1- import argparse , time , re , asyncio , functools , base64 , urllib .parse
1+ import argparse , time , re , asyncio , functools , base64 , random , urllib .parse
22from . import proto
33from .__doc__ import *
44
@@ -33,7 +33,24 @@ async def prepare_ciphers(cipher, reader, writer, bind=None, server_side=True):
3333 else :
3434 return None , None
3535
36- async def stream_handler (reader , writer , unix , lbind , protos , rserver , block , cipher , verbose = DUMMY , modstat = lambda r ,h :lambda i :DUMMY , ** kwargs ):
36+ def schedule (rserver , salgorithm , host_name ):
37+ filter_cond = lambda o : o .alive and (not o .match or o .match (host_name ))
38+ if salgorithm == 'fa' :
39+ return next (filter (filter_cond , rserver ), None )
40+ elif salgorithm == 'rr' :
41+ for i , roption in enumerate (rserver ):
42+ if filter_cond (roption ):
43+ rserver .append (rserver .pop (i ))
44+ return roption
45+ elif salgorithm == 'rc' :
46+ filters = [i for i in rserver if filter_cond (i )]
47+ return random .choice (filters ) if filters else None
48+ elif salgorithm == 'lc' :
49+ return min (filter (filter_cond , rserver ), default = None , key = lambda i : i .total )
50+ else :
51+ raise Exception ('Unknown scheduling algorithm' ) #Unreachable
52+
53+ async def stream_handler (reader , writer , unix , lbind , protos , rserver , block , cipher , salgorithm , verbose = DUMMY , modstat = lambda r ,h :lambda i :DUMMY , ** kwargs ):
3754 try :
3855 if unix :
3956 remote_ip , server_ip , remote_text = 'local' , None , 'unix_local'
@@ -51,8 +68,8 @@ async def stream_handler(reader, writer, unix, lbind, protos, rserver, block, ci
5168 elif block and block (host_name ):
5269 raise Exception ('BLOCK ' + host_name )
5370 else :
54- roption = next ( filter ( lambda o : o . alive and ( not o . match or o . match ( host_name )), rserver ), None )
55- verbose (f'{ lproto .name } { remote_text } ' + roption .logtext (host_name , port ))
71+ roption = schedule ( rserver , salgorithm , host_name ) or ProxyURI . DIRECT
72+ verbose (f'{ lproto .name } { remote_text } { roption .logtext (host_name , port )} ' )
5673 try :
5774 reader_remote , writer_remote = await asyncio .wait_for (roption .open_connection (host_name , port , local_addr , lbind ), timeout = SOCKET_TIMEOUT )
5875 except asyncio .TimeoutError :
@@ -66,14 +83,14 @@ async def stream_handler(reader, writer, unix, lbind, protos, rserver, block, ci
6683 m = modstat (remote_ip , host_name )
6784 lchannel = lproto .http_channel if initbuf else lproto .channel
6885 asyncio .ensure_future (lproto .channel (reader_remote , writer , m (2 + roption .direct ), m (4 + roption .direct )))
69- asyncio .ensure_future (lchannel (reader , writer_remote , m (roption .direct ), DUMMY ))
86+ asyncio .ensure_future (lchannel (reader , writer_remote , m (roption .direct ), roption . connection_change ))
7087 except Exception as ex :
7188 if not isinstance (ex , asyncio .TimeoutError ) and not str (ex ).startswith ('Connection closed' ):
7289 verbose (f'{ str (ex ) or "Unsupported protocol" } from { remote_ip } ' )
7390 try : writer .close ()
7491 except Exception : pass
7592
76- async def datagram_handler (writer , data , addr , protos , urserver , block , cipher , verbose = DUMMY , ** kwargs ):
93+ async def datagram_handler (writer , data , addr , protos , urserver , block , cipher , salgorithm , verbose = DUMMY , ** kwargs ):
7794 try :
7895 remote_ip , remote_port , * _ = addr
7996 remote_text = f'{ remote_ip } :{ remote_port } '
@@ -86,8 +103,8 @@ async def datagram_handler(writer, data, addr, protos, urserver, block, cipher,
86103 elif block and block (host_name ):
87104 raise Exception ('BLOCK ' + host_name )
88105 else :
89- roption = next ( filter ( lambda o : not o . match or o . match ( host_name ), urserver ), None )
90- verbose (f'UDP { lproto .name } { remote_text } ' + roption .logtext (host_name , port ))
106+ roption = schedule ( urserver , salgorithm , host_name ) or ProxyURI . DIRECT
107+ verbose (f'UDP { lproto .name } { remote_text } { roption .logtext (host_name , port )} ' )
91108 data = roption .prepare_udp_connection (host_name , port , data )
92109 def reply (rdata ):
93110 writer .sendto (cipher .datagram .encrypt (rdata ) if cipher else rdata , addr )
@@ -124,6 +141,7 @@ def pattern_compile(filename):
124141class ProxyURI (object ):
125142 def __init__ (self , ** kw ):
126143 self .__dict__ .update (kw )
144+ self .total = 0
127145 self .udpmap = {}
128146 def logtext (self , host , port ):
129147 if self .direct :
@@ -132,6 +150,8 @@ def logtext(self, host, port):
132150 return f' ->{ (" ssl" if self .sslclient else "" )} { self .bind } '
133151 else :
134152 return f' -> { self .rproto .name + ("+ssl" if self .sslclient else "" )} { self .bind } ' + self .relay .logtext (host , port )
153+ def connection_change (self , delta ):
154+ self .total += delta
135155 async def open_udp_connection (self , host , port , data , addr , reply ):
136156 class Protocol (asyncio .DatagramProtocol ):
137157 def __init__ (prot , data ):
@@ -155,6 +175,9 @@ def connection_lost(prot, exc):
155175 if addr in self .udpmap :
156176 self .udpmap [addr ].new_data_arrived (data )
157177 else :
178+ if self .direct and host == 'tunnel' :
179+ raise Exception ('Unknown tunnel endpoint' )
180+ self .connection_change (1 )
158181 prot = Protocol (data )
159182 remote_addr = (host , port ) if self .direct else (self .host_name , self .port )
160183 await asyncio .get_event_loop ().create_datagram_endpoint (lambda : prot , remote_addr = remote_addr )
@@ -175,6 +198,8 @@ def datagram_received(self, data, addr):
175198 return loop .create_datagram_endpoint (Protocol , local_addr = (self .host_name , self .port ))
176199 def open_connection (self , host , port , local_addr , lbind ):
177200 if self .direct :
201+ if host == 'tunnel' :
202+ raise Exception ('Unknown tunnel endpoint' )
178203 local_addr = local_addr if lbind == 'in' else (lbind , 0 ) if lbind else None
179204 return asyncio .open_connection (host = host , port = port , local_addr = local_addr )
180205 elif self .unix :
@@ -295,6 +320,7 @@ def main():
295320 parser .add_argument ('-ur' , dest = 'urserver' , default = [], action = 'append' , type = ProxyURI .compile_relay , help = 'udp remote server uri (default: direct)' )
296321 parser .add_argument ('-b' , dest = 'block' , type = pattern_compile , help = 'block regex rules' )
297322 parser .add_argument ('-a' , dest = 'alived' , default = 0 , type = int , help = 'interval to check remote alive (default: no check)' )
323+ parser .add_argument ('-s' , dest = 'salgorithm' , default = 'fa' , choices = ('fa' , 'rr' , 'rc' , 'lc' ), help = 'scheduling algorithm (default: first_available)' )
298324 parser .add_argument ('-v' , dest = 'v' , action = 'count' , help = 'print verbose output' )
299325 parser .add_argument ('--ssl' , dest = 'sslfile' , help = 'certfile[,keyfile] if server listen in ssl mode' )
300326 parser .add_argument ('--pac' , help = 'http PAC path' )
@@ -308,10 +334,6 @@ def main():
308334 return
309335 if not args .listen and not args .ulisten :
310336 args .listen .append (ProxyURI .compile_relay ('http+socks4+socks5://:8080/' ))
311- if not args .rserver or args .rserver [- 1 ].match :
312- args .rserver .append (ProxyURI .DIRECT )
313- if not args .urserver or args .urserver [- 1 ].match :
314- args .urserver .append (ProxyURI .DIRECT )
315337 args .httpget = {}
316338 if args .pac :
317339 pactext = 'function FindProxyForURL(u,h){' + (f'var b=/^(:?{ args .block .__self__ .pattern } )$/i;if(b.test(h))return "";' if args .block else '' )
0 commit comments