#!/usr/bin/env python
# -*- coding: utf8 -*-
# vim:set cc=80:
if 0:
# test OOB {{{1}}}
class Student(object):
__slots__ = ('name', 'age', 'score', '__privatename', '__grade')
def __init__(self, name, score):
self.name = name
self.score = score
self.__privatename = name
def print_score(self):
print '%s\'s score is %s' % (self.name, self.score)
# use __grade to test controlling method to a member
@property
def grade(self):
return self.__grade
@grade.setter
def grade(self, grade):
self.__grade = grade
jeremy = Student('jeremy', 92)
print "jeremy's name is %s" % jeremy.name
jeremy.print_score()
print "jeremy's private name is %s" % jeremy._Student__privatename
# print "private name is %s" % jeremy.__privatename
class Student_in_america(Student):
def print_score(self):
print '%s\'name is %s' % (self.name, self.score * 20)
xixi = Student_in_america('xixi', 4.7)
print "xixi's name is %s" % xixi.name
xixi.print_score()
'''
# test duotai {{{2}}}
print "is jeremy a student?"
isinstance(jeremy,Student)
print "is jeremy an america student?"
print isinstance(jeremy,Student_in_america)
print "is xixi a student?"
print isinstance(xixi,Student)
print "is xixi an america student?"
print isinstance(xixi,Student_in_america)
'''
'''
# test dir, len
print "dir of Student looks:-------------------------"
print dir('Student')
print len('Student')
print "dir of Student_in_america looks:-----------------------"
print dir('Student_in_america')
'''
# jeremy.set_grade(4)
# print jeremy.get_grade()
# test decoration {{{2}}}
jeremy.grade = 4
print jeremy.grade
# test for loop for object {{{2}}}
class Fib(object):
def __init__(self):
self.a, self.b = 0, 1 # 初始化两个计数器 a, b
def __iter__(self):
return self # 实例本身就是迭代对象,故返回自己
def next(self):
self.a, self.b = self.b, self.a + self.b # 计算下一个值
if self.a > 3: # 退出循环的条件
raise StopIteration()
return self.a # 返回下一个值
# pdb.set_trace()
for n in Fib():
print n
# test jason {{{1}}}
if 0:
import json
d = dict(name='Bob', age=20, score=88)
print "original dict looks %s" % d
j = json.dumps(d)
print "jsondump looks %s" % j
d1 = json.loads(j)
print "jsonloads looks %s" % d1
# test jason {{{2}}}
class Student(object):
def __init__(self, name, age, score):
self.name = name
self.age = age
self.score = score
s = Student('Bob', 20, 88)
# print(json.dumps(s))
def student2dict(std):
return {
'name': std.name,
'age': std.age,
'score': std.score
}
print "json dump looks %s" % json.dumps(s, default=student2dict)
# multi-tasking {{{1}}}
# multi-processing:
# multiprocessing -> Process, Pool, Queue
# multi-threading:
# threading ->
# multiprocessing {{{2}}}
'''
import os
print 'Process (%s) start...' % os.getpid()
pid = os.fork()
if pid==0:
print 'I am child process (%s) and my parent is %s.' % \
(os.getpid(), os.getppid())
else:
print 'I (%s) just created a child process (%s).' % \
(os.getpid(), pid)
'''
# multiprocessing.Process {{{3}}}
# create 1 or a few child processes...
if 0:
print "---------------multiprocessing.Process----------------------"
from multiprocessing import Process, Pool, Queue
import time
import os
import random
# 子进程要执行的代码
def run_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())
print 'child process %s sleep 1s now' % name
time.sleep(1)
if __name__ == '__main__':
print 'Parent process %s.' % os.getpid()
p = Process(target=run_proc, args=('test',))
print 'will start a child process...'
p.start()
# wait for child process to end, before proceeding...
p.join()
print 'parent Process end.'
# multiprocessing.Pool {{{3}}}
# create a lot of child processes...
if 1:
print "----------------multiprocessing.Pool---------------------"
from multiprocessing import Process, Pool, Queue
import time
import os
import random
def long_time_task(name):
print 'Run task %s (%s)...' % (name, os.getpid())
start = time.time()
time.sleep(random.random() * 3)
end = time.time()
print 'Task %s runs %0.2f seconds.' % (name, (end - start))
if __name__ == '__main__':
print 'Parent process %s.' % os.getpid()
p = Pool()
for i in range(10):
p.apply_async(long_time_task, args=(i,))
print 'Waiting for all subprocesses done...'
p.close()
p.join()
# multiprocessing.Queue {{{3}}}
# inter process communication...
if 0:
print "----------------multiprocessing.Queue---------------------"
from multiprocessing import Process, Pool, Queue
import time
import os
import random
# 写数据进程执行的代码:
def write(q):
for value in ['A', 'B', 'C']:
print 'Put %s to queue...' % value
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码:
def read(q):
while True:
value = q.get(True)
print 'Get %s from queue.' % value
if __name__ == '__main__':
# 父进程创建 Queue,并传给各个子进程:
q = Queue()
pw = Process(target=write, args=(q,))
pr = Process(target=read, args=(q,))
# 启动子进程 pw,写入:
pw.start()
# 启动子进程 pr,读取:
pr.start()
# 等待 pw 结束:
pw.join()
# pr 进程里是死循环,无法等待其结束,只能强行终止:
pr.terminate()
# multiprocessing {{{2}}}
if 0:
import multiprocessing
import time
def func(msg):
for i in xrange(3):
print msg
time.sleep(1)
if __name__ == "__main__":
pool = multiprocessing.Pool(processes=4)
for i in xrange(10):
msg = "hello %d" %(i)
pool.apply_async(func, (msg, ))
pool.close()
pool.join()
print "Sub-process(es) done."
# multithreading {{{2}}}
if 0:
print "---------------multithreading----------------------"
# 启动一个线程就是把一个函数传入并创建 Thread 实例,然后调用 start()开始执行:
import threading
# 新线程执行的代码:
def loop():
print 'thread %s is running...' % threading.current_thread().name
n = 0
while n < 5:
n = n + 1
print '%s:thread %s >>> %s' % \
(time.time(), threading.current_thread().name, n)
time.sleep(1)
print 'thread %s ended.' % threading.current_thread().name
print 'thread %s is running...' % threading.current_thread().name
t = threading.Thread(target=loop, name='LoopThread')
t.start()
t.join()
print 'thread %s ended.' % threading.current_thread().name
'''
thread MainThread is running...
thread LoopThread is running...
1478741107.56:thread LoopThread >>> 1
1478741107.56:thread LoopThread >>> 2
1478741107.56:thread LoopThread >>> 3
1478741107.56:thread LoopThread >>> 4
1478741107.56:thread LoopThread >>> 5
thread LoopThread ended.
thread MainThread ended.
'''
# use global var without lock: wrong{{{3}}}
'''
problem of multi-threading:
global var are shared by all threads
and in multi-thread environment, things may get wrong
'''
from multiprocessing import Process, Pool, Queue
import threading
import time
import os
import random
balance = 0
def change_it(n):
# 先存后取,结果应该为 0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(100000):
change_it(n)
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print balance
'''
ping@ubuntu47-3:~/Dropbox/python$ python test.py
40
ping@ubuntu47-3:~/Dropbox/python$ python test.py
107
ping@ubuntu47-3:~/Dropbox/python$ python test.py
84
ping@ubuntu47-3:~/Dropbox/python$ python test.py
34
'''
# use global var withlock {{{3}}}
'''
one solution is to use thead lock
'''
from multiprocessing import Process, Pool, Queue
import time
import os
import random
import threading
balance = 0
lock = threading.Lock()
def change_it(n):
# 先存后取,结果应该为 0:
global balance
balance = balance + n
balance = balance - n
def run_thread(n):
for i in range(100000):
# 先要获取锁:
lock.acquire()
try:
# 放心地改吧:
change_it(n)
finally:
# 改完了一定要释放锁:
lock.release()
t1 = threading.Thread(target=run_thread, args=(5,))
t2 = threading.Thread(target=run_thread, args=(8,))
t1.start()
t2.start()
t1.join()
t2.join()
print balance
# multi-core support {{{3}}}
'''
python threads' multi-core support
'''
import threading
import multiprocessing
def loop():
x = 0
while True:
x = x ^ 1
for i in range(multiprocessing.cpu_count()):
t = threading.Thread(target=loop)
t.start()
# use local var with threading.local {{{3}}}
'''
thread1:
process_thread:
args: Alice, name:Thread-A
pass: Alice -> name
process_student(name)
thread2:
process_thread:
args: Bob, name:Thread-B
pass: Bob -> name
process_student(name)
'''
import threading
# 创建全局 ThreadLocal 对象:
local_school = threading.local()
def process_student():
print 'Hello, %s (in %s)' % \
(local_school.student, threading.current_thread().name)
def process_thread(name):
# 绑定 ThreadLocal 的 student:
local_school.student = name
process_student()
t1 = threading.Thread(target=process_thread,
args=('Alice',), name='Thread-A')
t2 = threading.Thread(target=process_thread,
args=('Bob',), name='Thread-B')
t1.start()
t2.start()
t1.join()
t2.join()
'''
distributed computing:
it looks for me, that server just dispatch a number (n in this case)
client to get this number from the queue, and then do the task
then client return the result (also a number) to server,
then server display it.
need to understand: how to use this in practical?
'''
role = 'server'
# role = 'client'
if role == 'server':
# taskmanager.py {{{4}}}
import random
import time
import Queue
from multiprocessing.managers import BaseManager
# 发送任务的队列:
task_queue = Queue.Queue()
# 接收结果的队列:
result_queue = Queue.Queue()
# 从 BaseManager 继承的 QueueManager:
class QueueManager(BaseManager):
pass
# 把两个 Queue 都注册到网络上, callable 参数关联了 Queue 对象:
QueueManager.register('get_task_queue', callable=lambda: task_queue)
QueueManager.register('get_result_queue', callable=lambda: result_queue)
# 绑定端口 5000, 设置验证码'abc':
manager = QueueManager(address=('', 5000), authkey='abc')
# 启动 Queue:
manager.start()
# 获得通过网络访问的 Queue 对象:
task = manager.get_task_queue()
result = manager.get_result_queue()
# 放几个任务进去:
for i in range(10):
n = random.randint(0, 10000)
print('Put task %d...' % n)
task.put(n)
# 从 result 队列读取结果:
print('Try get results...')
for i in range(10):
r = result.get(timeout=100)
print('Result: %s' % r)
# 关闭:
manager.shutdown()
else:
# taskworker.py {{{4}}}
import time
import sys
import Queue
from multiprocessing.managers import BaseManager
# 创建类似的 QueueManager:
class QueueManager(BaseManager):
pass
# 由于这个 QueueManager 只从网络上获取 Queue,所以注册时只提供名字:
QueueManager.register('get_task_queue')
QueueManager.register('get_result_queue')
# 连接到服务器,也就是运行 taskmanager.py 的机器:
server_addr = '127.0.0.1'
print('Connect to server %s...' % server_addr)
# 端口和验证码注意保持与 taskmanager.py 设置的完全一致:
m = QueueManager(address=(server_addr, 5000), authkey='abc')
# 从网络连接:
m.connect()
# 获取 Queue 的对象:
task = m.get_task_queue()
result = m.get_result_queue()
# 从 task 队列取任务,并把结果写入 result 队列:
for i in range(10):
try:
n = task.get(timeout=1)
print('run task %d * %d...' % (n, n))
r = '%d * %d = %d' % (n, n, n*n)
time.sleep(1)
result.put(r)
except Queue.Empty:
print('task queue is empty.')
# 处理结束:
print('worker exit.')
# pyez {{{1}}}
# Device module {{{2}}}
if 0:
# pprint print things in a prettier format
from pprint import pprint
from jnpr.junos import Device
# if __name__ == '__main__':
# connect to router {{{3}}}
# create an instance of class Device
myrouter = Device(host='172.19.161.129', user='labroot', passwd='lab123')
# start netconf-over-ssh connection to router
# this will return the same instance 'myrouter', represented as
# "Device(myrouter)"
myrouter.open()
from IPython.Shell import IPShellEmbed
IPShellEmbed([])()
# #above 2 lines can be merged into one line
# myrouter=Device(host='172.19.161.129',user='labroot',passwd='lab123').open()
# close the netconf connection
myrouter.close()
# myrouter.facts dict {{{3}}}
if 0:
# facts gathering is enabled by default, can be disabled to speed up:
# dev.open(gather_facts=False)
# and later can still be enabled:
# dev.facts_refresh()
pprint(myrouter.facts)
interface_lxml_element = myrouter.rpc.get_interface_information(terse=True)
list_of_interfaces = interface_lxml_element.findall('physical-interface')
print "list_of_interfaces looks %s:" % list_of_interfaces
for interface in list_of_interfaces:
# print "interface in the list looks: %s" % interface
print "Interface: %s Status: %s " % (
interface.findtext('name').strip(),
interface.findtext('oper-status').strip()
)
# myrouter.cli: send a command
print myrouter.cli("show interfaces terse fxp0")
# myrouter.close()
# yaml module {{{2}}}
if 0:
import yaml
ethport_yaml_file_path = \
'/usr/local/lib/python2.7/dist-packages/jnpr/junos/op/ethport.yml'
yaml_file = open(ethport_yaml_file_path, "r")
data = yaml.load(yaml_file)
print data
for key in data:
print "key: %s" % key
print "value: %s" % data[key]
# EthPortTable module {{{2}}}
if 0:
'''
EthPortTable module
'''
from jnpr.junos import Device
from jnpr.junos.op.ethport import EthPortTable
from pprint import pprint
myrouter = Device(host='172.19.161.129', user='labroot', passwd='lab123')
myrouter.open()
# create EthPortTable obj
eth_interfaces = EthPortTable(myrouter)
eth_interfaces.get()
print "eth_interface.keys() looks:\n ",
pprint(eth_interfaces.keys())
for interface in eth_interfaces:
print "interface.keys() looks:\n ",
print interface.keys()
print interface.name + "'s keys,name,admin,oper,macadd looks:"
print " " + interface.name, interface.admin,\
interface.oper, interface.macaddr
myrouter.close()
# Config module {{{2}}}
if 0:
from jnpr.junos import Device
from jnpr.junos.utils.config import Config
import time
import os
import sys
from jnpr.junos.exception import *
MAX_ATTEMPTS = 3
WAIT_BEFORE_RECONNECT = 10
# Assumes r0 already exists and is a connected device instance
for attempt in range(MAX_ATTEMPTS):
try:
myrouter = Device(host='172.19.161.129', user='labroot',
passwd='lab123')
myrouter.open()
cfg = Config(myrouter)
print "connection opened"
# load override abc.conf {{{3}}}
# this won't work, for some reason, has to use either absolute path,
# or change the dir first
config_file = "~/download_folder/backup.botanix.161101.jtac.conf"
# this works
config_file = \
"/home/ping/download_folder/backup.botanix.161101.jtac.conf"
cfg.load(path=config_file, format="text", merge=True)
# #this works
# os.chdir('/home/ping/download_folder')
# cfg.load(path="backup.botanix.161101.jtac.conf",overwrite=True)
print "config loaded"
# commit confirm {{{3}}}
cfg.commit(confirm=2)
time.sleep(10)
print "commit confirm 2"
# commit {{{3}}}
cfg.commit()
print "commited"
# edit exclusive {{{3}}}
cfg.lock()
# rollback 2 {{{3}}}
cfg.rollback(rb_id=2)
print "rollback 2"
# commit {{{3}}}
cfg.commit()
print "commited"
# unlock {{{3}}}
cfg.unlock()
# config one statement {{{3}}}
cfg.load("set system host-name mytest_python",
format="set", merge=True)
print cfg.diff()
cfg.rollback(0)
myrouter.close
except ConnectAuthError:
print "Authentication error!"
myrouter.close()
sys.exit()
except ConnectTimeoutError:
print "Timeout error!"
myrouter.close()
sys.exit()
except ConfigLoadError:
print "Couldn't unlock the config db!"
myrouter.close()
sys.exit()
except Exception as error:
if isinstance(error, RpcTimeoutError):
print "Rpc Timeout while loading the config!"
myrouter.close()
# elif type(error) is CommitError:
else:
print "Encountered an expected exception"
print error
sys.exit()
# route table {{{2}}}
if 0:
from jnpr.junos.op.routes import RouteTable
route_table = RouteTable(myrouter)
route_table.get('192.168.1.0/24')
route_table.get('12.123.73.43/32')
route_table.get()
route_table.get('192.168.4.0/24', table='inet.0', protocol='static')
route_table.get(protocol='isis')
for key in route_table:
print key.name + ' Nexthop: ' + key.nexthop + ' Via: ' + \
key.via + ' Protocol: ' + key.protocol
# a working example {{{2}}}
if 0:
#!/usr/bin/env python
"""Use interface descriptions to track the topology reported by LLDP.
This includes the following steps:
1) Gather LLDP neighbor information.
2) Gather interface descriptions.
3) Parse LLDP neighbor information previously stored in the descriptions.
4) Compare LLDP neighbor info to previous LLDP info from the descriptions.
5) Print LLDP Up / Change / Down events.
6) Store the updated LLDP neighbor info in the interface descriptions.
Interface descriptions are in the format:
[user-configured description ]LLDP: [(DOWN)]
The '(DOWN)' string indicates an LLDP neighbor which was previously
present, but is now not present.
"""
import sys
import getpass
import jxmlease
from jnpr.junos import Device
from jnpr.junos.utils.config import Config
import jnpr.junos.exception
TEMPLATE_PATH = 'interface_descriptions_template.xml'
# Create a jxmlease parser with desired defaults.
parser = jxmlease.EtreeParser()
class DoneWithDevice(Exception): pass
def main(): # {{{3}}}
"""The main loop.
Prompt for a username and password.
Loop over each device specified on the command line.
Perform the following steps on each device:
1) Get LLDP information from the current device state.
2) Get interface descriptions from the device configuration.
3) Compare the LLDP information against the previous snapshot of LLDP
information stored in the interface descriptions. Print changes.
4) Build a configuration snippet with new interface descriptions.
5) Commit the configuration changes.
Return an integer suitable for passing to sys.exit().
"""
if len(sys.argv) == 1:
print("\nUsage: %s device1 [device2 [...]]\n\n" % sys.argv[0])
return 1
rc = 0
# Get username and password as user input.
user = raw_input('Device Username: ')
password = getpass.getpass('Device Password: ')
for hostname in sys.argv[1:]:
try:
print("Connecting to %s..." % hostname)
dev = Device(host=hostname,
user=user,
password=password,
normalize=True
)
dev.open()
# retrieving LLDP info {{{4}}}
print("Getting LLDP information from %s..." % hostname)
lldp_info = get_lldp_neighbors(device=dev)
if lldp_info == None:
print(" Error retrieving LLDP info on " +\
hostname +\
". Make sure LLDP is enabled."
)
rc = 1
raise DoneWithDevice
# retrieving int desc info {{{4}}}
print("Getting interface descriptions from %s..." % hostname)
desc_info = get_description_info_for_interfaces(device=dev)
if desc_info == None:
print(" Error retrieving interface descriptions on %s." %
hostname)
rc = 1
raise DoneWithDevice
# check lldp change {{{4}}}
desc_changes = check_lldp_changes(lldp_info, desc_info)
if not desc_changes:
print(" No LLDP changes to configure on %s." % hostname)
raise DoneWithDevice
# load merge temp config {{{4}}}
if load_merge_template_config(
device=dev,
template_path=TEMPLATE_PATH,
template_vars={'descriptions': desc_changes}
):
print("Successfully committed configuration changes on %s."
% hostname)
else:
print(" Error committing description changes on %s." %
hostname)
rc = 1
raise DoneWithDevice
except jnpr.junos.exception.ConnectError as err:
print(" Error connecting: " + repr(err))
rc = 1
except DoneWithDevice:
pass
finally:
print(" Closing connection to %s." % hostname)
try:
dev.close()
except:
pass
return rc
def get_lldp_neighbors(device): # {{{3}}}
"""
Get current LLDP neighbor information.
Return a two-level dictionary with the LLDP neighbor information.
The first-level key is the local port (aka interface) name.
The second-level keys are 'system' for the remote system name
and 'port' for the remote port ID. On error, return None.
For example:
{'ge-0/0/1': {'system': 'r1', 'port', 'ge-0/0/10'}}
"""
lldp_info = {}
try:
resp = device.rpc.get_lldp_neighbors_information()
except (jnpr.junos.exception.RpcError,
jnpr.junos.exception.ConnectError) as err:
print " " + repr(err)
return None
for nbr in resp.findall('lldp-neighbor-information'):
local_port = nbr.findtext('lldp-local-port-id')
remote_system = nbr.findtext('lldp-remote-system-name')
remote_port = nbr.findtext('lldp-remote-port-id')
if local_port and (remote_system or remote_port):
lldp_info[local_port] = {
'system': remote_system,
'port': remote_port
}
return lldp_info
def get_description_info_for_interfaces(device): # {{{3}}}
"""
Get current interface description for each interface.
Parse the description into the user-configured description, remote system,
and remote port components.
Return a two-level dictionary. The first-level key is the local port (aka
interface) name. The second-level keys are 'user_desc' for the
user-configured description, 'system' for the remote system name, 'port'
for the remote port, and 'down', which is a Boolean indicating if LLDP was
previously down. On error, return None. For example: {'ge-0/0/1':
{'user_desc': 'test description', 'system': 'r1', 'port': 'ge-0/0/10',
'down': True}}
"""
desc_info = {}
try:
resp = parser(device.rpc.get_interface_information(descriptions=True))
except (jnpr.junos.exception.RpcError,
jnpr.junos.exception.ConnectError) as err:
print " " + repr(err)
return None
try:
pi = resp['interface-information']['physical-interface'].jdict()
except KeyError:
return desc_info
for (local_port, port_info) in pi.items():
try:
(udesc, _, ldesc) = port_info['description'].partition('LLDP: ')
udesc = udesc.rstrip()
(remote_system, _, remote_port) = ldesc.partition(' ')
(remote_port, down_string, _) = remote_port.partition('(DOWN)')
desc_info[local_port] = {
'user_desc': udesc,
'system': remote_system,
'port': remote_port,
'down': True if down_string else False
}
except (KeyError, TypeError):
pass
return desc_info
def check_lldp_changes(lldp_info, desc_info): # {{{3}}}
"""
Compare current LLDP info with previous snapshot from descriptions.
Given the dictionaries produced by get_lldp_neighbors() and
get_description_info_for_interfaces(), print LLDP up, change, and down
messages.
Return a dictionary containing information for the new descriptions
to configure.
"""
desc_changes = {}
# Iterate through the current LLDP neighbor state. Compare this
# to the saved state as retrieved from the interface descriptions.
for local_port in lldp_info:
lldp_system = lldp_info[local_port]['system']
lldp_port = lldp_info[local_port]['port']
has_lldp_desc = desc_info.has_key(local_port)
desc_system = desc_port = False
if has_lldp_desc:
desc_system = desc_info[local_port]['system']
desc_port = desc_info[local_port]['port']
down = desc_info[local_port]['down']
if not desc_system or not desc_port:
has_lldp_desc = False
if not has_lldp_desc:
print(" %s LLDP Up. Now: %s %s" \
% \
(local_port,lldp_system,lldp_port)
)
elif down:
print(" %s LLDP Up. Was: %s %s Now: %s %s" %
(local_port,desc_system,desc_port,lldp_system,lldp_port))
elif lldp_system != desc_system or lldp_port != desc_port:
print(" %s LLDP Change. Was: %s %s Now: %s %s" %
(local_port,desc_system,desc_port,lldp_system,lldp_port))
else:
# No change. LLDP was not down. Same system and port.
continue
desc_changes[local_port] = "LLDP: %s %s" % (lldp_system,lldp_port)
# Iterate through the saved state as retrieved from the interface
# descriptions. Look for any neighbors that are present in the
# saved state, but are not present in the current LLDP neighbor
# state.
for local_port in desc_info:
desc_system = desc_info[local_port]['system']
desc_port = desc_info[local_port]['port']
down = desc_info[local_port]['down']
if (desc_system and desc_port and not down
and not lldp_info.has_key(local_port)):
print(" %s LLDP Down. Was: %s %s" %
(local_port,desc_system,desc_port))
desc_changes[local_port] = "LLDP: %s %s(DOWN)" % \
(desc_system, desc_port)
# Iterate through the list of interface descriptions we are going
# to change. Prepend the user description, if any.
for local_port in desc_changes:
try:
udesc = desc_info[local_port]['user_desc']
except KeyError:
continue
if udesc:
desc_changes[local_port] = udesc + " " + desc_changes[local_port]
return desc_changes
def load_merge_template_config(device, template_path, template_vars): # {{{3}}}
"""
Load templated config with "configure private" and "load merge".
Given a template_path and template_vars, do:
configure private,
load merge of the templated config,
commit,
and check the results.
Return True if the config was committed successfully, False otherwise.
"""
class LoadNotOKError(Exception):
pass
device.bind(cu=Config)
rc = False
try:
try:
# configure private
resp = device.rpc.open_configuration(private=True)
except jnpr.junos.exception.RpcError as err:
if not (err.rpc_error['severity'] == 'warning' and
'uncommitted changes will be discarded on exit' in
err.rpc_error['message']):
raise
resp = device.cu.load(
template_path=template_path,
template_vars=template_vars,
merge=True
)
if resp.find("ok") is None:
raise LoadNotOKError
device.cu.commit(comment="made by %s" % sys.argv[0])
except (jnpr.junos.exception.RpcError,
jnpr.junos.exception.ConnectError,
LoadNotOKError) as err:
print " " + repr(err)
except:
print " Unknown error occurred loading or committing configuration."
else:
rc = True
try:
device.rpc.close_configuration()
except jnpr.junos.exception.RpcError as err:
print " " + repr(err)
rc = False
return rc
if __name__ == "__main__":
sys.exit(main())
# upgrade sw {{{2}}}
if 0:
import os, sys, logging
from jnpr.junos import Device
from jnpr.junos.utils.sw import SW
from jnpr.junos.exception import *
host = 'dc1a.example.com'
package = '/var/tmp/junos-install/jinstall-13.3R1.8-domestic-signed.tgz'
remote_path = '/var/tmp'
validate = True
logfile = '/var/log/junos-pyez/install.log'
def do_log(msg, level='info'):
getattr(logging, level)(msg)
def update_progress(dev, report):
# log the progress of the installing process
do_log(report)
def main():
# initialize logging
logging.basicConfig(filename=logfile, level=logging.INFO,
format='%(asctime)s:%(name)s: %(message)s')
logging.getLogger().name = host
sys.stdout.write('Information logged in {0}\n'.format(logfile))
# verify package exists
if (os.path.isfile(package)):
found = True
else:
msg = 'Software package does not exist: {0}. '.format(package)
sys.exit(msg + '\nExiting program')
dev = Device(host=host)
try:
dev.open()
except Exception as err:
sys.stderr.write('Cannot connect to device: {0}\n'.format(err))
return
# Increase the default RPC timeout to accommodate install operations
dev.timeout = 300
# Create an instance of SW
sw = SW(dev)
try:
do_log('Starting the software upgrade process: {0}'.format(package))
ok = sw.install(package=package, remote_path=remote_path,
progress=update_progress, validate=validate)
except Exception as err:
msg = 'Unable to install software, {0}'.format(err)
do_log(msg, level='error')
else:
if ok is True:
do_log('Software installation complete. Rebooting')
rsp = sw.reboot()
do_log('Upgrade pending reboot cycle, please be patient.')
do_log(rsp)
# End the NETCONF session and close the connection
dev.close()
if __name__ == "__main__":
main()
# issu {{{2}}}
if 0:
from jnpr.junos import Device
from pprint import pprint
from jnpr.junos.utils.sw import SW
with Device(host=xxxx, user=xxxx, password=xxx) as dev:
pprint(dev.facts)
sw = SW(dev)
try:
ok = sw.install(package='/var/home/regress/junos-srx5000-12.1X47-D43-domestic.tgz', issu=True,
no_copy=True, progress=True)
print 'Install Status %s' % ok
if ok is not True:
print "sw.install returned False"
except Exception as ex:
print "ERROR: %s" % ex.message
# web {{{1}}}
# crawl {{{2}}}
if 0:
# work flow {{{3}}}
# class:
# Crawler
# get_page
# go
# Retriever
# get_file
# download
# parse_links
# main:
# robot=Crawler.go
# robot.go
# get_page
# r=Retriever
# get_file
# download
# parse_links
# r.download
# r.parse_links
# #add links to q
# !/usr/bin/env python
import cStringIO
import formatter
import htmllib
import httplib
import os
import sys
import urllib
import urlparse
class Retriever(object): # {{{3}}}
# the only attributes that instances can have are self.url and
# self.file.
__slots__ = ('url', 'file')
def __init__(self, url):
self.url, self.file = self.get_file(url)
def get_file(self, url, default='index.html'): # {{{4}}}
'Create usable local filename from URL'
parsed = urlparse.urlparse(url)
# find host: www.null.com
host = parsed.netloc.split('@')[-1].split(':')[0]
# set file full path:
# www.null.com+/home/index.html
filepath = '%s%s' % (host, parsed.path)
# if filepath no '.html', meaning only www.null.com/a/b/c
# assume index.html
if not os.path.splitext(parsed.path)[1]:
filepath = os.path.join(filepath, default)
# set dir name: 'www.null.com/home'
linkdir = os.path.dirname(filepath)
# on confliction of this folder name, erase and create the folder
if not os.path.isdir(linkdir):
if os.path.exists(linkdir):
os.unlink(linkdir)
os.makedirs(linkdir)
return url, filepath
def download(self): # {{{4}}}
'Download URL to specific named file'
try:
retval = urllib.urlretrieve(self.url, self.file)
except (IOError, httplib.InvalidURL) as e:
retval = (('*** ERROR: bad URL "%s": %s' % (self.url, e)),)
return retval
def parse_links(self): # {{{4}}}
'Parse out the links found in downloaded HTML file'
f = open(self.file, 'r')
data = f.read()
f.close()
parser = htmllib.HTMLParser(
formatter.AbstractFormatter(
formatter.DumbWriter(
cStringIO.StringIO()
)
)
)
parser.feed(data)
parser.close()
# return list of url
return parser.anchorlist
class Crawler(object): # {{{3}}}
count = 0
def __init__(self, url): # {{{4}}}
self.q = [url]
self.seen = set()
# retrieve domain name
parsed = urlparse.urlparse(url)
host = parsed.netloc.split('@')[-1].split(':')[0]
self.dom = '.'.join(host.split('.')[-2:])
def get_page(self, url, media=False): # {{{4}}}
'Download page & parse links, add to queue if nec'
r = Retriever(url)
# download the url
fname = r.download()[0]
if fname[0] == '*':
print fname, '... skipping parse'
return
Crawler.count += 1
print '\n(', Crawler.count, ')'
print 'URL:', url
print 'FILE:', fname
self.seen.add(url)
ftype = os.path.splitext(fname)[1]
if ftype not in ('.htm', '.html'):
return
for link in r.parse_links():
if link.startswith('mailto:'):
print '... discarded, mailto link'
continue
if not media:
ftype = os.path.splitext(link)[1]
if ftype in ('.mp3', '.mp4', '.m4v', '.wav'):
print '... discarded, media file'
continue
if not link.startswith('http://'):
link = urlparse.urljoin(url, link)
print '*', link,
if link not in self.seen:
if self.dom not in link:
print '... discarded, not in domain'
else:
if link not in self.q:
self.q.append(link)
print '... new, added to Q'
else:
print '... discarded, already in Q'
else:
print '... discarded, already processed'
def go(self, media=False):
'Process next page in queue (if any)'
while self.q:
url = self.q.pop()
self.get_page(url, media)
def main(): # {{{3}}}
# get url
if len(sys.argv) > 1:
url = sys.argv[1]
else:
try:
url = raw_input('Enter starting URL: ')
except (KeyboardInterrupt, EOFError):
url = ''
if not url:
return
if not url.startswith('http://') and \
not url.startswith('ftp://'):
url = 'http://%s/' % url
#
robot = Crawler(url)
robot.go()
if __name__ == '__main__':
main()
# parse_links.py {{{2}}}
if 0:
from HTMLParser import HTMLParser
from cStringIO import StringIO
from urllib2 import urlopen
from urlparse import urljoin
from BeautifulSoup import BeautifulSoup, SoupStrainer
from html5lib import parse, treebuilders
URLS = (
'http://python.org',
'http://google.com',
)
def output(x):
print '\n'.join(sorted(set(x)))
def simpleBS(url, f):
'simpleBS() - use BeautifulSoup to parse all tags to get anchors'
output(
urljoin(url, x['href'])
for x in BeautifulSoup(f).findAll('a')
)
def fasterBS(url, f):
'fasterBS() - use BeautifulSoup to parse only anchor tags'
output(
urljoin(url, x['href'])
for x in BeautifulSoup(
f,
parseOnlyThese=SoupStrainer('a')
)
)
def htmlparser(url, f):
'htmlparser() - use HTMLParser to parse anchor tags'
class AnchorParser(HTMLParser):
# called every time a new tag is encountered in the file stream
def handle_starttag(self, tag, attrs):
if tag != 'a':
return
if not hasattr(self, 'data'):
self.data = []
for attr in attrs:
if attr[0] == 'href':
self.data.append(attr[1])
parser = AnchorParser()
parser.feed(f.read())
output(urljoin(url, x) for x in parser.data)
def html5libparse(url, f):
'html5libparse() - use html5lib to parse anchor tags'
output(
urljoin(url, x.attributes['href'])
for x in parse(f)
if isinstance(x, treebuilders.simpletree.Element) and
x.name == 'a'
)
def process(url, data):
print '\n*** simple BS'
simpleBS(url, data)
data.seek(0)
print '\n*** faster BS'
fasterBS(url, data)
data.seek(0)
print '\n*** HTMLParser'
htmlparser(url, data)
data.seek(0)
print '\n*** HTML5lib'
html5libparse(url, data)
def main():
for url in URLS:
f = urlopen(url)
data = StringIO(f.read())
f.close()
process(url, data)
if __name__ == '__main__':
main()
# wsgi {{{2}}}
if 0:
from pprint import pprint
def application(environ, start_response):
pprint(environ)
start_response('200 OK', [('Content-Type', 'text/html')])
return 'Hello, %s!
' % (environ['PATH_INFO'][1:] or 'ping')
# return 'Hello, web!
'
# server.py
# 从 wsgiref 模块导入:
from wsgiref.simple_server import make_server
# 导入我们自己编写的 application 函数:
# from hello import application
# 创建一个服务器, IP 地址为空,端口是 8000,处理函数是 application:
httpd = make_server('', 8000, application)
print "Serving HTTP on port 8000..."
# 开始监听 HTTP 请求:
httpd.serve_forever()
# flask {{{3}}}
if 0:
from flask import Flask
from flask import request
app = Flask(__name__)
@app.route('/', methods=['GET', 'POST'])
def home():
return 'Home
'
@app.route('/signin', methods=['GET'])
def signin_form():
return ''''''
@app.route('/signin', methods=['POST'])
def signin():
# 需要从 request 对象读取表单内容:
if request.form['username'] == 'admin' and\
request.form['password'] == 'password':
return 'Hello, dmin!
'
return 'Bad username or password.
'
if __name__ == '__main__':
app.run(debug=True, host='0.0.0.0')
# getopt {{{1}}}
if 0:
import sys
import getopt
def usage():
print("Usage:%s [-a|-o|-c] [--help|--output] args...." % sys.argv[0])
if "__main__" == __name__:
try:
opts, args = getopt.getopt(
sys.argv[1:], "ao:c", ["help", "output="]
)
print("============ opts ==================")
print(opts)
print("============ args ==================")
print(args)
# check all param
for opt, arg in opts:
if opt in ("-h", "--help"):
usage()
sys.exit(1)
elif opt in ("-t", "--test"):
print("for test option")
else:
print("%s ==> %s" % (opt, arg))
except getopt.GetoptError as e:
print("getopt error!")
print e
usage()
sys.exit(1)
# twisted {{{1}}}
if 0:
from twisted.internet import task
from twisted.internet import reactor
def runEverySecond():
print "a second has passed"
# print "a second has passed"
l = task.LoopingCall(runEverySecond)
l.start(1.0) # call every second
# l.stop() will stop the looping calls
reactor.run()
# game {{{1}}}
if 0:
# Tic Tac Toe
# This is the solution for the Milestone Project! A two player game made
# within a Jupyter Notebook. Feel free to download the notebook to
# understand how it works!
# First some imports we'll need to use for displaying output and set the
# global variables
# Specifically for the iPython Notebook environment for clearing output.
from IPython.display import clear_output
# Global variables
board = [' '] * 10
game_state = True
announce = ''
# Next make a function that will reset the board, in this case we'll store
# values as a list.
# Note: Game will ignore the 0 index
def reset_board(): # {{{2}}}
global board , game_state
board = [' '] * 10
game_state = True
# Now create a function to display the board, I'll use the num pad as the
# board reference. Note: Should probably just make board and player classes
# later....
def display_board(): # {{{2}}}
''' This function prints out the board so the numpad can be used as a
reference '''
# Clear current cell output
clear_output()
# Print board
print " "+board[7]+" |"+board[8]+" | "+board[9]+" "
print "------------"
print " "+board[4]+" |"+board[5]+" | "+board[6]+" "
print "------------"
print " "+board[1]+" |"+board[2]+" | "+board[3]+" "
# Define a function to check for a win by comparing inputs in the board
# list. Note: Maybe should just have a list of winning combos and cycle
# through them?
def win_check(board, player): # {{{2}}}
''' Check Horizontals,Verticals, and Diagonals for a win '''
if (board[7] == board[8] == board[9] == player) or \
(board[4] == board[5] == board[6] == player) or \
(board[1] == board[2] == board[3] == player) or \
(board[7] == board[4] == board[1] == player) or \
(board[8] == board[5] == board[2] == player) or \
(board[9] == board[6] == board[3] == player) or \
(board[1] == board[5] == board[9] == player) or \
(board[3] == board[5] == board[7] == player):
return True
else:
return False
# Define function to check if the board is already full in case of a tie.
# (This is straightforward with our board stored as a list) Just remember
# index 0 is always empty.
def full_board_check(board): # {{{2}}}
''' Function to check if any remaining blanks are in the board '''
if " " in board[1:]:
return False
else:
return True
# Now define a function to get player input and do various checks on it.
def ask_player(mark): # {{{2}}}
''' Asks player where to place X or O mark, checks validity '''
global board
req = "Choose where to place your '" + mark + "': "
while True:
try:
choice = int(raw_input(req))
except ValueError:
print("Sorry, please input a number between 1-9.")
continue
if board[choice] == " ":
board[choice] = mark
break
else:
print "That space isn't empty!"
continue
# Now have a function that takes in the player's choice (via the ask_player
# function) then returns the game_state.
def player_choice(mark): # {{{2}}}
global board, game_state, announce
# Set game blank game announcement
announce = ''
# Get Player Input
mark = str(mark)
# Validate input
ask_player(mark)
# Check for player win
if win_check(board, mark):
clear_output()
display_board()
announce = mark + " wins! Congratulations"
game_state = False
# Show board
clear_output()
display_board()
# Check for a tie
if full_board_check(board):
announce = "Tie!"
game_state = False
return game_state, announce
# Finally put it all together in a function to play the game.
def play_game(): # {{{2}}}
reset_board()
global announce
# Set marks
X = 'X'
O = 'O'
while True:
# Show board
clear_output()
display_board()
# Player X turn
game_state, announce = player_choice(X)
print announce
if game_state is False:
break
# Player O turn
game_state, announce = player_choice(O)
print announce
if game_state is False:
break
# Ask player for a rematch
rematch = raw_input('Would you like to play again? y/n')
if rematch == 'y':
play_game()
else:
print "Thanks for playing!"
# Let's play!
if __name__ == "__main__":
play_game()
# O |X |
# ------------
# X |X | O
# ------------
# O |X |
# X wins! Congratulations
# Would you like to play again? y/nn
# Thanks for playing!
# re {{{1}}}
if 0:
# In [3]: run test.py testre.txt local2:80 /tmp
# test data file: restre.txt
# NameVirtualHost 127.0.0.1:80
#
# DocumentRoot /var/www/
#
# Options FollowSymLinks
# AllowOverride None
#
# ErrorLog /var/log/apache2/error.log
# LogLevel warn
# CustomLog /var/log/apache2/access.log combined
# ServerSignature On
#
#
# DocumentRoot /var/www2/
#
# Options FollowSymLinks
# AllowOverride None
#
# ErrorLog /var/log/apache2/error2.log
# LogLevel warn
# CustomLog /var/log/apache2/access2.log combined
# ServerSignature On
#
from cStringIO import StringIO
import re
import ipdb; ipdb.set_trace() # XXX BREAKPOINT
vhost_start = re.compile(r'')
vhost_end = re.compile(r')
vhost_start_match = vhost_start.search(line)
# if yes, get the vhost name from the first word in the matched
# part, that is first word #after "VirtualHost ", in this line
# below, it should be "local2:80"
#
#
#
# then we are sure we are now "in" the block that we are looking
# for
if vhost_start_match:
curr_vhost = vhost_start_match.groups()[0]
in_vhost = True
# if we are in the block, and the vhost is what we are looking for
# start to search the docroot "DocumentRoot"
if in_vhost and (curr_vhost == vhost):
docroot_match = docroot_re.search(line)
# once found, replace first match group with new_docroot
if docroot_match:
# \0 \1 ?
# ________________ ___
# docroot_re = re.compile(r'(DocumentRoot\s+)(\S+)')
sub_line = docroot_re.sub(r'\1%s' % new_docroot, line)
# then replace old line with new line, for yield (generator)
line = sub_line
# check if the line is the end of block, if yes, then we are
# stepping out of the block
vhost_end_match = vhost_end.search(line)
if vhost_end_match:
in_vhost = False
yield line
if __name__ == '__main__':
import sys
# test.py testre.txt local2:80 /tmp
# ---------- --------- ----
# name of the file to be processed
conf_file = sys.argv[1]
# name of the VirtualHost section to be located
#
# ---------
vhost = sys.argv[2]
# name of the new DocumentRoot value to be applied
# DocumentRoot /var/www/
# ---------
docroot = sys.argv[3]
conf_string = open(conf_file).read()
for line in replace_docroot(conf_string, vhost, docroot):
print line,
# unit test {{{1}}}
# module -> function to be tested {{{2}}}
if 0:
"""
USAGE:
apache_log_parser_split.py some_log_file
This script takes one command line argument: the name of a log file
to parse. It then parses the log file and generates a report which
associates remote hosts with number of bytes transferred to them.
"""
import sys
import re
# dictify_logline is the function to be tested
# regex with these features:
# VERBOSE, comments, named group
log_line_re = re.compile(r'''
(?P\S+) #IP ADDRESS
\s+ # whitespace
\S+ # remote logname
\s+ # whitespace
\S+ # remote user
\s+ # whitespace
\[[^\[\]]+\] # time
\s+ # whitespace
"[^"]+" # first line of request
\s+ # whitespace
(?P\d+) # status
\s+ # whitespace
(?P-|\d+) # bytes_sent
\s* # whitespace
''', re.VERBOSE)
# version1: Example 3-24. Apache logfile parser—split on white space
def dictify_logline(line):
'''return a dictionary of the pertinent pieces of an apache combined
log file Currently, the only fields we are interested in are remote
host and bytes sent, but we are putting status in there just for
good measure.
'''
split_line = line.split()
return {'remote_host': split_line[0],
'status': split_line[8],
'bytes_sent': split_line[9],
}
# version2: Example 3-26. Apache logfile parser—regex
def dictify_logline(line):
'''return a dictionary of the pertinent pieces of an apache combined log
file Currently, the only fields we are interested in are remote host and
bytes sent, but we are putting status in there just for good measure.
'''
m = log_line_re.match(line)
if m:
groupdict = m.groupdict()
if groupdict['bytes_sent'] == '-':
groupdict['bytes_sent'] = '0'
return groupdict
else:
return {'remote_host': None,
'status': None,
'bytes_sent': "0",
}
def generate_log_report(logfile):
'''return a dictionary of format remote_host=>[list of bytes sent]
This function takes a file object, iterates through all the lines in
the file, and generates a report of the number of bytes transferred
to each remote host for each hit on the webserver.
'''
report_dict = {}
for line in logfile:
line_dict = dictify_logline(line)
print line_dict
try:
bytes_sent = int(line_dict['bytes_sent'])
except ValueError:
# totally disregard anything we don't understand
continue
report_dict.setdefault(line_dict['remote_host'], []).append(bytes_sent)
return report_dict
if __name__ == "__main__":
if not len(sys.argv) > 1:
print __doc__
sys.exit(1)
infile_name = sys.argv[1]
try:
infile = open(infile_name, 'r')
except IOError:
print "You must specify a valid file to parse"
print __doc__
sys.exit(1)
log_report = generate_log_report(infile)
print log_report
infile.close()
# unit test code {{{2}}}
if 0:
# unit test code
# save below code in another file, say utest.py, and run :
# ./utest.py
# test result:
# In [64]: run /tmp/temp.py
# ...F
# ======================================================================
# FAIL: testMalformed (__main__.TestApacheLogParser)
# ----------------------------------------------------------------------
# Traceback (most recent call last):
# File "/tmp/temp.py", line 68, in testMalformed
# 'bytes_sent': '2326'})
# AssertionError: {'status': 'space.html', 'bytes_sent': 'HTTP/1.0"', 'remote_host': '127.0.0.1'} != {'status': '200', 'bytes_sent': '2326', 'remote_host': '127.0.0.1'}
# - {'bytes_sent': 'HTTP/1.0"', 'remote_host': '127.0.0.1',
# 'status': 'space.html'}
# ? ^^^^^^^^^ ^^^^^^^^^^
# + {'bytes_sent': '2326', 'remote_host': '127.0.0.1', 'status': '200'}
# ? ^^^^ ^^^
# ----------------------------------------------------------------------
# Ran 4 tests in 0.012s
# FAILED (failures=1)
# An exception has occurred, use %tb to see the full traceback.
# SystemExit: True
import imp
import unittest
test = imp.load_source('test', '/home/pings/python/test.py')
from test import dictify_logline
class TestApacheLogParser(unittest.TestCase):
def setUp(self):
pass
def testCombinedExample(self):
# test the combined example from apache.org
combined_log_entry = \
'127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] ' \
'"GET /apache_pb.gif HTTP/1.0" 200 2326 ' \
"http://www.example.com/start.html" \
"Mozilla/4.08 [en] (Win98; I;Nav)"
self.assertEqual(
dictify_logline(combined_log_entry),
{
'remote_host': '127.0.0.1',
'status': '200',
'bytes_sent': '2326'
}
)
def testCommonExample(self):
# test the common example from apache.org
common_log_entry = \
'127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] ' \
'"GET /apache_pb.gif HTTP/1.0" 200 2326'
self.assertEqual(
dictify_logline(common_log_entry),
{
'remote_host': '127.0.0.1',
'status': '200',
'bytes_sent': '2326'
}
)
def testExtraWhitespace(self):
# test for extra whitespace between fields
common_log_entry = \
'127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] '\
'"GET /apache_pb.gif HTTP/1.0" 200 2326'
self.assertEqual(
dictify_logline(common_log_entry),
{
'remote_host': '127.0.0.1', 'status': '200',
'bytes_sent': '2326'
}
)
def testMalformed(self):
# test for extra whitespace between fields
common_log_entry = \
'127.0.0.1 - frank [10/Oct/2000:13:55:36 -0700] '\
'"GET /some/url/with white space.html HTTP/1.0" 200 2326'
self.assertEqual(
dictify_logline(common_log_entry),
{'remote_host': '127.0.0.1', 'status': '200',
'bytes_sent': '2326'})
if __name__ == '__main__':
unittest.main()
# pdf {{{1}}}
## simplest {{{2}}}
if 0:
from reportlab.pdfgen import canvas
def hello():
c = canvas.Canvas("helloworld.pdf")
c.drawString(100, 100, "Hello World")
c.showPage()
c.save()
hello()
## medium {{{2}}}
if 1:
import subprocess
import datetime
from reportlab.pdfgen import canvas
from reportlab.lib.units import inch
def disk_report():
p = subprocess.Popen("df -h", shell=True, stdout=subprocess.PIPE)
return p.stdout.readlines()
def create_pdf(input, output="disk_report.pdf"):
now = datetime.datetime.today()
date = now.strftime("%h %d %Y %H:%M:%S")
c = canvas.Canvas(output)
textobject = c.beginText()
textobject.setTextOrigin(inch, 11*inch)
textobject.textLines('''Disk Capacity Report: %s''' % date)
for line in input:
textobject.textLine(line.strip())
c.drawText(textobject)
c.showPage()
c.save()
report = disk_report()
create_pdf(report)