X Tutup
Skip to content

Commit dd7d125

Browse files
committed
fixes, tests and examples for job queue
1 parent f813d4f commit dd7d125

File tree

3 files changed

+213
-5
lines changed

3 files changed

+213
-5
lines changed

examples/timerbot.py

Lines changed: 97 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,97 @@
1+
#!/usr/bin/env python
2+
# -*- coding: utf-8 -*-
3+
#
4+
# Simple Bot to send timed Telegram messages
5+
# This program is dedicated to the public domain under the CC0 license.
6+
7+
"""
8+
This Bot uses the Updater class to handle the bot and the JobQueue to send
9+
timed messages.
10+
11+
First, a few handler functions are defined. Then, those functions are passed to
12+
the Dispatcher and registered at their respective places.
13+
Then, the bot is started and runs until we press Ctrl-C on the command line.
14+
15+
Usage:
16+
Basic Alarm Bot example, sends a message after a set time.
17+
Press Ctrl-C on the command line or send a signal to the process to stop the
18+
bot.
19+
"""
20+
21+
from telegram import Updater, JobQueue
22+
import logging
23+
24+
# Enable logging
25+
logging.basicConfig(
26+
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
27+
level=logging.INFO)
28+
29+
logger = logging.getLogger(__name__)
30+
job_queue = None
31+
32+
33+
# Define a few command handlers. These usually take the two arguments bot and
34+
# update. Error handlers also receive the raised TelegramError object in error.
35+
def start(bot, update):
36+
bot.sendMessage(update.message.chat_id, text='Hi! Use /set <seconds> to '
37+
'set a timer')
38+
39+
40+
def set(bot, update, args):
41+
""" Adds a job to the queue """
42+
chat_id = update.message.chat_id
43+
try:
44+
# args[0] should contain the time for the timer in seconds
45+
due = int(args[0])
46+
47+
def alarm(bot):
48+
""" Inner function to send the alarm message """
49+
bot.sendMessage(chat_id, text='Beep!')
50+
51+
# Add job to queue
52+
job_queue.put(alarm, due, repeat=False)
53+
bot.sendMessage(chat_id, text='Timer successfully set!')
54+
55+
except IndexError:
56+
bot.sendMessage(chat_id, text='Usage: /set <seconds>')
57+
except ValueError:
58+
bot.sendMessage(chat_id, text='Usage: /set <seconds>')
59+
60+
61+
def error(bot, update, error):
62+
logger.warn('Update "%s" caused error "%s"' % (update, error))
63+
64+
65+
def main():
66+
global job_queue
67+
68+
updater = Updater("148447715:AAHbczRui6gO3RBlKQ2IwU2hMd226LqZE90")
69+
job_queue = JobQueue(updater.bot, tick_interval=1)
70+
71+
# Get the dispatcher to register handlers
72+
dp = updater.dispatcher
73+
74+
# on different commands - answer in Telegram
75+
dp.addTelegramCommandHandler("start", start)
76+
dp.addTelegramCommandHandler("help", start)
77+
dp.addTelegramCommandHandler("set", set)
78+
79+
# log all errors
80+
dp.addErrorHandler(error)
81+
82+
# start the job queue
83+
job_queue.start()
84+
85+
# Start the Bot
86+
updater.start_polling()
87+
88+
# Block until the you presses Ctrl-C or the process receives SIGINT,
89+
# SIGTERM or SIGABRT. This should be used most of the time, since
90+
# start_polling() is non-blocking and will stop the bot gracefully.
91+
updater.idle()
92+
93+
# After that, also stop the job queue
94+
job_queue.stop()
95+
96+
if __name__ == '__main__':
97+
main()

telegram/jobqueue.py

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,8 @@
3131

3232

3333
class JobQueue(object):
34-
"""This class allows you to periodically perform tasks with the bot.
34+
"""
35+
This class allows you to periodically perform tasks with the bot.
3536
3637
Attributes:
3738
tick_interval (float):
@@ -55,14 +56,15 @@ def __init__(self, bot, tick_interval=1.0):
5556
self.__lock = Lock()
5657
self.running = False
5758

58-
def put(self, run, interval, next_t=None):
59+
def put(self, run, interval, repeat=True, next_t=None):
5960
"""
6061
Queue a new job.
6162
6263
Args:
6364
run (function): A function that takes the parameter `bot`
6465
interval (float): The interval in seconds in which `run` should be
6566
executed
67+
repeat (Optional[bool]): If false, job will only be executed once
6668
next_t (Optional[float]): Time in seconds in which run should be
6769
executed first. Defaults to `interval`
6870
"""
@@ -72,6 +74,7 @@ def put(self, run, interval, next_t=None):
7274
job.run = run
7375
job.interval = interval
7476
job.name = name
77+
job.repeat = repeat
7578

