X Tutup
Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 18 additions & 8 deletions Lib/test/test_bz2.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,18 +66,28 @@ class BaseTest(unittest.TestCase):
EMPTY_DATA = b'BZh9\x17rE8P\x90\x00\x00\x00\x00'
BAD_DATA = b'this is not a valid bzip2 file'

# Some tests need more than one block of uncompressed data. Since one block
# is at least 100,000 bytes, we gather some data dynamically and compress it.
# Note that this assumes that compression works correctly, so we cannot
# simply use the bigger test data for all tests.
# Some tests need more than one block of data. The bz2 module does not
# support flushing a block during compression, so we must read in data until
# there are at least 2 blocks. Since different orderings of Python files may
# be compressed differently, we need to check the compression output for
# more than one bzip2 block header magic, a hex encoding of Pi
# (0x314159265359)
bz2_block_magic = bytes.fromhex('314159265359')
test_size = 0
BIG_TEXT = bytearray(128*1024)
BIG_TEXT = b''
BIG_DATA = b''
compressor = BZ2Compressor(1)
for fname in glob.glob(os.path.join(glob.escape(os.path.dirname(__file__)), '*.py')):
with open(fname, 'rb') as fh:
test_size += fh.readinto(memoryview(BIG_TEXT)[test_size:])
if test_size > 128*1024:
data = fh.read()
BIG_DATA += compressor.compress(data)
BIG_TEXT += data
# TODO(emmatyping): if it is impossible for a block header to cross
# multiple outputs, we can just search the output of each compress call
# which should be more efficient
if BIG_DATA.count(bz2_block_magic) > 1:
BIG_DATA += compressor.flush()
break
BIG_DATA = bz2.compress(BIG_TEXT, compresslevel=1)

def setUp(self):
fd, self.filename = tempfile.mkstemp()
Expand Down
Loading
X Tutup