123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247 |
- import logging
- from ckan import model
- from ckan import plugins as p
- from ckanext.report.interfaces import IReport
- from ckanext.archiver.interfaces import IPipe
- from ckanext.archiver.logic import action, auth
- from ckanext.archiver import helpers
- from ckanext.archiver import lib
- from ckanext.archiver.model import Archival, aggregate_archivals_for_a_dataset
- from ckanext.archiver import cli
- log = logging.getLogger(__name__)
- class ArchiverPlugin(p.SingletonPlugin, p.toolkit.DefaultDatasetForm):
- """
- Registers to be notified whenever CKAN resources are created or their URLs
- change, and will create a new ckanext.archiver celery task to archive the
- resource.
- """
- p.implements(p.IDomainObjectModification, inherit=True)
- p.implements(IReport)
- p.implements(p.IConfigurer, inherit=True)
- p.implements(p.IActions)
- p.implements(p.IAuthFunctions)
- p.implements(p.ITemplateHelpers)
- p.implements(p.IPackageController, inherit=True)
- if p.toolkit.check_ckan_version(min_version='2.9.0'):
- p.implements(p.IClick)
- # IDomainObjectModification
- def notify(self, entity, operation=None):
- if not isinstance(entity, model.Package):
- return
- log.debug('Notified of package event: %s %s', entity.name, operation)
- run_archiver = \
- self._is_it_sufficient_change_to_run_archiver(entity, operation)
- if not run_archiver:
- return
- log.debug('Creating archiver task: %s', entity.name)
- lib.create_archiver_package_task(entity, 'priority')
- def _is_it_sufficient_change_to_run_archiver(self, package, operation):
- ''' Returns True if in this revision any of these happened:
- * it is a new dataset
- * dataset licence changed (affects qa)
- * there are resources that have been added or deleted
- * resources have changed their URL or format (affects qa)
- '''
- if operation == 'new':
- log.debug('New package - will archive')
- # even if it has no resources, QA needs to show 0 stars against it
- return True
- elif operation == 'deleted':
- log.debug('Deleted package - won\'t archive')
- return False
- # therefore operation=changed
- # 2.9 does not have revisions so archive anyway
- if p.toolkit.check_ckan_version(min_version='2.9.0'):
- return True
- # check to see if resources are added, deleted or URL changed
- # look for the latest revision
- rev_list = package.all_related_revisions
- if not rev_list:
- log.debug('No sign of previous revisions - will archive')
- return True
- # I am not confident we can rely on the info about the current
- # revision, because we are still in the 'before_commit' stage. So
- # simply ignore that if it's returned.
- if rev_list[0][0].id == model.Session.revision.id:
- rev_list = rev_list[1:]
- if not rev_list:
- log.warn('No sign of previous revisions - will archive')
- return True
- previous_revision = rev_list[0][0]
- log.debug('Comparing with revision: %s %s',
- previous_revision.timestamp, previous_revision.id)
- # get the package as it was at that previous revision
- context = {'model': model, 'session': model.Session,
- # 'user': c.user or c.author,
- 'ignore_auth': True,
- 'revision_id': previous_revision.id}
- data_dict = {'id': package.id}
- try:
- old_pkg_dict = p.toolkit.get_action('package_show')(
- context, data_dict)
- except p.toolkit.NotFound:
- log.warn('No sign of previous package - will archive anyway')
- return True
- # has the licence changed?
- old_licence = (old_pkg_dict['license_id'],
- lib.get_extra_from_pkg_dict(old_pkg_dict, 'licence')
- or None)
- new_licence = (package.license_id,
- package.extras.get('licence') or None)
- if old_licence != new_licence:
- log.debug('Licence has changed - will archive: %r->%r',
- old_licence, new_licence)
- return True
- # have any resources been added or deleted?
- old_resources = dict((res['id'], res)
- for res in old_pkg_dict['resources'])
- old_res_ids = set(old_resources.keys())
- new_res_ids = set((res.id for res in package.resources))
- deleted_res_ids = old_res_ids - new_res_ids
- if deleted_res_ids:
- log.debug('Deleted resources - will archive. res_ids=%r',
- deleted_res_ids)
- return True
- added_res_ids = new_res_ids - old_res_ids
- if added_res_ids:
- log.debug('Added resources - will archive. res_ids=%r',
- added_res_ids)
- return True
- # have any resources' url/format changed?
- for res in package.resources:
- for key in ('url', 'format'):
- old_res_value = old_resources[res.id][key]
- new_res_value = getattr(res, key)
- if old_res_value != new_res_value:
- log.debug('Resource %s changed - will archive. '
- 'id=%s pos=%s url="%s"->"%s"',
- key, res.id[:4], res.position,
- old_res_value, new_res_value)
- return True
- was_in_progress = old_resources[res.id].get('upload_in_progress', None)
- is_in_progress = res.extras.get('upload_in_progress', None)
- if was_in_progress != is_in_progress:
- log.debug('Resource %s upload finished - will archive. ', 'upload_finished')
- return True
- log.debug('Resource unchanged. pos=%s id=%s',
- res.position, res.id[:4])
- log.debug('No new, deleted or changed resources - won\'t archive')
- return False
- # IReport
- def register_reports(self):
- """Register details of an extension's reports"""
- from ckanext.archiver import reports
- return [reports.broken_links_report_info,
- ]
- # IConfigurer
- def update_config(self, config):
- p.toolkit.add_template_directory(config, 'templates')
- # IActions
- def get_actions(self):
- return {
- 'archiver_resource_show': action.archiver_resource_show,
- 'archiver_dataset_show': action.archiver_dataset_show,
- }
- # IAuthFunctions
- def get_auth_functions(self):
- return {
- 'archiver_resource_show': auth.archiver_resource_show,
- 'archiver_dataset_show': auth.archiver_dataset_show,
- }
- # ITemplateHelpers
- def get_helpers(self):
- return dict((name, function) for name, function
- in list(helpers.__dict__.items())
- if callable(function) and name[0] != '_')
- # IPackageController
- def after_show(self, context, pkg_dict):
- """ Old CKAN function name """
- return self.after_dataset_show(context, pkg_dict)
- def after_dataset_show(self, context, pkg_dict):
- # Insert the archival info into the package_dict so that it is
- # available on the API.
- # When you edit the dataset, these values will not show in the form,
- # it they will be saved in the resources (not the dataset). I can't see
- # and easy way to stop this, but I think it is harmless. It will get
- # overwritten here when output again.
- archivals = Archival.get_for_package(pkg_dict['id'])
- if not archivals:
- return
- # dataset
- dataset_archival = aggregate_archivals_for_a_dataset(archivals)
- pkg_dict['archiver'] = dataset_archival
- # resources
- archivals_by_res_id = dict((a.resource_id, a) for a in archivals)
- for res in pkg_dict['resources']:
- archival = archivals_by_res_id.get(res['id'])
- if archival:
- archival_dict = archival.as_dict()
- del archival_dict['id']
- del archival_dict['package_id']
- del archival_dict['resource_id']
- res['archiver'] = archival_dict
- def before_dataset_index(self, pkg_dict):
- '''
- remove `archiver` from index
- '''
- pkg_dict.pop('archiver', None)
- return pkg_dict
- # IClick
- def get_commands(self):
- return cli.get_commands()
- class TestIPipePlugin(p.SingletonPlugin):
- """
- """
- p.implements(IPipe, inherit=True)
- def __init__(self, *args, **kwargs):
- self.calls = []
- def reset(self):
- self.calls = []
- def receive_data(self, operation, queue, **params):
- self.calls.append([operation, queue, params])
|