This repository was archived by the owner on Apr 14, 2024. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 9
Expand file tree
/
Copy pathtest_example.py
More file actions
45 lines (33 loc) · 1.42 KB
/
test_example.py
File metadata and controls
45 lines (33 loc) · 1.42 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
# Copyright (C) 2010, 2011 Sebastian Thiel (byronimo@gmail.com) and contributors
#
# This module is part of async and is released under
# the New BSD License: http://www.opensource.org/licenses/bsd-license.php
"""Module containing examples from the documentaiton"""
from .lib import TestBase
from async.pool import ThreadPool
from async.task import (
IteratorThreadTask,
ChannelThreadTask
)
from async.thread import terminate_threads
class TestExamples(TestBase):
@terminate_threads
def test_usage(self):
p = ThreadPool()
# default size is 0, synchronous mode
assert p.size() == 0
# A task performing processing on items from an iterator
t = IteratorThreadTask(iter(list(range(10))), "power", lambda i: i*i)
reader = p.add_task(t)
# read all items - they where procesed by worker 1
items = reader.read()
assert len(items) == 10 and items[0] == 0 and items[-1] == 81
# chaining
t = IteratorThreadTask(iter(list(range(10))), "power", lambda i: i*i)
reader = p.add_task(t)
# chain both by linking their readers
tmult = ChannelThreadTask(reader, "mult", lambda i: i*2)
result_reader = p.add_task(tmult)
# read all
items = result_reader.read()
assert len(items) == 10 and items[0] == 0 and items[-1] == 162