test_capture_http.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293
  1. import threading
  2. from wsgiref.simple_server import make_server
  3. from io import BytesIO
  4. import time
  5. # must be imported before 'requests'
  6. from warcio.capture_http import capture_http
  7. from pytest import raises
  8. import requests
  9. import json
  10. import os
  11. import tempfile
  12. from warcio.archiveiterator import ArchiveIterator
  13. from warcio.utils import BUFF_SIZE
  14. from warcio.warcwriter import BufferWARCWriter, WARCWriter
  15. # ==================================================================
  16. class TestCaptureHttpBin(object):
  17. @classmethod
  18. def setup_class(cls):
  19. from httpbin import app as httpbin_app
  20. cls.temp_dir = tempfile.mkdtemp('warctest')
  21. server = make_server('localhost', 0, httpbin_app)
  22. addr, cls.port = server.socket.getsockname()
  23. def run():
  24. try:
  25. server.serve_forever()
  26. except Exception as e:
  27. print(e)
  28. thread = threading.Thread(target=run)
  29. thread.daemon = True
  30. thread.start()
  31. time.sleep(0.1)
  32. @classmethod
  33. def teardown_class(cls):
  34. os.rmdir(cls.temp_dir)
  35. def test_get_no_capture(self):
  36. url = 'http://localhost:{0}/get?foo=bar'.format(self.port)
  37. res = requests.get(url, headers={'Host': 'httpbin.org'})
  38. assert res.json()['args'] == {'foo': 'bar'}
  39. def test_get(self):
  40. url = 'http://localhost:{0}/get?foo=bar'.format(self.port)
  41. with capture_http() as warc_writer:
  42. res = requests.get(url, headers={'Host': 'httpbin.org'})
  43. assert res.json()['args'] == {'foo': 'bar'}
  44. ai = ArchiveIterator(warc_writer.get_stream())
  45. response = next(ai)
  46. assert response.rec_type == 'response'
  47. assert response.rec_headers['WARC-Target-URI'] == url
  48. assert response.rec_headers['WARC-IP-Address'] == '127.0.0.1'
  49. assert res.json() == json.loads(response.content_stream().read().decode('utf-8'))
  50. request = next(ai)
  51. assert request.rec_type == 'request'
  52. assert request.rec_headers['WARC-Target-URI'] == url
  53. assert request.rec_headers['WARC-IP-Address'] == '127.0.0.1'
  54. def test_get_cache_to_file(self):
  55. warc_writer = BufferWARCWriter(gzip=False)
  56. url = 'http://localhost:{0}/bytes/{1}'.format(self.port, BUFF_SIZE * 2)
  57. with capture_http(warc_writer):
  58. res = requests.get(url, headers={'Host': 'httpbin.org'})
  59. assert len(res.content) == BUFF_SIZE * 2
  60. ai = ArchiveIterator(warc_writer.get_stream())
  61. response = next(ai)
  62. assert response.rec_type == 'response'
  63. assert response.rec_headers['WARC-Target-URI'] == url
  64. assert response.rec_headers['WARC-IP-Address'] == '127.0.0.1'
  65. assert res.content == response.content_stream().read()
  66. request = next(ai)
  67. assert request.rec_type == 'request'
  68. assert request.rec_headers['WARC-Target-URI'] == url
  69. assert request.rec_headers['WARC-IP-Address'] == '127.0.0.1'
  70. def test_post_json(self):
  71. warc_writer = BufferWARCWriter(gzip=False)
  72. with capture_http(warc_writer):
  73. res = requests.post('http://localhost:{0}/post'.format(self.port),
  74. headers={'Host': 'httpbin.org'},
  75. json={'some': {'data': 'posted'}})
  76. assert res.json()['json'] == {'some': {'data': 'posted'}}
  77. # response
  78. ai = ArchiveIterator(warc_writer.get_stream())
  79. response = next(ai)
  80. assert response.rec_type == 'response'
  81. assert res.json() == json.loads(response.content_stream().read().decode('utf-8'))
  82. # request
  83. request = next(ai)
  84. assert request.rec_type == 'request'
  85. assert request.http_headers['Content-Type'] == 'application/json'
  86. data = request.content_stream().read().decode('utf-8')
  87. assert data == '{"some": {"data": "posted"}}'
  88. def test_post_stream(self):
  89. warc_writer = BufferWARCWriter(gzip=False)
  90. def nop_filter(request, response, recorder):
  91. assert request
  92. assert response
  93. return request, response
  94. postbuff = BytesIO(b'somedatatopost')
  95. url = 'http://localhost:{0}/post'.format(self.port)
  96. with capture_http(warc_writer, nop_filter):
  97. res = requests.post(url, data=postbuff)
  98. # response
  99. ai = ArchiveIterator(warc_writer.get_stream())
  100. response = next(ai)
  101. assert response.rec_type == 'response'
  102. assert response.rec_headers['WARC-Target-URI'] == url
  103. assert response.rec_headers['WARC-IP-Address'] == '127.0.0.1'
  104. assert res.json() == json.loads(response.content_stream().read().decode('utf-8'))
  105. # request
  106. request = next(ai)
  107. assert request.rec_type == 'request'
  108. assert request.rec_headers['WARC-Target-URI'] == url
  109. assert request.rec_headers['WARC-IP-Address'] == '127.0.0.1'
  110. data = request.content_stream().read().decode('utf-8')
  111. assert data == 'somedatatopost'
  112. def test_post_chunked(self):
  113. warc_writer = BufferWARCWriter(gzip=False)
  114. def nop_filter(request, response, recorder):
  115. assert request
  116. assert response
  117. return request, response
  118. def gen():
  119. return iter([b'some', b'data', b'to', b'post'])
  120. #url = 'http://localhost:{0}/post'.format(self.port)
  121. url = 'https://httpbin.org/post'
  122. with capture_http(warc_writer, nop_filter, record_ip=False):
  123. res = requests.post(url, data=gen(), headers={'Content-Type': 'application/json'})
  124. # response
  125. ai = ArchiveIterator(warc_writer.get_stream())
  126. response = next(ai)
  127. assert response.rec_type == 'response'
  128. assert response.rec_headers['WARC-Target-URI'] == url
  129. assert 'WARC-IP-Address' not in response.rec_headers
  130. assert res.json() == json.loads(response.content_stream().read().decode('utf-8'))
  131. # request
  132. request = next(ai)
  133. assert request.rec_type == 'request'
  134. assert request.rec_headers['WARC-Target-URI'] == url
  135. assert 'WARC-IP-Address' not in response.rec_headers
  136. data = request.content_stream().read().decode('utf-8')
  137. assert data == 'somedatatopost'
  138. def test_skip_filter(self):
  139. warc_writer = BufferWARCWriter(gzip=False)
  140. def skip_filter(request, response, recorder):
  141. assert request
  142. assert response
  143. return None, None
  144. with capture_http(warc_writer, skip_filter):
  145. res = requests.get('http://localhost:{0}/get?foo=bar'.format(self.port),
  146. headers={'Host': 'httpbin.org'})
  147. assert res.json()['args'] == {'foo': 'bar'}
  148. # skipped, nothing written
  149. assert warc_writer.get_contents() == b''
  150. def test_capture_to_temp_file_append(self):
  151. full_path = os.path.join(self.temp_dir, 'example.warc.gz')
  152. url = 'http://localhost:{0}/get?foo=bar'.format(self.port)
  153. with capture_http(full_path):
  154. res = requests.get(url)
  155. with capture_http(full_path):
  156. res = requests.get(url)
  157. with open(full_path, 'rb') as stream:
  158. # response
  159. ai = ArchiveIterator(stream)
  160. response = next(ai)
  161. assert response.rec_type == 'response'
  162. assert response.rec_headers['WARC-Target-URI'] == url
  163. # request
  164. request = next(ai)
  165. assert request.rec_type == 'request'
  166. assert request.rec_headers['WARC-Target-URI'] == url
  167. response = next(ai)
  168. assert response.rec_type == 'response'
  169. assert response.rec_headers['WARC-Target-URI'] == url
  170. # request
  171. request = next(ai)
  172. assert request.rec_type == 'request'
  173. assert request.rec_headers['WARC-Target-URI'] == url
  174. os.remove(full_path)
  175. def test_error_capture_to_temp_file_no_append_no_overwrite(self):
  176. full_path = os.path.join(self.temp_dir, 'example2.warc.gz')
  177. url = 'http://localhost:{0}/get?foo=bar'.format(self.port)
  178. with capture_http(full_path, append=False):
  179. res = requests.get(url)
  180. with raises(OSError):
  181. with capture_http(full_path, append=False):
  182. res = requests.get(url)
  183. os.remove(full_path)
  184. def test_warc_1_1(self):
  185. full_path = os.path.join(self.temp_dir, 'example3.warc')
  186. url = 'http://localhost:{0}/get?foo=bar'.format(self.port)
  187. with capture_http(full_path, append=False, warc_version='1.1', gzip=False):
  188. res = requests.get(url)
  189. with open(full_path, 'rb') as stream:
  190. # response
  191. ai = ArchiveIterator(stream)
  192. response = next(ai)
  193. assert response.rec_headers.protocol == 'WARC/1.1'
  194. warc_date = response.rec_headers['WARC-Date']
  195. # ISO 8601 date with fractional seconds (microseconds)
  196. assert '.' in warc_date
  197. assert len(warc_date) == 27
  198. os.remove(full_path)
  199. def test_remote(self):
  200. with capture_http(warc_version='1.1', gzip=True) as writer:
  201. requests.get('http://example.com/')
  202. requests.get('https://google.com/')
  203. expected = [('http://example.com/', 'response', True),
  204. ('http://example.com/', 'request', True),
  205. ('https://google.com/', 'response', True),
  206. ('https://google.com/', 'request', True),
  207. ('https://www.google.com/', 'response', True),
  208. ('https://www.google.com/', 'request', True)
  209. ]
  210. actual = [
  211. (record.rec_headers['WARC-Target-URI'],
  212. record.rec_type,
  213. 'WARC-IP-Address' in record.rec_headers)
  214. for record in ArchiveIterator(writer.get_stream())
  215. ]
  216. assert actual == expected