X Tutup
Skip to content

Commit 5617755

Browse files
authored
v1.7.3
scheduling algorithm
1 parent 756a55e commit 5617755

File tree

3 files changed

+38
-14
lines changed

3 files changed

+38
-14
lines changed

pproxy/__doc__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
__title__ = "pproxy"
2-
__version__ = "1.7.2"
2+
__version__ = "1.7.3"
33
__license__ = "MIT"
44
__description__ = "Proxy server that can tunnel among remote servers by regex rules."
55
__keywords__ = "proxy socks http shadowsocks shadowsocksr ssr redirect pf tunnel cipher ssl udp"

pproxy/proto.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,8 +244,9 @@ async def parse(self, header, reader, writer, auth, authtable, httpget, **kw):
244244
async def connect(self, reader_remote, writer_remote, rauth, host_name, port, **kw):
245245
writer_remote.write(f'CONNECT {host_name}:{port} HTTP/1.1'.encode() + (b'\r\nProxy-Authorization: Basic '+base64.b64encode(rauth) if rauth else b'') + b'\r\n\r\n')
246246
await reader_remote.read_until(b'\r\n\r\n')
247-
async def http_channel(self, reader, writer, stat_bytes, _):
247+
async def http_channel(self, reader, writer, stat_bytes, stat_conn):
248248
try:
249+
stat_conn(1)
249250
while True:
250251
data = await reader.read_()
251252
if not data:
@@ -266,6 +267,7 @@ async def http_channel(self, reader, writer, stat_bytes, _):
266267
except Exception:
267268
pass
268269
finally:
270+
stat_conn(-1)
269271
writer.close()
270272

271273
class Transparent(BaseProtocol):

pproxy/server.py

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import argparse, time, re, asyncio, functools, base64, urllib.parse
1+
import argparse, time, re, asyncio, functools, base64, random, urllib.parse
22
from . import proto
33
from .__doc__ import *
44

@@ -33,7 +33,24 @@ async def prepare_ciphers(cipher, reader, writer, bind=None, server_side=True):
3333
else:
3434
return None, None
3535

