X Tutup
Skip to content

Commit d415a60

Browse files
committed
join() threads instead of guessing if they're running
- new book keeping of dispatcher's async threads so they can be joined when stopping - updater, webhook & dispatcher threads are now kept on Updater.__threads so they can be joined at the end refs python-telegram-bot#175
1 parent fd7baa2 commit d415a60

File tree

2 files changed

+50
-27
lines changed

2 files changed

+50
-27
lines changed

telegram/dispatcher.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import logging
2323
from functools import wraps
2424
from inspect import getargspec
25-
from threading import Thread, BoundedSemaphore, Lock, Event
25+
from threading import Thread, BoundedSemaphore, Lock, Event, current_thread
2626
from re import match
2727
from time import sleep
2828

@@ -33,7 +33,8 @@
3333
logging.getLogger(__name__).addHandler(H)
3434

3535
semaphore = None
36-
running_async = 0
36+
async_threads = set()
37+
""":type: set[Thread]"""
3738
async_lock = Lock()
3839

3940

@@ -58,23 +59,21 @@ def pooled(*pargs, **kwargs):
5859
"""
5960
A wrapper to run a thread in a thread pool
6061
"""
61-
global running_async, async_lock
6262
result = func(*pargs, **kwargs)
6363
semaphore.release()
6464
with async_lock:
65-
running_async -= 1
65+
async_threads.remove(current_thread())
6666
return result
6767

6868
@wraps(func)
6969
def async_func(*pargs, **kwargs):
7070
"""
7171
A wrapper to run a function in a thread
7272
"""
73-
global running_async, async_lock
7473
thread = Thread(target=pooled, args=pargs, kwargs=kwargs)
7574
semaphore.acquire()
7675
with async_lock:
77-
running_async += 1
76+
async_threads.add(thread)
7877
thread.start()
7978
return thread
8079

telegram/updater.py

Lines changed: 45 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ def __init__(self,
8989
self.is_idle = False
9090
self.httpd = None
9191
self.__lock = Lock()
92+
self.__threads = []
93+
""":type: list[Thread]"""
9294

9395
def start_polling(self, poll_interval=0.0, timeout=10, network_delay=2):
9496
"""
@@ -120,6 +122,7 @@ def _init_thread(self, target, name, *args, **kwargs):
120122
thr = Thread(target=self._thread_wrapper, name=name,
121123
args=(target,) + args, kwargs=kwargs)
122124
thr.start()
125+
self.__threads.append(thr)
123126

124127
def _thread_wrapper(self, target, *args, **kwargs):
125128
thr_name = current_thread().name
@@ -211,8 +214,6 @@ def _start_polling(self, poll_interval, timeout, network_delay):
211214

212215
sleep(cur_interval)
213216

214-
self.logger.info('Updater thread stopped')
215-
216217
@staticmethod
217218
def _increase_poll_interval(current_interval):
218219
# increase waiting times on subsequent errors up to 30secs
@@ -256,7 +257,6 @@ def _start_webhook(self, listen, port, url_path, cert, key):
256257
raise TelegramError('SSL Certificate invalid')
257258

258259
self.httpd.serve_forever(poll_interval=1)
259-
self.logger.info('Updater thread stopped')
260260

261261
def stop(self):
262262
"""
@@ -266,25 +266,49 @@ def stop(self):
266266
self.job_queue.stop()
267267
with self.__lock:
268268
if self.running:
269-
self.running = False
270269
self.logger.info('Stopping Updater and Dispatcher...')
271-
self.logger.debug('This might take a long time if you set a '
272-
'high value as polling timeout.')
273-
274-
if self.httpd:
275-
self.logger.info(
276-
'Waiting for current webhook connection to be '
277-
'closed... Send a Telegram message to the bot to exit '
278-
'immediately.')
279-
self.httpd.shutdown()
280-
self.httpd = None
281-
282-
self.logger.debug("Requesting Dispatcher to stop...")
283-
self.dispatcher.stop()
284-
while dispatcher.running_async > 0:
285-
sleep(1)
286-
287-
self.logger.debug("Dispatcher stopped.")
270+
271+
self.running = False
272+
273+
self._stop_httpd()
274+
self._stop_dispatcher()
275+
self._join_threads()
276+
# async threads must be join()ed only after the dispatcher
277+
# thread was joined, otherwise we can still have new async
278+
# threads dispatched
279+
self._join_async_threads()
280+
281+
def _stop_httpd(self):
282+
if self.httpd:
283+
self.logger.info(
284+
'Waiting for current webhook connection to be '
285+
'closed... Send a Telegram message to the bot to exit '
286+
'immediately.')
287+
self.httpd.shutdown()
288+
self.httpd = None
289+
290+
def _stop_dispatcher(self):
291+
self.logger.debug("Requesting Dispatcher to stop...")
292+
self.dispatcher.stop()
293+
294+
def _join_async_threads(self):
295+
with dispatcher.async_lock:
296+
threads = list(dispatcher.async_threads)
297+
total = len(threads)
298+
for i, thr in enumerate(threads):
299+
self.logger.info(
300+
'Waiting for async thread {0}/{1} to end'.format(i, total))
301+
thr.join()
302+
self.logger.debug(
303+
'async thread {0}/{1} has ended'.format(i, total))
304+
305+
def _join_threads(self):
306+
for thr in self.__threads:
307+
self.logger.info(
308+
'Waiting for {0} thread to end'.format(thr.name))
309+
thr.join()
310+
self.logger.debug('{0} thread has ended'.format(thr.name))
311+
self.__threads = []
288312

289313
def signal_handler(self, signum, frame):
290314
self.is_idle = False

0 commit comments

Comments
 (0)
X Tutup