X Tutup
import os, hashlib, hmac class BaseCipher(object): PYTHON = False CACHE = {} def __init__(self, key, ota=False, setup_key=True): if self.KEY_LENGTH > 0 and setup_key: self.key = self.CACHE.get(b'key'+key) if self.key is None: keybuf = [] while len(b''.join(keybuf)) < self.KEY_LENGTH: keybuf.append(hashlib.md5((keybuf[-1] if keybuf else b'') + key).digest()) self.key = self.CACHE[b'key'+key] = b''.join(keybuf)[:self.KEY_LENGTH] else: self.key = key self.ota = ota self.iv = None def setup_iv(self, iv=None): self.iv = os.urandom(self.IV_LENGTH) if iv is None else iv self.setup() return self def decrypt(self, s): return self.cipher.decrypt(s) def encrypt(self, s): return self.cipher.encrypt(s) @classmethod def name(cls): return cls.__name__.replace('_Cipher', '').replace('_', '-').lower() class AEADCipher(BaseCipher): PACKET_LIMIT = 16*1024-1 def setup_iv(self, iv=None): self.iv = os.urandom(self.IV_LENGTH) if iv is None else iv randkey = hmac.new(self.iv, self.key, hashlib.sha1).digest() blocks_needed = (self.KEY_LENGTH + len(randkey) - 1) // len(randkey) okm = bytearray() output_block = b'' for counter in range(blocks_needed): output_block = hmac.new(randkey, output_block + b'ss-subkey' + bytes([counter+1]), hashlib.sha1).digest() okm.extend(output_block) self.key = bytes(okm[:self.KEY_LENGTH]) self._nonce = 0 self._buffer = bytearray() self._declen = None self.setup() return self @property def nonce(self): ret = self._nonce.to_bytes(self.NONCE_LENGTH, 'little') self._nonce = (self._nonce+1) & ((1<= (3, 4) except Exception: cipher = None if cipher is None: cipher = MAP_PY.get(cipher_name) if cipher is None and cipher_name.endswith('-py'): cipher_name = cipher_name[:-3] cipher = MAP_PY.get(cipher_name) if cipher is None: return 'this cipher needs library: "pip3 install pycryptodome"', None cipher_name += ('-py' if cipher.PYTHON else '') def apply_cipher(reader, writer, pdecrypt, pdecrypt2, pencrypt, pencrypt2): reader_cipher, writer_cipher = cipher(key, ota=ota), cipher(key, ota=ota) reader_cipher._buffer = b'' def decrypt(s): s = pdecrypt2(s) if not reader_cipher.iv: s = reader_cipher._buffer + s if len(s) >= reader_cipher.IV_LENGTH: reader_cipher.setup_iv(s[:reader_cipher.IV_LENGTH]) return pdecrypt(reader_cipher.decrypt(s[reader_cipher.IV_LENGTH:])) else: reader_cipher._buffer = s return b'' else: return pdecrypt(reader_cipher.decrypt(s)) if hasattr(reader, 'decrypts'): reader.decrypts.append(decrypt) else: reader.decrypts = [decrypt] def feed_data(s, o=reader.feed_data, p=reader.decrypts): for decrypt in p: s = decrypt(s) if not s: return o(s) reader.feed_data = feed_data if reader._buffer: reader._buffer, buf = bytearray(), reader._buffer feed_data(buf) def write(s, o=writer.write): if not writer_cipher.iv: writer_cipher.setup_iv() o(pencrypt2(writer_cipher.iv)) if not s: return return o(pencrypt2(writer_cipher.encrypt(pencrypt(s)))) writer.write = write return reader_cipher, writer_cipher apply_cipher.cipher = cipher apply_cipher.key = key apply_cipher.name = cipher_name apply_cipher.ota = ota apply_cipher.plugins = [] apply_cipher.datagram = PacketCipher(cipher, key, cipher_name) return None, apply_cipher
X Tutup