|
9 | 9 | import struct
|
10 | 10 | import sys
|
11 | 11 | import unittest
|
| 12 | +import zlib |
12 | 13 | from subprocess import PIPE, Popen
|
13 | 14 | from test.support import import_helper
|
14 | 15 | from test.support import os_helper
|
@@ -616,6 +617,54 @@ def test_issue44439(self):
|
616 | 617 | self.assertEqual(f.write(q), LENGTH)
|
617 | 618 | self.assertEqual(f.tell(), LENGTH)
|
618 | 619 |
|
| 620 | + def test_flush_flushes_compressor(self): |
| 621 | + # See issue GH-105808. |
| 622 | + b = io.BytesIO() |
| 623 | + message = b"important message here." |
| 624 | + with gzip.GzipFile(fileobj=b, mode='w') as f: |
| 625 | + f.write(message) |
| 626 | + f.flush() |
| 627 | + partial_data = b.getvalue() |
| 628 | + full_data = b.getvalue() |
| 629 | + self.assertEqual(gzip.decompress(full_data), message) |
| 630 | + # The partial data should contain the gzip header and the complete |
| 631 | + # message, but not the end-of-stream markers (so we can't just |
| 632 | + # decompress it directly). |
| 633 | + with self.assertRaises(EOFError): |
| 634 | + gzip.decompress(partial_data) |
| 635 | + d = zlib.decompressobj(wbits=-zlib.MAX_WBITS) |
| 636 | + f = io.BytesIO(partial_data) |
| 637 | + gzip._read_gzip_header(f) |
| 638 | + read_message = d.decompress(f.read()) |
| 639 | + self.assertEqual(read_message, message) |
| 640 | + |
| 641 | + def test_flush_modes(self): |
| 642 | + # Make sure the argument to flush is properly passed to the |
| 643 | + # zlib.compressobj; see issue GH-105808. |
| 644 | + class FakeCompressor: |
| 645 | + def __init__(self): |
| 646 | + self.modes = [] |
| 647 | + def compress(self, data): |
| 648 | + return b'' |
| 649 | + def flush(self, mode=-1): |
| 650 | + self.modes.append(mode) |
| 651 | + return b'' |
| 652 | + b = io.BytesIO() |
| 653 | + fc = FakeCompressor() |
| 654 | + with gzip.GzipFile(fileobj=b, mode='w') as f: |
| 655 | + f.compress = fc |
| 656 | + f.flush() |
| 657 | + f.flush(50) |
| 658 | + f.flush(zlib_mode=100) |
| 659 | + # The implicit close will also flush the compressor. |
| 660 | + expected_modes = [ |
| 661 | + zlib.Z_SYNC_FLUSH, |
| 662 | + 50, |
| 663 | + 100, |
| 664 | + -1, |
| 665 | + ] |
| 666 | + self.assertEqual(fc.modes, expected_modes) |
| 667 | + |
619 | 668 |
|
620 | 669 | class TestOpen(BaseTest):
|
621 | 670 | def test_binary_modes(self):
|
|
0 commit comments