12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849 |
- from builtins import str
- import logging
- import ckan.plugins as p
- from ckanext.archiver.tasks import update_package, update_resource
- log = logging.getLogger(__name__)
- def compat_enqueue(name, fn, queue, args=None):
- u'''
- Enqueue a background job using Celery or RQ.
- '''
- try:
- # Try to use RQ
- from ckan.plugins.toolkit import enqueue_job
- enqueue_job(fn, args=args, queue=queue)
- except ImportError:
- # Fallback to Celery
- import uuid
- from ckan.lib.celery_app import celery
- celery.send_task(name, args=args + [queue], task_id=str(uuid.uuid4()))
- def create_archiver_resource_task(resource, queue):
- if p.toolkit.check_ckan_version(max_version='2.2.99'):
- # earlier CKANs had ResourceGroup
- package = resource.resource_group.package
- else:
- package = resource.package
- compat_enqueue('archiver.update_resource', update_resource, queue, [resource.id])
- log.debug('Archival of resource put into celery queue %s: %s/%s url=%r',
- queue, package.name, resource.id, resource.url)
- def create_archiver_package_task(package, queue):
- compat_enqueue('archiver.update_package', update_package, queue, [package.id])
- log.debug('Archival of package put into celery queue %s: %s',
- queue, package.name)
- def get_extra_from_pkg_dict(pkg_dict, key, default=None):
- for extra in pkg_dict.get('extras', []):
- if extra['key'] == key:
- return extra['value']
- return default
|