|
| 1 | +''' |
| 2 | +4.1 传统多线程问题? |
| 3 | + 传统多线程方案会使用“即时创建, 即时销毁”的策略。尽管与创建进程相比,创建线程的时间已经大大的缩短,但是如果提交给线程的任务是执行时间较短,而且执行次数极其频繁,那么服务器将处于不停的创建线程,销毁线程的状态。 |
| 4 | +
|
| 5 | + 一个线程的运行时间可以分为3部分:线程的启动时间、线程体的运行时间和线程的销毁时间。在多线程处理的情景中,如果线程不能被重用,就意味着每次创建都需要经过启动、销毁和运行3个过程。这必然会增加系统相应的时间,降低了效率。 |
| 6 | +
|
| 7 | +有没有一种高效的解决方案呢? —— 线程池 |
| 8 | +
|
| 9 | +4.2 线程池基本原理: |
| 10 | + 我们把任务放进队列中去,然后开N个线程,每个线程都去队列中取一个任务,执行完了之后告诉系统说我执行完了,然后接着去队列中取下一个任务,直至队列中所有任务取空,退出线程。 |
| 11 | +
|
| 12 | +使用线程池: |
| 13 | + 由于线程预先被创建并放入线程池中,同时处理完当前任务之后并不销毁而是被安排处理下一个任务,因此能够避免多次创建线程,从而节省线程创建和销毁的开销,能带来更好的性能和系统稳定性。 |
| 14 | +
|
| 15 | +线程池要设置为多少? |
| 16 | +
|
| 17 | +服务器CPU核数有限,能够同时并发的线程数有限,并不是开得越多越好,以及线程切换是有开销的,如果线程切换过于频繁,反而会使性能降低 |
| 18 | +
|
| 19 | +线程执行过程中,计算时间分为两部分: |
| 20 | +
|
| 21 | +CPU计算,占用CPU |
| 22 | +不需要CPU计算,不占用CPU,等待IO返回,比如recv(), accept(), sleep()等操作,具体操作就是比如 |
| 23 | +访问cache、RPC调用下游service、访问DB,等需要网络调用的操作 |
| 24 | +那么如果计算时间占50%, 等待时间50%,那么为了利用率达到最高,可以开2个线程: |
| 25 | +假如工作时间是2秒, CPU计算完1秒后,线程等待IO的时候需要1秒,此时CPU空闲了,这时就可以切换到另外一个线程,让CPU工作1秒后,线程等待IO需要1秒,此时CPU又可以切回去,第一个线程这时刚好完成了1秒的IO等待,可以让CPU继续工作,就这样循环的在两个线程之前切换操作。 |
| 26 | +
|
| 27 | +那么如果计算时间占20%, 等待时间80%,那么为了利用率达到最高,可以开5个线程: |
| 28 | +可以想象成完成任务需要5秒,CPU占用1秒,等待时间4秒,CPU在线程等待时,可以同时再激活4个线程,这样就把CPU和IO等待时间,最大化的重叠起来 |
| 29 | +
|
| 30 | +抽象一下,计算线程数设置的公式就是: |
| 31 | +N核服务器,通过执行业务的单线程分析出本地计算时间为x,等待时间为y,则工作线程数(线程池线程数)设置为 N*(x+y)/x,能让CPU的利用率最大化。 |
| 32 | +由于有GIL的影响,python只能使用到1个核,所以这里设置N=1 |
| 33 | +
|
| 34 | +''' |
| 35 | + |
| 36 | +import queue |
| 37 | +import threading |
| 38 | +import time |
| 39 | + |
| 40 | + |
| 41 | +# 声明线程池管理类 |
| 42 | +class WorkManager(object): |
| 43 | + def __init__(self, work_num=1000, thread_num=2): |
| 44 | + self.work_queue = queue.Queue() # 任务队列 |
| 45 | + self.threads = [] # 线程池 |
| 46 | + self.__init_work_queue(work_num) # 初始化任务队列,添加任务 |
| 47 | + self.__init_thread_pool(thread_num) # 初始化线程池,创建线程 |
| 48 | + |
| 49 | + """ |
| 50 | + 初始化线程池 |
| 51 | + """ |
| 52 | + def __init_thread_pool(self, thread_num): |
| 53 | + for i in range(thread_num): |
| 54 | + # 创建工作线程(线程池中的对象) |
| 55 | + self.threads.append(Work(self.work_queue)) |
| 56 | + |
| 57 | + """ |
| 58 | + 初始化工作队列 |
| 59 | + """ |
| 60 | + def __init_work_queue(self, jobs_num): |
| 61 | + for i in range(jobs_num): |
| 62 | + self.add_job(do_job, i) |
| 63 | + |
| 64 | + """ |
| 65 | + 添加一项工作入队 |
| 66 | + """ |
| 67 | + def add_job(self, func, *args): |
| 68 | + self.work_queue.put((func, list(args))) # 任务入队,Queue内部实现了同步机制 |
| 69 | + |
| 70 | + """ |
| 71 | + 等待所有线程运行完毕 |
| 72 | + """ |
| 73 | + def wait_allcomplete(self): |
| 74 | + for item in self.threads: |
| 75 | + if item.isAlive(): item.join() |
| 76 | + |
| 77 | + |
| 78 | +class Work(threading.Thread): |
| 79 | + def __init__(self, work_queue): |
| 80 | + threading.Thread.__init__(self) |
| 81 | + self.work_queue = work_queue |
| 82 | + self.start() |
| 83 | + |
| 84 | + def run(self): |
| 85 | + # 死循环,从而让创建的线程在一定条件下关闭退出 |
| 86 | + while True: |
| 87 | + try: |
| 88 | + # 任务异步出队,Queue内部实现了同步机制 |
| 89 | + do, args = self.work_queue.get(block=False) |
| 90 | + do(args) |
| 91 | + # 通知系统任务完成 |
| 92 | + self.work_queue.task_done() |
| 93 | + except: |
| 94 | + break |
| 95 | + |
| 96 | + |
| 97 | +# 具体要做的任务 |
| 98 | +def do_job(args): |
| 99 | + time.sleep(0.1) # 模拟处理时间 |
| 100 | + print(threading.current_thread()) |
| 101 | + print(list(args)) |
| 102 | + |
| 103 | + |
| 104 | +if __name__ == '__main__': |
| 105 | + start = time.time() |
| 106 | + work_manager = WorkManager(100, 10) |
| 107 | + # 或者work_manager = WorkManager(10000, 20) |
| 108 | + work_manager.wait_allcomplete() |
| 109 | + end = time.time() |
| 110 | + print("cost all time: %s" % (end - start)) |
0 commit comments