-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathasyncio_echo_client_protocol.py
More file actions
78 lines (62 loc) · 2.02 KB
/
asyncio_echo_client_protocol.py
File metadata and controls
78 lines (62 loc) · 2.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
import asyncio
import functools
import logging
import sys
MESSAGES = [
b"This is the message. ",
b"It will be sent ",
b"in parts.",
]
SERVER_ADDRESS = ("localhost", 10000)
logging.basicConfig(
level=logging.DEBUG,
format="%(name)s: %(message)s",
stream=sys.stderr,
)
log = logging.getLogger("main")
event_loop = asyncio.get_event_loop()
class EchoClient(asyncio.Protocol):
def __init__(self, messages, future):
super().__init__()
self.messages = messages
self.log = logging.getLogger("EchoClient")
self.f = future
def connection_made(self, transport):
self.transport = transport
self.address = transport.get_extra_info("peername")
self.log.debug("connecting to {} port {}".format(*self.address))
# This could be transport.writelines() except that
# would make it harder to show each part of the message
# being sent
for msg in self.messages:
transport.write(msg)
self.log.debug("sending {!r}".format(msg))
if transport.can_write_eof():
transport.write_eof()
def data_received(self, data: bytes) -> None:
self.log.debug("received {!r}".format(data))
def eof_received(self):
self.log.debug("received EOF")
self.transport.close()
if not self.f.done():
self.f.set_result(True)
def connection_lost(self, exc):
self.log.debug("server closed connection")
self.transport.close()
if not self.f.done():
self.f.set_result(True)
super().connection_lost(exc)
client_completed = asyncio.Future()
client_factory = functools.partial(
EchoClient,
messages=MESSAGES,
future=client_completed,
)
factory_coroutine = event_loop.create_connection(client_factory, *SERVER_ADDRESS)
log.debug("waiting for client to complete")
try:
event_loop.run_until_complete(factory_coroutine)
event_loop.run_until_complete(client_completed)
finally:
log.debug("closing event loop")
event_loop.close()