import os, hashlib, hmac
class BaseCipher(object):
PYTHON = False
CACHE = {}
def __init__(self, key, ota=False, setup_key=True):
if self.KEY_LENGTH > 0 and setup_key:
self.key = self.CACHE.get(b'key'+key)
if self.key is None:
keybuf = []
while len(b''.join(keybuf)) < self.KEY_LENGTH:
keybuf.append(hashlib.md5((keybuf[-1] if keybuf else b'') + key).digest())
self.key = self.CACHE[b'key'+key] = b''.join(keybuf)[:self.KEY_LENGTH]
else:
self.key = key
self.ota = ota
self.iv = None
def setup_iv(self, iv=None):
self.iv = os.urandom(self.IV_LENGTH) if iv is None else iv
self.setup()
return self
def decrypt(self, s):
return self.cipher.decrypt(s)
def encrypt(self, s):
return self.cipher.encrypt(s)
@classmethod
def name(cls):
return cls.__name__.replace('_Cipher', '').replace('_', '-').lower()
class AEADCipher(BaseCipher):
PACKET_LIMIT = 16*1024-1
def setup_iv(self, iv=None):
self.iv = os.urandom(self.IV_LENGTH) if iv is None else iv
randkey = hmac.new(self.iv, self.key, hashlib.sha1).digest()
blocks_needed = (self.KEY_LENGTH + len(randkey) - 1) // len(randkey)
okm = bytearray()
output_block = b''
for counter in range(blocks_needed):
output_block = hmac.new(randkey, output_block + b'ss-subkey' + bytes([counter+1]), hashlib.sha1).digest()
okm.extend(output_block)
self.key = bytes(okm[:self.KEY_LENGTH])
self._nonce = 0
self._buffer = bytearray()
self._declen = None
self.setup()
@property
def nonce(self):
ret = self._nonce.to_bytes(self.NONCE_LENGTH, 'little')
self._nonce = (self._nonce+1) & ((1<= (3, 4)
except Exception:
cipher = None
if cipher is None:
cipher = MAP_PY.get(cipher_name)
if cipher is None and cipher_name.endswith('-py'):
cipher_name = cipher_name[:-3]
cipher = MAP_PY.get(cipher_name)
if cipher is None:
return 'this cipher needs library: "pip3 install pycryptodome"', None
cipher_name += ('-py' if cipher.PYTHON else '')
def apply_cipher(reader, writer, pdecrypt, pdecrypt2, pencrypt, pencrypt2):
reader_cipher, writer_cipher = cipher(key, ota=ota), cipher(key, ota=ota)
reader_cipher._buffer = b''
def decrypt(s):
s = pdecrypt2(s)
if not reader_cipher.iv:
s = reader_cipher._buffer + s
if len(s) >= reader_cipher.IV_LENGTH:
reader_cipher.setup_iv(s[:reader_cipher.IV_LENGTH])
return pdecrypt(reader_cipher.decrypt(s[reader_cipher.IV_LENGTH:]))
else:
reader_cipher._buffer = s
return b''
else:
return pdecrypt(reader_cipher.decrypt(s))
if hasattr(reader, 'decrypts'):
reader.decrypts.append(decrypt)
else:
reader.decrypts = [decrypt]
def feed_data(s, o=reader.feed_data, p=reader.decrypts):
for decrypt in p:
s = decrypt(s)
if not s:
return
o(s)
reader.feed_data = feed_data
if reader._buffer:
reader._buffer, buf = bytearray(), reader._buffer
feed_data(buf)
def write(s, o=writer.write):
if not writer_cipher.iv:
writer_cipher.setup_iv()
o(pencrypt2(writer_cipher.iv))
if not s:
return
return o(pencrypt2(writer_cipher.encrypt(pencrypt(s))))
writer.write = write
return reader_cipher, writer_cipher
apply_cipher.cipher = cipher
apply_cipher.key = key
apply_cipher.name = cipher_name
apply_cipher.ota = ota
apply_cipher.plugins = []
apply_cipher.datagram = PacketCipher(cipher, key, cipher_name)
return None, apply_cipher