recordbuilder.py 7.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230
  1. import datetime
  2. import six
  3. import tempfile
  4. from io import BytesIO
  5. from warcio.recordloader import ArcWarcRecord, ArcWarcRecordLoader
  6. from warcio.statusandheaders import StatusAndHeadersParser, StatusAndHeaders
  7. from warcio.timeutils import datetime_to_iso_date
  8. from warcio.utils import to_native_str, BUFF_SIZE, Digester
  9. #=================================================================
  10. class RecordBuilder(object):
  11. REVISIT_PROFILE = 'http://netpreserve.org/warc/1.0/revisit/identical-payload-digest'
  12. REVISIT_PROFILE_1_1 = 'http://netpreserve.org/warc/1.1/revisit/identical-payload-digest'
  13. WARC_1_0 = 'WARC/1.0'
  14. WARC_1_1 = 'WARC/1.1'
  15. # default warc version
  16. WARC_VERSION = WARC_1_0
  17. WARC_RECORDS = {'warcinfo': 'application/warc-fields',
  18. 'response': 'application/http; msgtype=response',
  19. 'revisit': 'application/http; msgtype=response',
  20. 'request': 'application/http; msgtype=request',
  21. 'metadata': 'application/warc-fields',
  22. }
  23. NO_PAYLOAD_DIGEST_TYPES = ('warcinfo', 'revisit')
  24. def __init__(self, warc_version=None, header_filter=None):
  25. self.warc_version = self._parse_warc_version(warc_version)
  26. self.header_filter = header_filter
  27. def create_warcinfo_record(self, filename, info):
  28. warc_headers = StatusAndHeaders('', [], protocol=self.warc_version)
  29. warc_headers.add_header('WARC-Type', 'warcinfo')
  30. warc_headers.add_header('WARC-Record-ID', self._make_warc_id())
  31. if filename:
  32. warc_headers.add_header('WARC-Filename', filename)
  33. warc_headers.add_header('WARC-Date', self.curr_warc_date())
  34. warcinfo = BytesIO()
  35. for name, value in six.iteritems(info):
  36. if not value:
  37. continue
  38. line = name + ': ' + str(value) + '\r\n'
  39. warcinfo.write(line.encode('utf-8'))
  40. length = warcinfo.tell()
  41. warcinfo.seek(0)
  42. return self.create_warc_record('', 'warcinfo',
  43. warc_headers=warc_headers,
  44. payload=warcinfo,
  45. length=length)
  46. def create_revisit_record(self, uri, digest, refers_to_uri, refers_to_date,
  47. http_headers=None, warc_headers_dict=None):
  48. assert digest, 'Digest can not be empty'
  49. if warc_headers_dict is None:
  50. warc_headers_dict = dict()
  51. record = self.create_warc_record(uri, 'revisit', http_headers=http_headers,
  52. warc_headers_dict=warc_headers_dict)
  53. revisit_profile = self.REVISIT_PROFILE_1_1 if self.warc_version == self.WARC_1_1 else self.REVISIT_PROFILE
  54. record.rec_headers.add_header('WARC-Profile', revisit_profile)
  55. record.rec_headers.add_header('WARC-Refers-To-Target-URI', refers_to_uri)
  56. record.rec_headers.add_header('WARC-Refers-To-Date', refers_to_date)
  57. record.rec_headers.add_header('WARC-Payload-Digest', digest)
  58. return record
  59. def create_warc_record(self, uri, record_type,
  60. payload=None,
  61. length=None,
  62. warc_content_type='',
  63. warc_headers_dict=None,
  64. warc_headers=None,
  65. http_headers=None):
  66. if warc_headers_dict is None:
  67. warc_headers_dict = dict()
  68. if payload and not http_headers:
  69. loader = ArcWarcRecordLoader()
  70. http_headers = loader.load_http_headers(record_type, uri, payload, length)
  71. if http_headers and length is not None:
  72. length -= payload.tell()
  73. if not payload:
  74. payload = BytesIO()
  75. length = 0
  76. if not warc_headers:
  77. warc_headers = self._init_warc_headers(uri, record_type, warc_headers_dict)
  78. # compute Content-Type
  79. if not warc_content_type:
  80. warc_content_type = warc_headers.get_header('Content-Type')
  81. if not warc_content_type:
  82. warc_content_type = self.WARC_RECORDS.get(record_type,
  83. 'application/warc-record')
  84. record = ArcWarcRecord('warc', record_type, warc_headers, payload,
  85. http_headers, warc_content_type, length)
  86. record.payload_length = length
  87. self.ensure_digest(record, block=False, payload=True)
  88. return record
  89. def _init_warc_headers(self, uri, record_type, warc_headers_dict):
  90. warc_headers = StatusAndHeaders('', list(warc_headers_dict.items()), protocol=self.warc_version)
  91. warc_headers.replace_header('WARC-Type', record_type)
  92. if not warc_headers.get_header('WARC-Record-ID'):
  93. warc_headers.add_header('WARC-Record-ID', self._make_warc_id())
  94. if uri:
  95. warc_headers.replace_header('WARC-Target-URI', uri)
  96. if not warc_headers.get_header('WARC-Date'):
  97. warc_headers.add_header('WARC-Date', self.curr_warc_date())
  98. return warc_headers
  99. def curr_warc_date(self):
  100. use_micros = (self.warc_version >= self.WARC_1_1)
  101. return self._make_warc_date(use_micros=use_micros)
  102. def _parse_warc_version(self, version):
  103. if not version:
  104. return self.WARC_VERSION
  105. version = str(version)
  106. if version.startswith('WARC/'):
  107. return version
  108. return 'WARC/' + version
  109. @classmethod
  110. def _make_warc_id(cls):
  111. return StatusAndHeadersParser.make_warc_id()
  112. @classmethod
  113. def _make_warc_date(cls, use_micros=False):
  114. return datetime_to_iso_date(datetime.datetime.utcnow(), use_micros=use_micros)
  115. def ensure_digest(self, record, block=True, payload=True):
  116. if block:
  117. if record.rec_headers.get_header('WARC-Block-Digest'):
  118. block = False
  119. if payload:
  120. if (record.rec_headers.get_header('WARC-Payload-Digest') or
  121. (record.rec_type in self.NO_PAYLOAD_DIGEST_TYPES)):
  122. payload = False
  123. block_digester = self._create_digester() if block else None
  124. payload_digester = self._create_digester() if payload else None
  125. has_length = (record.length is not None)
  126. if not block_digester and not payload_digester and has_length:
  127. return
  128. temp_file = None
  129. try:
  130. # force buffering if no length is set
  131. assert(has_length)
  132. pos = record.raw_stream.tell()
  133. record.raw_stream.seek(pos)
  134. except:
  135. pos = 0
  136. temp_file = self._create_temp_file()
  137. if block_digester and record.http_headers:
  138. if not record.http_headers.headers_buff:
  139. record.http_headers.compute_headers_buffer(self.header_filter)
  140. block_digester.update(record.http_headers.headers_buff)
  141. for buf in self._iter_stream(record.raw_stream):
  142. if block_digester:
  143. block_digester.update(buf)
  144. if payload_digester:
  145. payload_digester.update(buf)
  146. if temp_file:
  147. temp_file.write(buf)
  148. if temp_file:
  149. record.payload_length = temp_file.tell()
  150. temp_file.seek(0)
  151. record._orig_stream = record.raw_stream
  152. record.raw_stream = temp_file
  153. else:
  154. record.raw_stream.seek(pos)
  155. if payload_digester:
  156. record.rec_headers.add_header('WARC-Payload-Digest', str(payload_digester))
  157. if block_digester:
  158. record.rec_headers.add_header('WARC-Block-Digest', str(block_digester))
  159. @staticmethod
  160. def _iter_stream(stream):
  161. while True:
  162. buf = stream.read(BUFF_SIZE)
  163. if not buf:
  164. return
  165. yield buf
  166. @staticmethod
  167. def _create_digester():
  168. return Digester('sha1')
  169. @staticmethod
  170. def _create_temp_file():
  171. return tempfile.SpooledTemporaryFile(max_size=512*1024)