@@ -89,6 +89,8 @@ def __init__(self,
8989 self .is_idle = False
9090 self .httpd = None
9191 self .__lock = Lock ()
92+ self .__threads = []
93+ """:type: list[Thread]"""
9294
9395 def start_polling (self , poll_interval = 0.0 , timeout = 10 , network_delay = 2 ):
9496 """
@@ -120,6 +122,7 @@ def _init_thread(self, target, name, *args, **kwargs):
120122 thr = Thread (target = self ._thread_wrapper , name = name ,
121123 args = (target ,) + args , kwargs = kwargs )
122124 thr .start ()
125+ self .__threads .append (thr )
123126
124127 def _thread_wrapper (self , target , * args , ** kwargs ):
125128 thr_name = current_thread ().name
@@ -211,8 +214,6 @@ def _start_polling(self, poll_interval, timeout, network_delay):
211214
212215 sleep (cur_interval )
213216
214- self .logger .info ('Updater thread stopped' )
215-
216217 @staticmethod
217218 def _increase_poll_interval (current_interval ):
218219 # increase waiting times on subsequent errors up to 30secs
@@ -256,7 +257,6 @@ def _start_webhook(self, listen, port, url_path, cert, key):
256257 raise TelegramError ('SSL Certificate invalid' )
257258
258259 self .httpd .serve_forever (poll_interval = 1 )
259- self .logger .info ('Updater thread stopped' )
260260
261261 def stop (self ):
262262 """
@@ -266,25 +266,49 @@ def stop(self):
266266 self .job_queue .stop ()
267267 with self .__lock :
268268 if self .running :
269- self .running = False
270269 self .logger .info ('Stopping Updater and Dispatcher...' )
271- self .logger .debug ('This might take a long time if you set a '
272- 'high value as polling timeout.' )
273-
274- if self .httpd :
275- self .logger .info (
276- 'Waiting for current webhook connection to be '
277- 'closed... Send a Telegram message to the bot to exit '
278- 'immediately.' )
279- self .httpd .shutdown ()
280- self .httpd = None
281-
282- self .logger .debug ("Requesting Dispatcher to stop..." )
283- self .dispatcher .stop ()
284- while dispatcher .running_async > 0 :
285- sleep (1 )
286-
287- self .logger .debug ("Dispatcher stopped." )
270+
271+ self .running = False
272+
273+ self ._stop_httpd ()
274+ self ._stop_dispatcher ()
275+ self ._join_threads ()
276+ # async threads must be join()ed only after the dispatcher
277+ # thread was joined, otherwise we can still have new async
278+ # threads dispatched
279+ self ._join_async_threads ()
280+
281+ def _stop_httpd (self ):
282+ if self .httpd :
283+ self .logger .info (
284+ 'Waiting for current webhook connection to be '
285+ 'closed... Send a Telegram message to the bot to exit '
286+ 'immediately.' )
287+ self .httpd .shutdown ()
288+ self .httpd = None
289+
290+ def _stop_dispatcher (self ):
291+ self .logger .debug ("Requesting Dispatcher to stop..." )
292+ self .dispatcher .stop ()
293+
294+ def _join_async_threads (self ):
295+ with dispatcher .async_lock :
296+ threads = list (dispatcher .async_threads )
297+ total = len (threads )
298+ for i , thr in enumerate (threads ):
299+ self .logger .info (
300+ 'Waiting for async thread {0}/{1} to end' .format (i , total ))
301+ thr .join ()
302+ self .logger .debug (
303+ 'async thread {0}/{1} has ended' .format (i , total ))
304+
305+ def _join_threads (self ):
306+ for thr in self .__threads :
307+ self .logger .info (
308+ 'Waiting for {0} thread to end' .format (thr .name ))
309+ thr .join ()
310+ self .logger .debug ('{0} thread has ended' .format (thr .name ))
311+ self .__threads = []
288312
289313 def signal_handler (self , signum , frame ):
290314 self .is_idle = False
0 commit comments