from __future__ import print_function import logging import os import shutil import tempfile import json from future.moves.urllib.parse import quote_plus from ckan.plugins.toolkit import config import pytest from ckan import model from ckan import plugins from ckan.logic import get_action from ckan.tests import factories as ckan_factories from ckanext.archiver import model as archiver_model from ckanext.archiver.model import Archival from ckanext.archiver.tasks import (link_checker, update_resource, update_package, download, api_request, LinkCheckerError, LinkInvalidError, response_is_an_api_error ) # enable celery logging for when you run nosetests -s log = logging.getLogger('ckanext.archiver.tasks') def get_logger(): return log update_resource.get_logger = get_logger update_package.get_logger = get_logger class TestLinkChecker: """ Tests for link checker task """ @pytest.fixture(autouse=True) @pytest.mark.usefixtures(u"clean_db") @pytest.mark.ckan_config("ckan.plugins", "archiver") def initial_data(self, clean_db): return {} def test_file_url(self): url = u'file:///home/root/test.txt' # schema not allowed context = json.dumps({}) data = json.dumps({'url': url}) with pytest.raises(LinkInvalidError): link_checker(context, data) def test_bad_url(self): url = u'' context = json.dumps({}) data = json.dumps({'url': url}) with pytest.raises(LinkInvalidError): link_checker(context, data) def test_non_escaped_url(self, client): url = client + '/+/' \ + 'drugs-alcohol-research/hosb1310/hosb1310-ann2tabs?view=Binary' context = json.dumps({}) data = json.dumps({'url': url}) res = link_checker(context, data) assert res def test_empty_url(self): url = u'' context = json.dumps({}) data = json.dumps({'url': url}) with pytest.raises(LinkCheckerError): link_checker(context, data) def test_url_with_503(self, client): url = client + '/?status=503' context = json.dumps({}) data = json.dumps({'url': url}) with pytest.raises(LinkCheckerError): link_checker(context, data) def test_url_with_404(self, client): url = client + 'http://localhost:9091/?status=404' context = json.dumps({}) data = json.dumps({'url': url}) with pytest.raises(LinkCheckerError): link_checker(context, data) def test_url_with_405(self, client): # 405: method (HEAD) not allowed url = client + '/?status=405' context = json.dumps({}) data = json.dumps({'url': url}) with pytest.raises(LinkCheckerError): link_checker(context, data) def test_url_with_30x_follows_redirect(self, client): redirect_url = client + u'/?status=200&content=test&content-type=text/csv' url = client + u'/?status=301&location=%s' % quote_plus(redirect_url) context = json.dumps({}) data = json.dumps({'url': url}) result = json.loads(link_checker(context, data)) assert result # e.g. " # &pubType=1&PublishTime=09:30:00&from=home&tabOption=1" def test_colon_in_query_string(self, client): # accept, because browsers accept this # see discussion: url = client + '/?time=09:30&status=200' context = json.dumps({}) data = json.dumps({'url': url}) result = json.loads(link_checker(context, data)) assert result def test_trailing_whitespace(self, client): # accept, because browsers accept this url = client + '/?status=200 ' context = json.dumps({}) data = json.dumps({'url': url}) result = json.loads(link_checker(context, data)) assert result def test_good_url(self, client): context = json.dumps({}) url = client + "/?status=200" data = json.dumps({'url': url}) result = json.loads(link_checker(context, data)) assert result @pytest.mark.usefixtures('with_plugins') @pytest.mark.ckan_config("ckanext-archiver.cache_url_root", "http://localhost:50001/resources/") @pytest.mark.ckan_config("ckanext-archiver.max_content_length", 1000000) @pytest.mark.ckan_config("ckan.plugins", "testipipe") class TestArchiver: """ Tests for Archiver 'update_resource'/'update_package' tasks """ @pytest.fixture(autouse=True) @pytest.mark.usefixtures(u"clean_db") def initial_data(cls, clean_db): archiver_model.init_tables(model.meta.engine) cls.temp_dir = tempfile.mkdtemp() def _test_package(self, url, format=None): pkg = {'resources': [ {'url': url, 'format': format or 'TXT', 'description': 'Test'} ]} pkg = ckan_factories.Dataset(**pkg) return pkg def _test_resource(self, url, format=None): pkg = self._test_package(url, format) return pkg['resources'][0] def assert_archival_error(self, error_message_fragment, resource_id): archival = Archival.get_for_resource(resource_id) if error_message_fragment not in archival.reason: print('ERROR: %s (%s)' % (archival.reason, archival.status)) raise AssertionError(archival.reason) def test_file_url(self): res_id = self._test_resource('file:///home/root/test.txt')['id'] # scheme not allowed result = update_resource(res_id) assert not result, result self.assert_archival_error('Invalid url scheme', res_id) def test_bad_url(self): res_id = self._test_resource('')['id'] # no slashes result = update_resource(res_id) assert not result, result self.assert_archival_error('URL parsing failure', res_id) def test_resource_hash_and_content_length(self, client): url = client + '/?status=200&content=test&content-type=csv' res_id = self._test_resource(url)['id'] result = json.loads(update_resource(res_id)) assert result['size'] == len('test') from hashlib import sha1 assert result['hash'] == sha1('test'.encode('utf-8')).hexdigest(), result _remove_archived_file(result.get('cache_filepath')) def test_archived_file(self, client): url = client + '/?status=200&content=test&content-type=csv' res_id = self._test_resource(url)['id'] result = json.loads(update_resource(res_id)) assert result['cache_filepath'] assert os.path.exists(result['cache_filepath']) with open(result['cache_filepath']) as f: content = f.readlines() assert len(content) == 1 assert content[0] == "test" _remove_archived_file(result.get('cache_filepath')) def test_update_url_with_unknown_content_type(self, client): url = client + '/?content-type=application/foo&content=test' res_id = self._test_resource(url, format='foo')['id'] # format has no effect result = json.loads(update_resource(res_id)) assert result, result assert result['mimetype'] == 'application/foo' # stored from the header def test_wms_1_3(self, client): url = client + '/WMS_1_3/' res_id = self._test_resource(url)['id'] result = json.loads(update_resource(res_id)) assert result, result assert result['request_type'] == 'WMS 1.3' with open(result['cache_filepath']) as f: content = assert ' 7800, result['length'] assert result['request_type'] == 'WMS 1.3' _remove_archived_file(result.get('saved_file')) def test_wms_1_1_1(self, client): url = client + '/WMS_1_1_1/' resource = self._test_resource(url) result = api_request(self.fake_context, resource) assert result assert int(result['size']) > 7800, result['length'] assert result['request_type'] == 'WMS 1.1.1' _remove_archived_file(result.get('saved_file')) def test_wfs(self, client): url = client + '/WFS/' resource = self._test_resource(url) result = api_request(self.fake_context, resource) assert result assert int(result['size']) > 7800, result['length'] assert result['request_type'] == 'WFS 2.0' _remove_archived_file(result.get('saved_file')) def test_wms_error(self, client): wms_error_1 = ''' Unknown service requested. ''' assert response_is_an_api_error(wms_error_1) is True wms_error_2 = ''' Unknown operation name. ''' assert response_is_an_api_error(wms_error_2) is True def _remove_archived_file(cache_filepath): if cache_filepath: if os.path.exists(cache_filepath): resource_folder = os.path.split(cache_filepath)[0] if 'fake_resource_id' in resource_folder: shutil.rmtree(resource_folder)