archiveiterator.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. from warcio.bufferedreaders import DecompressingBufferedReader
  2. from warcio.exceptions import ArchiveLoadFailed
  3. from warcio.recordloader import ArcWarcRecordLoader
  4. from warcio.utils import BUFF_SIZE
  5. import sys
  6. import six
  7. # ============================================================================
  8. class UnseekableYetTellable:
  9. def __init__(self, fh):
  10. self.fh = fh
  11. self.offset = 0
  12. def tell(self):
  13. return self.offset
  14. def read(self, size=-1):
  15. result = self.fh.read(size)
  16. self.offset += len(result)
  17. return result
  18. # ============================================================================
  19. class ArchiveIterator(six.Iterator):
  20. """ Iterate over records in WARC and ARC files, both gzip chunk
  21. compressed and uncompressed
  22. The indexer will automatically detect format, and decompress
  23. if necessary.
  24. """
  25. GZIP_ERR_MSG = """
  26. ERROR: non-chunked gzip file detected, gzip block continues
  27. beyond single record.
  28. This file is probably not a multi-member gzip but a single gzip file.
  29. To allow seek, a gzipped {1} must have each record compressed into
  30. a single gzip member and concatenated together.
  31. This file is likely still valid and can be fixed by running:
  32. warcio recompress <path/to/file> <path/to/new_file>
  33. """
  34. INC_RECORD = """\
  35. WARNING: Record not followed by newline, perhaps Content-Length is invalid
  36. Offset: {0}
  37. Remainder: {1}
  38. """
  39. def __init__(self, fileobj, no_record_parse=False,
  40. verify_http=False, arc2warc=False,
  41. ensure_http_headers=False, block_size=BUFF_SIZE,
  42. check_digests=False):
  43. self.fh = fileobj
  44. self.loader = ArcWarcRecordLoader(verify_http=verify_http,
  45. arc2warc=arc2warc)
  46. self.known_format = None
  47. self.mixed_arc_warc = arc2warc
  48. self.member_info = None
  49. self.no_record_parse = no_record_parse
  50. self.ensure_http_headers = ensure_http_headers
  51. try:
  52. self.offset = self.fh.tell()
  53. except:
  54. self.fh = UnseekableYetTellable(self.fh)
  55. self.offset = self.fh.tell()
  56. self.reader = DecompressingBufferedReader(self.fh,
  57. block_size=block_size)
  58. self.next_line = None
  59. self.check_digests = check_digests
  60. self.err_count = 0
  61. self.record = None
  62. self.the_iter = self._iterate_records()
  63. def __iter__(self):
  64. return self.the_iter
  65. def __next__(self):
  66. return six.next(self.the_iter)
  67. def close(self):
  68. self.record = None
  69. if self.reader:
  70. self.reader.close_decompressor()
  71. self.reader = None
  72. def _iterate_records(self):
  73. """ iterate over each record
  74. """
  75. raise_invalid_gzip = False
  76. empty_record = False
  77. while True:
  78. try:
  79. self.record = self._next_record(self.next_line)
  80. if raise_invalid_gzip:
  81. self._raise_invalid_gzip_err()
  82. yield self.record
  83. except EOFError:
  84. empty_record = True
  85. self.read_to_end()
  86. if self.reader.decompressor:
  87. # if another gzip member, continue
  88. if self.reader.read_next_member():
  89. continue
  90. # if empty record, then we're done
  91. elif empty_record:
  92. break
  93. # otherwise, probably a gzip
  94. # containing multiple non-chunked records
  95. # raise this as an error
  96. else:
  97. raise_invalid_gzip = True
  98. # non-gzip, so we're done
  99. elif empty_record:
  100. break
  101. self.close()
  102. def _raise_invalid_gzip_err(self):
  103. """ A gzip file with multiple ARC/WARC records, non-chunked
  104. has been detected. This is not valid for replay, so notify user
  105. """
  106. frmt = 'warc/arc'
  107. if self.known_format:
  108. frmt = self.known_format
  109. frmt_up = frmt.upper()
  110. msg = self.GZIP_ERR_MSG.format(frmt, frmt_up)
  111. raise ArchiveLoadFailed(msg)
  112. def _consume_blanklines(self):
  113. """ Consume blank lines that are between records
  114. - For warcs, there are usually 2
  115. - For arcs, may be 1 or 0
  116. - For block gzipped files, these are at end of each gzip envelope
  117. and are included in record length which is the full gzip envelope
  118. - For uncompressed, they are between records and so are NOT part of
  119. the record length
  120. count empty_size so that it can be substracted from
  121. the record length for uncompressed
  122. if first line read is not blank, likely error in WARC/ARC,
  123. display a warning
  124. """
  125. empty_size = 0
  126. first_line = True
  127. while True:
  128. line = self.reader.readline()
  129. if len(line) == 0:
  130. return None, empty_size
  131. stripped = line.rstrip()
  132. if len(stripped) == 0 or first_line:
  133. empty_size += len(line)
  134. if len(stripped) != 0:
  135. # if first line is not blank,
  136. # likely content-length was invalid, display warning
  137. err_offset = self.fh.tell() - self.reader.rem_length() - empty_size
  138. sys.stderr.write(self.INC_RECORD.format(err_offset, line))
  139. self.err_count += 1
  140. first_line = False
  141. continue
  142. return line, empty_size
  143. def read_to_end(self, record=None):
  144. """ Read remainder of the stream
  145. If a digester is included, update it
  146. with the data read
  147. """
  148. # no current record to read
  149. if not self.record:
  150. return None
  151. # already at end of this record, don't read until it is consumed
  152. if self.member_info:
  153. return None
  154. curr_offset = self.offset
  155. while True:
  156. b = self.record.raw_stream.read(BUFF_SIZE)
  157. if not b:
  158. break
  159. """
  160. - For compressed files, blank lines are consumed
  161. since they are part of record length
  162. - For uncompressed files, blank lines are read later,
  163. and not included in the record length
  164. """
  165. #if self.reader.decompressor:
  166. self.next_line, empty_size = self._consume_blanklines()
  167. self.offset = self.fh.tell() - self.reader.rem_length()
  168. #if self.offset < 0:
  169. # raise Exception('Not Gzipped Properly')
  170. if self.next_line:
  171. self.offset -= len(self.next_line)
  172. length = self.offset - curr_offset
  173. if not self.reader.decompressor:
  174. length -= empty_size
  175. self.member_info = (curr_offset, length)
  176. #return self.member_info
  177. #return next_line
  178. def get_record_offset(self):
  179. if not self.member_info:
  180. self.read_to_end()
  181. return self.member_info[0]
  182. def get_record_length(self):
  183. if not self.member_info:
  184. self.read_to_end()
  185. return self.member_info[1]
  186. def _next_record(self, next_line):
  187. """ Use loader to parse the record from the reader stream
  188. Supporting warc and arc records
  189. """
  190. record = self.loader.parse_record_stream(self.reader,
  191. next_line,
  192. self.known_format,
  193. self.no_record_parse,
  194. self.ensure_http_headers,
  195. self.check_digests)
  196. self.member_info = None
  197. # Track known format for faster parsing of other records
  198. if not self.mixed_arc_warc:
  199. self.known_format = record.format
  200. return record
  201. # ============================================================================
  202. class WARCIterator(ArchiveIterator):
  203. def __init__(self, *args, **kwargs):
  204. super(WARCIterator, self).__init__(*args, **kwargs)
  205. self.known_format = 'warc'
  206. # ============================================================================
  207. class ARCIterator(ArchiveIterator):
  208. def __init__(self, *args, **kwargs):
  209. super(ARCIterator, self).__init__(*args, **kwargs)
  210. self.known_format = 'arc'