36-
async def stream_handler(reader, writer, unix, lbind, protos, rserver, block, cipher, verbose=DUMMY, modstat=lambda r,h:lambda i:DUMMY, **kwargs):
36+
def schedule(rserver, salgorithm, host_name):
37+
filter_cond = lambda o: o.alive and (not o.match or o.match(host_name))
38+
if salgorithm == 'fa':
39+
return next(filter(filter_cond, rserver), None)
40+
elif salgorithm == 'rr':
41+
for i, roption in enumerate(rserver):
42+
if filter_cond(roption):
43+
rserver.append(rserver.pop(i))
44+
return roption
45+
elif salgorithm == 'rc':
46+
filters = [i for i in rserver if filter_cond(i)]
47+
return random.choice(filters) if filters else None
48+
elif salgorithm == 'lc':
49+
return min(filter(filter_cond, rserver), default=None, key=lambda i: i.total)
50+
else:
51+
raise Exception('Unknown scheduling algorithm') #Unreachable
52+
53+
async def stream_handler(reader, writer, unix, lbind, protos, rserver, block, cipher, salgorithm, verbose=DUMMY, modstat=lambda r,h:lambda i:DUMMY, **kwargs):
3754
try:
3855
if unix:
3956
remote_ip, server_ip, remote_text = 'local', None, 'unix_local'
@@ -51,8 +68,8 @@ async def stream_handler(reader, writer, unix, lbind, protos, rserver, block, ci
5168
elif block and block(host_name):
5269
raise Exception('BLOCK ' + host_name)
5370
else:
54-
roption = next(filter(lambda o: o.alive and (not o.match or o.match(host_name)), rserver), None)
55-
verbose(f'{lproto.name} {remote_text}' + roption.logtext(host_name, port))
71+
roption = schedule(rserver, salgorithm, host_name) or ProxyURI.DIRECT
72+
verbose(f'{lproto.name} {remote_text}{roption.logtext(host_name, port)}')
5673
try:
5774
reader_remote, writer_remote = await asyncio.wait_for(roption.open_connection(host_name, port, local_addr, lbind), timeout=SOCKET_TIMEOUT)
5875
except asyncio.TimeoutError:
@@ -66,14 +83,14 @@ async def stream_handler(reader, writer, unix, lbind, protos, rserver, block, ci
6683
m = modstat(remote_ip, host_name)
6784
lchannel = lproto.http_channel if initbuf else lproto.channel
6885
asyncio.ensure_future(lproto.channel(reader_remote, writer, m(2+roption.direct), m(4+roption.direct)))
69-
asyncio.ensure_future(lchannel(reader, writer_remote, m(roption.direct), DUMMY))
86+
asyncio.ensure_future(lchannel(reader, writer_remote, m(roption.direct), roption.connection_change))
7087
except Exception as ex:
7188
if not isinstance(ex, asyncio.TimeoutError) and not str(ex).startswith('Connection closed'):
7289
verbose(f'{str(ex) or "Unsupported protocol"} from {remote_ip}')
7390
try: writer.close()
7491
except Exception: pass
7592

76-
async def datagram_handler(writer, data, addr, protos, urserver, block, cipher, verbose=DUMMY, **kwargs):
93+
async def datagram_handler(writer, data, addr, protos, urserver, block, cipher, salgorithm, verbose=DUMMY, **kwargs):
7794
try:
7895
remote_ip, remote_port, *_ = addr
7996
remote_text = f'{remote_ip}:{remote_port}'
@@ -86,8 +103,8 @@ async def datagram_handler(writer, data, addr, protos, urserver, block, cipher,
86103
elif block and block(host_name):
87104
raise Exception('BLOCK ' + host_name)
88105
else:
89-
roption = next(filter(lambda o: not o.match or o.match(host_name), urserver), None)
90-
verbose(f'UDP {lproto.name} {remote_text}' + roption.logtext(host_name, port))
106+
roption = schedule(urserver, salgorithm, host_name) or ProxyURI.DIRECT
107+
verbose(f'UDP {lproto.name} {remote_text}{roption.logtext(host_name, port)}')
91108
data = roption.prepare_udp_connection(host_name, port, data)
92109
def reply(rdata):
93110
writer.sendto(cipher.datagram.encrypt(rdata) if cipher else rdata, addr)
@@ -124,6 +141,7 @@ def pattern_compile(filename):
124141
class ProxyURI(object):
125142
def __init__(self, **kw):
126143
self.__dict__.update(kw)
144+
self.total = 0
127145
self.udpmap = {}
128146
def logtext(self, host, port):
129147
if self.direct:
@@ -132,6 +150,8 @@ def logtext(self, host, port):
132150
return f' ->{(" ssl" if self.sslclient else "")} {self.bind}'
133151
else:
134152
return f' -> {self.rproto.name+("+ssl" if self.sslclient else "")} {self.bind}' + self.relay.logtext(host, port)
153+
def connection_change(self, delta):
154+
self.total += delta
135155
async def open_udp_connection(self, host, port, data, addr, reply):
136156
class Protocol(asyncio.DatagramProtocol):
137157
def __init__(prot, data):
@@ -155,6 +175,9 @@ def connection_lost(prot, exc):
155175
if addr in self.udpmap:
156176
self.udpmap[addr].new_data_arrived(data)
157177
else:
178+
if self.direct and host == 'tunnel':
179+
raise Exception('Unknown tunnel endpoint')
180+
self.connection_change(1)
158181
prot = Protocol(data)
159182
remote_addr = (host, port) if self.direct else (self.host_name, self.port)
160183
await asyncio.get_event_loop().create_datagram_endpoint(lambda: prot, remote_addr=remote_addr)
@@ -175,6 +198,8 @@ def datagram_received(self, data, addr):
175198
return loop.create_datagram_endpoint(Protocol, local_addr=(self.host_name, self.port))
176199
def open_connection(self, host, port, local_addr, lbind):
177200
if self.direct:
201+
if host == 'tunnel':
202+
raise Exception('Unknown tunnel endpoint')
178203
local_addr = local_addr if lbind == 'in' else (lbind, 0) if lbind else None
179204
return asyncio.open_connection(host=host, port=port, local_addr=local_addr)
180205
elif self.unix:
@@ -295,6 +320,7 @@ def main():
295320
parser.add_argument('-ur', dest='urserver', default=[], action='append', type=ProxyURI.compile_relay, help='udp remote server uri (default: direct)')
296321
parser.add_argument('-b', dest='block', type=pattern_compile, help='block regex rules')
297322
parser.add_argument('-a', dest='alived', default=0, type=int, help='interval to check remote alive (default: no check)')
323+
parser.add_argument('-s', dest='salgorithm', default='fa', choices=('fa', 'rr', 'rc', 'lc'), help='scheduling algorithm (default: first_available)')
298324
parser.add_argument('-v', dest='v', action='count', help='print verbose output')
299325
parser.add_argument('--ssl', dest='sslfile', help='certfile[,keyfile] if server listen in ssl mode')
300326
parser.add_argument('--pac', help='http PAC path')
@@ -308,10 +334,6 @@ def main():
308334
return
309335
if not args.listen and not args.ulisten:
310336
args.listen.append(ProxyURI.compile_relay('http+socks4+socks5://:8080/'))
311-
if not args.rserver or args.rserver[-1].match:
312-
args.rserver.append(ProxyURI.DIRECT)
313-
if not args.urserver or args.urserver[-1].match:
314-
args.urserver.append(ProxyURI.DIRECT)
315337
args.httpget = {}
316338
if args.pac:
317339
pactext = 'function FindProxyForURL(u,h){' + (f'var b=/^(:?{args.block.__self__.pattern})$/i;if(b.test(h))return "";' if args.block else '')

0 commit comments

Comments
 (0)
X Tutup