7679
if next_t is None:
7780
next_t = interval
@@ -96,9 +99,14 @@ def tick(self):
9699

97100
if t < now:
98101
self.queue.get()
99-
self.logger.debug("About time! running")
100-
j.run(self.bot)
101-
self.put(j.run, j.interval)
102+
self.logger.info("Running job {}".format(j.name))
103+
try:
104+
j.run(self.bot)
105+
except:
106+
self.logger.exception("An uncaught error was raised while "
107+
"executing job {}".format(j.name))
108+
if j.repeat:
109+
self.put(j.run, j.interval)
102110
continue
103111

104112
self.logger.debug("Next task isn't due yet. Finished!")
@@ -115,6 +123,7 @@ def start(self):
115123
job_queue_thread = Thread(target=self._start,
116124
name="job_queue")
117125
job_queue_thread.start()
126+
self.logger.info('Job Queue thread started')
118127
else:
119128
self.__lock.release()
120129

@@ -127,6 +136,8 @@ def _start(self):
127136
self.tick()
128137
time.sleep(self.tick_interval)
129138

139+
self.logger.info('Job Queue thread stopped')
140+
130141
def stop(self):
131142
"""
132143
Stops the thread
@@ -138,6 +149,7 @@ class Job(object):
138149
""" Inner class that represents a job """
139150
interval = None
140151
name = None
152+
repeat = None
141153

142154
def run(self):
143155
pass

tests/test_jobqueue.py

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
#!/usr/bin/env python
2+
# encoding: utf-8
3+
#
4+
# A library that provides a Python interface to the Telegram Bot API
5+
# Copyright (C) 2015 Leandro Toledo de Souza <devs@python-telegram-bot.org>
6+
#
7+
# This program is free software: you can redistribute it and/or modify
8+
# it under the terms of the GNU General Public License as published by
9+
# the Free Software Foundation, either version 3 of the License, or
10+
# (at your option) any later version.
11+
#
12+
# This program is distributed in the hope that it will be useful,
13+
# but WITHOUT ANY WARRANTY; without even the implied warranty of
14+
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
15+
# GNU General Public License for more details.
16+
#
17+
# You should have received a copy of the GNU General Public License
18+
# along with this program. If not, see [http://www.gnu.org/licenses/].
19+
20+
"""
21+
This module contains a object that represents Tests for JobQueue
22+
"""
23+
import logging
24+
import sys
25+
import re
26+
import os
27+
import signal
28+
from random import randrange
29+
from time import sleep
30+
from datetime import datetime
31+
32+
if sys.version_info[0:2] == (2, 6):
33+
import unittest2 as unittest
34+
else:
35+
import unittest
36+
37+
try:
38+
from urllib2 import urlopen, Request
39+
except ImportError:
40+
from urllib.request import Request, urlopen
41+
42+
sys.path.append('.')
43+
44+
from telegram import JobQueue
45+
from tests.base import BaseTest
46+
47+
# Enable logging
48+
root = logging.getLogger()
49+
root.setLevel(logging.INFO)
50+
51+
ch = logging.StreamHandler(sys.stdout)
52+
ch.setLevel(logging.WARN)
53+
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
54+
ch.setFormatter(formatter)
55+
root.addHandler(ch)
56+
57+
58+
class UpdaterTest(BaseTest, unittest.TestCase):
59+
"""
60+
This object represents Tests for Updater, Dispatcher, WebhookServer and
61+
WebhookHandler
62+
"""
63+
64+
def setUp(self):
65+
self.jq = JobQueue("Bot", tick_interval=0.001)
66+
self.result = 0
67+
68+
def tearDown(self):
69+
if self.jq is not None:
70+
self.jq.stop()
71+
72+
def job1(self, bot):
73+
self.result += 1
74+
75+
def test_basic(self):
76+
print('Testing basic job queue function')
77+
self.jq.put(self.job1, 0.1)
78+
self.jq.start()
79+
sleep(1.05)
80+
self.assertEqual(10, self.result)
81+
82+
def test_noRepeat(self):
83+
print('Testing job queue without repeat')
84+
self.jq.put(self.job1, 0.1, repeat=False)
85+
self.jq.start()
86+
sleep(0.5)
87+
self.assertEqual(1, self.result)
88+
89+
def test_nextT(self):
90+
print('Testing job queue with a set next_t value')
91+
self.jq.put(self.job1, 0.1, next_t=0.5)
92+
self.jq.start()
93+
sleep(0.45)
94+
self.assertEqual(0, self.result)
95+
sleep(0.1)
96+
self.assertEqual(1, self.result)
97+
98+
if __name__ == '__main__':
99+
unittest.main()

0 commit comments

Comments
 (0)
X Tutup