X Tutup
Skip to content

Commit e2cc8db

Browse files
committed
Merge branch 'master' into inlinebots
2 parents edf4e8a + 04c8681 commit e2cc8db

File tree

7 files changed

+179
-110
lines changed

7 files changed

+179
-110
lines changed

README.rst

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ A Python wrapper around the Telegram Bot API.
3636
:alt: Coveralls
3737

3838
.. image:: https://img.shields.io/badge/Telegram-Group-blue.svg
39-
:target: https://telegram.me/joinchat/ALnA-AJQm5SV4thqGgN9KA
39+
:target: https://telegram.me/joinchat/ALnA-AJQm5TcNEiy2G_4cQ
4040
:alt: Telegram Group
4141

4242
=================
@@ -412,7 +412,7 @@ You may copy, distribute and modify the software provided that modifications are
412412
_`Contact`
413413
==========
414414

415-
Feel free to join to our `Telegram group <https://telegram.me/joinchat/ALnA-AJQm5SV4thqGgN9KA>`_.
415+
Feel free to join to our `Telegram group <https://telegram.me/joinchat/ALnA-AJQm5TcNEiy2G_4cQ>`_.
416416

417417
=======
418418
_`TODO`

examples/legacy/echobot.py

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,9 @@
66

77
import logging
88
import telegram
9+
from telegram.error import NetworkError, Unauthorized
910
from time import sleep
1011

11-
try:
12-
from urllib.error import URLError
13-
except ImportError:
14-
from urllib2 import URLError # python 2
15-
1612

1713
def main():
1814
# Telegram Bot Authorization Token
@@ -31,18 +27,11 @@ def main():
3127
while True:
3228
try:
3329
update_id = echo(bot, update_id)
34-
except telegram.TelegramError as e:
35-
# These are network problems with Telegram.
36-
if e.message in ("Bad Gateway", "Timed out"):
37-
sleep(1)
38-
elif e.message == "Unauthorized":
39-
# The user has removed or blocked the bot.
40-
update_id += 1
41-
else:
42-
raise e
43-
except URLError as e:
44-
# These are network problems on our end.
30+
except NetworkError:
4531
sleep(1)
32+
except Unauthorized:
33+
# The user has removed or blocked the bot.
34+
update_id += 1
4635

4736

4837
def echo(bot, update_id):

telegram/dispatcher.py

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import logging
2323
from functools import wraps
2424
from inspect import getargspec
25-
from threading import Thread, BoundedSemaphore, Lock, Event
25+
from threading import Thread, BoundedSemaphore, Lock, Event, current_thread
2626
from re import match
2727
from time import sleep
2828

@@ -33,7 +33,8 @@
3333
logging.getLogger(__name__).addHandler(H)
3434

3535
semaphore = None
36-
running_async = 0
36+
async_threads = set()
37+
""":type: set[Thread]"""
3738
async_lock = Lock()
3839

3940

@@ -58,23 +59,21 @@ def pooled(*pargs, **kwargs):
5859
"""
5960
A wrapper to run a thread in a thread pool
6061
"""
61-
global running_async, async_lock
6262
result = func(*pargs, **kwargs)
6363
semaphore.release()
6464
with async_lock:
65-
running_async -= 1
65+
async_threads.remove(current_thread())
6666
return result
6767

6868
@wraps(func)
6969
def async_func(*pargs, **kwargs):
7070
"""
7171
A wrapper to run a function in a thread
7272
"""
73-
global running_async, async_lock
7473
thread = Thread(target=pooled, args=pargs, kwargs=kwargs)
7574
semaphore.acquire()
7675
with async_lock:
77-
running_async += 1
76+
async_threads.add(thread)
7877
thread.start()
7978
return thread
8079

telegram/updater.py

