test_archiveiterator.py 15 KB


  1. from warcio.archiveiterator import ArchiveIterator, WARCIterator, ARCIterator
  2. from warcio.exceptions import ArchiveLoadFailed
  3. from warcio.bufferedreaders import DecompressingBufferedReader, BufferedReader
  4. from warcio.warcwriter import BufferWARCWriter
  5. import pytest
  6. from io import BytesIO
  7. import sys
  8. import os
  9. from . import get_test_file
  10. from contextlib import closing, contextmanager
  11. import subprocess
  12. #==============================================================================
  13. class TestArchiveIterator(object):
  14. def _load_archive(self, filename, offset=0, cls=ArchiveIterator,
  15. errs_expected=0, **kwargs):
  16. with open(get_test_file(filename), 'rb') as fh:
  17. fh.seek(offset)
  18. iter_ = cls(fh, **kwargs)
  19. rec_types = [record.rec_type for record in iter_ if record.digest_checker.passed is not False]
  20. assert iter_.err_count == errs_expected
  21. return rec_types
  22. def _load_archive_memory(self, stream, offset=0, cls=ArchiveIterator,
  23. errs_expected=0, full_read=False, **kwargs):
  24. stream.seek(offset)
  25. iter_ = cls(stream, **kwargs)
  26. if full_read:
  27. rec_types = [record.rec_type for record in iter_
  28. if (record.content_stream().read() or True) and record.digest_checker.passed is not False]
  29. else:
  30. rec_types = [record.rec_type for record in iter_ if record.digest_checker.passed is not False]
  31. assert iter_.err_count == errs_expected
  32. return rec_types
  33. def _read_first_response(self, filename):
  34. with self._find_first_by_type(filename, 'response') as record:
  35. if record:
  36. return record.content_stream().read()
  37. @contextmanager
  38. def _find_first_by_type(self, filename, match_type, **params):
  39. with open(get_test_file(filename), 'rb') as fh:
  40. with closing(ArchiveIterator(fh, **params)) as a:
  41. for record in a:
  42. if record.rec_type == match_type:
  43. yield record
  44. break
  45. def test_example_warc_gz(self):
  46. expected = ['warcinfo', 'warcinfo', 'response', 'request', 'revisit', 'request']
  47. assert self._load_archive('example.warc.gz') == expected
  48. def test_example_warc(self):
  49. expected = ['warcinfo', 'warcinfo', 'response', 'request', 'revisit', 'request']
  50. assert self._load_archive('example.warc') == expected
  51. def test_example_warc_2(self):
  52. expected = ['warcinfo', 'response', 'request']
  53. assert self._load_archive('example-iana.org-chunked.warc') == expected
  54. def test_iterator(self):
  55. """ Test iterator semantics on 3 record WARC
  56. """
  57. with open(get_test_file('example-iana.org-chunked.warc'), 'rb') as fh:
  58. with closing(ArchiveIterator(fh)) as a:
  59. for record in a:
  60. assert record.rec_type == 'warcinfo'
  61. assert a.get_record_offset() == 0
  62. assert record.digest_checker.passed is None
  63. assert len(record.digest_checker.problems) == 0
  64. break
  65. record = next(a)
  66. assert record.rec_type == 'response'
  67. assert a.get_record_offset() == 405
  68. assert record.digest_checker.passed is None
  69. assert len(record.digest_checker.problems) == 0
  70. for record in a:
  71. assert record.rec_type == 'request'
  72. assert a.get_record_offset() == 8379
  73. assert record.digest_checker.passed is None
  74. assert len(record.digest_checker.problems) == 0
  75. break
  76. with pytest.raises(StopIteration):
  77. record = next(a)
  78. assert a.record == None
  79. assert a.reader == None
  80. assert a.read_to_end() == None
  81. def test_unseekable(self):
  82. """ Test iterator on unseekable 3 record uncompressed WARC input
  83. """
  84. proc = subprocess.Popen(['cat', get_test_file('example-iana.org-chunked.warc')],
  85. stdout=subprocess.PIPE)
  86. def raise_tell(x):
  87. raise Exception()
  88. # on windows, this tell() exists but doesn't work correctly, so just override (in py3)
  89. # this is designed to emulated stdin, which does not have a tell(), as expected
  90. stdout = proc.stdout
  91. if os.name == 'nt' and hasattr(proc.stdout, 'tell'):
  92. if sys.version_info < (3, 0):
  93. stdout = BufferedReader(stdout)
  94. else:
  95. stdout.tell = raise_tell
  96. with closing(ArchiveIterator(stdout)) as a:
  97. for record in a:
  98. assert record.rec_type == 'warcinfo'
  99. assert a.get_record_offset() == 0
  100. break
  101. record = next(a)
  102. assert record.rec_type == 'response'
  103. assert a.get_record_offset() == 405
  104. for record in a:
  105. assert record.rec_type == 'request'
  106. assert a.get_record_offset() == 8379
  107. break
  108. with pytest.raises(StopIteration):
  109. record = next(a)
  110. assert a.record == None
  111. assert a.reader == None
  112. assert a.read_to_end() == None
  113. proc.stdout.close()
  114. proc.wait()
  115. def test_unseekable_gz(self):
  116. """ Test iterator on unseekable 3 record uncompressed gzipped WARC input
  117. """
  118. proc = subprocess.Popen(['cat', get_test_file('example-resource.warc.gz')],
  119. stdout=subprocess.PIPE)
  120. def raise_tell(x):
  121. raise Exception()
  122. # on windows, this tell() exists but doesn't work correctly, so just override (in py3)
  123. # this is designed to emulated stdin, which does not have a tell(), as expected
  124. stdout = proc.stdout
  125. if os.name == 'nt' and hasattr(proc.stdout, 'tell'):
  126. #can't override tell() in py2
  127. if sys.version_info < (3, 0):
  128. stdout = BufferedReader(stdout)
  129. else:
  130. stdout.tell = raise_tell
  131. with closing(ArchiveIterator(stdout)) as a:
  132. for record in a:
  133. assert record.rec_type == 'warcinfo'
  134. assert a.get_record_offset() == 0
  135. break
  136. record = next(a)
  137. assert record.rec_type == 'warcinfo'
  138. assert a.get_record_offset() == 361
  139. for record in a:
  140. assert record.rec_type == 'resource'
  141. assert a.get_record_offset() == 802
  142. break
  143. with pytest.raises(StopIteration):
  144. record = next(a)
  145. assert a.record == None
  146. assert a.reader == None
  147. assert a.read_to_end() == None
  148. proc.stdout.close()
  149. proc.wait()
  150. def test_example_warc_trunc(self):
  151. """ WARC file with content-length truncated on a response record
  152. Error output printed, but still read
  153. """
  154. expected = ['warcinfo', 'warcinfo', 'response', 'request']
  155. assert self._load_archive('example-trunc.warc', errs_expected=1) == expected
  156. assert self._load_archive('example-trunc.warc', errs_expected=1,
  157. check_digests=True) == expected
  158. with pytest.raises(ArchiveLoadFailed):
  159. assert self._load_archive('example-trunc.warc', errs_expected=1,
  160. check_digests='raise') == expected
  161. def test_example_arc_gz(self):
  162. expected = ['arc_header', 'response']
  163. assert self._load_archive('example.arc.gz') == expected
  164. def test_example_space_in_url_arc(self):
  165. expected = ['arc_header', 'response']
  166. assert self._load_archive('example-space-in-url.arc') == expected
  167. def test_example_arc(self):
  168. expected = ['arc_header', 'response']
  169. assert self._load_archive('example.arc') == expected
  170. def test_example_arc2warc(self):
  171. expected = ['warcinfo', 'response']
  172. assert self._load_archive('example.arc.gz', arc2warc=True) == expected
  173. def test_example_warc_resource(self):
  174. expected = ['warcinfo', 'warcinfo', 'resource']
  175. assert self._load_archive('example-resource.warc.gz') == expected
  176. def test_resource_no_http_headers(self):
  177. with self._find_first_by_type('example-resource.warc.gz', 'resource') as record:
  178. assert record.http_headers == None
  179. assert len(record.content_stream().read()) == int(record.rec_headers.get('Content-Length'))
  180. def test_resource_with_http_headers(self):
  181. with self._find_first_by_type('example-resource.warc.gz', 'resource',
  182. ensure_http_headers=True) as record:
  183. assert record.http_headers != None
  184. assert (record.http_headers.get_header('Content-Length') ==
  185. record.rec_headers.get_header('Content-Length'))
  186. expected = 'HTTP/1.0 200 OK\r\n\
  187. Content-Type: text/html; charset=utf-8\r\n\
  188. Content-Length: 1303\r\n'
  189. assert str(record.http_headers) == expected
  190. assert len(record.content_stream().read()) == int(record.rec_headers.get('Content-Length'))
  191. def test_read_content(self):
  192. assert 'Example Domain' in self._read_first_response('example.warc.gz').decode('utf-8')
  193. assert 'Example Domain' in self._read_first_response('example.warc').decode('utf-8')
  194. assert 'Example Domain' in self._read_first_response('example.arc.gz').decode('utf-8')
  195. assert 'Example Domain' in self._read_first_response('example.arc').decode('utf-8')
  196. def test_read_content_chunked(self):
  197. buff = self._read_first_response('example-iana.org-chunked.warc').decode('utf-8')
  198. assert buff.startswith('<!doctype html>')
  199. assert 'Internet Assigned Numbers Authority' in buff
  200. def test_bad_warc(self):
  201. with pytest.raises(ArchiveLoadFailed):
  202. self._load_archive('example-bad.warc.gz.bad')
  203. def test_bad_offset_warc(self):
  204. with pytest.raises(ArchiveLoadFailed):
  205. self._load_archive('example.warc.gz', offset=10)
  206. def test_bad_arc_invalid_lengths(self):
  207. expected = ['arc_header', 'response', 'response', 'response']
  208. assert self._load_archive('bad.arc') == expected
  209. def test_err_non_chunked_gzip(self):
  210. with pytest.raises(ArchiveLoadFailed):
  211. self._load_archive('example-bad-non-chunked.warc.gz')
  212. def test_err_warc_iterator_on_arc(self):
  213. expected = ['arc_header', 'response']
  214. with pytest.raises(ArchiveLoadFailed):
  215. self._load_archive('example.arc.gz', cls=WARCIterator)
  216. def test_err_arc_iterator_on_warc(self):
  217. expected = ['arc_header', 'response']
  218. with pytest.raises(ArchiveLoadFailed):
  219. self._load_archive('example.warc.gz', cls=ARCIterator)
  220. def test_corrects_wget_bug(self):
  221. with self._find_first_by_type('example-wget-bad-target-uri.warc.gz', 'response') as record:
  222. assert record.rec_headers.get('WARC-Target-URI') == 'http://example.com/'
  223. def test_corrects_space_in_target_uri(self):
  224. with self._find_first_by_type('example-space-in-target-uri.warc.gz', 'resource') as record:
  225. assert record.rec_headers.get('WARC-Target-URI') == 'file:///example%20with%20spaces.png'
  226. def _digests_mutilate_helper(self, contents, expected_t, expected_f, capsys, full_read=False):
  227. with pytest.raises(ArchiveLoadFailed):
  228. assert self._load_archive_memory(BytesIO(contents), check_digests='raise', full_read=full_read) == expected_t
  229. capsys.readouterr()
  230. assert self._load_archive_memory(BytesIO(contents), check_digests='log', full_read=full_read) == expected_t
  231. out, err = capsys.readouterr()
  232. assert err
  233. assert self._load_archive_memory(BytesIO(contents), check_digests=True, full_read=full_read) == expected_t
  234. out, err = capsys.readouterr()
  235. assert not err
  236. assert self._load_archive_memory(BytesIO(contents), check_digests=False, full_read=full_read) == expected_f
  237. out, err = capsys.readouterr()
  238. assert not err
  239. def test_digests_mutilate(self, capsys):
  240. expected_f = ['warcinfo', 'warcinfo', 'response', 'request', 'revisit', 'request']
  241. expected_t = ['warcinfo', 'warcinfo', 'request', 'revisit', 'request']
  242. with open(get_test_file('example.warc'), 'rb') as fh:
  243. contents = fh.read()
  244. contents_sha = contents.replace(b'WARC-Block-Digest: sha1:', b'WARC-Block-Digest: xxx:', 1)
  245. assert contents != contents_sha, 'a replace happened'
  246. self._digests_mutilate_helper(contents_sha, expected_t, expected_f, capsys)
  247. contents_sha = contents.replace(b'WARC-Payload-Digest: sha1:', b'WARC-Payload-Digest: xxx:', 1)
  248. assert contents != contents_sha, 'a replace happened'
  249. self._digests_mutilate_helper(contents_sha, expected_t, expected_f, capsys)
  250. contents_block = contents
  251. thing = b'WARC-Block-Digest: sha1:'
  252. index = contents_block.find(thing)
  253. index += len(thing)
  254. b = contents_block[index:index+3]
  255. contents_block = contents_block.replace(thing+b, thing+b'111')
  256. assert contents != contents_block, 'a replace happened'
  257. '''
  258. If we don't read the stream, the digest check will not happen & all recs will be seen
  259. '''
  260. self._digests_mutilate_helper(contents_block, expected_f, expected_f, capsys)
  261. self._digests_mutilate_helper(contents_block, expected_t, expected_f, capsys, full_read=True)
  262. contents_payload = contents
  263. thing = b'WARC-Payload-Digest: sha1:'
  264. index = contents_payload.find(thing)
  265. index += len(thing)
  266. b = contents_payload[index:index+3]
  267. contents_payload = contents_payload.replace(thing+b, thing+b'111')
  268. assert contents != contents_payload, 'a replace happened'
  269. self._digests_mutilate_helper(contents_payload, expected_f, expected_f, capsys)
  270. self._digests_mutilate_helper(contents_payload, expected_t, expected_f, capsys, full_read=True)
  271. def test_digests_file(self):
  272. expected_f = ['request', 'request', 'request', 'request']
  273. expected_t = ['request', 'request', 'request']
  274. # record 1: invalid payload digest
  275. assert self._load_archive('example-digest.warc', check_digests=True) == expected_t
  276. assert self._load_archive('example-digest.warc', check_digests=False) == expected_f
  277. # record 2: b64 digest; record 3: b64 filename safe digest
  278. assert self._load_archive('example-digest.warc', offset=922, check_digests=True) == expected_t
  279. assert self._load_archive('example-digest.warc', offset=922, check_digests=False) == expected_t