123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381 |
- from io import BytesIO
- import zlib
- import sys
- from warcio.utils import BUFF_SIZE
- #=================================================================
- def gzip_decompressor():
- """
- Decompressor which can handle decompress gzip stream
- """
- return zlib.decompressobj(16 + zlib.MAX_WBITS)
- def deflate_decompressor():
- return zlib.decompressobj()
- def deflate_decompressor_alt():
- return zlib.decompressobj(-zlib.MAX_WBITS)
- #=================================================================
- def try_brotli_init():
- try:
- import brotli
- def brotli_decompressor():
- decomp = brotli.Decompressor()
- decomp.unused_data = None
- return decomp
- BufferedReader.DECOMPRESSORS['br'] = brotli_decompressor
- except ImportError: #pragma: no cover
- pass
- #=================================================================
- class BufferedReader(object):
- """
- A wrapping line reader which wraps an existing reader.
- Read operations operate on underlying buffer, which is filled to
- block_size (16384 default)
- If an optional decompress type is specified,
- data is fed through the decompressor when read from the buffer.
- Currently supported decompression: gzip
- If unspecified, default decompression is None
- If decompression is specified, and decompress fails on first try,
- data is assumed to not be compressed and no exception is thrown.
- If a failure occurs after data has been
- partially decompressed, the exception is propagated.
- """
- DECOMPRESSORS = {'gzip': gzip_decompressor,
- 'deflate': deflate_decompressor,
- 'deflate_alt': deflate_decompressor_alt
- }
- def __init__(self, stream, block_size=BUFF_SIZE,
- decomp_type=None,
- starting_data=None,
- read_all_members=False):
- self.stream = stream
- self.block_size = block_size
- self._init_decomp(decomp_type)
- self.buff = None
- self.starting_data = starting_data
- self.num_read = 0
- self.buff_size = 0
- self.read_all_members = read_all_members
- def set_decomp(self, decomp_type):
- self._init_decomp(decomp_type)
- def _init_decomp(self, decomp_type):
- self.num_block_read = 0
- if decomp_type:
- try:
- self.decomp_type = decomp_type
- self.decompressor = self.DECOMPRESSORS[decomp_type.lower()]()
- except KeyError:
- raise Exception('Decompression type not supported: ' +
- decomp_type)
- else:
- self.decomp_type = None
- self.decompressor = None
- def _fillbuff(self, block_size=None):
- if not self.empty():
- return
- # can't read past next member
- if self.rem_length() > 0:
- return
- block_size = block_size or self.block_size
- if self.starting_data:
- data = self.starting_data
- self.starting_data = None
- else:
- data = self.stream.read(block_size)
- self._process_read(data)
- # if raw data is not empty and decompressor set, but
- # decompressed buff is empty, keep reading --
- # decompressor likely needs more data to decompress
- while data and self.decompressor and not self.decompressor.unused_data and self.empty():
- data = self.stream.read(block_size)
- self._process_read(data)
- def _process_read(self, data):
- # don't process if no raw data read
- if not data:
- self.buff = None
- return
- data = self._decompress(data)
- self.buff_size = len(data)
- self.num_read += self.buff_size
- self.num_block_read += self.buff_size
- self.buff = BytesIO(data)
- def _decompress(self, data):
- if self.decompressor and data:
- try:
- data = self.decompressor.decompress(data)
- except Exception as e:
- # if first read attempt, assume non-gzipped stream
- if self.num_block_read == 0:
- if self.decomp_type == 'deflate':
- self._init_decomp('deflate_alt')
- data = self._decompress(data)
- else:
- self.decompressor = None
- # otherwise (partly decompressed), something is wrong
- else:
- sys.stderr.write(str(e) + '\n')
- return b''
- return data
- def read(self, length=None):
- """
- Fill bytes and read some number of bytes
- (up to length if specified)
- <= length bytes may be read if reached the end of input
- if at buffer boundary, will attempt to read again until
- specified length is read
- """
- all_buffs = []
- while length is None or length > 0:
- self._fillbuff()
- if self.empty():
- break
- buff = self.buff.read(length)
- all_buffs.append(buff)
- if length:
- length -= len(buff)
- return b''.join(all_buffs)
- def readline(self, length=None):
- """
- Fill buffer and read a full line from the buffer
- (up to specified length, if provided)
- If no newline found at end, try filling buffer again in case
- at buffer boundary.
- """
- if length == 0:
- return b''
- self._fillbuff()
- if self.empty():
- return b''
- linebuff = self.buff.readline(length)
- # we may be at a boundary
- while not linebuff.endswith(b'\n'):
- if length:
- length -= len(linebuff)
- if length <= 0:
- break
- self._fillbuff()
- if self.empty():
- break
- linebuff += self.buff.readline(length)
- return linebuff
- def tell(self):
- return self.num_read
- def empty(self):
- if not self.buff or self.buff.tell() >= self.buff_size:
- # if reading all members, attempt to get next member automatically
- if self.read_all_members:
- self.read_next_member()
- return True
- return False
- def read_next_member(self):
- if not self.decompressor or not self.decompressor.unused_data:
- return False
- self.starting_data = self.decompressor.unused_data
- self._init_decomp(self.decomp_type)
- return True
- def rem_length(self):
- rem = 0
- if self.buff:
- rem = self.buff_size - self.buff.tell()
- if self.decompressor and self.decompressor.unused_data:
- rem += len(self.decompressor.unused_data)
- return rem
- def close(self):
- if self.stream:
- self.stream.close()
- self.stream = None
- self.buff = None
- self.close_decompressor()
- def close_decompressor(self):
- if self.decompressor:
- self.decompressor.flush()
- self.decompressor = None
- @classmethod
- def get_supported_decompressors(cls):
- return cls.DECOMPRESSORS.keys()
- #=================================================================
- class DecompressingBufferedReader(BufferedReader):
- """
- A BufferedReader which defaults to gzip decompression,
- (unless different type specified)
- """
- def __init__(self, *args, **kwargs):
- if 'decomp_type' not in kwargs:
- kwargs['decomp_type'] = 'gzip'
- super(DecompressingBufferedReader, self).__init__(*args, **kwargs)
- #=================================================================
- class ChunkedDataException(Exception):
- def __init__(self, msg, data=b''):
- Exception.__init__(self, msg)
- self.data = data
- #=================================================================
- class ChunkedDataReader(BufferedReader):
- r"""
- A ChunkedDataReader is a DecompressingBufferedReader
- which also supports de-chunking of the data if it happens
- to be http 'chunk-encoded'.
- If at any point the chunked header is not available, the stream is
- assumed to not be chunked and no more dechunking occurs.
- """
- def __init__(self, stream, raise_exceptions=False, **kwargs):
- super(ChunkedDataReader, self).__init__(stream, **kwargs)
- self.all_chunks_read = False
- self.not_chunked = False
- # if False, we'll use best-guess fallback for parse errors
- self.raise_chunked_data_exceptions = raise_exceptions
- def _fillbuff(self, block_size=None):
- if self.not_chunked:
- return super(ChunkedDataReader, self)._fillbuff(block_size)
- # Loop over chunks until there is some data (not empty())
- # In particular, gzipped data may require multiple chunks to
- # return any decompressed result
- while (self.empty() and
- not self.all_chunks_read and
- not self.not_chunked):
- try:
- length_header = self.stream.readline(64)
- self._try_decode(length_header)
- except ChunkedDataException as e:
- if self.raise_chunked_data_exceptions:
- raise
- # Can't parse the data as chunked.
- # It's possible that non-chunked data is served
- # with a Transfer-Encoding: chunked.
- # Treat this as non-chunk encoded from here on.
- self._process_read(length_header + e.data)
- self.not_chunked = True
- # parse as block as non-chunked
- return super(ChunkedDataReader, self)._fillbuff(block_size)
- def _try_decode(self, length_header):
- # decode length header
- try:
- # ensure line ends with \r\n
- assert(length_header[-2:] == b'\r\n')
- chunk_size = length_header[:-2].split(b';')[0]
- chunk_size = int(chunk_size, 16)
- # sanity check chunk size
- assert(chunk_size <= 2**31)
- except (ValueError, AssertionError):
- raise ChunkedDataException(b"Couldn't decode length header " +
- length_header)
- if not chunk_size:
- # chunk_size 0 indicates end of file. read final bytes to compute digest.
- final_data = self.stream.read(2)
- try:
- assert(final_data == b'\r\n')
- except AssertionError:
- raise ChunkedDataException(b"Incorrect \r\n after length header of 0")
- self.all_chunks_read = True
- self._process_read(b'')
- return
- data_len = 0
- data = b''
- # read chunk
- while data_len < chunk_size:
- new_data = self.stream.read(chunk_size - data_len)
- # if we unexpectedly run out of data,
- # either raise an exception or just stop reading,
- # assuming file was cut off
- if not new_data:
- if self.raise_chunked_data_exceptions:
- msg = 'Ran out of data before end of chunk'
- raise ChunkedDataException(msg, data)
- else:
- chunk_size = data_len
- self.all_chunks_read = True
- data += new_data
- data_len = len(data)
- # if we successfully read a block without running out,
- # it should end in \r\n
- if not self.all_chunks_read:
- clrf = self.stream.read(2)
- if clrf != b'\r\n':
- raise ChunkedDataException(b"Chunk terminator not found.",
- data)
- # hand to base class for further processing
- self._process_read(data)
- #=================================================================
- try_brotli_init()
|