plugin.py 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247
  1. import logging
  2. from ckan import model
  3. from ckan import plugins as p
  4. from ckanext.report.interfaces import IReport
  5. from ckanext.archiver.interfaces import IPipe
  6. from ckanext.archiver.logic import action, auth
  7. from ckanext.archiver import helpers
  8. from ckanext.archiver import lib
  9. from ckanext.archiver.model import Archival, aggregate_archivals_for_a_dataset
  10. from ckanext.archiver import cli
  11. log = logging.getLogger(__name__)
  12. class ArchiverPlugin(p.SingletonPlugin, p.toolkit.DefaultDatasetForm):
  13. """
  14. Registers to be notified whenever CKAN resources are created or their URLs
  15. change, and will create a new ckanext.archiver celery task to archive the
  16. resource.
  17. """
  18. p.implements(p.IDomainObjectModification, inherit=True)
  19. p.implements(IReport)
  20. p.implements(p.IConfigurer, inherit=True)
  21. p.implements(p.IActions)
  22. p.implements(p.IAuthFunctions)
  23. p.implements(p.ITemplateHelpers)
  24. p.implements(p.IPackageController, inherit=True)
  25. if p.toolkit.check_ckan_version(min_version='2.9.0'):
  26. p.implements(p.IClick)
  27. # IDomainObjectModification
  28. def notify(self, entity, operation=None):
  29. if not isinstance(entity, model.Package):
  30. return
  31. log.debug('Notified of package event: %s %s', entity.name, operation)
  32. run_archiver = \
  33. self._is_it_sufficient_change_to_run_archiver(entity, operation)
  34. if not run_archiver:
  35. return
  36. log.debug('Creating archiver task: %s', entity.name)
  37. lib.create_archiver_package_task(entity, 'priority')
  38. def _is_it_sufficient_change_to_run_archiver(self, package, operation):
  39. ''' Returns True if in this revision any of these happened:
  40. * it is a new dataset
  41. * dataset licence changed (affects qa)
  42. * there are resources that have been added or deleted
  43. * resources have changed their URL or format (affects qa)
  44. '''
  45. if operation == 'new':
  46. log.debug('New package - will archive')
  47. # even if it has no resources, QA needs to show 0 stars against it
  48. return True
  49. elif operation == 'deleted':
  50. log.debug('Deleted package - won\'t archive')
  51. return False
  52. # therefore operation=changed
  53. # 2.9 does not have revisions so archive anyway
  54. if p.toolkit.check_ckan_version(min_version='2.9.0'):
  55. return True
  56. # check to see if resources are added, deleted or URL changed
  57. # look for the latest revision
  58. rev_list = package.all_related_revisions
  59. if not rev_list:
  60. log.debug('No sign of previous revisions - will archive')
  61. return True
  62. # I am not confident we can rely on the info about the current
  63. # revision, because we are still in the 'before_commit' stage. So
  64. # simply ignore that if it's returned.
  65. if rev_list[0][0].id == model.Session.revision.id:
  66. rev_list = rev_list[1:]
  67. if not rev_list:
  68. log.warn('No sign of previous revisions - will archive')
  69. return True
  70. previous_revision = rev_list[0][0]
  71. log.debug('Comparing with revision: %s %s',
  72. previous_revision.timestamp, previous_revision.id)
  73. # get the package as it was at that previous revision
  74. context = {'model': model, 'session': model.Session,
  75. # 'user': c.user or c.author,
  76. 'ignore_auth': True,
  77. 'revision_id': previous_revision.id}
  78. data_dict = {'id': package.id}
  79. try:
  80. old_pkg_dict = p.toolkit.get_action('package_show')(
  81. context, data_dict)
  82. except p.toolkit.NotFound:
  83. log.warn('No sign of previous package - will archive anyway')
  84. return True
  85. # has the licence changed?
  86. old_licence = (old_pkg_dict['license_id'],
  87. lib.get_extra_from_pkg_dict(old_pkg_dict, 'licence')
  88. or None)
  89. new_licence = (package.license_id,
  90. package.extras.get('licence') or None)
  91. if old_licence != new_licence:
  92. log.debug('Licence has changed - will archive: %r->%r',
  93. old_licence, new_licence)
  94. return True
  95. # have any resources been added or deleted?
  96. old_resources = dict((res['id'], res)
  97. for res in old_pkg_dict['resources'])
  98. old_res_ids = set(old_resources.keys())
  99. new_res_ids = set((res.id for res in package.resources))
  100. deleted_res_ids = old_res_ids - new_res_ids
  101. if deleted_res_ids:
  102. log.debug('Deleted resources - will archive. res_ids=%r',
  103. deleted_res_ids)
  104. return True
  105. added_res_ids = new_res_ids - old_res_ids
  106. if added_res_ids:
  107. log.debug('Added resources - will archive. res_ids=%r',
  108. added_res_ids)
  109. return True
  110. # have any resources' url/format changed?
  111. for res in package.resources:
  112. for key in ('url', 'format'):
  113. old_res_value = old_resources[res.id][key]
  114. new_res_value = getattr(res, key)
  115. if old_res_value != new_res_value:
  116. log.debug('Resource %s changed - will archive. '
  117. 'id=%s pos=%s url="%s"->"%s"',
  118. key, res.id[:4], res.position,
  119. old_res_value, new_res_value)
  120. return True
  121. was_in_progress = old_resources[res.id].get('upload_in_progress', None)
  122. is_in_progress = res.extras.get('upload_in_progress', None)
  123. if was_in_progress != is_in_progress:
  124. log.debug('Resource %s upload finished - will archive. ', 'upload_finished')
  125. return True
  126. log.debug('Resource unchanged. pos=%s id=%s',
  127. res.position, res.id[:4])
  128. log.debug('No new, deleted or changed resources - won\'t archive')
  129. return False
  130. # IReport
  131. def register_reports(self):
  132. """Register details of an extension's reports"""
  133. from ckanext.archiver import reports
  134. return [reports.broken_links_report_info,
  135. ]
  136. # IConfigurer
  137. def update_config(self, config):
  138. p.toolkit.add_template_directory(config, 'templates')
  139. # IActions
  140. def get_actions(self):
  141. return {
  142. 'archiver_resource_show': action.archiver_resource_show,
  143. 'archiver_dataset_show': action.archiver_dataset_show,
  144. }
  145. # IAuthFunctions
  146. def get_auth_functions(self):
  147. return {
  148. 'archiver_resource_show': auth.archiver_resource_show,
  149. 'archiver_dataset_show': auth.archiver_dataset_show,
  150. }
  151. # ITemplateHelpers
  152. def get_helpers(self):
  153. return dict((name, function) for name, function
  154. in list(helpers.__dict__.items())
  155. if callable(function) and name[0] != '_')
  156. # IPackageController
  157. def after_show(self, context, pkg_dict):
  158. """ Old CKAN function name """
  159. return self.after_dataset_show(context, pkg_dict)
  160. def after_dataset_show(self, context, pkg_dict):
  161. # Insert the archival info into the package_dict so that it is
  162. # available on the API.
  163. # When you edit the dataset, these values will not show in the form,
  164. # it they will be saved in the resources (not the dataset). I can't see
  165. # and easy way to stop this, but I think it is harmless. It will get
  166. # overwritten here when output again.
  167. archivals = Archival.get_for_package(pkg_dict['id'])
  168. if not archivals:
  169. return
  170. # dataset
  171. dataset_archival = aggregate_archivals_for_a_dataset(archivals)
  172. pkg_dict['archiver'] = dataset_archival
  173. # resources
  174. archivals_by_res_id = dict((a.resource_id, a) for a in archivals)
  175. for res in pkg_dict['resources']:
  176. archival = archivals_by_res_id.get(res['id'])
  177. if archival:
  178. archival_dict = archival.as_dict()
  179. del archival_dict['id']
  180. del archival_dict['package_id']
  181. del archival_dict['resource_id']
  182. res['archiver'] = archival_dict
  183. def before_dataset_index(self, pkg_dict):
  184. '''
  185. remove `archiver` from index
  186. '''
  187. pkg_dict.pop('archiver', None)
  188. return pkg_dict
  189. # IClick
  190. def get_commands(self):
  191. return cli.get_commands()
  192. class TestIPipePlugin(p.SingletonPlugin):
  193. """
  194. """
  195. p.implements(IPipe, inherit=True)
  196. def __init__(self, *args, **kwargs):
  197. self.calls = []
  198. def reset(self):
  199. self.calls = []
  200. def receive_data(self, operation, queue, **params):
  201. self.calls.append([operation, queue, params])