tasks.py 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979
  1. from __future__ import absolute_import
  2. from builtins import str
  3. import os
  4. import hashlib
  5. import http.client
  6. import requests
  7. import json
  8. import tempfile
  9. import shutil
  10. import datetime
  11. import copy
  12. import mimetypes
  13. import re
  14. from time import sleep
  15. from requests.packages import urllib3
  16. from future.moves.urllib.parse import urlparse, urljoin, quote, urlunparse
  17. from ckan.common import _
  18. from ckan.lib import uploader
  19. from ckan import plugins as p
  20. from ckanext.archiver import interfaces as archiver_interfaces
  21. import logging
  22. log = logging.getLogger(__name__)
  23. toolkit = p.toolkit
  24. ALLOWED_SCHEMES = set(('http', 'https', 'ftp'))
  25. USER_AGENT = 'ckanext-archiver'
  26. # CKAN 2.7 introduces new jobs system
  27. if p.toolkit.check_ckan_version(max_version='2.6.99'):
  28. from ckan.lib.celery_app import celery
  29. @celery.task(name="archiver.update_resource")
  30. def update_resouce_celery(*args, **kwargs):
  31. update_resource(*args, **kwargs)
  32. @celery.task(name="archiver.update_package")
  33. def update_package_celery(*args, **kwargs):
  34. update_package(*args, **kwargs)
  35. @celery.task(name="archiver.clean")
  36. def clean_celery(*args, **kwargs):
  37. clean(*args, **kwargs)
  38. @celery.task(name="archiver.link_checker")
  39. def link_checker_celery(*args, **kwargs):
  40. link_checker(*args, **kwargs)
  41. class ArchiverError(Exception):
  42. pass
  43. class ArchiverErrorBeforeDownloadStarted(ArchiverError):
  44. pass
  45. class DownloadException(ArchiverError):
  46. pass
  47. class ArchiverErrorAfterDownloadStarted(ArchiverError):
  48. def __init__(self, msg, url_redirected_to=None):
  49. super(ArchiverError, self).__init__(msg)
  50. self.url_redirected_to = url_redirected_to
  51. class DownloadError(ArchiverErrorAfterDownloadStarted):
  52. pass
  53. class ArchiveError(ArchiverErrorAfterDownloadStarted):
  54. pass
  55. class ChooseNotToDownload(ArchiverErrorAfterDownloadStarted):
  56. pass
  57. class NotChanged(ArchiverErrorAfterDownloadStarted):
  58. pass
  59. class LinkCheckerError(ArchiverError):
  60. pass
  61. class LinkInvalidError(LinkCheckerError):
  62. pass
  63. class LinkHeadRequestError(LinkCheckerError):
  64. pass
  65. class LinkHeadMethodNotSupported(LinkCheckerError):
  66. pass
  67. class CkanError(ArchiverError):
  68. pass
  69. def update_resource(resource_id, queue='bulk'):
  70. '''
  71. Archive a resource.
  72. '''
  73. log.info('Starting update_resource task: res_id=%r queue=%s', resource_id, queue)
  74. # HACK because of race condition #1481
  75. sleep(2)
  76. # Do all work in a sub-routine since it can then be tested without celery.
  77. # Also put try/except around it is easier to monitor ckan's log rather than
  78. # celery's task status.
  79. try:
  80. result = _update_resource(resource_id, queue, log)
  81. return result
  82. except Exception as e:
  83. if os.environ.get('DEBUG'):
  84. raise
  85. # Any problem at all is logged and reraised so that celery can log it too
  86. log.error('Error occurred during archiving resource: %s\nResource: %r',
  87. e, resource_id)
  88. raise
  89. def update_package(package_id, queue='bulk'):
  90. '''
  91. Archive a package.
  92. '''
  93. log.info('Starting update_package task: package_id=%r queue=%s',
  94. package_id, queue)
  95. # Do all work in a sub-routine since it can then be tested without celery.
  96. # Also put try/except around it is easier to monitor ckan's log rather than
  97. # celery's task status.
  98. try:
  99. _update_package(package_id, queue, log)
  100. except Exception as e:
  101. if os.environ.get('DEBUG'):
  102. raise
  103. # Any problem at all is logged and reraised so that celery can log it
  104. # too
  105. log.error('Error occurred during archiving package: %s\nPackage: %s',
  106. e, package_id)
  107. raise
  108. def _update_package(package_id, queue, log):
  109. from ckan import model
  110. get_action = toolkit.get_action
  111. num_archived = 0
  112. context_ = {'model': model, 'ignore_auth': True, 'session': model.Session}
  113. package = get_action('package_show')(context_, {'id': package_id})
  114. for resource in package['resources']:
  115. resource_id = resource['id']
  116. res = _update_resource(resource_id, queue, log)
  117. if res:
  118. num_archived += 1
  119. if num_archived > 0:
  120. log.info("Notifying package as %d items were archived", num_archived)
  121. notify_package(package, queue)
  122. else:
  123. log.info("Not notifying package as 0 items were archived")
  124. # Refresh the index for this dataset, so that it contains the latest
  125. # archive info. However skip it if there are downstream plugins that will
  126. # do this anyway, since it is an expensive step to duplicate.
  127. if 'qa' not in get_plugins_waiting_on_ipipe():
  128. _update_search_index(package_id, log)
  129. else:
  130. log.info('Search index skipped %s', package['name'])
  131. def _update_search_index(package_id, log):
  132. '''
  133. Tells CKAN to update its search index for a given package.
  134. '''
  135. from ckan import model
  136. from ckan.lib.search.index import PackageSearchIndex
  137. package_index = PackageSearchIndex()
  138. context_ = {'model': model, 'ignore_auth': True, 'session': model.Session,
  139. 'use_cache': False, 'validate': False}
  140. package = toolkit.get_action('package_show')(context_, {'id': package_id})
  141. package_index.index_package(package, defer_commit=False)
  142. log.info('Search indexed %s', package['name'])
  143. def _update_resource(resource_id, queue, log):
  144. """
  145. Link check and archive the given resource.
  146. If successful, updates the archival table with the cache_url & hash etc.
  147. Finally, a notification of the archival is broadcast.
  148. Params:
  149. resource - resource dict
  150. queue - name of the celery queue
  151. Should only raise on a fundamental error:
  152. ArchiverError
  153. CkanError
  154. Returns a JSON dict, ready to be returned from the celery task giving a
  155. success status:
  156. {
  157. 'resource': the updated resource dict,
  158. 'file_path': path to archived file (if archive successful), or None
  159. }
  160. If not successful, returns None.
  161. """
  162. from ckan import model
  163. from ckan.plugins.toolkit import config
  164. from ckanext.archiver import default_settings as settings
  165. from ckanext.archiver.model import Status, Archival
  166. get_action = toolkit.get_action
  167. assert is_id(resource_id), resource_id
  168. context_ = {'model': model, 'ignore_auth': True, 'session': model.Session}
  169. resource = get_action('resource_show')(context_, {'id': resource_id})
  170. if not os.path.exists(settings.ARCHIVE_DIR):
  171. log.info("Creating archive directory: %s" % settings.ARCHIVE_DIR)
  172. os.mkdir(settings.ARCHIVE_DIR)
  173. def _save(status_id, exception, resource, url_redirected_to=None,
  174. download_result=None, archive_result=None):
  175. reason = u'%s' % exception
  176. save_archival(resource, status_id,
  177. reason, url_redirected_to,
  178. download_result, archive_result,
  179. log)
  180. notify_resource(
  181. resource,
  182. queue,
  183. archive_result.get('cache_filename') if archive_result else None)
  184. # Download
  185. try_as_api = False
  186. requires_archive = True
  187. url = resource['url']
  188. if not url.startswith('http'):
  189. url = config['ckan.site_url'].rstrip('/') + url
  190. if resource.get('url_type') == 'upload':
  191. upload = uploader.get_resource_uploader(resource)
  192. filepath = upload.get_path(resource['id'])
  193. hosted_externally = not url.startswith(config['ckan.site_url']) or urlparse(filepath).scheme != ''
  194. # if resource.get('resource_type') == 'file.upload' and not hosted_externally:
  195. if not hosted_externally:
  196. log.info("Won't attemp to archive resource uploaded locally: %s" % resource['url'])
  197. try:
  198. hash, length = _file_hashnlength(filepath)
  199. except IOError as e:
  200. log.error('Error while accessing local resource %s: %s', filepath, e)
  201. download_status_id = Status.by_text('URL request failed')
  202. _save(download_status_id, e, resource)
  203. return
  204. mimetype = None
  205. headers = None
  206. content_type, content_encoding = mimetypes.guess_type(url)
  207. if content_type:
  208. mimetype = _clean_content_type(content_type)
  209. headers = {'Content-Type': content_type}
  210. download_result_mock = {'mimetype': mimetype,
  211. 'size': length,
  212. 'hash': hash,
  213. 'headers': headers,
  214. 'saved_file': filepath,
  215. 'url_redirected_to': url,
  216. 'request_type': 'GET'}
  217. archive_result_mock = {'cache_filepath': filepath,
  218. 'cache_url': url}
  219. # Success
  220. _save(Status.by_text('Archived successfully'), '', resource,
  221. download_result_mock['url_redirected_to'], download_result_mock, archive_result_mock)
  222. # The return value is only used by tests. Serialized for Celery.
  223. return json.dumps(dict(download_result_mock, **archive_result_mock))
  224. # endif: processing locally uploaded resource
  225. log.info("Attempting to download resource: %s" % resource['url'])
  226. download_result = None
  227. download_status_id = Status.by_text('Archived successfully')
  228. context = {
  229. 'site_url': config.get('ckan.site_url_internally') or config['ckan.site_url'],
  230. 'cache_url_root': config.get('ckanext-archiver.cache_url_root'),
  231. 'previous': Archival.get_for_resource(resource_id)
  232. }
  233. err = None
  234. try:
  235. download_result = download(context, resource)
  236. except NotChanged as e:
  237. download_status_id = Status.by_text('Content has not changed')
  238. try_as_api = False
  239. requires_archive = False
  240. err = e
  241. except LinkInvalidError as e:
  242. download_status_id = Status.by_text('URL invalid')
  243. try_as_api = False
  244. err = e
  245. except DownloadException as e:
  246. download_status_id = Status.by_text('Download error')
  247. try_as_api = True
  248. err = e
  249. except DownloadError as e:
  250. download_status_id = Status.by_text('Download error')
  251. try_as_api = True
  252. err = e
  253. except ChooseNotToDownload as e:
  254. download_status_id = Status.by_text('Chose not to download')
  255. try_as_api = False
  256. err = e
  257. except Exception as e:
  258. if os.environ.get('DEBUG'):
  259. raise
  260. log.error('Uncaught download failure: %r, %r', e, e.args)
  261. _save(Status.by_text('Download failure'), e, resource)
  262. return
  263. if not Status.is_ok(download_status_id) and err:
  264. log.info('GET error: %s - %r, %r "%s"',
  265. Status.by_id(download_status_id), err, err.args,
  266. resource.get('url'))
  267. if try_as_api:
  268. download_result = api_request(context, resource)
  269. if download_result:
  270. download_status_id = Status.by_text('Archived successfully')
  271. # else the download_status_id (i.e. an error) is left what it was
  272. # from the previous download (i.e. not when we tried it as an API)
  273. if not try_as_api or not Status.is_ok(download_status_id):
  274. extra_args = [err.args.url_redirected_to] if 'url_redirected_to' in err.args else []
  275. _save(download_status_id, err, resource, *extra_args)
  276. return
  277. if not requires_archive:
  278. # We don't need to archive if the remote content has not changed
  279. return None
  280. # Archival
  281. log.info('Attempting to archive resource')
  282. try:
  283. archive_result = archive_resource(context, resource, log, download_result)
  284. except ArchiveError as e:
  285. log.error('System error during archival: %r, %r', e, e.args)
  286. _save(Status.by_text('System error during archival'), e, resource, download_result['url_redirected_to'])
  287. return
  288. # Success
  289. _save(Status.by_text('Archived successfully'), '', resource,
  290. download_result['url_redirected_to'], download_result, archive_result)
  291. # The return value is only used by tests. Serialized for Celery.
  292. return json.dumps(dict(download_result, **archive_result))
  293. def download(context, resource, url_timeout=30,
  294. max_content_length='default',
  295. method='GET'):
  296. '''Given a resource, tries to download it.
  297. Params:
  298. resource - dict of the resource
  299. Exceptions from tidy_url may be propagated:
  300. LinkInvalidError if the URL is invalid
  301. If there is an error performing the download, raises:
  302. DownloadException - connection problems etc.
  303. DownloadError - HTTP status code is an error or 0 length
  304. If download is not suitable (e.g. too large), raises:
  305. ChooseNotToDownload
  306. If the basic GET fails then it will try it with common API
  307. parameters (SPARQL, WMS etc) to get a better response.
  308. Returns a dict of results of a successful download:
  309. mimetype, size, hash, headers, saved_file, url_redirected_to
  310. '''
  311. from ckanext.archiver import default_settings as settings
  312. from ckan.plugins.toolkit import config
  313. if max_content_length == 'default':
  314. max_content_length = settings.MAX_CONTENT_LENGTH
  315. url = resource['url']
  316. url = tidy_url(url)
  317. if (resource.get('url_type') == 'upload' and
  318. not url.startswith('http')):
  319. url = context['site_url'].rstrip('/') + url
  320. hosted_externally = not url.startswith(config['ckan.site_url'])
  321. if resource.get('url_type') == 'upload' and hosted_externally:
  322. # ckanext-cloudstorage for example does that
  323. # enable ckanext-archiver.archive_cloud for qa to work on cloud resources
  324. # till https://github.com/ckan/ckanext-qa/issues/48 is resolved
  325. # Warning: this will result in double storage of all files below archival filesize limit
  326. if not config.get('ckanext-archiver.archive_cloud', False):
  327. raise ChooseNotToDownload('Skipping resource hosted externally to download resource: %s'
  328. % url, url)
  329. headers = _set_user_agent_string({})
  330. # start the download - just get the headers
  331. # May raise DownloadException
  332. method_func = {'GET': requests.get, 'POST': requests.post}[method]
  333. kwargs = {'timeout': url_timeout, 'stream': True, 'headers': headers,
  334. 'verify': verify_https()}
  335. if 'ckan.download_proxy' in config:
  336. download_proxy = config.get('ckan.download_proxy')
  337. log.debug('Downloading via proxy %s', download_proxy)
  338. kwargs['proxies'] = {'http': download_proxy, 'https': download_proxy}
  339. res = requests_wrapper(log, method_func, url, **kwargs)
  340. url_redirected_to = res.url if url != res.url else None
  341. if context.get('previous') and ('etag' in res.headers):
  342. if context.get('previous').etag == res.headers['etag']:
  343. log.info("ETAG matches, not downloading content")
  344. raise NotChanged("etag suggests content has not changed")
  345. if not res.ok: # i.e. 404 or something
  346. raise DownloadError('Server reported status error: %s %s' %
  347. (res.status_code, res.reason),
  348. url_redirected_to)
  349. log.info('GET started successfully. Content headers: %r', res.headers)
  350. # record headers
  351. mimetype = _clean_content_type(res.headers.get('content-type', '').lower())
  352. # make sure resource content-length does not exceed our maximum
  353. content_length = res.headers.get('content-length')
  354. if content_length:
  355. try:
  356. content_length = int(content_length)
  357. except ValueError:
  358. # if there are multiple Content-Length headers, requests
  359. # will return all the values, comma separated
  360. if ',' in content_length:
  361. try:
  362. content_length = int(content_length.split(',')[0])
  363. except ValueError:
  364. pass
  365. if isinstance(content_length, int) and \
  366. int(content_length) >= max_content_length:
  367. # record fact that resource is too large to archive
  368. log.warning('Resource too large to download: %s > max (%s). '
  369. 'Resource: %s %r', content_length,
  370. max_content_length, resource['id'], url)
  371. raise ChooseNotToDownload(_('Content-length %s exceeds maximum '
  372. 'allowed value %s') %
  373. (content_length, max_content_length),
  374. url_redirected_to)
  375. # content_length in the headers is useful but can be unreliable, so when we
  376. # download, we will monitor it doesn't go over the max.
  377. # continue the download - stream the response body
  378. def get_content():
  379. return res.text
  380. log.info('Downloading the body')
  381. content = requests_wrapper(log, get_content)
  382. # APIs can return status 200, but contain an error message in the body
  383. if response_is_an_api_error(content):
  384. raise DownloadError(_('Server content contained an API error message: %s') %
  385. content[:250],
  386. url_redirected_to)
  387. content_length = len(content)
  388. if content_length > max_content_length:
  389. raise ChooseNotToDownload(_("Content-length %s exceeds maximum allowed value %s") %
  390. (content_length, max_content_length),
  391. url_redirected_to)
  392. log.info('Saving resource')
  393. try:
  394. length, hash, saved_file_path = _save_resource(resource, res, max_content_length)
  395. except ChooseNotToDownload as e:
  396. raise ChooseNotToDownload(str(e), url_redirected_to)
  397. log.info('Resource saved. Length: %s File: %s', length, saved_file_path)
  398. # zero length (or just one byte) indicates a problem
  399. if length < 2:
  400. # record fact that resource is zero length
  401. log.warning('Resource found was length %i - not archiving. Resource: %s %r',
  402. length, resource['id'], url)
  403. raise DownloadError(_("Content-length after streaming was %i") % length,
  404. url_redirected_to)
  405. log.info('Resource downloaded: id=%s url=%r cache_filename=%s length=%s hash=%s',
  406. resource['id'], url, saved_file_path, length, hash)
  407. return {'mimetype': mimetype,
  408. 'size': length,
  409. 'hash': hash,
  410. 'headers': dict(res.headers),
  411. 'saved_file': saved_file_path,
  412. 'url_redirected_to': url_redirected_to,
  413. 'request_type': method}
  414. def _file_hashnlength(local_path):
  415. BLOCKSIZE = 65536
  416. hasher = hashlib.sha1()
  417. length = 0
  418. with open(local_path, 'rb') as afile:
  419. buf = afile.read(BLOCKSIZE)
  420. while len(buf) > 0:
  421. hasher.update(buf)
  422. length += len(buf)
  423. buf = afile.read(BLOCKSIZE)
  424. return (str(hasher.hexdigest()), length)
  425. def archive_resource(context, resource, log, result=None, url_timeout=30):
  426. """
  427. Archive the given resource. Moves the file from the temporary location
  428. given in download().
  429. Params:
  430. result - result of the download(), containing keys: length, saved_file
  431. If there is a failure, raises ArchiveError.
  432. Returns: {cache_filepath, cache_url}
  433. """
  434. from ckanext.archiver import default_settings as settings
  435. relative_archive_path = os.path.join(resource['id'][:2], resource['id'])
  436. archive_dir = os.path.join(settings.ARCHIVE_DIR, relative_archive_path)
  437. if not os.path.exists(archive_dir):
  438. os.makedirs(archive_dir)
  439. # try to get a file name from the url
  440. parsed_url = urlparse(resource.get('url'))
  441. try:
  442. file_name = parsed_url.path.split('/')[-1] or 'resource'
  443. file_name = file_name.strip() # trailing spaces cause problems
  444. except Exception:
  445. file_name = "resource"
  446. # move the temp file to the resource's archival directory
  447. saved_file = os.path.join(archive_dir, file_name)
  448. shutil.move(result['saved_file'], saved_file)
  449. log.info('Going to do chmod: %s', saved_file)
  450. try:
  451. os.chmod(saved_file, 0o644) # allow other users to read it
  452. except Exception as e:
  453. log.error('chmod failed %s: %s', saved_file, e)
  454. raise
  455. log.info('Archived resource as: %s', saved_file)
  456. # calculate the cache_url
  457. if not context.get('cache_url_root'):
  458. log.warning('Not saved cache_url because no value for '
  459. 'ckanext-archiver.cache_url_root in config')
  460. raise ArchiveError(_('No value for ckanext-archiver.cache_url_root in config'))
  461. cache_url = urljoin(str(context['cache_url_root']),
  462. '%s/%s' % (str(relative_archive_path), str(file_name)))
  463. return {'cache_filepath': saved_file,
  464. 'cache_url': cache_url}
  465. def notify_resource(resource, queue, cache_filepath):
  466. '''
  467. Broadcasts an IPipe notification that an resource archival has taken place
  468. (or at least the archival object is changed somehow).
  469. '''
  470. archiver_interfaces.IPipe.send_data('archived',
  471. resource_id=resource['id'],
  472. queue=queue,
  473. cache_filepath=cache_filepath)
  474. def notify_package(package, queue):
  475. '''
  476. Broadcasts an IPipe notification that a package archival has taken place
  477. (or at least the archival object is changed somehow). e.g.
  478. ckanext-packagezip listens for this
  479. '''
  480. archiver_interfaces.IPipe.send_data('package-archived',
  481. package_id=package['id'],
  482. queue=queue)
  483. def get_plugins_waiting_on_ipipe():
  484. return [observer.name for observer in
  485. p.PluginImplementations(archiver_interfaces.IPipe)]
  486. def verify_https():
  487. from ckan.plugins.toolkit import config
  488. return toolkit.asbool(config.get('ckanext-archiver.verify_https', True))
  489. def _clean_content_type(ct):
  490. # For now we should remove the charset from the content type and
  491. # handle it better, differently, later on.
  492. if 'charset' in ct:
  493. return ct[:ct.index(';')]
  494. return ct
  495. def _set_user_agent_string(headers):
  496. '''
  497. Update the passed headers object with a `User-Agent` key, if there is a
  498. USER_AGENT_STRING option in settings.
  499. '''
  500. from ckanext.archiver import default_settings as settings
  501. ua_str = settings.USER_AGENT_STRING
  502. if ua_str is not None:
  503. headers['User-Agent'] = ua_str
  504. return headers
  505. def tidy_url(url):
  506. '''
  507. Given a URL it does various checks before returning a tidied version
  508. suitable for calling.
  509. It may raise LinkInvalidError if the URL has a problem.
  510. '''
  511. # Find out if it has unicode characters, and if it does, quote them
  512. # so we are left with an ascii string
  513. try:
  514. url = url.decode('ascii')
  515. except Exception:
  516. parts = list(urlparse(url))
  517. parts[2] = quote(parts[2].encode('utf-8'))
  518. url = urlunparse(parts)
  519. url = str(url)
  520. # strip whitespace from url
  521. # (browsers appear to do this)
  522. url = url.strip()
  523. # Use urllib3 to parse the url ahead of time, since that is what
  524. # requests uses, but when it does it during a GET, errors are not
  525. # caught well
  526. try:
  527. parsed_url = urllib3.util.parse_url(url)
  528. except urllib3.exceptions.LocationParseError as e:
  529. raise LinkInvalidError(_('URL parsing failure: %s') % e)
  530. # Check we aren't using any schemes we shouldn't be.
  531. # Scheme is case-insensitive.
  532. if not parsed_url.scheme or not parsed_url.scheme.lower() in ALLOWED_SCHEMES:
  533. raise LinkInvalidError(_('Invalid url scheme. Please use one of: %s') %
  534. ' '.join(ALLOWED_SCHEMES))
  535. if not parsed_url.host:
  536. raise LinkInvalidError(_('URL parsing failure - did not find a host name'))
  537. return url
  538. def _save_resource(resource, response, max_file_size, chunk_size=1024*16):
  539. """
  540. Write the response content to disk.
  541. Returns a tuple:
  542. (file length: int, content hash: string, saved file path: string)
  543. """
  544. resource_hash = hashlib.sha1()
  545. length = 0
  546. fd, tmp_resource_file_path = tempfile.mkstemp()
  547. with open(tmp_resource_file_path, 'wb') as fp:
  548. for chunk in response.iter_content(chunk_size=chunk_size,
  549. decode_unicode=False):
  550. fp.write(chunk)
  551. length += len(chunk)
  552. resource_hash.update(chunk)
  553. if length >= max_file_size:
  554. raise ChooseNotToDownload(
  555. _("Content-length %s exceeds maximum allowed value %s") %
  556. (length, max_file_size))
  557. os.close(fd)
  558. content_hash = str(resource_hash.hexdigest())
  559. return length, content_hash, tmp_resource_file_path
  560. def save_archival(resource, status_id, reason, url_redirected_to,
  561. download_result, archive_result, log):
  562. '''Writes to the archival table the result of an attempt to download
  563. the resource.
  564. May propagate a CkanError.
  565. '''
  566. now = datetime.datetime.now()
  567. from ckanext.archiver.model import Archival, Status
  568. from ckan import model
  569. archival = Archival.get_for_resource(resource['id'])
  570. first_archival = not archival
  571. previous_archival_was_broken = None
  572. if not archival:
  573. archival = Archival.create(resource['id'])
  574. model.Session.add(archival)
  575. else:
  576. log.info('Archival from before: %r', archival)
  577. previous_archival_was_broken = archival.is_broken
  578. try:
  579. revision = model.Session.query(model.Revision) \
  580. .get(resource['revision_id'])
  581. archival.resource_timestamp = revision.timestamp
  582. except AttributeError:
  583. # CKAN 2.9 doesn't have revisions, so we can't get a timestamp
  584. pass
  585. # Details of the latest archival attempt
  586. archival.status_id = status_id
  587. archival.is_broken = Status.is_status_broken(status_id)
  588. archival.reason = reason
  589. archival.url_redirected_to = url_redirected_to
  590. # Details of successful archival
  591. if archival.is_broken is False:
  592. archival.cache_filepath = archive_result['cache_filepath']
  593. archival.cache_url = archive_result['cache_url']
  594. archival.size = download_result['size']
  595. archival.mimetype = download_result['mimetype']
  596. archival.hash = download_result['hash']
  597. archival.etag = download_result['headers'].get('etag')
  598. archival.last_modified = download_result['headers'].get('last-modified')
  599. # History
  600. if archival.is_broken is False:
  601. archival.last_success = now
  602. archival.first_failure = None
  603. archival.failure_count = 0
  604. else:
  605. log.info('First_archival=%r Previous_broken=%r Failure_count=%r' %
  606. (first_archival, previous_archival_was_broken,
  607. archival.failure_count))
  608. if first_archival or previous_archival_was_broken is False:
  609. # i.e. this is the first failure (or the first archival)
  610. archival.first_failure = now
  611. archival.failure_count = 1
  612. else:
  613. archival.failure_count += 1
  614. archival.updated = now
  615. log.info('Archival saved: %r', archival)
  616. model.repo.commit_and_remove()
  617. def requests_wrapper(log, func, *args, **kwargs):
  618. '''
  619. Run a requests command, catching exceptions and reraising them as
  620. DownloadException. Status errors, such as 404 or 500 do not cause
  621. exceptions, instead exposed as not response.ok.
  622. e.g.
  623. >>> requests_wrapper(log, requests.get, url, timeout=url_timeout)
  624. runs:
  625. res = requests.get(url, timeout=url_timeout)
  626. '''
  627. from .requests_ssl import SSLv3Adapter
  628. try:
  629. try:
  630. response = func(*args, **kwargs)
  631. except requests.exceptions.ConnectionError as e:
  632. if 'SSL23_GET_SERVER_HELLO' not in str(e):
  633. raise
  634. log.info('SSLv23 failed so trying again using SSLv3: %r', args)
  635. requests_session = requests.Session()
  636. requests_session.mount('https://', SSLv3Adapter())
  637. func = {requests.get: requests_session.get,
  638. requests.post: requests_session.post}[func]
  639. response = func(*args, **kwargs)
  640. except requests.exceptions.ConnectionError as e:
  641. raise DownloadException(_('Connection error: %s') % e)
  642. except requests.exceptions.HTTPError as e:
  643. raise DownloadException(_('Invalid HTTP response: %s') % e)
  644. except requests.exceptions.Timeout:
  645. raise DownloadException(_('Connection timed out after %ss') % kwargs.get('timeout', '?'))
  646. except requests.exceptions.TooManyRedirects:
  647. raise DownloadException(_('Too many redirects'))
  648. except requests.exceptions.RequestException as e:
  649. raise DownloadException(_('Error downloading: %s') % e)
  650. except Exception as e:
  651. if os.environ.get('DEBUG'):
  652. raise
  653. raise DownloadException(_('Error with the download: %s') % e)
  654. return response
  655. def ogc_request(context, resource, service, wms_version):
  656. original_url = url = resource['url']
  657. # Remove parameters
  658. url = url.split('?')[0]
  659. # Add WMS GetCapabilities parameters
  660. url += '?service=%s&request=GetCapabilities&version=%s' % \
  661. (service, wms_version)
  662. resource['url'] = url
  663. # Make the request
  664. response = download(context, resource)
  665. # Restore the URL so that it doesn't get saved in the actual resource
  666. resource['url'] = original_url
  667. return response
  668. def wms_1_3_request(context, resource):
  669. res = ogc_request(context, resource, 'WMS', '1.3')
  670. res['request_type'] = 'WMS 1.3'
  671. return res
  672. def wms_1_1_1_request(context, resource):
  673. res = ogc_request(context, resource, 'WMS', '1.1.1')
  674. res['request_type'] = 'WMS 1.1.1'
  675. return res
  676. def wfs_request(context, resource):
  677. res = ogc_request(context, resource, 'WFS', '2.0')
  678. res['request_type'] = 'WFS 2.0'
  679. return res
  680. def api_request(context, resource):
  681. '''
  682. Tries making requests as if the resource is a well-known sort of API to try
  683. and get a valid response. If it does it returns the response, otherwise
  684. Archives the response and stores what sort of request elicited it.
  685. '''
  686. # 'resource' holds the results of the download and will get saved. Only if
  687. # an API request is successful do we want to save the details of it.
  688. # However download() gets altered for these API requests. So only give
  689. # download() a copy of 'resource'.
  690. for api_request_func in wms_1_3_request, wms_1_1_1_request, wfs_request:
  691. resource_copy = copy.deepcopy(resource)
  692. try:
  693. download_dict = api_request_func(context, resource_copy)
  694. except ArchiverError as e:
  695. log.info('API %s error: %r, %r "%s"', api_request_func,
  696. e, e.args, resource.get('url'))
  697. continue
  698. except Exception as e:
  699. if os.environ.get('DEBUG'):
  700. raise
  701. log.error('Uncaught API %s failure: %r, %r', api_request_func,
  702. e, e.args)
  703. continue
  704. return download_dict
  705. def is_id(id_string):
  706. '''Tells the client if the string looks like a revision id or not'''
  707. reg_ex = '^[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}$'
  708. return bool(re.match(reg_ex, id_string))
  709. def response_is_an_api_error(response_body):
  710. '''Some APIs return errors as the response body, but HTTP status 200. So we
  711. need to check response bodies for these error messages.
  712. '''
  713. response_sample = response_body[:250] # to allow for <?xml> and <!DOCTYPE> lines
  714. # WMS spec
  715. # e.g. https://map.bgs.ac.uk/ArcGIS/services/BGS_Detailed_Geology/MapServer/WMSServer?service=abc
  716. # <?xml version="1.0" encoding="UTF-8" standalone="yes" ?>
  717. # <ServiceExceptionReport version="1.3.0"
  718. if '<ServiceExceptionReport' in response_sample:
  719. return True
  720. # This appears to be an alternative - I can't find the spec.
  721. # e.g. http://sedsh13.sedsh.gov.uk/ArcGIS/services/HS/Historic_Scotland/MapServer/WFSServer?service=abc
  722. # <ows:ExceptionReport version='1.1.0' language='en' xmlns:ows='http://www.opengis.net/ows'>
  723. # <ows:Exception exceptionCode='NoApplicableCode'><ows:ExceptionText>Wrong service type.
  724. # </ows:ExceptionText></ows:Exception></ows:ExceptionReport>
  725. if '<ows:ExceptionReport' in response_sample:
  726. return True
  727. def clean():
  728. """
  729. Remove all archived resources.
  730. """
  731. log.error("clean task not implemented yet")
  732. def link_checker(context, data):
  733. """
  734. Check that the resource's url is valid, and accepts a HEAD request.
  735. Redirects are not followed - they simple return 'location' in the headers.
  736. data is a JSON dict describing the link:
  737. { 'url': url,
  738. 'url_timeout': url_timeout }
  739. Raises LinkInvalidError if the URL is invalid
  740. Raises LinkHeadRequestError if HEAD request fails
  741. Raises LinkHeadMethodNotSupported if server says HEAD is not supported
  742. Returns a json dict of the headers of the request
  743. """
  744. data = json.loads(data)
  745. url_timeout = data.get('url_timeout', 30)
  746. error_message = ''
  747. headers = {'User-Agent': USER_AGENT}
  748. url = tidy_url(data['url'])
  749. # Send a head request
  750. try:
  751. res = requests.head(url, timeout=url_timeout)
  752. headers = res.headers
  753. except http.client.InvalidURL as ve:
  754. log.error("Could not make a head request to %r, error is: %s."
  755. " Package is: %r. This sometimes happens when using an old version of requests on a URL"
  756. " which issues a 301 redirect. Version=%s", url, ve, data.get('package'), requests.__version__)
  757. raise LinkHeadRequestError(_("Invalid URL or Redirect Link"))
  758. except ValueError as ve:
  759. log.error("Could not make a head request to %r, error is: %s. Package is: %r.", url, ve, data.get('package'))
  760. raise LinkHeadRequestError(_("Could not make HEAD request"))
  761. except requests.exceptions.ConnectionError as e:
  762. raise LinkHeadRequestError(_('Connection error: %s') % e)
  763. except requests.exceptions.HTTPError as e:
  764. raise LinkHeadRequestError(_('Invalid HTTP response: %s') % e)
  765. except requests.exceptions.Timeout:
  766. raise LinkHeadRequestError(_('Connection timed out after %ss') % url_timeout)
  767. except requests.exceptions.TooManyRedirects:
  768. raise LinkHeadRequestError(_('Too many redirects'))
  769. except requests.exceptions.RequestException as e:
  770. raise LinkHeadRequestError(_('Error during request: %s') % e)
  771. except Exception as e:
  772. raise LinkHeadRequestError(_('Error with the request: %s') % e)
  773. else:
  774. if res.status_code == 405:
  775. # this suggests a GET request may be ok, so proceed to that
  776. # in the download
  777. raise LinkHeadMethodNotSupported()
  778. if not res.ok or res.status_code >= 400:
  779. error_message = _('Server returned HTTP error status: %s %s') % \
  780. (res.status_code, res.reason)
  781. raise LinkHeadRequestError(error_message)
  782. return json.dumps(dict(headers))