bufferedreaders.py 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381
  1. from io import BytesIO
  2. import zlib
  3. import sys
  4. from warcio.utils import BUFF_SIZE
  5. #=================================================================
  6. def gzip_decompressor():
  7. """
  8. Decompressor which can handle decompress gzip stream
  9. """
  10. return zlib.decompressobj(16 + zlib.MAX_WBITS)
  11. def deflate_decompressor():
  12. return zlib.decompressobj()
  13. def deflate_decompressor_alt():
  14. return zlib.decompressobj(-zlib.MAX_WBITS)
  15. #=================================================================
  16. def try_brotli_init():
  17. try:
  18. import brotli
  19. def brotli_decompressor():
  20. decomp = brotli.Decompressor()
  21. decomp.unused_data = None
  22. return decomp
  23. BufferedReader.DECOMPRESSORS['br'] = brotli_decompressor
  24. except ImportError: #pragma: no cover
  25. pass
  26. #=================================================================
  27. class BufferedReader(object):
  28. """
  29. A wrapping line reader which wraps an existing reader.
  30. Read operations operate on underlying buffer, which is filled to
  31. block_size (16384 default)
  32. If an optional decompress type is specified,
  33. data is fed through the decompressor when read from the buffer.
  34. Currently supported decompression: gzip
  35. If unspecified, default decompression is None
  36. If decompression is specified, and decompress fails on first try,
  37. data is assumed to not be compressed and no exception is thrown.
  38. If a failure occurs after data has been
  39. partially decompressed, the exception is propagated.
  40. """
  41. DECOMPRESSORS = {'gzip': gzip_decompressor,
  42. 'deflate': deflate_decompressor,
  43. 'deflate_alt': deflate_decompressor_alt
  44. }
  45. def __init__(self, stream, block_size=BUFF_SIZE,
  46. decomp_type=None,
  47. starting_data=None,
  48. read_all_members=False):
  49. self.stream = stream
  50. self.block_size = block_size
  51. self._init_decomp(decomp_type)
  52. self.buff = None
  53. self.starting_data = starting_data
  54. self.num_read = 0
  55. self.buff_size = 0
  56. self.read_all_members = read_all_members
  57. def set_decomp(self, decomp_type):
  58. self._init_decomp(decomp_type)
  59. def _init_decomp(self, decomp_type):
  60. self.num_block_read = 0
  61. if decomp_type:
  62. try:
  63. self.decomp_type = decomp_type
  64. self.decompressor = self.DECOMPRESSORS[decomp_type.lower()]()
  65. except KeyError:
  66. raise Exception('Decompression type not supported: ' +
  67. decomp_type)
  68. else:
  69. self.decomp_type = None
  70. self.decompressor = None
  71. def _fillbuff(self, block_size=None):
  72. if not self.empty():
  73. return
  74. # can't read past next member
  75. if self.rem_length() > 0:
  76. return
  77. block_size = block_size or self.block_size
  78. if self.starting_data:
  79. data = self.starting_data
  80. self.starting_data = None
  81. else:
  82. data = self.stream.read(block_size)
  83. self._process_read(data)
  84. # if raw data is not empty and decompressor set, but
  85. # decompressed buff is empty, keep reading --
  86. # decompressor likely needs more data to decompress
  87. while data and self.decompressor and not self.decompressor.unused_data and self.empty():
  88. data = self.stream.read(block_size)
  89. self._process_read(data)
  90. def _process_read(self, data):
  91. # don't process if no raw data read
  92. if not data:
  93. self.buff = None
  94. return
  95. data = self._decompress(data)
  96. self.buff_size = len(data)
  97. self.num_read += self.buff_size
  98. self.num_block_read += self.buff_size
  99. self.buff = BytesIO(data)
  100. def _decompress(self, data):
  101. if self.decompressor and data:
  102. try:
  103. data = self.decompressor.decompress(data)
  104. except Exception as e:
  105. # if first read attempt, assume non-gzipped stream
  106. if self.num_block_read == 0:
  107. if self.decomp_type == 'deflate':
  108. self._init_decomp('deflate_alt')
  109. data = self._decompress(data)
  110. else:
  111. self.decompressor = None
  112. # otherwise (partly decompressed), something is wrong
  113. else:
  114. sys.stderr.write(str(e) + '\n')
  115. return b''
  116. return data
  117. def read(self, length=None):
  118. """
  119. Fill bytes and read some number of bytes
  120. (up to length if specified)
  121. <= length bytes may be read if reached the end of input
  122. if at buffer boundary, will attempt to read again until
  123. specified length is read
  124. """
  125. all_buffs = []
  126. while length is None or length > 0:
  127. self._fillbuff()
  128. if self.empty():
  129. break
  130. buff = self.buff.read(length)
  131. all_buffs.append(buff)
  132. if length:
  133. length -= len(buff)
  134. return b''.join(all_buffs)
  135. def readline(self, length=None):
  136. """
  137. Fill buffer and read a full line from the buffer
  138. (up to specified length, if provided)
  139. If no newline found at end, try filling buffer again in case
  140. at buffer boundary.
  141. """
  142. if length == 0:
  143. return b''
  144. self._fillbuff()
  145. if self.empty():
  146. return b''
  147. linebuff = self.buff.readline(length)
  148. # we may be at a boundary
  149. while not linebuff.endswith(b'\n'):
  150. if length:
  151. length -= len(linebuff)
  152. if length <= 0:
  153. break
  154. self._fillbuff()
  155. if self.empty():
  156. break
  157. linebuff += self.buff.readline(length)
  158. return linebuff
  159. def tell(self):
  160. return self.num_read
  161. def empty(self):
  162. if not self.buff or self.buff.tell() >= self.buff_size:
  163. # if reading all members, attempt to get next member automatically
  164. if self.read_all_members:
  165. self.read_next_member()
  166. return True
  167. return False
  168. def read_next_member(self):
  169. if not self.decompressor or not self.decompressor.unused_data:
  170. return False
  171. self.starting_data = self.decompressor.unused_data
  172. self._init_decomp(self.decomp_type)
  173. return True
  174. def rem_length(self):
  175. rem = 0
  176. if self.buff:
  177. rem = self.buff_size - self.buff.tell()
  178. if self.decompressor and self.decompressor.unused_data:
  179. rem += len(self.decompressor.unused_data)
  180. return rem
  181. def close(self):
  182. if self.stream:
  183. self.stream.close()
  184. self.stream = None
  185. self.buff = None
  186. self.close_decompressor()
  187. def close_decompressor(self):
  188. if self.decompressor:
  189. self.decompressor.flush()
  190. self.decompressor = None
  191. @classmethod
  192. def get_supported_decompressors(cls):
  193. return cls.DECOMPRESSORS.keys()
  194. #=================================================================
  195. class DecompressingBufferedReader(BufferedReader):
  196. """
  197. A BufferedReader which defaults to gzip decompression,
  198. (unless different type specified)
  199. """
  200. def __init__(self, *args, **kwargs):
  201. if 'decomp_type' not in kwargs:
  202. kwargs['decomp_type'] = 'gzip'
  203. super(DecompressingBufferedReader, self).__init__(*args, **kwargs)
  204. #=================================================================
  205. class ChunkedDataException(Exception):
  206. def __init__(self, msg, data=b''):
  207. Exception.__init__(self, msg)
  208. self.data = data
  209. #=================================================================
  210. class ChunkedDataReader(BufferedReader):
  211. r"""
  212. A ChunkedDataReader is a DecompressingBufferedReader
  213. which also supports de-chunking of the data if it happens
  214. to be http 'chunk-encoded'.
  215. If at any point the chunked header is not available, the stream is
  216. assumed to not be chunked and no more dechunking occurs.
  217. """
  218. def __init__(self, stream, raise_exceptions=False, **kwargs):
  219. super(ChunkedDataReader, self).__init__(stream, **kwargs)
  220. self.all_chunks_read = False
  221. self.not_chunked = False
  222. # if False, we'll use best-guess fallback for parse errors
  223. self.raise_chunked_data_exceptions = raise_exceptions
  224. def _fillbuff(self, block_size=None):
  225. if self.not_chunked:
  226. return super(ChunkedDataReader, self)._fillbuff(block_size)
  227. # Loop over chunks until there is some data (not empty())
  228. # In particular, gzipped data may require multiple chunks to
  229. # return any decompressed result
  230. while (self.empty() and
  231. not self.all_chunks_read and
  232. not self.not_chunked):
  233. try:
  234. length_header = self.stream.readline(64)
  235. self._try_decode(length_header)
  236. except ChunkedDataException as e:
  237. if self.raise_chunked_data_exceptions:
  238. raise
  239. # Can't parse the data as chunked.
  240. # It's possible that non-chunked data is served
  241. # with a Transfer-Encoding: chunked.
  242. # Treat this as non-chunk encoded from here on.
  243. self._process_read(length_header + e.data)
  244. self.not_chunked = True
  245. # parse as block as non-chunked
  246. return super(ChunkedDataReader, self)._fillbuff(block_size)
  247. def _try_decode(self, length_header):
  248. # decode length header
  249. try:
  250. # ensure line ends with \r\n
  251. assert(length_header[-2:] == b'\r\n')
  252. chunk_size = length_header[:-2].split(b';')[0]
  253. chunk_size = int(chunk_size, 16)
  254. # sanity check chunk size
  255. assert(chunk_size <= 2**31)
  256. except (ValueError, AssertionError):
  257. raise ChunkedDataException(b"Couldn't decode length header " +
  258. length_header)
  259. if not chunk_size:
  260. # chunk_size 0 indicates end of file. read final bytes to compute digest.
  261. final_data = self.stream.read(2)
  262. try:
  263. assert(final_data == b'\r\n')
  264. except AssertionError:
  265. raise ChunkedDataException(b"Incorrect \r\n after length header of 0")
  266. self.all_chunks_read = True
  267. self._process_read(b'')
  268. return
  269. data_len = 0
  270. data = b''
  271. # read chunk
  272. while data_len < chunk_size:
  273. new_data = self.stream.read(chunk_size - data_len)
  274. # if we unexpectedly run out of data,
  275. # either raise an exception or just stop reading,
  276. # assuming file was cut off
  277. if not new_data:
  278. if self.raise_chunked_data_exceptions:
  279. msg = 'Ran out of data before end of chunk'
  280. raise ChunkedDataException(msg, data)
  281. else:
  282. chunk_size = data_len
  283. self.all_chunks_read = True
  284. data += new_data
  285. data_len = len(data)
  286. # if we successfully read a block without running out,
  287. # it should end in \r\n
  288. if not self.all_chunks_read:
  289. clrf = self.stream.read(2)
  290. if clrf != b'\r\n':
  291. raise ChunkedDataException(b"Chunk terminator not found.",
  292. data)
  293. # hand to base class for further processing
  294. self._process_read(data)
  295. #=================================================================
  296. try_brotli_init()