utils.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521
  1. import itertools
  2. import logging
  3. import sys
  4. from time import sleep
  5. import os
  6. import re
  7. import shutil
  8. from sqlalchemy import func
  9. import ckan.plugins as p
  10. from ckan.plugins.toolkit import config
  11. try:
  12. from collections import OrderedDict # from python 2.7
  13. except ImportError:
  14. from sqlalchemy.util import OrderedDict
  15. log = logging.getLogger(__name__)
  16. def update(identifiers, queue):
  17. from ckanext.archiver import lib
  18. for pkg_or_res, is_pkg, num_resources_for_pkg, pkg_for_res in \
  19. _get_packages_and_resources_in_args(identifiers, queue):
  20. if is_pkg:
  21. package = pkg_or_res
  22. log.info('Queuing dataset %s (%s resources) Q:%s', package.name, num_resources_for_pkg, queue)
  23. lib.create_archiver_package_task(package, queue)
  24. sleep(0.1) # to try to avoid Redis getting overloaded
  25. else:
  26. resource = pkg_or_res
  27. package = pkg_for_res
  28. log.info('Queuing resource %s/%s', package.name, resource.id)
  29. lib.create_archiver_resource_task(resource, queue)
  30. sleep(0.05) # to try to avoid Redis getting overloaded
  31. def _get_packages_and_resources_in_args(identifiers, queue):
  32. '''Given identifies that specify one or more datasets or
  33. resources, it generates a list of those packages & resources with some
  34. basic properties.
  35. Returns a tuple:
  36. (pkg_or_res, is_pkg, num_resources_for_pkg, pkg_for_res)
  37. When is_pkg=True:
  38. pkg_or_res - package object
  39. num_resources_for_pkg - number of resources it has
  40. pkg_for_res - None
  41. When is_pkg=False:
  42. pkg_or_res - resource object
  43. num_resources_for_pkg - None
  44. pkg_for_res - package object relating to the given resource
  45. '''
  46. from ckan import model
  47. packages = []
  48. resources = []
  49. if identifiers:
  50. for identifier in identifiers:
  51. # try arg as a group id/name
  52. group = model.Group.get(identifier)
  53. if group:
  54. if group.is_organization:
  55. packages.extend(
  56. model.Session.query(model.Package)
  57. .filter_by(owner_org=group.id))
  58. else:
  59. packages.extend(group.packages(with_private=True))
  60. if not queue:
  61. queue = 'bulk'
  62. continue
  63. # try arg as a package id/name
  64. pkg = model.Package.get(identifier)
  65. if pkg:
  66. packages.append(pkg)
  67. if not queue:
  68. queue = 'priority'
  69. continue
  70. # try arg as a resource id
  71. res = model.Resource.get(identifier)
  72. if res:
  73. resources.append(res)
  74. if not queue:
  75. queue = 'priority'
  76. continue
  77. else:
  78. log.error('Could not recognize as a group, package or resource: %r', identifier)
  79. sys.exit(1)
  80. else:
  81. # all packages
  82. pkgs = model.Session.query(model.Package) \
  83. .filter_by(state='active') \
  84. .order_by('name').all()
  85. packages.extend(pkgs)
  86. if not queue:
  87. queue = 'bulk'
  88. log.info('Datasets to archive: %d', len(packages))
  89. if resources:
  90. log.info('Resources to archive: %d', len(resources))
  91. if not (packages or resources):
  92. log.error('No datasets or resources to process')
  93. sys.exit(1)
  94. log.info('Queue: %s', queue)
  95. for package in packages:
  96. if p.toolkit.check_ckan_version(max_version='2.2.99'):
  97. # earlier CKANs had ResourceGroup
  98. pkg_resources = \
  99. [resource for resource in
  100. itertools.chain.from_iterable(
  101. (rg.resources_all
  102. for rg in package.resource_groups_all)
  103. )
  104. if res.state == 'active']
  105. else:
  106. pkg_resources = \
  107. [resource for resource in package.resources_all
  108. if resource.state == 'active']
  109. yield package, True, len(pkg_resources), None
  110. for resource in resources:
  111. if p.toolkit.check_ckan_version(max_version='2.2.99'):
  112. package = resource.resource_group.package
  113. else:
  114. package = resource.package
  115. yield resource, False, None, package
  116. def update_test(identifiers, queue):
  117. from ckanext.archiver import tasks
  118. # Prevent it loading config again
  119. tasks.load_config = lambda x: None
  120. for pkg_or_res, is_pkg, num_resources_for_pkg, pkg_for_res in \
  121. _get_packages_and_resources_in_args(identifiers):
  122. if is_pkg:
  123. package = pkg_or_res
  124. log.info('Archiving dataset %s (%s resources)', package.name, num_resources_for_pkg)
  125. tasks._update_package(package.id, queue, log)
  126. else:
  127. resource = pkg_or_res
  128. package = pkg_for_res
  129. log.info('Queuing resource %s/%s', package.name, resource.id)
  130. tasks._update_resource(resource.id, queue, log)
  131. def init():
  132. import ckan.model as model
  133. from ckanext.archiver.model import init_tables
  134. init_tables(model.meta.engine)
  135. def view(package_ref=None):
  136. from ckan import model
  137. from ckanext.archiver.model import Archival
  138. r_q = model.Session.query(model.Resource).filter_by(state='active')
  139. print('Resources: %i total' % r_q.count())
  140. a_q = model.Session.query(Archival)
  141. print('Archived resources: %i total' % a_q.count())
  142. num_with_cache_url = a_q.filter(Archival.cache_url != '').count()
  143. print(' %i with cache_url' % num_with_cache_url)
  144. last_updated_res = a_q.order_by(Archival.updated.desc()).first()
  145. print('Latest archival: %s' % (last_updated_res.updated.strftime('%Y-%m-%d %H:%M') if last_updated_res else '(no)'))
  146. if package_ref:
  147. pkg = model.Package.get(package_ref)
  148. print('Package %s %s' % (pkg.name, pkg.id))
  149. for res in pkg.resources:
  150. print('Resource %s' % res.id)
  151. for archival in a_q.filter_by(resource_id=res.id):
  152. print('* %r' % archival)
  153. def clean_status():
  154. from ckan import model
  155. from ckanext.archiver.model import Archival
  156. print('Before:')
  157. view()
  158. q = model.Session.query(Archival)
  159. q.delete()
  160. model.Session.commit()
  161. print('After:')
  162. view()
  163. def clean_cached_resources():
  164. from ckan import model
  165. from ckanext.archiver.model import Archival
  166. print('Before:')
  167. view()
  168. q = model.Session.query(Archival).filter(Archival.cache_url != '')
  169. archivals = q.all()
  170. num_archivals = len(archivals)
  171. progress = 0
  172. for archival in archivals:
  173. archival.cache_url = None
  174. archival.cache_filepath = None
  175. archival.size = None
  176. archival.mimetype = None
  177. archival.hash = None
  178. progress += 1
  179. if progress % 1000 == 0:
  180. print('Done %i/%i' % (progress, num_archivals))
  181. model.Session.commit()
  182. model.Session.commit()
  183. model.Session.remove()
  184. print('After:')
  185. view()
  186. def report(output_file, delete=False):
  187. """
  188. Generates a report containing orphans (either files or resources)
  189. """
  190. import csv
  191. from ckan import model
  192. archive_root = config.get('ckanext-archiver.archive_dir')
  193. if not archive_root:
  194. log.error("Could not find archiver root")
  195. return
  196. # We'll use this to match the UUID part of the path
  197. uuid_re = re.compile(".*([0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}).*")
  198. not_cached_active = 0
  199. not_cached_deleted = 0
  200. file_not_found_active = 0
  201. file_not_found_deleted = 0
  202. perm_error = 0
  203. file_no_resource = 0
  204. with open(output_file, "w") as f:
  205. writer = csv.writer(f)
  206. writer.writerow(["Resource ID", "Filepath", "Problem"])
  207. resources = {}
  208. for resource in model.Session.query(model.Resource).all():
  209. resources[resource.id] = True
  210. # Check the resource's cached_filepath
  211. fp = resource.extras.get('cache_filepath')
  212. if fp is None:
  213. if resource.state == 'active':
  214. not_cached_active += 1
  215. else:
  216. not_cached_deleted += 1
  217. writer.writerow([resource.id, str(resource.extras), "Resource not cached: {0}".format(resource.state)])
  218. continue
  219. # Check that the cached file is there and readable
  220. if not os.path.exists(fp):
  221. if resource.state == 'active':
  222. file_not_found_active += 1
  223. else:
  224. file_not_found_deleted += 1
  225. writer.writerow([resource.id, fp.encode('utf-8'), "File not found: {0}".format(resource.state)])
  226. continue
  227. try:
  228. os.stat(fp)
  229. except OSError:
  230. perm_error += 1
  231. writer.writerow([resource.id, fp.encode('utf-8'), "File not readable"])
  232. continue
  233. # Iterate over the archive root and check each file by matching the
  234. # resource_id part of the path to the resources dict
  235. for root, _, files in os.walk(archive_root):
  236. for filename in files:
  237. archived_path = os.path.join(root, filename)
  238. m = uuid_re.match(archived_path)
  239. if not m:
  240. writer.writerow([resource.id, archived_path, "Malformed path (no UUID)"])
  241. continue
  242. if not resources.get(m.groups(0)[0].strip(), False):
  243. file_no_resource += 1
  244. if delete:
  245. try:
  246. os.unlink(archived_path)
  247. log.info("Unlinked {0}".format(archived_path))
  248. os.rmdir(root)
  249. log.info("Unlinked {0}".format(root))
  250. writer.writerow([m.groups(0)[0], archived_path, "Resource not found, file deleted"])
  251. except Exception as e:
  252. log.error("Failed to unlink {0}: {1}".format(archived_path, e))
  253. else:
  254. writer.writerow([m.groups(0)[0], archived_path, "Resource not found"])
  255. continue
  256. print("General info:")
  257. print(" Permission error reading file: {0}".format(perm_error))
  258. print(" file on disk but no resource: {0}".format(file_no_resource))
  259. print(" Total resources: {0}".format(model.Session.query(model.Resource).count()))
  260. print("Active resource info:")
  261. print(" No cache_filepath: {0}".format(not_cached_active))
  262. print(" cache_filepath not on disk: {0}".format(file_not_found_active))
  263. print("Deleted resource info:")
  264. print(" No cache_filepath: {0}".format(not_cached_deleted))
  265. print(" cache_filepath not on disk: {0}".format(file_not_found_deleted))
  266. def migrate():
  267. """ Adds any missing columns to the database table for Archival by
  268. checking the schema and adding those that are missing.
  269. If you wish to add a column, add the column name and sql
  270. statement to MIGRATIONS_ADD which will check that the column is
  271. not present before running the query.
  272. If you wish to modify or delete a column, add the column name and
  273. query to the MIGRATIONS_MODIFY which only runs if the column
  274. does exist.
  275. """
  276. from ckan import model
  277. MIGRATIONS_ADD = OrderedDict({
  278. "etag": "ALTER TABLE archival ADD COLUMN etag character varying",
  279. "last_modified": "ALTER TABLE archival ADD COLUMN last_modified character varying"
  280. })
  281. MIGRATIONS_MODIFY = OrderedDict({
  282. })
  283. q = "select column_name from INFORMATION_SCHEMA.COLUMNS where table_name = 'archival';"
  284. current_cols = list([m[0] for m in model.Session.execute(q)])
  285. for k, v in MIGRATIONS_ADD.items():
  286. if k not in current_cols:
  287. log.info(u"Adding column '{0}'".format(k))
  288. log.info(u"Executing '{0}'".format(v))
  289. model.Session.execute(v)
  290. model.Session.commit()
  291. for k, v in MIGRATIONS_MODIFY.items():
  292. if k in current_cols:
  293. log.info(u"Removing column '{0}'".format(k))
  294. log.info(u"Executing '{0}'".format(v))
  295. model.Session.execute(v)
  296. model.Session.commit()
  297. log.info("Migrations complete")
  298. def migrate_archiver_dirs():
  299. from ckan import model
  300. from ckan.logic import get_action
  301. site_user = get_action('get_site_user')(
  302. {'model': model, 'ignore_auth': True, 'defer_commit': True}, {}
  303. )
  304. site_url_base = config['ckanext-archiver.cache_url_root'].rstrip('/')
  305. old_dir_regex = re.compile(r'(.*)/([a-f0-9\-]+)/([^/]*)$')
  306. new_dir_regex = re.compile(r'(.*)/[a-f0-9]{2}/[a-f0-9\-]{36}/[^/]*$')
  307. for resource in model.Session.query(model.Resource). \
  308. filter(model.Resource.state != model.State.DELETED):
  309. if not resource.cache_url or resource.cache_url == 'None':
  310. continue
  311. if new_dir_regex.match(resource.cache_url):
  312. print('Resource with new url already: %s' % resource.cache_url)
  313. continue
  314. match = old_dir_regex.match(resource.cache_url)
  315. if not match:
  316. print('ERROR Could not match url: %s' % resource.cache_url)
  317. continue
  318. url_base, res_id, filename = match.groups()
  319. # check the package isn't deleted
  320. # Need to refresh the resource's session
  321. resource = model.Session.query(model.Resource).get(resource.id)
  322. if p.toolkit.check_ckan_version(max_version='2.2.99'):
  323. package = None
  324. if resource.resource_group:
  325. package = resource.resource_group.package
  326. else:
  327. package = resource.package
  328. if package and package.state == model.State.DELETED:
  329. print('Package is deleted')
  330. continue
  331. if url_base != site_url_base:
  332. print('ERROR Base URL is incorrect: %r != %r' % (url_base, site_url_base))
  333. continue
  334. # move the file
  335. filepath_base = config['ckanext-archiver.archive_dir']
  336. old_path = os.path.join(filepath_base, resource.id)
  337. new_dir = os.path.join(filepath_base, resource.id[:2])
  338. new_path = os.path.join(filepath_base, resource.id[:2], resource.id)
  339. new_filepath = os.path.join(new_path, filename)
  340. if not os.path.exists(new_dir):
  341. os.mkdir(new_dir)
  342. if os.path.exists(new_path) and not os.path.exists(old_path):
  343. print('File already moved: %s' % new_path)
  344. else:
  345. print('File: "%s" -> "%s"' % (old_path, new_path))
  346. try:
  347. shutil.move(old_path, new_path)
  348. except IOError as e:
  349. print('ERROR moving resource: %s' % e)
  350. continue
  351. # change the cache_url and cache_filepath
  352. new_cache_url = '/'.join((url_base, res_id[:2], res_id, filename))
  353. print('cache_filepath: "%s" -> "%s"' % (resource.extras.get('cache_filepath'), new_filepath))
  354. print('cache_url: "%s" -> "%s"' % (resource.cache_url, new_cache_url))
  355. context = {'model': model, 'user': site_user['name'], 'ignore_auth': True, 'session': model.Session}
  356. data_dict = {'id': resource.id}
  357. res_dict = get_action('resource_show')(context, data_dict)
  358. res_dict['cache_filepath'] = new_filepath
  359. res_dict['cache_url'] = new_cache_url
  360. data_dict = res_dict
  361. result = get_action('resource_update')(context, data_dict)
  362. if result.get('id') == res_id:
  363. print('Successfully updated resource')
  364. else:
  365. print('ERROR updating resource: %r' % result)
  366. def size_report():
  367. from ckan import model
  368. from ckanext.archiver.model import Archival
  369. kb = 1024
  370. mb = 1024*1024
  371. gb = pow(1024, 3)
  372. size_bins = [
  373. (kb, '<1 KB'), (10*kb, '1-10 KB'), (100*kb, '10-100 KB'),
  374. (mb, '100 KB - 1 MB'), (10*mb, '1-10 MB'), (100*mb, '10-100 MB'),
  375. (gb, '100 MB - 1 GB'), (10*gb, '1-10 GB'), (100*gb, '10-100 GB'),
  376. (gb*gb, '>100 GB'),
  377. ]
  378. previous_bin = (0, '')
  379. counts = []
  380. total_sizes = []
  381. print('{:>15}{:>10}{:>20}'.format(
  382. 'file size', 'no. files', 'files size (bytes)'))
  383. for size_bin in size_bins:
  384. q = model.Session.query(Archival) \
  385. .filter(Archival.size > previous_bin[0]) \
  386. .filter(Archival.size <= size_bin[0]) \
  387. .filter(Archival.cache_filepath != '') \
  388. .join(model.Resource,
  389. Archival.resource_id == model.Resource.id) \
  390. .filter(model.Resource.state != 'deleted') \
  391. .join(model.Package,
  392. Archival.package_id == model.Package.id) \
  393. .filter(model.Package.state != 'deleted')
  394. count = q.count()
  395. counts.append(count)
  396. total_size = model.Session.query(func.sum(Archival.size)) \
  397. .filter(Archival.size > previous_bin[0]) \
  398. .filter(Archival.size <= size_bin[0]) \
  399. .filter(Archival.cache_filepath != '') \
  400. .join(model.Resource,
  401. Archival.resource_id == model.Resource.id) \
  402. .filter(model.Resource.state != 'deleted') \
  403. .join(model.Package,
  404. Archival.package_id == model.Package.id) \
  405. .filter(model.Package.state != 'deleted') \
  406. .all()[0][0]
  407. total_size = int(total_size or 0)
  408. total_sizes.append(total_size)
  409. print('{:>15}{:>10,}{:>20,}'.format(size_bin[1], count, total_size))
  410. previous_bin = size_bin
  411. print('Totals: {:,} {:,}'.format(sum(counts), sum(total_sizes)))
  412. def delete_files_larger_than_max_content_length():
  413. from ckan import model
  414. from ckanext.archiver.model import Archival
  415. from ckanext.archiver import default_settings as settings
  416. max_size = settings.MAX_CONTENT_LENGTH
  417. archivals = model.Session.query(Archival) \
  418. .filter(Archival.size > max_size) \
  419. .filter(Archival.cache_filepath != '') \
  420. .all()
  421. total_size = int(model.Session.query(func.sum(Archival.size))
  422. .filter(Archival.size > max_size)
  423. .all()[0][0] or 0)
  424. print('{} archivals above the {:,} threshold with total size {:,}'.format(
  425. len(archivals), max_size, total_size))
  426. input('Press Enter to DELETE them')
  427. for archival in archivals:
  428. print('Deleting %r' % archival)
  429. resource = model.Resource.get(archival.resource_id)
  430. if resource.state == 'deleted':
  431. print('Nothing to delete - Resource is deleted - deleting archival')
  432. model.Session.delete(archival)
  433. model.Session.commit()
  434. model.Session.flush()
  435. continue
  436. pkg = model.Package.get(archival.package_id)
  437. if pkg.state == 'deleted':
  438. print('Nothing to delete - Dataset is deleted - deleting archival')
  439. model.Session.delete(archival)
  440. model.Session.commit()
  441. model.Session.flush()
  442. continue
  443. filepath = archival.cache_filepath
  444. if not os.path.exists(filepath):
  445. print('Skipping - file not on disk')
  446. continue
  447. try:
  448. os.unlink(filepath)
  449. except OSError:
  450. print('ERROR deleting %s' % filepath.decode('utf8'))
  451. else:
  452. archival.cache_filepath = None
  453. model.Session.commit()
  454. model.Session.flush()
  455. print('..deleted %s' % filepath.decode('utf8'))