forked from fluent/fluent-logger-python
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathasyncsender.py
More file actions
136 lines (106 loc) · 3.97 KB
/
asyncsender.py
File metadata and controls
136 lines (106 loc) · 3.97 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
# -*- coding: utf-8 -*-
from __future__ import print_function
import threading
try:
from queue import Queue, Full, Empty
except ImportError:
from Queue import Queue, Full, Empty
from fluent import sender
from fluent.sender import EventTime
__all__ = ["EventTime", "FluentSender"]
DEFAULT_QUEUE_MAXSIZE = 100
DEFAULT_QUEUE_CIRCULAR = False
_TOMBSTONE = object()
_global_sender = None
def _set_global_sender(sender): # pragma: no cover
""" [For testing] Function to set global sender directly
"""
global _global_sender
_global_sender = sender
def setup(tag, **kwargs): # pragma: no cover
global _global_sender
_global_sender = FluentSender(tag, **kwargs)
def get_global_sender(): # pragma: no cover
return _global_sender
def close(): # pragma: no cover
get_global_sender().close()
class FluentSender(sender.FluentSender):
def __init__(self,
tag,
host='localhost',
port=24224,
bufmax=1 * 1024 * 1024,
timeout=3.0,
verbose=False,
buffer_overflow_handler=None,
nanosecond_precision=False,
msgpack_kwargs=None,
queue_maxsize=DEFAULT_QUEUE_MAXSIZE,
queue_circular=DEFAULT_QUEUE_CIRCULAR,
**kwargs):
"""
:param kwargs: This kwargs argument is not used in __init__. This will be removed in the next major version.
"""
super(FluentSender, self).__init__(tag=tag, host=host, port=port, bufmax=bufmax, timeout=timeout,
verbose=verbose, buffer_overflow_handler=buffer_overflow_handler,
nanosecond_precision=nanosecond_precision,
msgpack_kwargs=msgpack_kwargs,
**kwargs)
self._queue_maxsize = queue_maxsize
self._queue_circular = queue_circular
self._thread_guard = threading.Event() # This ensures visibility across all variables
self._closed = False
self._queue = Queue(maxsize=queue_maxsize)
self._send_thread = threading.Thread(target=self._send_loop,
name="AsyncFluentSender %d" % id(self))
self._send_thread.daemon = True
self._send_thread.start()
def close(self, flush=True):
with self.lock:
if self._closed:
return
self._closed = True
if not flush:
while True:
try:
self._queue.get(block=False)
except Empty:
break
self._queue.put(_TOMBSTONE)
self._send_thread.join()
@property
def queue_maxsize(self):
return self._queue_maxsize
@property
def queue_blocking(self):
return not self._queue_circular
@property
def queue_circular(self):
return self._queue_circular
def _send(self, bytes_):
with self.lock:
if self._closed:
return False
if self._queue_circular and self._queue.full():
# discard oldest
try:
self._queue.get(block=False)
except Empty: # pragma: no cover
pass
try:
self._queue.put(bytes_, block=(not self._queue_circular))
except Full: # pragma: no cover
return False # this actually can't happen
return True
def _send_loop(self):
send_internal = super(FluentSender, self)._send_internal
try:
while True:
bytes_ = self._queue.get(block=True)
if bytes_ is _TOMBSTONE:
break
send_internal(bytes_)
finally:
self._close()
def __exit__(self, exc_type, exc_val, exc_tb):
self.close()