X Tutup
Skip to content

Commit 22142e7

Browse files
thodnevjh0ker
authored andcommitted
Introduce MessageQueue (python-telegram-bot#537)
* Introduce MessageQueue * minor documentation and terminology fixes according to the review * minor documentation and terminology fixes according to the review * minor documentation and terminology fixes according to the review * pep8 fix
1 parent 85b9236 commit 22142e7

File tree

5 files changed

+430
-0
lines changed

5 files changed

+430
-0
lines changed

AUTHORS.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ The following wonderful people contributed directly or indirectly to this projec
4141
- `Joscha Götzer <https://github.com/Rostgnom>`_
4242
- `Shelomentsev D <https://github.com/shelomentsevd>`_
4343
- `sooyhwang <https://github.com/sooyhwang>`_
44+
- `thodnev <https://github.com/thodnev>`_
4445
- `Valentijn <https://github.com/Faalentijn>`_
4546
- `voider1 <https://github.com/voider1>`_
4647
- `wjt <https://github.com/wjt>`_
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
telegram.ext.messagequeue module
2+
================================
3+
4+
.. automodule:: telegram.ext.messagequeue
5+
:members:
6+
:undoc-members:
7+
:show-inheritance:

docs/source/telegram.ext.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ Submodules
1616
telegram.ext.commandhandler
1717
telegram.ext.inlinequeryhandler
1818
telegram.ext.messagehandler
19+
telegram.ext.messagequeue
1920
telegram.ext.filters
2021
telegram.ext.regexhandler
2122
telegram.ext.stringcommandhandler

telegram/ext/messagequeue.py

Lines changed: 330 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,330 @@
1+
#!/usr/bin/env python
2+
#
3+
# Module author:
4+
# Tymofii A. Khodniev (thodnev) <thodnev@mail.ru>
5+
#
6+
# A library that provides a Python interface to the Telegram Bot API
7+
# Copyright (C) 2015-2017
8+
# Leandro Toledo de Souza <devs@python-telegram-bot.org>
9+
#
10+
# This program is free software: you can redistribute it and/or modify
11+
# it under the terms of the GNU Lesser Public License as published by
12+
# the Free Software Foundation, either version 3 of the License, or
13+
# (at your option) any later version.
14+
#
15+
# This program is distributed in the hope that it will be useful,
16+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
17+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
18+
# GNU Lesser Public License for more details.
19+
#
20+
# You should have received a copy of the GNU Lesser Public License
21+
# along with this program. If not, see [http://www.gnu.org/licenses/]
22+
'''A throughput-limiting message processor for Telegram bots'''
23+
from telegram.utils import promise
24+
25+
import functools
26+
import sys
27+
import time
28+
import threading
29+
if sys.version_info.major > 2:
30+
import queue as q
31+
else:
32+
import Queue as q
33+
34+
# We need to count < 1s intervals, so the most accurate timer is needed
35+
# Starting from Python 3.3 we have time.perf_counter which is the clock
36+
# with the highest resolution available to the system, so let's use it there.
37+
# In Python 2.7, there's no perf_counter yet, so fallback on what we have:
38+
# on Windows, the best available is time.clock while time.time is on
39+
# another platforms (M. Lutz, "Learning Python," 4ed, p.630-634)
40+
if sys.version_info.major == 3 and sys.version_info.minor >= 3:
41+
curtime = time.perf_counter
42+
else:
43+
curtime = time.clock if sys.platform[:3] == 'win' else time.time
44+
45+
46+
class DelayQueueError(RuntimeError):
47+
'''Indicates processing errors'''
48+
pass
49+
50+
51+
class DelayQueue(threading.Thread):
52+
'''Processes callbacks from queue with specified throughput limits.
53+
Creates a separate thread to process callbacks with delays.
54+
55+
Args:
56+
queue (:obj:`queue.Queue`, optional): used to pass callbacks to
57+
thread.
58+
Creates `queue.Queue` implicitly if not provided.
59+
burst_limit (:obj:`int`, optional): number of maximum callbacks to
60+
process per time-window defined by `time_limit_ms`.
61+
Defaults to 30.
62+
time_limit_ms (:obj:`int`, optional): defines width of time-window
63+
used when each processing limit is calculated.
64+
Defaults to 1000.
65+
exc_route (:obj:`callable`, optional): a callable, accepting 1
66+
positional argument; used to route exceptions from processor
67+
thread to main thread; is called on `Exception` subclass
68+
exceptions.
69+
If not provided, exceptions are routed through dummy handler,
70+
which re-raises them.
71+
autostart (:obj:`bool`, optional): if True, processor is started
72+
immediately after object's creation; if False, should be
73+
started manually by `start` method.
74+
Defaults to True.
75+
name (:obj:`str`, optional): thread's name.
76+
Defaults to ``'DelayQueue-N'``, where N is sequential
77+
number of object created.
78+
'''
79+
_instcnt = 0 # instance counter
80+
81+
def __init__(self,
82+
queue=None,
83+
burst_limit=30,
84+
time_limit_ms=1000,
85+
exc_route=None,
86+
autostart=True,
87+
name=None):
88+
self._queue = queue if queue is not None else q.Queue()
89+
self.burst_limit = burst_limit
90+
self.time_limit = time_limit_ms / 1000
91+
self.exc_route = (exc_route if exc_route is not None else self._default_exception_handler)
92+
self.__exit_req = False # flag to gently exit thread
93+
self.__class__._instcnt += 1
94+
if name is None:
95+
name = '%s-%s' % (self.__class__.__name__, self.__class__._instcnt)
96+
super(DelayQueue, self).__init__(name=name)
97+
self.daemon = False
98+
if autostart: # immediately start processing
99+
super(DelayQueue, self).start()
100+
101+
def run(self):
102+
'''Do not use the method except for unthreaded testing purposes,
103+
the method normally is automatically called by `start` method.
104+
'''
105+
times = [] # used to store each callable processing time
106+
while True:
107+
item = self._queue.get()
108+
if self.__exit_req:
109+
return # shutdown thread
110+
# delay routine
111+
now = curtime()
112+
t_delta = now - self.time_limit # calculate early to improve perf.
113+
if times and t_delta > times[-1]:
114+
# if last call was before the limit time-window
115+
# used to impr. perf. in long-interval calls case
116+
times = [now]
117+
else:
118+
# collect last in current limit time-window
119+
times = [t for t in times if t >= t_delta]
120+
times.append(now)
121+
if len(times) >= self.burst_limit: # if throughput limit was hit
122+
time.sleep(times[1] - t_delta)
123+
# finally process one
124+
try:
125+
func, args, kwargs = item
126+
func(*args, **kwargs)
127+
except Exception as exc: # re-route any exceptions
128+
self.exc_route(exc) # to prevent thread exit
129+
130+
def stop(self, timeout=None):
131+
'''Used to gently stop processor and shutdown its thread.
132+
133+
Args:
134+
timeout (:obj:`float`): indicates maximum time to wait for
135+
processor to stop and its thread to exit.
136+
If timeout exceeds and processor has not stopped, method
137+
silently returns. `is_alive` could be used afterwards
138+
to check the actual status. If `timeout` set to None, blocks
139+
until processor is shut down.
140+
Defaults to None.
141+
Returns:
142+
None
143+
'''
144+
self.__exit_req = True # gently request
145+
self._queue.put(None) # put something to unfreeze if frozen
146+
super(DelayQueue, self).join(timeout=timeout)
147+
148+
@staticmethod
149+
def _default_exception_handler(exc):
150+
'''Dummy exception handler which re-raises exception in thread.
151+
Could be possibly overwritten by subclasses.
152+
'''
153+
raise exc
154+
155+
def __call__(self, func, *args, **kwargs):
156+
'''Used to process callbacks in throughput-limiting thread
157+
through queue.
158+
Args:
159+
func (:obj:`callable`): the actual function (or any callable) that
160+
is processed through queue.
161+
*args: variable-length `func` arguments.
162+
**kwargs: arbitrary keyword-arguments to `func`.
163+
Returns:
164+
None
165+
'''
166+
if not self.is_alive() or self.__exit_req:
167+
raise DelayQueueError('Could not process callback in stopped thread')
168+
self._queue.put((func, args, kwargs))
169+
170+
171+
# The most straightforward way to implement this is to use 2 sequenital delay
172+
# queues, like on classic delay chain schematics in electronics.
173+
# So, message path is:
174+
# msg --> group delay if group msg, else no delay --> normal msg delay --> out
175+
# This way OS threading scheduler cares of timings accuracy.
176+
# (see time.time, time.clock, time.perf_counter, time.sleep @ docs.python.org)
177+
class MessageQueue(object):
178+
'''Implements callback processing with proper delays to avoid hitting
179+
Telegram's message limits.
180+
Contains two `DelayQueue`s, for group and for all messages, interconnected
181+
in delay chain. Callables are processed through *group* `DelayQueue`, then
182+
through *all* `DelayQueue` for group-type messages. For non-group messages,
183+
only the *all* `DelayQueue` is used.
184+
185+
Args:
186+
all_burst_limit (:obj:`int`, optional): numer of maximum *all-type*
187+
callbacks to process per time-window defined by
188+
`all_time_limit_ms`.
189+
Defaults to 30.
190+
all_time_limit_ms (:obj:`int`, optional): defines width of *all-type*
191+
time-window used when each processing limit is calculated.
192+
Defaults to 1000 ms.
193+
group_burst_limit (:obj:`int`, optional): numer of maximum *group-type*
194+
callbacks to process per time-window defined by
195+
`group_time_limit_ms`.
196+
Defaults to 20.
197+
group_time_limit_ms (:obj:`int`, optional): defines width of
198+
*group-type* time-window used when each processing limit is
199+
calculated.
200+
Defaults to 60000 ms.
201+
exc_route (:obj:`callable`, optional): a callable, accepting one
202+
positional argument; used to route exceptions from processor
203+
threads to main thread; is called on `Exception` subclass
204+
exceptions.
205+
If not provided, exceptions are routed through dummy handler,
206+
which re-raises them.
207+
autostart (:obj:`bool`, optional): if True, processors are started
208+
immediately after object's creation; if False, should be
209+
started manually by `start` method.
210+
Defaults to True.
211+
212+
Attributes:
213+
_all_delayq (:obj:`telegram.ext.messagequeue.DelayQueue`): actual
214+
`DelayQueue` used for *all-type* callback processing
215+
_group_delayq (:obj:`telegram.ext.messagequeue.DelayQueue`): actual
216+
`DelayQueue` used for *group-type* callback processing
217+
'''
218+
219+
def __init__(self,
220+
all_burst_limit=30,
221+
all_time_limit_ms=1000,
222+
group_burst_limit=20,
223+
group_time_limit_ms=60000,
224+
exc_route=None,
225+
autostart=True):
226+
# create accoring delay queues, use composition
227+
self._all_delayq = DelayQueue(
228+
burst_limit=all_burst_limit,
229+
time_limit_ms=all_time_limit_ms,
230+
exc_route=exc_route,
231+
autostart=autostart)
232+
self._group_delayq = DelayQueue(
233+
burst_limit=group_burst_limit,
234+
time_limit_ms=group_time_limit_ms,
235+
exc_route=exc_route,
236+
autostart=autostart)
237+
238+
def start(self):
239+
'''Method is used to manually start the `MessageQueue` processing
240+
241+
Returns:
242+
None
243+
'''
244+
self._all_delayq.start()
245+
self._group_delayq.start()
246+
247+
def stop(self, timeout=None):
248+
self._group_delayq.stop(timeout=timeout)
249+
self._all_delayq.stop(timeout=timeout)
250+
251+
stop.__doc__ = DelayQueue.stop.__doc__ or '' # reuse docsting if any
252+
253+
def __call__(self, promise, is_group_msg=False):
254+
'''Processes callables in troughput-limiting queues to avoid
255+
hitting limits (specified with \*_burst_limit and *\_time_limit_ms).
256+
Args:
257+
promise (:obj:`callable`): mainly the
258+
:obj:`telegram.utils.promise.Promise` (see Notes for other
259+
callables), that is processed in delay queues
260+
is_group_msg (:obj:`bool`, optional): defines whether `promise`
261+
would be processed in *group*+*all* `DelayQueue`s
262+
(if set to ``True``), or only through *all* `DelayQueue`
263+
(if set to ``False``), resulting in needed delays to avoid
264+
hitting specified limits.
265+
Defaults to ``True``.
266+
267+
Notes:
268+
Method is designed to accept :obj:`telegram.utils.promise.Promise`
269+
as `promise` argument, but other callables could be used too.
270+
For example, lambdas or simple functions could be used to wrap
271+
original func to be called with needed args.
272+
In that case, be sure that either wrapper func does not raise
273+
outside exceptions or the proper `exc_route` handler is provided.
274+
275+
Returns:
276+
:obj:`callable` used as `promise` argument.
277+
'''
278+
if not is_group_msg: # ignore middle group delay
279+
self._all_delayq(promise)
280+
else: # use middle group delay
281+
self._group_delayq(self._all_delayq, promise)
282+
return promise
283+
284+
285+
def queuedmessage(method):
286+
'''A decorator to be used with `telegram.bot.Bot` send* methods.
287+
288+
Note:
289+
As it probably wouldn't be a good idea to make this decorator a
290+
property, it had been coded as decorator function, so it implies that
291+
**first positional argument to wrapped MUST be self**\.
292+
293+
The next object attributes are used by decorator:
294+
295+
Attributes:
296+
self._is_messages_queued_default (:obj:`bool`): Value to provide
297+
class-defaults to `queued` kwarg if not provided during wrapped
298+
method call.
299+
self._msg_queue (:obj:`telegram.ext.messagequeue.MessageQueue`):
300+
The actual `MessageQueue` used to delay outbond messages according
301+
to specified time-limits.
302+
303+
Wrapped method starts accepting the next kwargs:
304+
305+
Args:
306+
queued (:obj:`bool`, optional): if set to ``True``, the `MessageQueue`
307+
is used to process output messages.
308+
Defaults to `self._is_queued_out`.
309+
isgroup (:obj:`bool`, optional): if set to ``True``, the message is
310+
meant to be group-type (as there's no obvious way to determine its
311+
type in other way at the moment). Group-type messages could have
312+
additional processing delay according to limits set in
313+
`self._out_queue`.
314+
Defaults to ``False``.
315+
316+
Returns:
317+
Either :obj:`telegram.utils.promise.Promise` in case call is queued,
318+
or original method's return value if it's not.
319+
'''
320+
321+
@functools.wraps(method)
322+
def wrapped(self, *args, **kwargs):
323+
queued = kwargs.pop('queued', self._is_messages_queued_default)
324+
isgroup = kwargs.pop('isgroup', False)
325+
if queued:
326+
prom = promise.Promise(method, args, kwargs)
327+
return self._msg_queue(prom, isgroup)
328+
return method(self, *args, **kwargs)
329+
330+
return wrapped

0 commit comments

Comments
 (0)
X Tutup