123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293 |
- import threading
- from wsgiref.simple_server import make_server
- from io import BytesIO
- import time
- # must be imported before 'requests'
- from warcio.capture_http import capture_http
- from pytest import raises
- import requests
- import json
- import os
- import tempfile
- from warcio.archiveiterator import ArchiveIterator
- from warcio.utils import BUFF_SIZE
- from warcio.warcwriter import BufferWARCWriter, WARCWriter
- # ==================================================================
- class TestCaptureHttpBin(object):
- @classmethod
- def setup_class(cls):
- from httpbin import app as httpbin_app
- cls.temp_dir = tempfile.mkdtemp('warctest')
- server = make_server('localhost', 0, httpbin_app)
- addr, cls.port = server.socket.getsockname()
- def run():
- try:
- server.serve_forever()
- except Exception as e:
- print(e)
- thread = threading.Thread(target=run)
- thread.daemon = True
- thread.start()
- time.sleep(0.1)
- @classmethod
- def teardown_class(cls):
- os.rmdir(cls.temp_dir)
- def test_get_no_capture(self):
- url = 'http://localhost:{0}/get?foo=bar'.format(self.port)
- res = requests.get(url, headers={'Host': 'httpbin.org'})
- assert res.json()['args'] == {'foo': 'bar'}
- def test_get(self):
- url = 'http://localhost:{0}/get?foo=bar'.format(self.port)
- with capture_http() as warc_writer:
- res = requests.get(url, headers={'Host': 'httpbin.org'})
- assert res.json()['args'] == {'foo': 'bar'}
- ai = ArchiveIterator(warc_writer.get_stream())
- response = next(ai)
- assert response.rec_type == 'response'
- assert response.rec_headers['WARC-Target-URI'] == url
- assert response.rec_headers['WARC-IP-Address'] == '127.0.0.1'
- assert res.json() == json.loads(response.content_stream().read().decode('utf-8'))
- request = next(ai)
- assert request.rec_type == 'request'
- assert request.rec_headers['WARC-Target-URI'] == url
- assert request.rec_headers['WARC-IP-Address'] == '127.0.0.1'
- def test_get_cache_to_file(self):
- warc_writer = BufferWARCWriter(gzip=False)
- url = 'http://localhost:{0}/bytes/{1}'.format(self.port, BUFF_SIZE * 2)
- with capture_http(warc_writer):
- res = requests.get(url, headers={'Host': 'httpbin.org'})
- assert len(res.content) == BUFF_SIZE * 2
- ai = ArchiveIterator(warc_writer.get_stream())
- response = next(ai)
- assert response.rec_type == 'response'
- assert response.rec_headers['WARC-Target-URI'] == url
- assert response.rec_headers['WARC-IP-Address'] == '127.0.0.1'
- assert res.content == response.content_stream().read()
- request = next(ai)
- assert request.rec_type == 'request'
- assert request.rec_headers['WARC-Target-URI'] == url
- assert request.rec_headers['WARC-IP-Address'] == '127.0.0.1'
- def test_post_json(self):
- warc_writer = BufferWARCWriter(gzip=False)
- with capture_http(warc_writer):
- res = requests.post('http://localhost:{0}/post'.format(self.port),
- headers={'Host': 'httpbin.org'},
- json={'some': {'data': 'posted'}})
- assert res.json()['json'] == {'some': {'data': 'posted'}}
- # response
- ai = ArchiveIterator(warc_writer.get_stream())
- response = next(ai)
- assert response.rec_type == 'response'
- assert res.json() == json.loads(response.content_stream().read().decode('utf-8'))
- # request
- request = next(ai)
- assert request.rec_type == 'request'
- assert request.http_headers['Content-Type'] == 'application/json'
- data = request.content_stream().read().decode('utf-8')
- assert data == '{"some": {"data": "posted"}}'
- def test_post_stream(self):
- warc_writer = BufferWARCWriter(gzip=False)
- def nop_filter(request, response, recorder):
- assert request
- assert response
- return request, response
- postbuff = BytesIO(b'somedatatopost')
- url = 'http://localhost:{0}/post'.format(self.port)
- with capture_http(warc_writer, nop_filter):
- res = requests.post(url, data=postbuff)
- # response
- ai = ArchiveIterator(warc_writer.get_stream())
- response = next(ai)
- assert response.rec_type == 'response'
- assert response.rec_headers['WARC-Target-URI'] == url
- assert response.rec_headers['WARC-IP-Address'] == '127.0.0.1'
- assert res.json() == json.loads(response.content_stream().read().decode('utf-8'))
- # request
- request = next(ai)
- assert request.rec_type == 'request'
- assert request.rec_headers['WARC-Target-URI'] == url
- assert request.rec_headers['WARC-IP-Address'] == '127.0.0.1'
- data = request.content_stream().read().decode('utf-8')
- assert data == 'somedatatopost'
- def test_post_chunked(self):
- warc_writer = BufferWARCWriter(gzip=False)
- def nop_filter(request, response, recorder):
- assert request
- assert response
- return request, response
- def gen():
- return iter([b'some', b'data', b'to', b'post'])
- #url = 'http://localhost:{0}/post'.format(self.port)
- url = 'https://httpbin.org/post'
- with capture_http(warc_writer, nop_filter, record_ip=False):
- res = requests.post(url, data=gen(), headers={'Content-Type': 'application/json'})
- # response
- ai = ArchiveIterator(warc_writer.get_stream())
- response = next(ai)
- assert response.rec_type == 'response'
- assert response.rec_headers['WARC-Target-URI'] == url
- assert 'WARC-IP-Address' not in response.rec_headers
- assert res.json() == json.loads(response.content_stream().read().decode('utf-8'))
- # request
- request = next(ai)
- assert request.rec_type == 'request'
- assert request.rec_headers['WARC-Target-URI'] == url
- assert 'WARC-IP-Address' not in response.rec_headers
- data = request.content_stream().read().decode('utf-8')
- assert data == 'somedatatopost'
- def test_skip_filter(self):
- warc_writer = BufferWARCWriter(gzip=False)
- def skip_filter(request, response, recorder):
- assert request
- assert response
- return None, None
- with capture_http(warc_writer, skip_filter):
- res = requests.get('http://localhost:{0}/get?foo=bar'.format(self.port),
- headers={'Host': 'httpbin.org'})
- assert res.json()['args'] == {'foo': 'bar'}
- # skipped, nothing written
- assert warc_writer.get_contents() == b''
- def test_capture_to_temp_file_append(self):
- full_path = os.path.join(self.temp_dir, 'example.warc.gz')
- url = 'http://localhost:{0}/get?foo=bar'.format(self.port)
- with capture_http(full_path):
- res = requests.get(url)
- with capture_http(full_path):
- res = requests.get(url)
- with open(full_path, 'rb') as stream:
- # response
- ai = ArchiveIterator(stream)
- response = next(ai)
- assert response.rec_type == 'response'
- assert response.rec_headers['WARC-Target-URI'] == url
- # request
- request = next(ai)
- assert request.rec_type == 'request'
- assert request.rec_headers['WARC-Target-URI'] == url
- response = next(ai)
- assert response.rec_type == 'response'
- assert response.rec_headers['WARC-Target-URI'] == url
- # request
- request = next(ai)
- assert request.rec_type == 'request'
- assert request.rec_headers['WARC-Target-URI'] == url
- os.remove(full_path)
- def test_error_capture_to_temp_file_no_append_no_overwrite(self):
- full_path = os.path.join(self.temp_dir, 'example2.warc.gz')
- url = 'http://localhost:{0}/get?foo=bar'.format(self.port)
- with capture_http(full_path, append=False):
- res = requests.get(url)
- with raises(OSError):
- with capture_http(full_path, append=False):
- res = requests.get(url)
- os.remove(full_path)
- def test_warc_1_1(self):
- full_path = os.path.join(self.temp_dir, 'example3.warc')
- url = 'http://localhost:{0}/get?foo=bar'.format(self.port)
- with capture_http(full_path, append=False, warc_version='1.1', gzip=False):
- res = requests.get(url)
- with open(full_path, 'rb') as stream:
- # response
- ai = ArchiveIterator(stream)
- response = next(ai)
- assert response.rec_headers.protocol == 'WARC/1.1'
- warc_date = response.rec_headers['WARC-Date']
- # ISO 8601 date with fractional seconds (microseconds)
- assert '.' in warc_date
- assert len(warc_date) == 27
- os.remove(full_path)
- def test_remote(self):
- with capture_http(warc_version='1.1', gzip=True) as writer:
- requests.get('http://example.com/')
- requests.get('https://google.com/')
- expected = [('http://example.com/', 'response', True),
- ('http://example.com/', 'request', True),
- ('https://google.com/', 'response', True),
- ('https://google.com/', 'request', True),
- ('https://www.google.com/', 'response', True),
- ('https://www.google.com/', 'request', True)
- ]
- actual = [
- (record.rec_headers['WARC-Target-URI'],
- record.rec_type,
- 'WARC-IP-Address' in record.rec_headers)
- for record in ArchiveIterator(writer.get_stream())
- ]
- assert actual == expected
|