X Tutup
import datetime, zlib, os, binascii, hmac, hashlib, time, random, collections packstr = lambda s, n=2: len(s).to_bytes(n, 'big') + s toint = lambda s, o='big': int.from_bytes(s, o) class BasePlugin(object): async def init_client_data(self, reader, writer, cipher): pass async def init_server_data(self, reader, writer, cipher, raddr): pass def add_cipher(self, cipher): pass @classmethod def name(cls): return cls.__name__.replace('_Plugin', '').replace('__', '.').lower() class Plain_Plugin(BasePlugin): pass class Origin_Plugin(BasePlugin): pass class Http_Simple_Plugin(BasePlugin): async def init_client_data(self, reader, writer, cipher): buf = await reader.read_until(b'\r\n\r\n') data = buf.split(b' ')[:2] data = bytes.fromhex(data[1][1:].replace(b'%',b'').decode()) reader._buffer[0:0] = data writer.write(b'HTTP/1.1 200 OK\r\nConnection: keep-alive\r\nContent-Encoding: gzip\r\nContent-Type: text/html\r\nDate: ' + datetime.datetime.now().strftime('%a, %d %b %Y %H:%M:%S GMT').encode() + b'\r\nServer: nginx\r\nVary: Accept-Encoding\r\n\r\n') async def init_server_data(self, reader, writer, cipher, raddr): writer.write(f'GET / HTTP/1.1\r\nHost: {raddr}\r\nUser-Agent: curl\r\nAccept-Encoding: gzip, deflate\r\nConnection: keep-alive\r\n\r\n'.encode()) await reader.read_until(b'\r\n\r\n') TIMESTAMP_TOLERANCE = 5 * 60 class Tls1__2_Ticket_Auth_Plugin(BasePlugin): CACHE = collections.deque(maxlen = 100) async def init_client_data(self, reader, writer, cipher): key = cipher.cipher(cipher.key).key assert await reader.read_n(3) == b'\x16\x03\x01' header = await reader.read_n(toint(await reader.read_n(2))) assert header[:2] == b'\x01\x00' assert header[4:6] == b'\x03\x03' cacheid = header[6:28] sessionid = header[39:39+header[38]] assert cacheid not in self.CACHE self.CACHE.append(cacheid) utc_time = int(time.time()) assert hmac.new(key+sessionid, cacheid, hashlib.sha1).digest()[:10] == header[28:38] assert abs(toint(header[6:10]) - utc_time) < TIMESTAMP_TOLERANCE addhmac = lambda s: s + hmac.new(key+sessionid, s, hashlib.sha1).digest()[:10] writer.write(addhmac((b"\x16\x03\x03" + packstr(b"\x02\x00" + packstr(b'\x03\x03' + addhmac(utc_time.to_bytes(4, 'big') + os.urandom(18)) + b'\x20' + sessionid + b'\xc0\x2f\x00\x00\x05\xff\x01\x00\x01\x00')) + (b"\x16\x03\x03" + packstr(b"\x04\x00" + packstr(os.urandom(random.randrange(164)*2+64))) if random.randint(0, 8) < 1 else b'') + b"\x14\x03\x03\x00\x01\x01\x16\x03\x03" + packstr(os.urandom(random.choice((32, 40)))))[:-10])) async def init_server_data(self, reader, writer, cipher, raddr): key = cipher.cipher(cipher.key).key sessionid = os.urandom(32) addhmac = lambda s: s + hmac.new(key+sessionid, s, hashlib.sha1).digest()[:10] writer.write(b"\x16\x03\x01" + packstr(b"\x01\x00" + packstr(b'\x03\x03' + addhmac(int(time.time()).to_bytes(4, 'big') + os.urandom(18)) + b"\x20" + sessionid + b"\x00\x1c\xc0\x2b\xc0\x2f\xcc\xa9\xcc\xa8\xcc\x14\xcc\x13\xc0\x0a\xc0\x14\xc0\x09\xc0\x13\x00\x9c\x00\x35\x00\x2f\x00\x0a\x01\x00" + packstr(b"\xff\x01\x00\x01\x00\x00\x00" + packstr(packstr(b"\x00" + packstr(raddr.encode()))) + b"\x00\x17\x00\x00\x00\x23" + packstr(os.urandom((random.randrange(17)+8)*16)) + b"\x00\x0d\x00\x16\x00\x14\x06\x01\x06\x03\x05\x01\x05\x03\x04\x01\x04\x03\x03\x01\x03\x03\x02\x01\x02\x03\x00\x05\x00\x05\x01\x00\x00\x00\x00\x00\x12\x00\x00\x75\x50\x00\x00\x00\x0b\x00\x02\x01\x00\x00\x0a\x00\x06\x00\x04\x00\x17\x00\x18")))) writer.write(addhmac(b'\x14\x03\x03\x00\x01\x01\x16\x03\x03\x00\x20' + os.urandom(22))) def add_cipher(self, cipher): self.buf = bytearray() def decrypt(s): self.buf.extend(s) ret = b'' while len(self.buf) >= 5: l = int.from_bytes(self.buf[3:5], 'big') if len(self.buf) < l: break if self.buf[:3] in (b'\x16\x03\x03', b'\x14\x03\x03'): del self.buf[:5+l] continue assert self.buf[:3] == b'\x17\x03\x03' data = self.buf[5:5+l] ret += data del self.buf[:5+l] return ret def pack(s): return b'\x17\x03\x03' + packstr(s) def encrypt(s): ret = b'' while len(s) > 2048: size = min(random.randrange(4096)+100, len(s)) ret += pack(s[:size]) s = s[size:] if s: ret += pack(s) return ret cipher.pdecrypt2 = decrypt cipher.pencrypt2 = encrypt class Verify_Simple_Plugin(BasePlugin): def add_cipher(self, cipher): self.buf = bytearray() def decrypt(s): self.buf.extend(s) ret = b'' while len(self.buf) >= 2: l = int.from_bytes(self.buf[:2], 'big') if len(self.buf) < l: break data = self.buf[2+self.buf[2]:l-4] crc = (-1 - binascii.crc32(self.buf[:l-4])) & 0xffffffff assert int.from_bytes(self.buf[l-4:l], 'little') == crc ret += data del self.buf[:l] return ret def pack(s): rnd_data = os.urandom(os.urandom(1)[0] % 16) data = bytes([len(rnd_data)+1]) + rnd_data + s data = (len(data)+6).to_bytes(2, 'big') + data crc = (-1 - binascii.crc32(data)) & 0xffffffff return data + crc.to_bytes(4, 'little') def encrypt(s): ret = b'' while len(s) > 8100: ret += pack(s[:8100]) s = s[8100:] if s: ret += pack(s) return ret cipher.pdecrypt = decrypt cipher.pencrypt = encrypt class Verify_Deflate_Plugin(BasePlugin): def add_cipher(self, cipher): self.buf = bytearray() def decrypt(s): self.buf.extend(s) ret = b'' while len(self.buf) >= 2: l = int.from_bytes(self.buf[:2], 'big') if len(self.buf) < l: break ret += zlib.decompress(b'x\x9c' + self.buf[2:l]) del self.buf[:l] return ret def pack(s): packed = zlib.compress(s) return len(packed).to_bytes(2, 'big') + packed[2:] def encrypt(s): ret = b'' while len(s) > 32700: ret += pack(s[:32700]) s = s[32700:] if s: ret += pack(s) return ret cipher.pdecrypt = decrypt cipher.pencrypt = encrypt PLUGIN = {cls.name(): cls for name, cls in globals().items() if name.endswith('_Plugin')} def get_plugin(plugin_name): if plugin_name not in PLUGIN: return f'existing plugins: {sorted(PLUGIN.keys())}', None return None, PLUGIN[plugin_name]()
X Tutup