recordbuilder.py 7.9 KB


  1. import datetime
  2. import six
  3. import tempfile
  4. from io import BytesIO
  5. from warcio.recordloader import ArcWarcRecord, ArcWarcRecordLoader
  6. from warcio.statusandheaders import StatusAndHeadersParser, StatusAndHeaders
  7. from warcio.timeutils import datetime_to_iso_date
  8. from warcio.utils import to_native_str, BUFF_SIZE, Digester
  9. #=================================================================
  10. class RecordBuilder(object):
  11. REVISIT_PROFILE = 'http://netpreserve.org/warc/1.0/revisit/identical-payload-digest'
  12. REVISIT_PROFILE_1_1 = 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest'
  13. WARC_1_0 = 'WARC/1.0'
  14. WARC_1_1 = 'WARC/1.1'
  15. # default warc version
  16. WARC_VERSION = WARC_1_0
  17. WARC_RECORDS = {'warcinfo': 'application/warc-fields',
  18. 'response': 'application/http; msgtype=response',
  19. 'revisit': 'application/http; msgtype=response',
  20. 'request': 'application/http; msgtype=request',
  21. 'metadata': 'application/warc-fields',
  22. }
  23. NO_PAYLOAD_DIGEST_TYPES = ('warcinfo', 'revisit')
  24. def __init__(self, warc_version=None, header_filter=None):
  25. self.warc_version = self._parse_warc_version(warc_version)
  26. self.header_filter = header_filter
  27. def create_warcinfo_record(self, filename, info):
  28. warc_headers = StatusAndHeaders('', [], protocol=self.warc_version)
  29. warc_headers.add_header('WARC-Type', 'warcinfo')
  30. warc_headers.add_header('WARC-Record-ID', self._make_warc_id())
  31. if filename:
  32. warc_headers.add_header('WARC-Filename', filename)
  33. warc_headers.add_header('WARC-Date', self.curr_warc_date())
  34. warcinfo = BytesIO()
  35. for name, value in six.iteritems(info):
  36. if not value:
  37. continue
  38. line = name + ': ' + str(value) + '\r\n'
  39. warcinfo.write(line.encode('utf-8'))
  40. length = warcinfo.tell()
  41. warcinfo.seek(0)
  42. return self.create_warc_record('', 'warcinfo',
  43. warc_headers=warc_headers,
  44. payload=warcinfo,
  45. length=length)
  46. def create_revisit_record(self, uri, digest, refers_to_uri, refers_to_date,
  47. http_headers=None, warc_headers_dict=None):
  48. assert digest, 'Digest can not be empty'
  49. if warc_headers_dict is None:
  50. warc_headers_dict = dict()
  51. record = self.create_warc_record(uri, 'revisit', http_headers=http_headers,
  52. warc_headers_dict=warc_headers_dict)
  53. revisit_profile = self.REVISIT_PROFILE_1_1 if self.warc_version == self.WARC_1_1 else self.REVISIT_PROFILE
  54. record.rec_headers.add_header('WARC-Profile', revisit_profile)
  55. record.rec_headers.add_header('WARC-Refers-To-Target-URI', refers_to_uri)
  56. record.rec_headers.add_header('WARC-Refers-To-Date', refers_to_date)
  57. record.rec_headers.add_header('WARC-Payload-Digest', digest)
  58. return record
  59. def create_warc_record(self, uri, record_type,
  60. payload=None,
  61. length=None,
  62. warc_content_type='',
  63. warc_headers_dict=None,
  64. warc_headers=None,
  65. http_headers=None):
  66. if warc_headers_dict is None:
  67. warc_headers_dict = dict()
  68. if payload and not http_headers:
  69. loader = ArcWarcRecordLoader()
  70. http_headers = loader.load_http_headers(record_type, uri, payload, length)
  71. if http_headers and length is not None:
  72. length -= payload.tell()
  73. if not payload:
  74. payload = BytesIO()
  75. length = 0
  76. if not warc_headers:
  77. warc_headers = self._init_warc_headers(uri, record_type, warc_headers_dict)
  78. # compute Content-Type
  79. if not warc_content_type:
  80. warc_content_type = warc_headers.get_header('Content-Type')
  81. if not warc_content_type:
  82. warc_content_type = self.WARC_RECORDS.get(record_type,
  83. 'application/warc-record')
  84. record = ArcWarcRecord('warc', record_type, warc_headers, payload,
  85. http_headers, warc_content_type, length)
  86. record.payload_length = length
  87. self.ensure_digest(record, block=False, payload=True)
  88. return record
  89. def _init_warc_headers(self, uri, record_type, warc_headers_dict):
  90. warc_headers = StatusAndHeaders('', list(warc_headers_dict.items()), protocol=self.warc_version)
  91. warc_headers.replace_header('WARC-Type', record_type)
  92. if not warc_headers.get_header('WARC-Record-ID'):
  93. warc_headers.add_header('WARC-Record-ID', self._make_warc_id())
  94. if uri:
  95. warc_headers.replace_header('WARC-Target-URI', uri)
  96. if not warc_headers.get_header('WARC-Date'):
  97. warc_headers.add_header('WARC-Date', self.curr_warc_date())
  98. return warc_headers
  99. def curr_warc_date(self):
  100. use_micros = (self.warc_version >= self.WARC_1_1)
  101. return self._make_warc_date(use_micros=use_micros)
  102. def _parse_warc_version(self, version):
  103. if not version:
  104. return self.WARC_VERSION
  105. version = str(version)
  106. if version.startswith('WARC/'):
  107. return version
  108. return 'WARC/' + version
  109. @classmethod
  110. def _make_warc_id(cls):
  111. return StatusAndHeadersParser.make_warc_id()
  112. @classmethod
  113. def _make_warc_date(cls, use_micros=False):
  114. return datetime_to_iso_date(datetime.datetime.utcnow(), use_micros=use_micros)
  115. def ensure_digest(self, record, block=True, payload=True):
  116. if block:
  117. if record.rec_headers.get_header('WARC-Block-Digest'):
  118. block = False
  119. if payload:
  120. if (record.rec_headers.get_header('WARC-Payload-Digest') or
  121. (record.rec_type in self.NO_PAYLOAD_DIGEST_TYPES)):
  122. payload = False
  123. block_digester = self._create_digester() if block else None
  124. payload_digester = self._create_digester() if payload else None
  125. has_length = (record.length is not None)
  126. if not block_digester and not payload_digester and has_length:
  127. return
  128. temp_file = None
  129. try:
  130. # force buffering if no length is set
  131. assert(has_length)
  132. pos = record.raw_stream.tell()
  133. record.raw_stream.seek(pos)
  134. except:
  135. pos = 0
  136. temp_file = self._create_temp_file()
  137. if block_digester and record.http_headers:
  138. if not record.http_headers.headers_buff:
  139. record.http_headers.compute_headers_buffer(self.header_filter)
  140. block_digester.update(record.http_headers.headers_buff)
  141. for buf in self._iter_stream(record.raw_stream):
  142. if block_digester:
  143. block_digester.update(buf)
  144. if payload_digester:
  145. payload_digester.update(buf)
  146. if temp_file:
  147. temp_file.write(buf)
  148. if temp_file:
  149. record.payload_length = temp_file.tell()
  150. temp_file.seek(0)
  151. record._orig_stream = record.raw_stream
  152. record.raw_stream = temp_file
  153. else:
  154. record.raw_stream.seek(pos)
  155. if payload_digester:
  156. record.rec_headers.add_header('WARC-Payload-Digest', str(payload_digester))
  157. if block_digester:
  158. record.rec_headers.add_header('WARC-Block-Digest', str(block_digester))
  159. @staticmethod
  160. def _iter_stream(stream):
  161. while True:
  162. buf = stream.read(BUFF_SIZE)
  163. if not buf:
  164. return
  165. yield buf
  166. @staticmethod
  167. def _create_digester():
  168. return Digester('sha1')
  169. @staticmethod
  170. def _create_temp_file():
  171. return tempfile.SpooledTemporaryFile(max_size=512*1024)