@@ -89,6 +89,8 @@ def __init__(self,
8989 self .is_idle = False
9090 self .httpd = None
9191 self .__lock = Lock ()
92+ self .__threads = []
93+ """:type: list[Thread]"""
9294
9395 def start_polling (self , poll_interval = 0.0 , timeout = 10 , network_delay = 2 ):
9496 """
@@ -120,9 +122,10 @@ def _init_thread(self, target, name, *args, **kwargs):
120122 thr = Thread (target = self ._thread_wrapper , name = name ,
121123 args = (target ,) + args , kwargs = kwargs )
122124 thr .start ()
125+ self .__threads .append (thr )
123126
124127 def _thread_wrapper (self , target , * args , ** kwargs ):
125- thr_name = current_thread ()
128+ thr_name = current_thread (). name
126129 self .logger .debug ('{0} - started' .format (thr_name ))
127130 try :
128131 target (* args , ** kwargs )
@@ -160,20 +163,10 @@ def start_webhook(self,
160163 if not self .running :
161164 self .running = True
162165
163- # Create Thread objects
164- dispatcher_thread = Thread (target = self .dispatcher .start ,
165- name = "dispatcher" )
166- updater_thread = Thread (target = self ._start_webhook ,
167- name = "updater" ,
168- args = (listen ,
169- port ,
170- url_path ,
171- cert ,
172- key ))
173-
174- # Start threads
175- dispatcher_thread .start ()
176- updater_thread .start ()
166+ # Create & start threads
167+ self ._init_thread (self .dispatcher .start , "dispatcher" ),
168+ self ._init_thread (self ._start_webhook , "updater" , listen ,
169+ port , url_path , cert , key )
177170
178171 # Return the update queue so the main thread can insert updates
179172 return self .update_queue
@@ -221,8 +214,6 @@ def _start_polling(self, poll_interval, timeout, network_delay):
221214
222215 sleep (cur_interval )
223216
224- self .logger .info ('Updater thread stopped' )
225-
226217 @staticmethod
227218 def _increase_poll_interval (current_interval ):
228219 # increase waiting times on subsequent errors up to 30secs
@@ -266,7 +257,6 @@ def _start_webhook(self, listen, port, url_path, cert, key):
266257 raise TelegramError ('SSL Certificate invalid' )
267258
268259 self .httpd .serve_forever (poll_interval = 1 )
269- self .logger .info ('Updater thread stopped' )
270260
271261 def stop (self ):
272262 """
@@ -276,25 +266,49 @@ def stop(self):
276266 self .job_queue .stop ()
277267 with self .__lock :
278268 if self .running :
279- self .running = False
280269 self .logger .info ('Stopping Updater and Dispatcher...' )
281- self .logger .debug ('This might take a long time if you set a '
282- 'high value as polling timeout.' )
283-
284- if self .httpd :
285- self .logger .info (
286- 'Waiting for current webhook connection to be '
287- 'closed... Send a Telegram message to the bot to exit '
288- 'immediately.' )
289- self .httpd .shutdown ()
290- self .httpd = None
291-
292- self .logger .debug ("Requesting Dispatcher to stop..." )
293- self .dispatcher .stop ()
294- while dispatcher .running_async > 0 :
295- sleep (1 )
296-
297- self .logger .debug ("Dispatcher stopped." )
270+
271+ self .running = False
272+
273+ self ._stop_httpd ()
274+ self ._stop_dispatcher ()
275+ self ._join_threads ()
276+ # async threads must be join()ed only after the dispatcher
277+ # thread was joined, otherwise we can still have new async
278+ # threads dispatched
279+ self ._join_async_threads ()
280+
281+ def _stop_httpd (self ):
282+ if self .httpd :
283+ self .logger .info (
284+ 'Waiting for current webhook connection to be '
285+ 'closed... Send a Telegram message to the bot to exit '
286+ 'immediately.' )
287+ self .httpd .shutdown ()
288+ self .httpd = None
289+
290+ def _stop_dispatcher (self ):
291+ self .logger .debug ("Requesting Dispatcher to stop..." )
292+ self .dispatcher .stop ()
293+
294+ def _join_async_threads (self ):
295+ with dispatcher .async_lock :
296+ threads = list (dispatcher .async_threads )
297+ total = len (threads )
298+ for i , thr in enumerate (threads ):
299+ self .logger .info (
300+ 'Waiting for async thread {0}/{1} to end' .format (i , total ))
301+ thr .join ()
302+ self .logger .debug (
303+ 'async thread {0}/{1} has ended' .format (i , total ))
304+
305+ def _join_threads (self ):
306+ for thr in self .__threads :
307+ self .logger .info (
308+ 'Waiting for {0} thread to end' .format (thr .name ))
309+ thr .join ()
310+ self .logger .debug ('{0} thread has ended' .format (thr .name ))
311+ self .__threads = []
298312
299313 def signal_handler (self , signum , frame ):
300314 self .is_idle = False
0 commit comments