warcwriter.py 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161
  1. import zlib
  2. from socket import gethostname
  3. from warcio.utils import Digester
  4. from warcio.recordbuilder import RecordBuilder
  5. from warcio.statusandheaders import StatusAndHeadersParser
  6. # ============================================================================
  7. class BaseWARCWriter(RecordBuilder):
  8. def __init__(self, gzip=True, *args, **kwargs):
  9. super(BaseWARCWriter, self).__init__(warc_version=kwargs.get('warc_version'),
  10. header_filter=kwargs.get('header_filter'))
  11. self.gzip = gzip
  12. self.hostname = gethostname()
  13. self.parser = StatusAndHeadersParser([], verify=False)
  14. def write_request_response_pair(self, req, resp, params=None):
  15. url = resp.rec_headers.get_header('WARC-Target-URI')
  16. dt = resp.rec_headers.get_header('WARC-Date')
  17. req.rec_headers.replace_header('WARC-Target-URI', url)
  18. req.rec_headers.replace_header('WARC-Date', dt)
  19. resp_id = resp.rec_headers.get_header('WARC-Record-ID')
  20. if resp_id:
  21. req.rec_headers.add_header('WARC-Concurrent-To', resp_id)
  22. self._do_write_req_resp(req, resp, params)
  23. def write_record(self, record, params=None): #pragma: no cover
  24. raise NotImplemented()
  25. def _do_write_req_resp(self, req, resp, params): #pragma: no cover
  26. raise NotImplemented()
  27. def _write_warc_record(self, out, record):
  28. if self.gzip:
  29. out = GzippingWrapper(out)
  30. if record.http_headers:
  31. record.http_headers.compute_headers_buffer(self.header_filter)
  32. # Content-Length is None/unknown
  33. # Fix record by: buffering and recomputing all digests and length
  34. # (since no length, can't trust existing digests)
  35. # Also remove content-type for consistent header ordering
  36. if record.length is None:
  37. record.rec_headers.remove_header('WARC-Block-Digest')
  38. if record.rec_type != 'revisit':
  39. record.rec_headers.remove_header('WARC-Payload-Digest')
  40. record.rec_headers.remove_header('Content-Type')
  41. self.ensure_digest(record, block=True, payload=True)
  42. record.length = record.payload_length
  43. # ensure digests are set
  44. else:
  45. self.ensure_digest(record, block=True, payload=True)
  46. if record.content_type != None:
  47. # ensure proper content type
  48. record.rec_headers.replace_header('Content-Type', record.content_type)
  49. if record.rec_type == 'revisit':
  50. http_headers_only = True
  51. else:
  52. http_headers_only = False
  53. # compute Content-Length
  54. if record.http_headers and record.payload_length >= 0:
  55. actual_len = 0
  56. if record.http_headers:
  57. actual_len = len(record.http_headers.headers_buff)
  58. if not http_headers_only:
  59. actual_len += record.payload_length
  60. record.length = actual_len
  61. record.rec_headers.replace_header('Content-Length', str(record.length))
  62. # write record headers -- encoded as utf-8
  63. # WARC headers can be utf-8 per spec
  64. out.write(record.rec_headers.to_bytes(encoding='utf-8'))
  65. # write headers buffer, if any
  66. if record.http_headers:
  67. out.write(record.http_headers.headers_buff)
  68. if not http_headers_only:
  69. try:
  70. for buf in self._iter_stream(record.raw_stream):
  71. out.write(buf)
  72. finally:
  73. if hasattr(record, '_orig_stream'):
  74. record.raw_stream.close()
  75. record.raw_stream = record._orig_stream
  76. # add two lines
  77. out.write(b'\r\n\r\n')
  78. out.flush()
  79. # ============================================================================
  80. class GzippingWrapper(object):
  81. def __init__(self, out):
  82. self.compressor = zlib.compressobj(9, zlib.DEFLATED, zlib.MAX_WBITS + 16)
  83. self.out = out
  84. def write(self, buff):
  85. #if isinstance(buff, str):
  86. # buff = buff.encode('utf-8')
  87. buff = self.compressor.compress(buff)
  88. self.out.write(buff)
  89. def flush(self):
  90. buff = self.compressor.flush()
  91. self.out.write(buff)
  92. self.out.flush()
  93. # ============================================================================
  94. class WARCWriter(BaseWARCWriter):
  95. def __init__(self, filebuf, *args, **kwargs):
  96. super(WARCWriter, self).__init__(*args, **kwargs)
  97. self.out = filebuf
  98. def write_record(self, record, params=None):
  99. self._write_warc_record(self.out, record)
  100. def _do_write_req_resp(self, req, resp, params):
  101. self._write_warc_record(self.out, resp)
  102. self._write_warc_record(self.out, req)
  103. # ============================================================================
  104. class BufferWARCWriter(WARCWriter):
  105. def __init__(self, *args, **kwargs):
  106. out = self._create_temp_file()
  107. super(BufferWARCWriter, self).__init__(out, *args, **kwargs)
  108. def get_contents(self):
  109. pos = self.out.tell()
  110. self.out.seek(0)
  111. buff = self.out.read()
  112. self.out.seek(pos)
  113. return buff
  114. def get_stream(self):
  115. self.out.seek(0)
  116. return self.out