test_archiver.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433
  1. from __future__ import print_function
  2. import logging
  3. import os
  4. import shutil
  5. import tempfile
  6. import json
  7. from future.moves.urllib.parse import quote_plus
  8. from ckan.plugins.toolkit import config
  9. import pytest
  10. from ckan import model
  11. from ckan import plugins
  12. from ckan.logic import get_action
  13. from ckan.tests import factories as ckan_factories
  14. from ckanext.archiver import model as archiver_model
  15. from ckanext.archiver.model import Archival
  16. from ckanext.archiver.tasks import (link_checker,
  17. update_resource,
  18. update_package,
  19. download,
  20. api_request,
  21. LinkCheckerError,
  22. LinkInvalidError,
  23. response_is_an_api_error
  24. )
  25. # enable celery logging for when you run nosetests -s
  26. log = logging.getLogger('ckanext.archiver.tasks')
  27. def get_logger():
  28. return log
  29. update_resource.get_logger = get_logger
  30. update_package.get_logger = get_logger
  31. class TestLinkChecker:
  32. """
  33. Tests for link checker task
  34. """
  35. @pytest.fixture(autouse=True)
  36. @pytest.mark.usefixtures(u"clean_db")
  37. @pytest.mark.ckan_config("ckan.plugins", "archiver")
  38. def initial_data(self, clean_db):
  39. return {}
  40. def test_file_url(self):
  41. url = u'file:///home/root/test.txt' # schema not allowed
  42. context = json.dumps({})
  43. data = json.dumps({'url': url})
  44. with pytest.raises(LinkInvalidError):
  45. link_checker(context, data)
  46. def test_bad_url(self):
  47. url = u'http:www.buckshealthcare.nhs.uk/freedom-of-information.htm'
  48. context = json.dumps({})
  49. data = json.dumps({'url': url})
  50. with pytest.raises(LinkInvalidError):
  51. link_checker(context, data)
  52. def test_non_escaped_url(self, client):
  53. url = client + '/+/http://www.homeoffice.gov.uk/publications/science-research-statistics/research-statistics/' \
  54. + 'drugs-alcohol-research/hosb1310/hosb1310-ann2tabs?view=Binary'
  55. context = json.dumps({})
  56. data = json.dumps({'url': url})
  57. res = link_checker(context, data)
  58. assert res
  59. def test_empty_url(self):
  60. url = u''
  61. context = json.dumps({})
  62. data = json.dumps({'url': url})
  63. with pytest.raises(LinkCheckerError):
  64. link_checker(context, data)
  65. def test_url_with_503(self, client):
  66. url = client + '/?status=503'
  67. context = json.dumps({})
  68. data = json.dumps({'url': url})
  69. with pytest.raises(LinkCheckerError):
  70. link_checker(context, data)
  71. def test_url_with_404(self, client):
  72. url = client + 'http://localhost:9091/?status=404'
  73. context = json.dumps({})
  74. data = json.dumps({'url': url})
  75. with pytest.raises(LinkCheckerError):
  76. link_checker(context, data)
  77. def test_url_with_405(self, client): # 405: method (HEAD) not allowed
  78. url = client + '/?status=405'
  79. context = json.dumps({})
  80. data = json.dumps({'url': url})
  81. with pytest.raises(LinkCheckerError):
  82. link_checker(context, data)
  83. def test_url_with_30x_follows_redirect(self, client):
  84. redirect_url = client + u'/?status=200&content=test&content-type=text/csv'
  85. url = client + u'/?status=301&location=%s' % quote_plus(redirect_url)
  86. context = json.dumps({})
  87. data = json.dumps({'url': url})
  88. result = json.loads(link_checker(context, data))
  89. assert result
  90. # e.g. "http://www.dasa.mod.uk/applications/newWeb/www/index.php?page=48&thiscontent=180&date=2011-05-26
  91. # &pubType=1&PublishTime=09:30:00&from=home&tabOption=1"
  92. def test_colon_in_query_string(self, client):
  93. # accept, because browsers accept this
  94. # see discussion: http://trac.ckan.org/ticket/318
  95. url = client + '/?time=09:30&status=200'
  96. context = json.dumps({})
  97. data = json.dumps({'url': url})
  98. result = json.loads(link_checker(context, data))
  99. assert result
  100. def test_trailing_whitespace(self, client):
  101. # accept, because browsers accept this
  102. url = client + '/?status=200 '
  103. context = json.dumps({})
  104. data = json.dumps({'url': url})
  105. result = json.loads(link_checker(context, data))
  106. assert result
  107. def test_good_url(self, client):
  108. context = json.dumps({})
  109. url = client + "/?status=200"
  110. data = json.dumps({'url': url})
  111. result = json.loads(link_checker(context, data))
  112. assert result
  113. @pytest.mark.usefixtures('with_plugins')
  114. @pytest.mark.ckan_config("ckanext-archiver.cache_url_root", "http://localhost:50001/resources/")
  115. @pytest.mark.ckan_config("ckanext-archiver.max_content_length", 1000000)
  116. @pytest.mark.ckan_config("ckan.plugins", "testipipe")
  117. class TestArchiver:
  118. """
  119. Tests for Archiver 'update_resource'/'update_package' tasks
  120. """
  121. @pytest.fixture(autouse=True)
  122. @pytest.mark.usefixtures(u"clean_db")
  123. def initial_data(cls, clean_db):
  124. archiver_model.init_tables(model.meta.engine)
  125. cls.temp_dir = tempfile.mkdtemp()
  126. def _test_package(self, url, format=None):
  127. pkg = {'resources': [
  128. {'url': url, 'format': format or 'TXT', 'description': 'Test'}
  129. ]}
  130. pkg = ckan_factories.Dataset(**pkg)
  131. return pkg
  132. def _test_resource(self, url, format=None):
  133. pkg = self._test_package(url, format)
  134. return pkg['resources'][0]
  135. def assert_archival_error(self, error_message_fragment, resource_id):
  136. archival = Archival.get_for_resource(resource_id)
  137. if error_message_fragment not in archival.reason:
  138. print('ERROR: %s (%s)' % (archival.reason, archival.status))
  139. raise AssertionError(archival.reason)
  140. def test_file_url(self):
  141. res_id = self._test_resource('file:///home/root/test.txt')['id'] # scheme not allowed
  142. result = update_resource(res_id)
  143. assert not result, result
  144. self.assert_archival_error('Invalid url scheme', res_id)
  145. def test_bad_url(self):
  146. res_id = self._test_resource('http:host.com')['id'] # no slashes
  147. result = update_resource(res_id)
  148. assert not result, result
  149. self.assert_archival_error('URL parsing failure', res_id)
  150. def test_resource_hash_and_content_length(self, client):
  151. url = client + '/?status=200&content=test&content-type=csv'
  152. res_id = self._test_resource(url)['id']
  153. result = json.loads(update_resource(res_id))
  154. assert result['size'] == len('test')
  155. from hashlib import sha1
  156. assert result['hash'] == sha1('test'.encode('utf-8')).hexdigest(), result
  157. _remove_archived_file(result.get('cache_filepath'))
  158. def test_archived_file(self, client):
  159. url = client + '/?status=200&content=test&content-type=csv'
  160. res_id = self._test_resource(url)['id']
  161. result = json.loads(update_resource(res_id))
  162. assert result['cache_filepath']
  163. assert os.path.exists(result['cache_filepath'])
  164. with open(result['cache_filepath']) as f:
  165. content = f.readlines()
  166. assert len(content) == 1
  167. assert content[0] == "test"
  168. _remove_archived_file(result.get('cache_filepath'))
  169. def test_update_url_with_unknown_content_type(self, client):
  170. url = client + '/?content-type=application/foo&content=test'
  171. res_id = self._test_resource(url, format='foo')['id'] # format has no effect
  172. result = json.loads(update_resource(res_id))
  173. assert result, result
  174. assert result['mimetype'] == 'application/foo' # stored from the header
  175. def test_wms_1_3(self, client):
  176. url = client + '/WMS_1_3/'
  177. res_id = self._test_resource(url)['id']
  178. result = json.loads(update_resource(res_id))
  179. assert result, result
  180. assert result['request_type'] == 'WMS 1.3'
  181. with open(result['cache_filepath']) as f:
  182. content = f.read()
  183. assert '<WMT_MS_Capabilities' in content, content[:1000]
  184. _remove_archived_file(result.get('cache_filepath'))
  185. def test_update_with_zero_length(self, client):
  186. url = client + '/?status=200&content-type=csv'
  187. # i.e. no content
  188. res_id = self._test_resource(url)['id']
  189. result = update_resource(res_id)
  190. assert not result, result
  191. self.assert_archival_error('Content-length after streaming was 0', res_id)
  192. def test_file_not_found(self, client):
  193. url = client + '/?status=404&content=test&content-type=csv'
  194. res_id = self._test_resource(url)['id']
  195. result = update_resource(res_id)
  196. assert not result, result
  197. self.assert_archival_error('Server reported status error: 404 NOT FOUND', res_id)
  198. def test_server_error(self, client):
  199. url = client + '/?status=500&content=test&content-type=csv'
  200. res_id = self._test_resource(url)['id']
  201. result = update_resource(res_id)
  202. assert not result, result
  203. self.assert_archival_error('Server reported status error: 500 INTERNAL SERVER ERROR', res_id)
  204. def test_file_too_large_1(self, client):
  205. url = client + '/?status=200&content=short&length=1000001&content-type=csv'
  206. # will stop after receiving the header
  207. res_id = self._test_resource(url)['id']
  208. result = update_resource(res_id)
  209. assert not result, result
  210. self.assert_archival_error('Content-length 1000001 exceeds maximum allowed value 1000000', res_id)
  211. def test_file_too_large_2(self, client):
  212. url = client + '/?status=200&content_long=test_contents_greater_than_the_max_length&no-content-length&content-type=csv'
  213. # no size info in headers - it stops only after downloading the content
  214. res_id = self._test_resource(url)['id']
  215. result = update_resource(res_id)
  216. assert not result, result
  217. self.assert_archival_error('Content-length 1000001 exceeds maximum allowed value 1000000', res_id)
  218. def test_content_length_not_integer(self, client):
  219. url = client + '/?status=200&content=content&length=abc&content-type=csv'
  220. res_id = self._test_resource(url)['id']
  221. result = json.loads(update_resource(res_id))
  222. assert result, result
  223. def test_content_length_repeated(self, client):
  224. url = client + '/?status=200&content=content&repeat-length&content-type=csv'
  225. # listing the Content-Length header twice causes requests to
  226. # store the value as a comma-separated list
  227. res_id = self._test_resource(url)['id']
  228. result = json.loads(update_resource(res_id))
  229. assert result, result
  230. def test_url_with_30x_follows_and_records_redirect(self, client):
  231. url = client + '/'
  232. redirect_url = url + u'?status=200&content=test&content-type=text/csv'
  233. url += u'?status=301&location=%s' % quote_plus(redirect_url)
  234. res_id = self._test_resource(url)['id']
  235. result = json.loads(update_resource(res_id))
  236. assert result
  237. assert result['url_redirected_to'] == redirect_url
  238. def test_ipipe_notified(self, client):
  239. url = client + '/?status=200&content=test&content-type=csv'
  240. testipipe = plugins.get_plugin('testipipe')
  241. testipipe.reset()
  242. res_id = self._test_resource(url)['id']
  243. update_resource(res_id, 'queue1')
  244. assert len(testipipe.calls) == 1
  245. operation, queue, params = testipipe.calls[0]
  246. assert operation == 'archived'
  247. assert queue == 'queue1'
  248. assert params.get('package_id') is None
  249. assert params.get('resource_id') == res_id
  250. @pytest.mark.ckan_config("ckan.plugins", "archiver testipipe")
  251. def test_ipipe_notified_dataset(self, client):
  252. url = client + '/?status=200&content=test&content-type=csv'
  253. testipipe = plugins.get_plugin('testipipe')
  254. testipipe.reset()
  255. pkg = self._test_package(url)
  256. update_package(pkg['id'], 'queue1')
  257. assert len(testipipe.calls) == 2, len(testipipe.calls)
  258. operation, queue, params = testipipe.calls[0]
  259. assert operation == 'archived'
  260. assert queue == 'queue1'
  261. assert params.get('package_id') is None
  262. assert params.get('resource_id') == pkg['resources'][0]['id']
  263. operation, queue, params = testipipe.calls[1]
  264. assert operation == 'package-archived'
  265. assert queue == 'queue1'
  266. assert params.get('package_id') == pkg['id']
  267. assert params.get('resource_id') is None
  268. class TestDownload:
  269. '''Tests of the download method (and things it calls).
  270. Doesn't need a fake CKAN to get/set the status of.
  271. '''
  272. @pytest.fixture(autouse=True)
  273. @pytest.mark.usefixtures(u"clean_index")
  274. def initialData(cls, clean_db):
  275. config
  276. cls.fake_context = {
  277. 'site_url': config.get('ckan.site_url_internally') or config['ckan.site_url'],
  278. 'cache_url_root': config.get('ckanext-archiver.cache_url_root'),
  279. }
  280. def _test_resource(self, url, format=None):
  281. context = {'model': model, 'ignore_auth': True, 'session': model.Session, 'user': 'test'}
  282. pkg = {'name': 'testpkg', 'resources': [
  283. {'url': url, 'format': format or 'TXT', 'description': 'Test'}
  284. ]}
  285. pkg = get_action('package_create')(context, pkg)
  286. return pkg['resources'][0]
  287. def test_head_unsupported(self, client):
  288. url = client + '/?status=200&method=get&content=test&content-type=csv'
  289. # This test was more relevant when we did HEAD requests. Now servers
  290. # which respond badly to HEAD requests are not an issue.
  291. resource = self._test_resource(url)
  292. # HEAD request will return a 405 error, but it will persevere
  293. # and do a GET request which will work.
  294. result = download(self.fake_context, resource)
  295. assert result['saved_file']
  296. def test_download_file(self, client):
  297. url = client + '/?status=200&content=test&content-type=csv'
  298. resource = self._test_resource(url)
  299. result = download(self.fake_context, resource)
  300. assert result['saved_file']
  301. assert os.path.exists(result['saved_file'])
  302. _remove_archived_file(result.get('saved_file'))
  303. # Modify the resource and check that the resource size gets updated
  304. resource['url'] = url.replace('content=test', 'content=test2')
  305. result = download(self.fake_context, resource)
  306. assert result['size'] == len('test2')
  307. _remove_archived_file(result.get('saved_file'))
  308. def test_wms_1_3(self, client):
  309. url = client + '/WMS_1_3/'
  310. resource = self._test_resource(url)
  311. result = api_request(self.fake_context, resource)
  312. assert result
  313. assert int(result['size']) > 7800, result['length']
  314. assert result['request_type'] == 'WMS 1.3'
  315. _remove_archived_file(result.get('saved_file'))
  316. def test_wms_1_1_1(self, client):
  317. url = client + '/WMS_1_1_1/'
  318. resource = self._test_resource(url)
  319. result = api_request(self.fake_context, resource)
  320. assert result
  321. assert int(result['size']) > 7800, result['length']
  322. assert result['request_type'] == 'WMS 1.1.1'
  323. _remove_archived_file(result.get('saved_file'))
  324. def test_wfs(self, client):
  325. url = client + '/WFS/'
  326. resource = self._test_resource(url)
  327. result = api_request(self.fake_context, resource)
  328. assert result
  329. assert int(result['size']) > 7800, result['length']
  330. assert result['request_type'] == 'WFS 2.0'
  331. _remove_archived_file(result.get('saved_file'))
  332. def test_wms_error(self, client):
  333. wms_error_1 = '''<?xml version="1.0" encoding="UTF-8" standalone="yes" ?>
  334. <ServiceExceptionReport version="1.3.0"
  335. xmlns="http://www.opengis.net/ogc"
  336. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  337. xsi:schemaLocation="http://www.opengis.net/ogc http://schemas.opengis.net/wms/1.3.0/exceptions_1_3_0.xsd">
  338. <ServiceException code="InvalidFormat">
  339. Unknown service requested.
  340. </ServiceException>
  341. </ServiceExceptionReport>'''
  342. assert response_is_an_api_error(wms_error_1) is True
  343. wms_error_2 = '''<ows:ExceptionReport version='1.1.0' language='en' xmlns:ows='http://www.opengis.net/ows'>
  344. <ows:Exception exceptionCode='NoApplicableCode'><ows:ExceptionText>Unknown operation name.</ows:ExceptionText>
  345. </ows:Exception></ows:ExceptionReport>'''
  346. assert response_is_an_api_error(wms_error_2) is True
  347. def _remove_archived_file(cache_filepath):
  348. if cache_filepath:
  349. if os.path.exists(cache_filepath):
  350. resource_folder = os.path.split(cache_filepath)[0]
  351. if 'fake_resource_id' in resource_folder:
  352. shutil.rmtree(resource_folder)