lib.py 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. from builtins import str
  2. import logging
  3. import ckan.plugins as p
  4. from ckanext.archiver.tasks import update_package, update_resource
  5. log = logging.getLogger(__name__)
  6. def compat_enqueue(name, fn, queue, args=None):
  7. u'''
  8. Enqueue a background job using Celery or RQ.
  9. '''
  10. try:
  11. # Try to use RQ
  12. from ckan.plugins.toolkit import enqueue_job
  13. enqueue_job(fn, args=args, queue=queue)
  14. except ImportError:
  15. # Fallback to Celery
  16. import uuid
  17. from ckan.lib.celery_app import celery
  18. celery.send_task(name, args=args + [queue], task_id=str(uuid.uuid4()))
  19. def create_archiver_resource_task(resource, queue):
  20. if p.toolkit.check_ckan_version(max_version='2.2.99'):
  21. # earlier CKANs had ResourceGroup
  22. package = resource.resource_group.package
  23. else:
  24. package = resource.package
  25. compat_enqueue('archiver.update_resource', update_resource, queue, [resource.id])
  26. log.debug('Archival of resource put into celery queue %s: %s/%s url=%r',
  27. queue, package.name, resource.id, resource.url)
  28. def create_archiver_package_task(package, queue):
  29. compat_enqueue('archiver.update_package', update_package, queue, [package.id])
  30. log.debug('Archival of package put into celery queue %s: %s',
  31. queue, package.name)
  32. def get_extra_from_pkg_dict(pkg_dict, key, default=None):
  33. for extra in pkg_dict.get('extras', []):
  34. if extra['key'] == key:
  35. return extra['value']
  36. return default