X Tutup
Skip to content

Commit b828333

Browse files
committed
多线程/协程操作
1 parent 58196db commit b828333

File tree

9 files changed

+409
-0
lines changed

9 files changed

+409
-0
lines changed
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
import asyncio
2+
3+
4+
@asyncio.coroutine
5+
def hello():
6+
print("Hello world!")
7+
# 异步调用asyncio.sleep(1):
8+
r = yield from asyncio.sleep(1)
9+
print("Hello again!")
10+
11+
12+
# 获取EventLoop:
13+
loop = asyncio.get_event_loop()
14+
# 执行coroutine
15+
loop.run_until_complete(hello())
16+
loop.close()
17+
18+
'''
19+
@asyncio.coroutine把一个generator标记为coroutine类型,然后,我们就把这个coroutine扔到EventLoop中执行。 hello()会首先打印出Hello world!,
20+
然后,yield from语法可以让我们方便地调用另一个generator。由于asyncio.sleep()也是一个coroutine,所以线程不会等待asyncio.sleep(),
21+
而是直接中断并执行下一个消息循环。当asyncio.sleep()返回时,线程就可以从yield from拿到返回值(此处是None),然后接着执行下一行语句。
22+
把asyncio.sleep(1)看成是一个耗时1秒的IO操作,在此期间,主线程并未等待,而是去执行EventLoop中其他可以执行的coroutine了,因此可以实现并发执行。
23+
'''
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
'''
2+
4.1 传统多线程问题?
3+
​ 传统多线程方案会使用“即时创建, 即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。
4+
5+
​ 一个线程的运行时间可以分为3部分:线程的启动时间、线程体的运行时间和线程的销毁时间。在多线程处理的情景中,如果线程不能被重用,就意味着每次创建都需要经过启动、销毁和运行3个过程。这必然会增加系统相应的时间,降低了效率。
6+
7+
有没有一种高效的解决方案呢? —— 线程池
8+
9+
4.2 线程池基本原理:
10+
​ 我们把任务放进队列中去,然后开N个线程,每个线程都去队列中取一个任务,执行完了之后告诉系统说我执行完了,然后接着去队列中取下一个任务,直至队列中所有任务取空,退出线程。
11+
12+
使用线程池:
13+
​ 由于线程预先被创建并放入线程池中,同时处理完当前任务之后并不销毁而是被安排处理下一个任务,因此能够避免多次创建线程,从而节省线程创建和销毁的开销,能带来更好的性能和系统稳定性。
14+
15+
线程池要设置为多少?
16+
17+
服务器CPU核数有限,能够同时并发的线程数有限,并不是开得越多越好,以及线程切换是有开销的,如果线程切换过于频繁,反而会使性能降低
18+
19+
线程执行过程中,计算时间分为两部分:
20+
21+
CPU计算,占用CPU
22+
不需要CPU计算,不占用CPU,等待IO返回,比如recv(), accept(), sleep()等操作,具体操作就是比如
23+
访问cache、RPC调用下游service、访问DB,等需要网络调用的操作
24+
那么如果计算时间占50%, 等待时间50%,那么为了利用率达到最高,可以开2个线程:
25+
假如工作时间是2秒, CPU计算完1秒后,线程等待IO的时候需要1秒,此时CPU空闲了,这时就可以切换到另外一个线程,让CPU工作1秒后,线程等待IO需要1秒,此时CPU又可以切回去,第一个线程这时刚好完成了1秒的IO等待,可以让CPU继续工作,就这样循环的在两个线程之前切换操作。
26+
27+
那么如果计算时间占20%, 等待时间80%,那么为了利用率达到最高,可以开5个线程:
28+
可以想象成完成任务需要5秒,CPU占用1秒,等待时间4秒,CPU在线程等待时,可以同时再激活4个线程,这样就把CPU和IO等待时间,最大化的重叠起来
29+
30+
抽象一下,计算线程数设置的公式就是:
31+
N核服务器,通过执行业务的单线程分析出本地计算时间为x,等待时间为y,则工作线程数(线程池线程数)设置为 N*(x+y)/x,能让CPU的利用率最大化。
32+
由于有GIL的影响,python只能使用到1个核,所以这里设置N=1
33+
34+
'''
35+
36+
import queue
37+
import threading
38+
import time
39+
40+
41+
# 声明线程池管理类
42+
class WorkManager(object):
43+
def __init__(self, work_num=1000, thread_num=2):
44+
self.work_queue = queue.Queue() # 任务队列
45+
self.threads = [] # 线程池
46+
self.__init_work_queue(work_num) # 初始化任务队列,添加任务
47+
self.__init_thread_pool(thread_num) # 初始化线程池,创建线程
48+
49+
"""
50+
初始化线程池
51+
"""
52+
def __init_thread_pool(self, thread_num):
53+
for i in range(thread_num):
54+
# 创建工作线程(线程池中的对象)
55+
self.threads.append(Work(self.work_queue))
56+
57+
"""
58+
初始化工作队列
59+
"""
60+
def __init_work_queue(self, jobs_num):
61+
for i in range(jobs_num):
62+
self.add_job(do_job, i)
63+
64+
"""
65+
添加一项工作入队
66+
"""
67+
def add_job(self, func, *args):
68+
self.work_queue.put((func, list(args))) # 任务入队,Queue内部实现了同步机制
69+
70+
"""
71+
等待所有线程运行完毕
72+
"""
73+
def wait_allcomplete(self):
74+
for item in self.threads:
75+
if item.isAlive(): item.join()
76+
77+
78+
class Work(threading.Thread):
79+
def __init__(self, work_queue):
80+
threading.Thread.__init__(self)
81+
self.work_queue = work_queue
82+
self.start()
83+
84+
def run(self):
85+
# 死循环,从而让创建的线程在一定条件下关闭退出
86+
while True:
87+
try:
88+
# 任务异步出队,Queue内部实现了同步机制
89+
do, args = self.work_queue.get(block=False)
90+
do(args)
91+
# 通知系统任务完成
92+
self.work_queue.task_done()
93+
except:
94+
break
95+
96+
97+
# 具体要做的任务
98+
def do_job(args):
99+
time.sleep(0.1) # 模拟处理时间
100+
print(threading.current_thread())
101+
print(list(args))
102+
103+
104+
if __name__ == '__main__':
105+
start = time.time()
106+
work_manager = WorkManager(100, 10)
107+
# 或者work_manager = WorkManager(10000, 20)
108+
work_manager.wait_allcomplete()
109+
end = time.time()
110+
print("cost all time: %s" % (end - start))
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
import queue
2+
import threading
3+
import time
4+
5+
'''
6+
线程同步队列queue
7+
8+
python2.x中提供的Queue, Python3.x中提供的是queue
9+
10+
见import queue.
11+
12+
Python的queue模块中提供了同步的、线程安全的队列类,包括FIFO(先入先出)队列Queue,LIFO(后入先出)队列LifoQueue,和优先级队列PriorityQueue。这些队列都实现了锁原语,能够在多线程中直接使用。可以使用队列来实现线程间的同步。
13+
14+
queue模块中的常用方法:
15+
16+
queue.qsize() 返回队列的大小
17+
queue.empty() 如果队列为空,返回True,反之False
18+
queue.full() 如果队列满了,返回True,反之False
19+
queue.full 与 maxsize 大小对应
20+
queue.get([block[, timeout]])获取队列,timeout等待时间
21+
queue.get_nowait() 相当Queue.get(False)
22+
queue.put(item) 写入队列,timeout等待时间
23+
queue.put_nowait(item) 相当Queue.put(item, False)
24+
queue.task_done() 在完成一项工作之后,Queue.task_done()函数向任务已经完成的队列发送一个信号
25+
queue.join() 实际上意味着等到队列为空,再执行别的操作
26+
'''
27+
28+
29+
exitFlag = 0
30+
31+
class myThread(threading.Thread):
32+
def __init__(self, threadID, name, queue):
33+
threading.Thread.__init__(self)
34+
self.threadID = threadID
35+
self.name = name
36+
self.queue = queue
37+
38+
def run(self):
39+
print("Starting " + self.name)
40+
process_data(self.name, self.queue)
41+
print("Exiting " + self.name)
42+
43+
def process_data(threadName, queue):
44+
while not exitFlag:
45+
queueLock.acquire()
46+
if not workQueue.empty():
47+
data = queue.get()
48+
queueLock.release()
49+
print("%s processing %s" % (threadName, data))
50+
else:
51+
queueLock.release()
52+
time.sleep(1)
53+
54+
threadList = ["Thread-1", "Thread-2", "Thread-3"]
55+
nameList = ["One", "Two", "Three", "Four", "Five"]
56+
queueLock = threading.Lock()
57+
workQueue = queue.Queue(10)
58+
threads = []
59+
threadID = 1
60+
61+
# 创建新线程
62+
for tName in threadList:
63+
thread = myThread(threadID, tName, workQueue)
64+
thread.start()
65+
threads.append(thread)
66+
threadID += 1
67+
68+
# 填充队列
69+
queueLock.acquire()
70+
for word in nameList:
71+
workQueue.put(word)
72+
queueLock.release()
73+
74+
# 等待队列清空
75+
while not workQueue.empty():
76+
pass
77+
78+
# 通知线程是时候退出
79+
exitFlag = 1
80+
81+
# 等待所有线程完成
82+
for t in threads:
83+
t.join()
84+
print("Exiting Main Thread")
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
import threading
2+
import time
3+
4+
'''
5+
​ python主要是通过thread和threading这两个模块来实现多线程支持。python的thread模块是比较底层的模块,python的threading模块是对thread做了一些封装,可以更加方便的被使用。但是python(cpython)由于GIL的存在无法使用threading充分利用CPU资源,如果想充分发挥多核CPU的计算能力需要使用multiprocessing模块(Windows下使用会有诸多问题)。
6+
7+
​ python3.x中已经摒弃了Python2.x中采用函数式thread模块中的start_new_thread()函数来产生新线程方式。
8+
9+
​ python3.x中通过threading模块创建新的线程有两种方法:一种是通过threading.Thread(Target=executable Method)-即传递给Thread对象一个可执行方法(或对象);第二种是继承threading.Thread定义子类并重写run()方法。第二种方法中,唯一必须重写的方法是run()
10+
'''
11+
12+
def target():
13+
print("the current threading %s is runing" % (threading.current_thread().name))
14+
time.sleep(1)
15+
print("the current threading %s is ended" % (threading.current_thread().name))
16+
17+
18+
print("the current threading %s is runing" % (threading.current_thread().name))
19+
## 属于线程t的部分
20+
t = threading.Thread(target=target)
21+
t.start()
22+
## 属于线程t的部分
23+
t.join() # join是阻塞当前线程(此处的当前线程时主线程) 主线程直到Thread-1结束之后才结束
24+
print("the current threading %s is ended" % (threading.current_thread().name))
Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
import threading
2+
import time
3+
4+
'''
5+
线程间同步
6+
​ 如果多个线程共同对某个数据修改,则可能出现不可预料的结果,为了保证数据的正确性,需要对多个线程进行同步。
7+
8+
​ 使用Thread对象的Lock和Rlock可以实现简单的线程同步,这两个对象都有acquire方法和release方法,对于那些需要每次只允许一个线程操作的数据,可以将其操作放到acquire和release方法之间。
9+
10+
​ 需要注意的是,Python有一个GIL(Global Interpreter Lock)机制,任何线程在运行之前必须获取这个全局锁才能执行,每当执行完100条字节码,全局锁才会释放,切换到其他线程执行。
11+
12+
多线程实现同步有四种方式:
13+
14+
锁机制,信号量,条件判断和同步队列。
15+
16+
下面我主要关注两种同步机制:锁机制和同步队列。
17+
18+
(1)锁机制
19+
20+
threading的Lock类,用该类的acquire函数进行加锁,用realease函数进行解锁
21+
'''
22+
23+
class myThread(threading.Thread): # 继承父类threading.Thread
24+
def __init__(self, threadID, name, counter):
25+
threading.Thread.__init__(self)
26+
self.threadID = threadID
27+
self.name = name
28+
self.counter = counter
29+
30+
def run(self): # 把要执行的代码写到run函数里面 线程在创建后会直接运行run函数
31+
print("Starting " + self.name)
32+
print_time(self.name, self.counter, 5)
33+
print("Exiting " + self.name)
34+
35+
36+
def print_time(threadName, delay, counter):
37+
while counter:
38+
time.sleep(delay)
39+
print("%s process at: %s" % (threadName, time.ctime(time.time())))
40+
counter -= 1
41+
42+
43+
# 创建新线程
44+
thread1 = myThread(1, "Thread-1", 1)
45+
thread2 = myThread(2, "Thread-2", 2)
46+
47+
# 开启线程
48+
thread1.start()
49+
thread2.start()
50+
51+
# 等待线程结束
52+
thread1.join()
53+
thread2.join()
54+
55+
print("Exiting Main Thread")
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
import time
2+
import threading
3+
import queue
4+
5+
6+
class Worker(threading.Thread):
7+
def __init__(self, name, queue):
8+
threading.Thread.__init__(self)
9+
self.queue = queue
10+
self.start() # 执行run()
11+
12+
def run(self):
13+
# 循环,保证接着跑下一个任务
14+
while True:
15+
# 队列为空则退出线程
16+
if self.queue.empty():
17+
break
18+
# 获取一个队列数据
19+
foo = self.queue.get()
20+
# 延时1S模拟你要做的事情
21+
time.sleep(1)
22+
# 打印
23+
print('{0} process data: {1} \r\n'.format(self.getName(), str(foo)))
24+
# 任务完成
25+
self.queue.task_done()
26+
27+
28+
# 队列
29+
queue = queue.Queue()
30+
# 加入100个任务队列
31+
for i in range(100):
32+
queue.put(i)
33+
# 开10个线程
34+
for i in range(10):
35+
threadName = 'Thread' + str(i)
36+
Worker(threadName, queue)
37+
# 所有线程执行完毕后关闭
38+
queue.join()
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
import asyncio
2+
import re
3+
4+
async def browser(host, port=80):
5+
# 连接host
6+
reader, writer = await asyncio.open_connection(host, port)
7+
print(host, port, '连接成功!')
8+
9+
# 发起 / 主页请求(HTTP协议)
10+
# 发送请求头必须是两个空行
11+
index_get = 'GET {} HTTP/1.1\r\nHost:{}\r\n\r\n'.format('/', host)
12+
writer.write(index_get.encode())
13+
14+
await writer.drain() # 等待向连接写完数据(请求发送完成)
15+
16+
# 开始读取响应的数据报头
17+
while True:
18+
line = await reader.readline() # 等待读取响应数据
19+
if line == b'\r\n':
20+
break
21+
22+
print(host, '<header>', line)
23+
24+
# 读取响应的数据body
25+
body = await reader.read()
26+
print(host, '<content>', body)
27+
28+
29+
if __name__ == '__main__':
30+
loop = asyncio.get_event_loop()
31+
tasks = [browser(host) for host in ['www.dushu.com', 'www.sina.com.cn', 'www.baidu.com']]
32+
loop.run_until_complete(asyncio.wait(tasks))
33+
loop.close()
34+
35+
print('---over---')
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
import asyncio
2+
3+
@asyncio.coroutine
4+
def wget(host):
5+
print('wget %s...' % host)
6+
connect = asyncio.open_connection(host, 80)
7+
reader, writer = yield from connect
8+
header = 'GET / HTTP/1.0\r\nHost: %s\r\n\r\n' % host
9+
writer.write(header.encode('utf-8'))
10+
yield from writer.drain()
11+
while True:
12+
line = yield from reader.readline()
13+
if line == b'\r\n':
14+
break
15+
print('%s header > %s' % (host, line.decode('utf-8').rstrip()))
16+
# Ignore the body, close the socket
17+
writer.close()
18+
19+
loop = asyncio.get_event_loop()
20+
tasks = [wget(host) for host in ['www.sina.com.cn', 'www.sohu.com', 'www.163.com']]
21+
loop.run_until_complete(asyncio.wait(tasks))
22+
loop.close()

0 commit comments

Comments
 (0)
X Tutup