Lines changed: 50 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,8 @@ def __init__(self,
8989
self.is_idle = False
9090
self.httpd = None
9191
self.__lock = Lock()
92+
self.__threads = []
93+
""":type: list[Thread]"""
9294

9395
def start_polling(self, poll_interval=0.0, timeout=10, network_delay=2):
9496
"""
@@ -120,9 +122,10 @@ def _init_thread(self, target, name, *args, **kwargs):
120122
thr = Thread(target=self._thread_wrapper, name=name,
121123
args=(target,) + args, kwargs=kwargs)
122124
thr.start()
125+
self.__threads.append(thr)
123126

124127
def _thread_wrapper(self, target, *args, **kwargs):
125-
thr_name = current_thread()
128+
thr_name = current_thread().name
126129
self.logger.debug('{0} - started'.format(thr_name))
127130
try:
128131
target(*args, **kwargs)
@@ -160,20 +163,10 @@ def start_webhook(self,
160163
if not self.running:
161164
self.running = True
162165

163-
# Create Thread objects
164-
dispatcher_thread = Thread(target=self.dispatcher.start,
165-
name="dispatcher")
166-
updater_thread = Thread(target=self._start_webhook,
167-
name="updater",
168-
args=(listen,
169-
port,
170-
url_path,
171-
cert,
172-
key))
173-
174-
# Start threads
175-
dispatcher_thread.start()
176-
updater_thread.start()
166+
# Create & start threads
167+
self._init_thread(self.dispatcher.start, "dispatcher"),
168+
self._init_thread(self._start_webhook, "updater", listen,
169+
port, url_path, cert, key)
177170

178171
# Return the update queue so the main thread can insert updates
179172
return self.update_queue
@@ -221,8 +214,6 @@ def _start_polling(self, poll_interval, timeout, network_delay):
221214

222215
sleep(cur_interval)
223216

224-
self.logger.info('Updater thread stopped')
225-
226217
@staticmethod
227218
def _increase_poll_interval(current_interval):
228219
# increase waiting times on subsequent errors up to 30secs
@@ -266,7 +257,6 @@ def _start_webhook(self, listen, port, url_path, cert, key):
266257
raise TelegramError('SSL Certificate invalid')
267258

268259
self.httpd.serve_forever(poll_interval=1)
269-
self.logger.info('Updater thread stopped')
270260

271261
def stop(self):
272262
"""
@@ -276,25 +266,49 @@ def stop(self):
276266
self.job_queue.stop()
277267
with self.__lock:
278268
if self.running:
279-
self.running = False
280269
self.logger.info('Stopping Updater and Dispatcher...')
281-
self.logger.debug('This might take a long time if you set a '
282-
'high value as polling timeout.')
283-
284-
if self.httpd:
285-
self.logger.info(
286-
'Waiting for current webhook connection to be '
287-
'closed... Send a Telegram message to the bot to exit '
288-
'immediately.')
289-
self.httpd.shutdown()
290-
self.httpd = None
291-
292-
self.logger.debug("Requesting Dispatcher to stop...")
293-
self.dispatcher.stop()
294-
while dispatcher.running_async > 0:
295-
sleep(1)
296-
297-
self.logger.debug("Dispatcher stopped.")
270+
271+
self.running = False
272+
273+
self._stop_httpd()
274+
self._stop_dispatcher()
275+
self._join_threads()
276+
# async threads must be join()ed only after the dispatcher
277+
# thread was joined, otherwise we can still have new async
278+
# threads dispatched
279+
self._join_async_threads()
280+
281+
def _stop_httpd(self):
282+
if self.httpd:
283+
self.logger.info(
284+
'Waiting for current webhook connection to be '
285+
'closed... Send a Telegram message to the bot to exit '
286+
'immediately.')
287+
self.httpd.shutdown()
288+
self.httpd = None
289+
290+
def _stop_dispatcher(self):
291+
self.logger.debug("Requesting Dispatcher to stop...")
292+
self.dispatcher.stop()
293+
294+
def _join_async_threads(self):
295+
with dispatcher.async_lock:
296+
threads = list(dispatcher.async_threads)
297+
total = len(threads)
298+
for i, thr in enumerate(threads):
299+
self.logger.info(
300+
'Waiting for async thread {0}/{1} to end'.format(i, total))
301+
thr.join()
302+
self.logger.debug(
303+
'async thread {0}/{1} has ended'.format(i, total))
304+
305+
def _join_threads(self):
306+
for thr in self.__threads:
307+
self.logger.info(
308+
'Waiting for {0} thread to end'.format(thr.name))
309+
thr.join()
310+
self.logger.debug('{0} thread has ended'.format(thr.name))
311+
self.__threads = []
298312

299313
def signal_handler(self, signum, frame):
300314
self.is_idle = False

telegram/utils/request.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def decorator(*args, **kwargs):
8484
if errcode in (401, 403):
8585
raise Unauthorized()
8686
if errcode == 502:
87-
raise TelegramError('Bad Gateway')
87+
raise NetworkError('Bad Gateway')
8888

8989
try:
9090
message = _parse(error.read())

telegram/utils/webhookhandler.py

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import logging
22

33
from telegram import Update, NullHandler
4-
from future.utils import bytes_to_native_str as n
4+
from future.utils import bytes_to_native_str
55
from threading import Lock
66
import json
77
try:
@@ -14,6 +14,13 @@
1414
logging.getLogger(__name__).addHandler(H)
1515

1616

17+
class _InvalidPost(Exception):
18+
19+
def __init__(self, http_code):
20+
self.http_code = http_code
21+
super(_InvalidPost, self).__init__()
22+
23+
1724
class WebhookServer(BaseHTTPServer.HTTPServer, object):
1825
def __init__(self, server_address, RequestHandlerClass, update_queue,
1926
webhook_path):
@@ -63,12 +70,15 @@ def do_GET(self):
6370

6471
def do_POST(self):
6572
self.logger.debug("Webhook triggered")
66-
if self.path == self.server.webhook_path and \
67-
'content-type' in self.headers and \
68-
'content-length' in self.headers and \
69-
self.headers['content-type'] == 'application/json':
70-
json_string = \
71-
n(self.rfile.read(int(self.headers['content-length'])))
73+
try:
74+
self._validate_post()
75+
clen = self._get_content_len()
76+
except _InvalidPost as e:
77+
self.send_error(e.http_code)
78+
self.end_headers()
79+
else:
80+
buf = self.rfile.read(clen)
81+
json_string = bytes_to_native_str(buf)
7282

7383
self.send_response(200)
7484
self.end_headers()
@@ -80,6 +90,20 @@ def do_POST(self):
8090
update.update_id)
8191
self.server.update_queue.put(update)
8292

83-
else:
84-
self.send_error(403)
85-
self.end_headers()
93+
def _validate_post(self):
94+
if not (self.path == self.server.webhook_path and
95+
'content-type' in self.headers and
96+
self.headers['content-type'] == 'application/json'):
97+
raise _InvalidPost(403)
98+
99+
def _get_content_len(self):
100+
clen = self.headers.get('content-length')
101+
if clen is None:
102+
raise _InvalidPost(411)
103+
try:
104+
clen = int(clen)
105+
except ValueError:
106+
raise _InvalidPost(403)
107+
if clen < 0:
108+
raise _InvalidPost(403)
109+
return clen

0 commit comments

Comments
 (0)
X Tutup