recordloader.py 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375
  1. from warcio.statusandheaders import StatusAndHeaders
  2. from warcio.statusandheaders import StatusAndHeadersParser
  3. from warcio.statusandheaders import StatusAndHeadersParserException
  4. from warcio.exceptions import ArchiveLoadFailed
  5. from warcio.limitreader import LimitReader
  6. from warcio.digestverifyingreader import DigestVerifyingReader, DigestChecker
  7. from warcio.bufferedreaders import BufferedReader, ChunkedDataReader
  8. from warcio.timeutils import timestamp_to_iso_date
  9. from six.moves import zip
  10. import logging
  11. logger = logging.getLogger(__name__)
  12. #=================================================================
  13. class ArcWarcRecord(object):
  14. def __init__(self, *args, **kwargs):
  15. (self.format, self.rec_type, self.rec_headers, self.raw_stream,
  16. self.http_headers, self.content_type, self.length) = args
  17. self.payload_length = kwargs.get('payload_length', -1)
  18. self.digest_checker = kwargs.get('digest_checker')
  19. def content_stream(self):
  20. if not self.http_headers:
  21. return self.raw_stream
  22. encoding = self.http_headers.get_header('content-encoding')
  23. if encoding:
  24. encoding = encoding.lower()
  25. if encoding not in BufferedReader.get_supported_decompressors():
  26. encoding = None
  27. if self.http_headers.get_header('transfer-encoding') == 'chunked':
  28. return ChunkedDataReader(self.raw_stream, decomp_type=encoding)
  29. elif encoding:
  30. return BufferedReader(self.raw_stream, decomp_type=encoding)
  31. else:
  32. return self.raw_stream
  33. #=================================================================
  34. class ArcWarcRecordLoader(object):
  35. WARC_TYPES = ['WARC/1.1', 'WARC/1.0', 'WARC/0.17', 'WARC/0.18']
  36. HTTP_TYPES = ['HTTP/1.0', 'HTTP/1.1']
  37. HTTP_VERBS = ['GET', 'HEAD', 'POST', 'PUT', 'DELETE', 'TRACE',
  38. 'OPTIONS', 'CONNECT', 'PATCH']
  39. HTTP_RECORDS = ('response', 'request', 'revisit')
  40. NON_HTTP_SCHEMES = ('dns:', 'whois:', 'ntp:')
  41. HTTP_SCHEMES = ('http:', 'https:')
  42. def __init__(self, verify_http=True, arc2warc=True):
  43. if arc2warc:
  44. self.arc_parser = ARC2WARCHeadersParser()
  45. else:
  46. self.arc_parser = ARCHeadersParser()
  47. self.warc_parser = StatusAndHeadersParser(self.WARC_TYPES)
  48. self.http_parser = StatusAndHeadersParser(self.HTTP_TYPES, verify_http)
  49. self.http_req_parser = StatusAndHeadersParser(self.HTTP_VERBS, verify_http)
  50. def parse_record_stream(self, stream,
  51. statusline=None,
  52. known_format=None,
  53. no_record_parse=False,
  54. ensure_http_headers=False,
  55. check_digests=False):
  56. """ Parse file-like stream and return an ArcWarcRecord
  57. encapsulating the record headers, http headers (if any),
  58. and a stream limited to the remainder of the record.
  59. Pass statusline and known_format to detect_type_loader_headers()
  60. to faciliate parsing.
  61. """
  62. (the_format, rec_headers) = (self.
  63. _detect_type_load_headers(stream,
  64. statusline,
  65. known_format))
  66. if the_format == 'arc':
  67. uri = rec_headers.get_header('uri')
  68. length = rec_headers.get_header('length')
  69. content_type = rec_headers.get_header('content-type')
  70. sub_len = rec_headers.total_len
  71. if uri and uri.startswith('filedesc://'):
  72. rec_type = 'arc_header'
  73. else:
  74. rec_type = 'response'
  75. elif the_format in ('warc', 'arc2warc'):
  76. rec_type = rec_headers.get_header('WARC-Type')
  77. uri = self._ensure_target_uri_format(rec_headers)
  78. length = rec_headers.get_header('Content-Length')
  79. content_type = rec_headers.get_header('Content-Type')
  80. if the_format == 'warc':
  81. sub_len = 0
  82. else:
  83. sub_len = rec_headers.total_len
  84. the_format = 'warc'
  85. is_err = False
  86. try:
  87. if length is not None:
  88. length = int(length) - sub_len
  89. if length < 0:
  90. is_err = True
  91. except (ValueError, TypeError):
  92. is_err = True
  93. # err condition
  94. if is_err:
  95. length = 0
  96. is_verifying = False
  97. digest_checker = DigestChecker(check_digests)
  98. # limit stream to the length for all valid records
  99. if length is not None and length >= 0:
  100. stream = LimitReader.wrap_stream(stream, length)
  101. if check_digests:
  102. stream, is_verifying = self.wrap_digest_verifying_stream(stream, rec_type,
  103. rec_headers, digest_checker,
  104. length=length)
  105. http_headers = None
  106. payload_length = -1
  107. # load http headers if parsing
  108. if not no_record_parse:
  109. start = stream.tell()
  110. http_headers = self.load_http_headers(rec_type, uri, stream, length)
  111. if length and http_headers:
  112. payload_length = length - (stream.tell() - start)
  113. # generate validate http headers (eg. for replay)
  114. if not http_headers and ensure_http_headers:
  115. http_headers = self.default_http_headers(length, content_type)
  116. if is_verifying:
  117. stream.begin_payload()
  118. return ArcWarcRecord(the_format, rec_type,
  119. rec_headers, stream, http_headers,
  120. content_type, length, payload_length=payload_length, digest_checker=digest_checker)
  121. def wrap_digest_verifying_stream(self, stream, rec_type, rec_headers, digest_checker, length=None):
  122. payload_digest = rec_headers.get_header('WARC-Payload-Digest')
  123. block_digest = rec_headers.get_header('WARC-Block-Digest')
  124. segment_number = rec_headers.get_header('WARC-Segment-Number')
  125. if not payload_digest and not block_digest:
  126. return stream, False
  127. stream = DigestVerifyingReader(stream, length, digest_checker,
  128. record_type=rec_type,
  129. payload_digest=payload_digest,
  130. block_digest=block_digest,
  131. segment_number=segment_number)
  132. return stream, True
  133. def load_http_headers(self, rec_type, uri, stream, length):
  134. # only if length == 0 don't parse
  135. # try parsing is length is unknown (length is None) or length > 0
  136. if length == 0:
  137. return None
  138. # only certain record types can have http headers
  139. if rec_type not in self.HTTP_RECORDS:
  140. return None
  141. # only http:/https: uris can have http headers
  142. if not uri.startswith(self.HTTP_SCHEMES):
  143. return None
  144. # request record: parse request
  145. if rec_type == 'request':
  146. return self.http_req_parser.parse(stream)
  147. elif rec_type == 'revisit':
  148. try:
  149. return self.http_parser.parse(stream)
  150. except EOFError:
  151. # empty revisit with no http headers, is ok!
  152. return None
  153. # response record or non-empty revisit: parse HTTP status and headers!
  154. else:
  155. return self.http_parser.parse(stream)
  156. def default_http_headers(self, length, content_type=None):
  157. headers = []
  158. if content_type:
  159. headers.append(('Content-Type', content_type))
  160. if length is not None and length >= 0:
  161. headers.append(('Content-Length', str(length)))
  162. return StatusAndHeaders('200 OK', headers=headers, protocol='HTTP/1.0')
  163. def _detect_type_load_headers(self, stream,
  164. statusline=None, known_format=None):
  165. """ If known_format is specified ('warc' or 'arc'),
  166. parse only as that format.
  167. Otherwise, try parsing record as WARC, then try parsing as ARC.
  168. if neither one succeeds, we're out of luck.
  169. """
  170. if known_format != 'arc':
  171. # try as warc first
  172. try:
  173. rec_headers = self.warc_parser.parse(stream, statusline)
  174. return 'warc', rec_headers
  175. except StatusAndHeadersParserException as se:
  176. if known_format == 'warc':
  177. msg = 'Invalid WARC record, first line: '
  178. raise ArchiveLoadFailed(msg + str(se.statusline))
  179. statusline = se.statusline
  180. pass
  181. # now try as arc
  182. try:
  183. rec_headers = self.arc_parser.parse(stream, statusline)
  184. return self.arc_parser.get_rec_type(), rec_headers
  185. except StatusAndHeadersParserException as se:
  186. if known_format == 'arc':
  187. msg = 'Invalid ARC record, first line: '
  188. else:
  189. msg = 'Unknown archive format, first line: '
  190. raise ArchiveLoadFailed(msg + str(se.statusline))
  191. def _ensure_target_uri_format(self, rec_headers):
  192. """Checks the value for the WARC-Target-URI header field to see if it starts
  193. with '<' and ends with '>' (Wget 1.19 bug) and if '<' and '>' are present,
  194. corrects and updates the field returning the corrected value for the field
  195. otherwise just returns the fields value. Also checks for the presence of
  196. spaces and percent-encodes them if present, for more reliable parsing
  197. downstream.
  198. :param StatusAndHeaders rec_headers: The parsed WARC headers
  199. :return: The value for the WARC-Target-URI field
  200. :rtype: str | None
  201. """
  202. uri = rec_headers.get_header('WARC-Target-URI')
  203. if uri is not None and uri.startswith('<') and uri.endswith('>'):
  204. uri = uri[1:-1]
  205. rec_headers.replace_header('WARC-Target-URI', uri)
  206. if uri is not None and " " in uri:
  207. logger.warning("Replacing spaces in invalid WARC-Target-URI: {}".format(uri))
  208. uri = uri.replace(" ", "%20")
  209. rec_headers.replace_header('WARC-Target-URI', uri)
  210. return uri
  211. #=================================================================
  212. class ARCHeadersParser(object):
  213. # ARC 1.0 headers
  214. ARC_HEADERS = ["uri", "ip-address", "archive-date",
  215. "content-type", "length"]
  216. def __init__(self):
  217. self.headernames = self.get_header_names()
  218. def get_rec_type(self):
  219. return 'arc'
  220. def parse(self, stream, headerline=None):
  221. total_read = 0
  222. if headerline is None:
  223. headerline = stream.readline()
  224. headerline = StatusAndHeadersParser.decode_header(headerline)
  225. header_len = len(headerline)
  226. if header_len == 0:
  227. raise EOFError()
  228. headerline = headerline.rstrip()
  229. headernames = self.headernames
  230. # if arc header, consume next two lines
  231. if headerline.startswith('filedesc://'):
  232. version = StatusAndHeadersParser.decode_header(stream.readline()) # skip version
  233. spec = StatusAndHeadersParser.decode_header(stream.readline()) # skip header spec, use preset one
  234. total_read += len(version)
  235. total_read += len(spec)
  236. parts = headerline.rsplit(' ', len(headernames)-1)
  237. if len(parts) != len(headernames):
  238. msg = 'Wrong # of headers, expected arc headers {0}, Found {1}'
  239. msg = msg.format(headernames, parts)
  240. raise StatusAndHeadersParserException(msg, parts)
  241. protocol, headers = self._get_protocol_and_headers(headerline, parts)
  242. return StatusAndHeaders(statusline='',
  243. headers=headers,
  244. protocol='WARC/1.0',
  245. total_len=total_read)
  246. @classmethod
  247. def get_header_names(cls):
  248. return cls.ARC_HEADERS
  249. def _get_protocol_and_headers(self, headerline, parts):
  250. headers = []
  251. for name, value in zip(self.headernames, parts):
  252. headers.append((name, value))
  253. return ('ARC/1.0', headers)
  254. #=================================================================
  255. class ARC2WARCHeadersParser(ARCHeadersParser):
  256. # Headers for converting ARC -> WARC Header
  257. ARC_TO_WARC_HEADERS = ["WARC-Target-URI",
  258. "WARC-IP-Address",
  259. "WARC-Date",
  260. "Content-Type",
  261. "Content-Length"]
  262. def get_rec_type(self):
  263. return 'arc2warc'
  264. @classmethod
  265. def get_header_names(cls):
  266. return cls.ARC_TO_WARC_HEADERS
  267. def _get_protocol_and_headers(self, headerline, parts):
  268. headers = []
  269. if headerline.startswith('filedesc://'):
  270. rec_type = 'warcinfo'
  271. else:
  272. rec_type = 'response'
  273. parts[3] = 'application/http;msgtype=response'
  274. headers.append(('WARC-Type', rec_type))
  275. headers.append(('WARC-Record-ID', StatusAndHeadersParser.make_warc_id()))
  276. for name, value in zip(self.headernames, parts):
  277. if name == 'WARC-Date':
  278. value = timestamp_to_iso_date(value)
  279. if rec_type == 'warcinfo' and name == 'WARC-Target-URI':
  280. name = 'WARC-Filename'
  281. value = value[len('filedesc://'):]
  282. headers.append((name, value))
  283. return ('WARC/1.0', headers)