X Tutup
Skip to content

Commit 34b91f5

Browse files
committed
properly lock updater and dispatcher start/stop methods
1 parent fc9456e commit 34b91f5

File tree

2 files changed

+81
-66
lines changed

2 files changed

+81
-66
lines changed

telegram/dispatcher.py

Lines changed: 29 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -139,6 +139,7 @@ def __init__(self, bot, update_queue, workers=4):
139139
self.error_handlers = []
140140
self.logger = logging.getLogger(__name__)
141141
self.running = False
142+
self.__lock = Lock()
142143

143144
global semaphore
144145
if not semaphore:
@@ -159,41 +160,46 @@ def start(self):
159160
the update queue.
160161
"""
161162

162-
self.running = True
163-
self.logger.info('Dispatcher thread started')
163+
self.__lock.acquire()
164+
if not self.running:
165+
self.running = True
166+
self.__lock.release()
167+
self.logger.info('Dispatcher thread started')
164168

165-
while True:
166-
update = None
169+
while True:
170+
update = None
167171

168-
try:
169-
# Pop update from update queue.
170-
# Blocks if no updates are available.
171-
update = self.update_queue.get()
172+
try:
173+
# Pop update from update queue.
174+
# Blocks if no updates are available.
175+
update = self.update_queue.get()
172176

173-
if type(update) is self._Stop:
174-
break
177+
if type(update) is self._Stop:
178+
break
175179

176-
self.processUpdate(update)
177-
self.logger.debug('Processed Update: %s' % update)
180+
self.processUpdate(update)
181+
self.logger.debug('Processed Update: %s' % update)
178182

179-
# Dispatch any errors
180-
except TelegramError as te:
181-
self.logger.warn("Error was raised while processing Update.")
182-
self.dispatchError(update, te)
183-
184-
# All other errors should not stop the thread, so just print them
185-
except:
186-
print_exc()
183+
# Dispatch any errors
184+
except TelegramError as te:
185+
self.logger.warn("Error was raised while processing Update.")
186+
self.dispatchError(update, te)
187187

188+
# All other errors should not stop the thread, so just print them
189+
except:
190+
print_exc()
191+
else:
192+
self.__lock.release()
188193
self.logger.info('Dispatcher thread stopped')
189194

190195
def stop(self):
191196
"""
192197
Stops the thread
193198
"""
194-
if self.running:
195-
self.running = False
196-
self.update_queue.put(self._Stop())
199+
with self.__lock:
200+
if self.running:
201+
self.running = False
202+
self.update_queue.put(self._Stop())
197203

198204
def processUpdate(self, update):
199205
"""

telegram/updater.py

Lines changed: 52 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@
2424
import logging
2525
import os
2626
import ssl
27-
from threading import Thread
27+
from threading import Thread, Lock
2828
from time import sleep
2929
import subprocess
3030
from signal import signal, SIGINT, SIGTERM, SIGABRT
@@ -81,6 +81,7 @@ def __init__(self, token, base_url=None, workers=4):
8181
self.running = False
8282
self.is_idle = False
8383
self.httpd = None
84+
self.__lock = Lock()
8485

8586
def start_polling(self, poll_interval=0.0, timeout=10, network_delay=2):
8687
"""
@@ -96,22 +97,24 @@ def start_polling(self, poll_interval=0.0, timeout=10, network_delay=2):
9697
Queue: The update queue that can be filled from the main thread
9798
"""
9899

99-
# Create Thread objects
100-
dispatcher_thread = Thread(target=self.dispatcher.start,
101-
name="dispatcher")
102-
event_handler_thread = Thread(target=self._start_polling,
103-
name="updater",
104-
args=(poll_interval, timeout,
105-
network_delay))
100+
with self.__lock:
101+
if not self.running:
102+
self.running = True
106103

107-
self.running = True
104+
# Create Thread objects
105+
dispatcher_thread = Thread(target=self.dispatcher.start,
106+
name="dispatcher")
107+
updater_thread = Thread(target=self._start_polling,
108+
name="updater",
109+
args=(poll_interval, timeout,
110+
network_delay))
108111

109-
# Start threads
110-
dispatcher_thread.start()
111-
event_handler_thread.start()
112+
# Start threads
113+
dispatcher_thread.start()
114+
updater_thread.start()
112115

113-
# Return the update queue so the main thread can insert updates
114-
return self.update_queue
116+
# Return the update queue so the main thread can insert updates
117+
return self.update_queue
115118

116119
def start_webhook(self,
117120
listen='127.0.0.1',
@@ -137,21 +140,23 @@ def start_webhook(self,
137140
Queue: The update queue that can be filled from the main thread
138141
"""
139142

140-
# Create Thread objects
141-
dispatcher_thread = Thread(target=self.dispatcher.start,
142-
name="dispatcher")
143-
event_handler_thread = Thread(target=self._start_webhook,
144-
name="updater",
145-
args=(listen, port, url_path, cert, key))
143+
with self.__lock:
144+
if not self.running:
145+
self.running = True
146146

147-
self.running = True
147+
# Create Thread objects
148+
dispatcher_thread = Thread(target=self.dispatcher.start,
149+
name="dispatcher")
150+
updater_thread = Thread(target=self._start_webhook,
151+
name="updater",
152+
args=(listen, port, url_path, cert, key))
148153

149-
# Start threads
150-
dispatcher_thread.start()
151-
event_handler_thread.start()
154+
# Start threads
155+
dispatcher_thread.start()
156+
updater_thread.start()
152157

153-
# Return the update queue so the main thread can insert updates
154-
return self.update_queue
158+
# Return the update queue so the main thread can insert updates
159+
return self.update_queue
155160

156161
def _start_polling(self, poll_interval, timeout, network_delay):
157162
"""
@@ -238,24 +243,28 @@ def stop(self):
238243
"""
239244
Stops the polling/webhook thread and the dispatcher
240245
"""
241-
self.logger.info('Stopping Updater and Dispatcher...')
242-
self.logger.debug('This might take a long time if you set a high value'
243-
' as polling timeout.')
244-
self.running = False
245-
246-
if self.httpd:
247-
self.logger.info(
248-
'Waiting for current webhook connection to be closed... '
249-
'Send a Telegram message to the bot to exit immediately.')
250-
self.httpd.shutdown()
251-
self.httpd = None
252-
253-
self.logger.debug("Requesting Dispatcher to stop...")
254-
self.dispatcher.stop()
255-
while dispatcher.running_async > 0:
256-
sleep(1)
257246

258-
self.logger.debug("Dispatcher stopped.")
247+
with self.__lock:
248+
if self.running:
249+
self.running = False
250+
self.logger.info('Stopping Updater and Dispatcher...')
251+
self.logger.debug('This might take a long time if you set a '
252+
'high value as polling timeout.')
253+
254+
if self.httpd:
255+
self.logger.info(
256+
'Waiting for current webhook connection to be '
257+
'closed... Send a Telegram message to the bot to exit '
258+
'immediately.')
259+
self.httpd.shutdown()
260+
self.httpd = None
261+
262+
self.logger.debug("Requesting Dispatcher to stop...")
263+
self.dispatcher.stop()
264+
while dispatcher.running_async > 0:
265+
sleep(1)
266+
267+
self.logger.debug("Dispatcher stopped.")
259268

260269
def signal_handler(self, signum, frame):
261270
self.is_idle = False

0 commit comments

Comments
 (0)
X Tutup