migrate_task_status.py 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169
  1. '''
  2. Tool to migrate archival data from the TaskStatus and Resource tables to
  3. the Archival table.
  4. '''
  5. from __future__ import print_function
  6. from optparse import OptionParser
  7. import logging
  8. import json
  9. import datetime
  10. import common
  11. from running_stats import StatsList
  12. # pip install 'ProgressBar==2.3'
  13. from progressbar import ProgressBar, Percentage, Bar, ETA
  14. START_OF_TIME = datetime.datetime(1980, 1, 1)
  15. END_OF_TIME = datetime.datetime(9999, 12, 31)
  16. # NB put no CKAN imports here, or logging breaks
  17. def migrate(options):
  18. from ckan import model
  19. from ckanext.archiver.model import Archival, Status
  20. resources = common.get_resources(state='active',
  21. publisher_ref=options.publisher,
  22. resource_id=options.resource,
  23. dataset_name=options.dataset)
  24. stats = StatsList()
  25. widgets = ['Resources: ', Percentage(), ' ', Bar(), ' ', ETA()]
  26. progress = ProgressBar(widgets=widgets)
  27. for res in progress(resources):
  28. # Gather the details of archivals from TaskStatus and Resource
  29. # to fill all properties of Archival apart from:
  30. # * package_id
  31. # * resource_id
  32. fields = {}
  33. archiver_task_status = model.Session.query(model.TaskStatus)\
  34. .filter_by(entity_id=res.id)\
  35. .filter_by(task_type='archiver')\
  36. .filter_by(key='status')\
  37. .first()
  38. if archiver_task_status:
  39. ats_error = json.loads(archiver_task_status.error)
  40. fields['status_id'] = Status.by_text(archiver_task_status.value)
  41. fields['is_broken'] = Status.is_status_broken(fields['status_id'])
  42. fields['reason'] = ats_error['reason']
  43. fields['last_success'] = date_str_to_datetime_or_none(ats_error['last_success'])
  44. fields['first_failure'] = date_str_to_datetime_or_none(ats_error['first_failure'])
  45. fields['failure_count'] = int(ats_error['failure_count'])
  46. fields['url_redirected_to'] = ats_error['url_redirected_to']
  47. fields['updated'] = archiver_task_status.last_updated
  48. else:
  49. if not (res.cache_url
  50. or res.extras.get('cache_filepath')
  51. or res.hash
  52. or res.size
  53. or res.mimetype):
  54. add_stat('No archive data', res, stats)
  55. continue
  56. for field_name in ('status_id', 'is_broken', 'reason',
  57. 'last_success', 'first_failure',
  58. 'failure_count', 'url_redirected_to',
  59. 'updated', 'created'):
  60. fields[field_name] = None
  61. fields['cache_filepath'] = res.extras.get('cache_filepath')
  62. fields['cache_url'] = res.cache_url
  63. fields['hash'] = res.hash
  64. fields['size'] = res.size
  65. fields['mimetype'] = res.mimetype
  66. revisions_with_hash = model.Session.query(model.ResourceRevision)\
  67. .filter_by(id=res.id)\
  68. .order_by(model.ResourceRevision.revision_timestamp)\
  69. .filter(model.ResourceRevision.hash != '').all()
  70. if revisions_with_hash:
  71. # these are not perfect by not far off
  72. fields['created'] = revisions_with_hash[0].revision_timestamp
  73. fields['resource_timestamp'] = revisions_with_hash[-1].revision_timestamp
  74. else:
  75. fields['created'] = min(fields['updated'] or END_OF_TIME,
  76. fields['first_failure'] or END_OF_TIME,
  77. fields['last_success'] or END_OF_TIME)
  78. fields['resource_timestamp'] = max(
  79. fields['updated'] or START_OF_TIME,
  80. fields['first_failure'] or START_OF_TIME,
  81. fields['last_success'] or START_OF_TIME)
  82. # Compare with any existing data in the Archival table
  83. archival = Archival.get_for_resource(res.id)
  84. if archival:
  85. changed = None
  86. for field, value in list(fields.items()):
  87. if getattr(archival, field) != value:
  88. if options.write:
  89. setattr(archival, field, value)
  90. changed = True
  91. if not changed:
  92. add_stat('Already exists correctly in archival table', res, stats)
  93. continue
  94. add_stat('Updated in archival table', res, stats)
  95. else:
  96. archival = Archival.create(res.id)
  97. if options.write:
  98. for field, value in list(fields.items()):
  99. setattr(archival, field, value)
  100. model.Session.add(archival)
  101. add_stat('Added to archival table', res, stats)
  102. print('Summary\n', stats.report())
  103. if options.write:
  104. model.repo.commit_and_remove()
  105. print('Written')
  106. def add_stat(outcome, res, stats, extra_info=None):
  107. try:
  108. # pre CKAN 2.3 model
  109. package_name = res.resource_group.package.name
  110. except AttributeError:
  111. # CKAN 2.3+ model
  112. package_name = res.package.name
  113. res_id = '%s %s' % (package_name, res.id[:4])
  114. if extra_info:
  115. res_id += ' %s' % extra_info
  116. return '\n' + stats.add(outcome, res_id)
  117. def date_str_to_datetime_or_none(date_str):
  118. from ckan.lib.helpers import date_str_to_datetime
  119. if date_str:
  120. return date_str_to_datetime(date_str)
  121. return None
  122. if __name__ == '__main__':
  123. usage = """Tool to migrate archival data from TaskStatus/Resource to Archival table
  124. usage: %prog [options] <ckan.ini>
  125. """
  126. parser = OptionParser(usage=usage)
  127. parser.add_option("-w", "--write",
  128. action="store_true", dest="write",
  129. help="write the theme to the datasets")
  130. parser.add_option('-p', '--publisher', dest='publisher')
  131. parser.add_option('-d', '--dataset', dest='dataset')
  132. parser.add_option('-r', '--resource', dest='resource')
  133. (options, args) = parser.parse_args()
  134. if len(args) != 1:
  135. parser.error('Wrong number of arguments (%i)' % len(args))
  136. config_ini = args[0]
  137. print('Loading CKAN config...')
  138. common.load_config(config_ini)
  139. common.register_translator()
  140. print('Done')
  141. # Setup logging to print debug out for theme stuff only
  142. rootLogger = logging.getLogger()
  143. rootLogger.setLevel(logging.WARNING)
  144. localLogger = logging.getLogger(__name__)
  145. localLogger.setLevel(logging.DEBUG)
  146. handler = logging.StreamHandler()
  147. handler.setFormatter(logging.Formatter('%(message)s'))
  148. localLogger.addHandler(handler)
  149. migrate(options)