test_bufferedreaders.py 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. r"""
  2. # DecompressingBufferedReader Tests
  3. #=================================================================
  4. # decompress with on the fly compression, default gzip compression
  5. >>> print_str(DecompressingBufferedReader(BytesIO(compress('ABC\n1234\n'))).read())
  6. 'ABC\n1234\n'
  7. # decompress with on the fly compression, default 'inflate' compression
  8. >>> print_str(DecompressingBufferedReader(BytesIO(compress_alt('ABC\n1234\n')), decomp_type='deflate').read())
  9. 'ABC\n1234\n'
  10. # error: invalid compress type
  11. >>> DecompressingBufferedReader(BytesIO(compress('ABC')), decomp_type = 'bzip2').read()
  12. Traceback (most recent call last):
  13. Exception: Decompression type not supported: bzip2
  14. # invalid output when reading compressed data as not compressed
  15. >>> DecompressingBufferedReader(BytesIO(compress('ABC')), decomp_type = None).read() != b'ABC'
  16. True
  17. # test very small block size
  18. >>> dbr = DecompressingBufferedReader(BytesIO(b'ABCDEFG\nHIJKLMN\nOPQR\nXYZ'), block_size = 3)
  19. >>> print_str(dbr.readline()); print_str(dbr.readline(4)); print_str(dbr.readline()); print_str(dbr.readline()); print_str(dbr.readline(2)); print_str(dbr.readline()); print_str(dbr.readline())
  20. 'ABCDEFG\n'
  21. 'HIJK'
  22. 'LMN\n'
  23. 'OPQR\n'
  24. 'XY'
  25. 'Z'
  26. ''
  27. # test zero length reads
  28. >>> x = DecompressingBufferedReader(LimitReader(BytesIO(b'\r\n'), 1))
  29. >>> print_str(x.readline(0)); print_str(x.read(0))
  30. ''
  31. ''
  32. # Chunk-Decoding Buffered Reader Tests
  33. #=================================================================
  34. Properly formatted chunked data:
  35. >>> c = ChunkedDataReader(BytesIO(b"4\r\n1234\r\n0\r\n\r\n"));
  36. >>> print_str(c.read() + c.read(1) + c.read() + c.read())
  37. '1234'
  38. Non-chunked data:
  39. >>> print_str(ChunkedDataReader(BytesIO(b"xyz123!@#")).read())
  40. 'xyz123!@#'
  41. Non-chunked data, numbers only:
  42. >>> print_str(ChunkedDataReader(BytesIO(b"ABCDE" * 10)).read())
  43. 'ABCDEABCDEABCDEABCDEABCDEABCDEABCDEABCDEABCDEABCDE'
  44. Non-chunked data, numbers new line, large:
  45. >>> print_str(ChunkedDataReader(BytesIO(b"ABCDE" * 10 + b'\r\n')).read())
  46. 'ABCDEABCDEABCDEABCDEABCDEABCDEABCDEABCDEABCDEABCDE\r\n'
  47. Non-chunked, compressed data, specify decomp_type
  48. >>> print_str(ChunkedDataReader(BytesIO(compress('ABCDEF')), decomp_type='gzip').read())
  49. 'ABCDEF'
  50. Non-chunked, compressed data, specifiy compression seperately
  51. >>> c = ChunkedDataReader(BytesIO(compress('ABCDEF'))); c.set_decomp('gzip'); print_str(c.read())
  52. 'ABCDEF'
  53. Non-chunked, compressed data, wrap in DecompressingBufferedReader
  54. >>> print_str(DecompressingBufferedReader(ChunkedDataReader(BytesIO(compress('\nABCDEF\nGHIJ')))).read())
  55. '\nABCDEF\nGHIJ'
  56. Chunked compressed data
  57. Split compressed stream into 10-byte chunk and a remainder chunk
  58. >>> b = compress('ABCDEFGHIJKLMNOP')
  59. >>> l = len(b)
  60. >>> in_ = format(10, 'x').encode('utf-8') + b"\r\n" + b[:10] + b"\r\n" + format(l - 10, 'x').encode('utf-8') + b"\r\n" + b[10:] + b"\r\n0\r\n\r\n"
  61. >>> c = ChunkedDataReader(BytesIO(in_), decomp_type='gzip')
  62. >>> print_str(c.read())
  63. 'ABCDEFGHIJKLMNOP'
  64. Starts like chunked data, but isn't:
  65. >>> c = ChunkedDataReader(BytesIO(b"1\r\nxyz123!@#"));
  66. >>> print_str(c.read() + c.read())
  67. '1\r\nx123!@#'
  68. Chunked data cut off part way through:
  69. >>> c = ChunkedDataReader(BytesIO(b"4\r\n1234\r\n4\r\n12"));
  70. >>> print_str(c.read() + c.read())
  71. '123412'
  72. Zero-Length chunk:
  73. >>> print_str(ChunkedDataReader(BytesIO(b"0\r\n\r\n")).read())
  74. ''
  75. """
  76. from io import BytesIO
  77. from warcio.bufferedreaders import ChunkedDataReader, ChunkedDataException
  78. from warcio.bufferedreaders import DecompressingBufferedReader
  79. from warcio.limitreader import LimitReader
  80. from contextlib import closing
  81. import six
  82. import zlib
  83. import pytest
  84. def compress(buff):
  85. buff = buff.encode('utf-8')
  86. compressobj = zlib.compressobj(6, zlib.DEFLATED, zlib.MAX_WBITS + 16)
  87. compressed = compressobj.compress(buff)
  88. compressed += compressobj.flush()
  89. return compressed
  90. # plain "inflate"
  91. def compress_alt(buff):
  92. buff = buff.encode('utf-8')
  93. compressobj = zlib.compressobj(6, zlib.DEFLATED)
  94. compressed = compressobj.compress(buff)
  95. compressed += compressobj.flush()
  96. # drop gzip headers/tail
  97. compressed = compressed[2:-4]
  98. return compressed
  99. # Brotli
  100. @pytest.mark.skipif('br' not in DecompressingBufferedReader.DECOMPRESSORS, reason='brotli not available')
  101. def test_brotli():
  102. brotli_buff = b'[\xff\xaf\x02\xc0"y\\\xfbZ\x8cB;\xf4%U\x19Z\x92\x99\xb15\xc8\x19\x9e\x9e\n{K\x90\xb9<\x98\xc8\t@\xf3\xe6\xd9M\xe4me\x1b\'\x87\x13_\xa6\xe90\x96{<\x15\xd8S\x1c'
  103. with closing(DecompressingBufferedReader(BytesIO(brotli_buff), decomp_type='br')) as x:
  104. assert x.read() == b'The quick brown fox jumps over the lazy dog' * 4096
  105. @pytest.mark.skipif('br' not in DecompressingBufferedReader.DECOMPRESSORS, reason='brotli not available')
  106. def test_brotli_very_small_chunk():
  107. brotli_buff = b'[\xff\xaf\x02\xc0"y\\\xfbZ\x8cB;\xf4%U\x19Z\x92\x99\xb15\xc8\x19\x9e\x9e\n{K\x90\xb9<\x98\xc8\t@\xf3\xe6\xd9M\xe4me\x1b\'\x87\x13_\xa6\xe90\x96{<\x15\xd8S\x1c'
  108. # read 3 bytes at time, will need to read() multiple types before decompressor has enough to return something
  109. with closing(DecompressingBufferedReader(BytesIO(brotli_buff), decomp_type='br', block_size=3)) as x:
  110. assert x.read() == b'The quick brown fox jumps over the lazy dog' * 4096
  111. # Compression
  112. def test_compress_mix():
  113. x = DecompressingBufferedReader(BytesIO(compress('ABC') + b'123'), decomp_type = 'gzip')
  114. b = x.read()
  115. assert b == b'ABC'
  116. x.read_next_member()
  117. assert x.read() == b'123'
  118. # Errors
  119. def test_compress_invalid():
  120. result = compress('ABCDEFG' * 1)
  121. # cut-off part of the block
  122. result = result[:-2] + b'xyz'
  123. x = DecompressingBufferedReader(BytesIO(result), block_size=16)
  124. b = x.read(3)
  125. assert b == b'ABC'
  126. assert b'DE' == x.read()
  127. def test_err_chunk_cut_off():
  128. # Chunked data cut off with exceptions
  129. c = ChunkedDataReader(BytesIO(b"4\r\n1234\r\n4\r\n12"), raise_exceptions=True)
  130. with pytest.raises(ChunkedDataException):
  131. c.read() + c.read()
  132. #ChunkedDataException: Ran out of data before end of chunk
  133. def print_str(string):
  134. return string.decode('utf-8') if six.PY3 else string
  135. if __name__ == "__main__":
  136. import doctest
  137. doctest.testmod()