export.py 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418
  1. # encoding=utf-8
  2. import collections
  3. import gzip
  4. import logging
  5. import lzma
  6. import os
  7. import pickle
  8. import shutil
  9. import zipfile
  10. from sqlalchemy.sql.expression import bindparam, delete
  11. from terroroftinytown.format import registry
  12. from terroroftinytown.format.projectsettings import ProjectSettingsWriter
  13. from terroroftinytown.format.urlformat import quote
  14. from terroroftinytown.tracker.bootstrap import Bootstrap
  15. from terroroftinytown.tracker.model import Project, Result, new_session
  16. from terroroftinytown.util.externalsort import GNUExternalSort
  17. logger = logging.getLogger(__name__)
  18. ResultContainer = collections.namedtuple(
  19. 'ResultContainer',
  20. ['id', 'shortcode', 'url', 'encoding', 'datetime']
  21. )
  22. class Exporter:
  23. def __init__(self, output_dir, format="beacon", settings={}):
  24. super().__init__()
  25. self.setup_format(format)
  26. self.output_dir = output_dir
  27. self.settings = settings
  28. self.after = self.settings['after']
  29. self.max_items = self.settings['max_items']
  30. self.projects_count = 0
  31. self.items_count = 0
  32. self.last_date = None
  33. self.lzma = True
  34. self.extension = 'txt.xz'
  35. # Length of directory name
  36. self.dir_length = settings['dir_length']
  37. # Number of characters from the right are not used in directory name
  38. # in other words, number of _
  39. self.max_right = settings['max_right']
  40. # Number of characters from the left that are used in file name
  41. # in other words, number of characters that are not in directory name and not _
  42. self.file_length = settings['file_length']
  43. # Example of settings:
  44. # dir_length = 2
  45. # max_right = 4
  46. # file_length = 2
  47. # output: projectname/00/01/000100____.txt, projectname/01/01__.txt
  48. self.fp = None
  49. self.writer = None
  50. self.project_result_sorters = {}
  51. self.working_set_filename = os.path.join(output_dir,
  52. 'current_working_set.pickle.gz')
  53. def setup_format(self, format):
  54. self.format = registry[format]
  55. def make_output_dir(self):
  56. if not os.path.isdir(self.output_dir):
  57. os.makedirs(self.output_dir)
  58. def dump(self):
  59. self.make_output_dir()
  60. database_busy_file = self.settings.get('database_busy_file')
  61. if database_busy_file:
  62. with open(database_busy_file, 'w'):
  63. pass
  64. self._drain_to_working_set()
  65. if database_busy_file:
  66. os.remove(database_busy_file)
  67. self._feed_input_sorters()
  68. with new_session() as session:
  69. for project_id, sorter in self.project_result_sorters.items():
  70. project = session.query(Project).filter_by(name=project_id).first()
  71. if self.settings['include_settings']:
  72. self.dump_project_settings(project)
  73. self.dump_project(project, sorter)
  74. if self.settings['zip']:
  75. self.zip_project(project)
  76. os.remove(self.working_set_filename)
  77. def _drain_to_working_set(self, size=1000):
  78. logger.info('Draining to working set %s', self.working_set_filename)
  79. assert not os.path.exists(self.working_set_filename)
  80. with new_session() as session:
  81. query = session.query(Result)
  82. if self.after:
  83. query = query.filter(Result.datetime > self.after)
  84. with gzip.open(self.working_set_filename, 'wb', compresslevel=1) as work_file:
  85. last_id = -1
  86. num_results = 0
  87. running = True
  88. while running:
  89. # Optimized for SQLite scrolling window
  90. rows = query.filter(Result.id > last_id).limit(size).all()
  91. if not rows:
  92. break
  93. delete_ids = []
  94. for result in rows:
  95. pickle.dump({
  96. 'id': result.id,
  97. 'project_id': result.project_id,
  98. 'shortcode': result.shortcode,
  99. 'url': result.url,
  100. 'encoding': result.encoding,
  101. 'datetime': result.datetime,
  102. }, work_file)
  103. num_results += 1
  104. self.items_count += 1
  105. delete_ids.append(result.id)
  106. if num_results % 10000 == 0:
  107. logger.info('Drain progress: %d', num_results)
  108. if num_results % 100000 == 0:
  109. # Risky, but need to do this since WAL
  110. # performance is low on large transactions
  111. logger.info("Checkpoint. (Don't delete stray files if program crashes!)")
  112. work_file.flush()
  113. session.commit()
  114. if self.max_items and num_results >= self.max_items:
  115. logger.info('Reached max items %d.', self.max_items)
  116. running = False
  117. break
  118. if self.settings['delete']:
  119. delete_query = delete(Result).where(
  120. Result.id == bindparam('id')
  121. )
  122. session.execute(
  123. delete_query,
  124. [{'id': result_id} for result_id in delete_ids]
  125. )
  126. pickle.dump('eof', work_file)
  127. def _feed_input_sorters(self):
  128. num_results = 0
  129. with gzip.open(self.working_set_filename, 'rb') as work_file:
  130. while True:
  131. result = pickle.load(work_file)
  132. if result == 'eof':
  133. break
  134. if result['project_id'] not in self.project_result_sorters:
  135. self.project_result_sorters[result['project_id']] = \
  136. GNUExternalSort(temp_dir=self.output_dir,
  137. temp_prefix='tott-{0}-'.format(
  138. result['project_id']
  139. )
  140. )
  141. self.projects_count += 1
  142. sorter = self.project_result_sorters[result['project_id']]
  143. sorter.input(
  144. result['shortcode'],
  145. (result['id'], result['url'], result['encoding'],
  146. result['datetime'])
  147. )
  148. num_results += 1
  149. if num_results % 10000 == 0:
  150. logger.info('Sort progress: %d', num_results)
  151. def dump_project(self, project, sorter):
  152. logger.info('Looking in project %s', project.name)
  153. if project.url_template.endswith('{shortcode}'):
  154. site = project.url_template.replace('{shortcode}', '')
  155. else:
  156. site = project.url_template
  157. last_filename = None
  158. for i, (key, value) in enumerate(sorter.sort()):
  159. if i % 10000 == 0:
  160. logger.info('Format progress: %d/%d', i, sorter.rows)
  161. id_, url, encoding, datetime_ = value
  162. result = ResultContainer(id_, key, url, encoding, datetime_)
  163. # we can do this as the query is sorted
  164. # so that item that would end up together
  165. # would returned together
  166. filename = self.get_filename(project, result)
  167. if filename != last_filename:
  168. self.close_fp()
  169. logger.info('Writing results to file %s.', filename)
  170. assert not os.path.isfile(filename), 'Target file %s already exists' % (filename)
  171. self.fp = self.get_fp(filename)
  172. self.writer = self.format(self.fp)
  173. self.writer.write_header(site)
  174. last_filename = filename
  175. for encoding in (result.encoding, 'latin-1', 'cp437', 'utf-8'):
  176. try:
  177. result.url.encode(encoding)
  178. except UnicodeError:
  179. logger.warning('Encoding failed %s|%s %s.',
  180. result.shortcode, repr(result.url),
  181. encoding,
  182. exc_info=True)
  183. continue
  184. else:
  185. self.writer.write_shortcode(
  186. result.shortcode, result.url, encoding
  187. )
  188. break
  189. else:
  190. raise Exception(
  191. 'Unable to encode {}|{} {}'
  192. .format(result.shortcode, repr(result.url),
  193. result.encoding)
  194. )
  195. if not self.last_date or result.datetime > self.last_date:
  196. self.last_date = result.datetime
  197. self.close_fp()
  198. def dump_project_settings(self, project):
  199. path = os.path.join(self.output_dir, project.name,
  200. '{0}.meta.json.xz'.format(project.name))
  201. self.fp = self.get_fp(path)
  202. self.writer = ProjectSettingsWriter(self.fp)
  203. self.writer.write_project(project)
  204. self.close_fp()
  205. def zip_project(self, project):
  206. project_path = os.path.join(self.output_dir, project.name)
  207. filename = project.name
  208. if self.settings.get('zip_filename_infix'):
  209. filename += self.settings['zip_filename_infix']
  210. zip_path = os.path.join(self.output_dir, '{0}.zip'.format(filename))
  211. assert not os.path.isfile(zip_path), 'Target file %s already exists' % (zip_path)
  212. with zipfile.ZipFile(zip_path, mode='w',
  213. compression=zipfile.ZIP_STORED) as zip_file:
  214. for root, dirs, files in os.walk(project_path):
  215. for file in files:
  216. file_path = os.path.join(root, file)
  217. arc_filename = os.path.relpath(file_path, self.output_dir)
  218. zip_file.write(file_path, arc_filename)
  219. shutil.rmtree(project_path)
  220. def get_fp(self, filename):
  221. dirname = os.path.dirname(filename)
  222. if not os.path.isdir(dirname):
  223. os.makedirs(dirname)
  224. if self.lzma:
  225. return lzma.open(filename, 'wb')
  226. else:
  227. return open(filename, 'wb')
  228. def close_fp(self):
  229. if not self.fp or not self.writer:
  230. return
  231. self.writer.write_footer()
  232. self.fp.close()
  233. def get_filename(self, project, item):
  234. path = os.path.join(self.output_dir, project.name)
  235. dirs, prefix, underscores = self.split_shortcode(
  236. item.shortcode, self.dir_length, self.max_right, self.file_length)
  237. dirs = [quote(dirname.encode('ascii')) for dirname in dirs]
  238. path = os.path.join(path, *dirs)
  239. path = os.path.join(path, '%s%s.%s' % (
  240. quote(prefix.encode('ascii')),
  241. '_' * len(underscores),
  242. self.extension
  243. ))
  244. return path
  245. @classmethod
  246. def split_shortcode(cls, shortcode, dir_length=2, max_right=4,
  247. file_length=2):
  248. assert dir_length >= 0
  249. assert max_right >= 0
  250. assert file_length >= 0
  251. # 0001asdf
  252. # dir_length max_right file_length
  253. dirs = []
  254. # create directories until we left only max_right or less characters
  255. length = 0
  256. shortcode_temp = shortcode
  257. while dir_length and len(shortcode_temp) > max_right + file_length:
  258. dirname = shortcode_temp[:dir_length]
  259. dirs.append(dirname)
  260. length += len(dirname)
  261. shortcode_temp = shortcode_temp[dir_length:]
  262. # name the file
  263. code_length = len(shortcode)
  264. length_left = code_length - length
  265. underscores = min(length_left, max_right)
  266. return dirs, shortcode[:code_length - underscores], shortcode[code_length - underscores:]
  267. class ExporterBootstrap(Bootstrap):
  268. def start(self, args=None):
  269. super().start(args=args)
  270. logging.basicConfig(level=logging.INFO)
  271. self.exporter = Exporter(self.args.output_dir, self.args.format, vars(self.args))
  272. self.exporter.dump()
  273. self.write_stats()
  274. def setup_args(self):
  275. super().setup_args()
  276. self.arg_parser.add_argument(
  277. '--format', default='beacon',
  278. choices=registry.keys(), help='Output file format')
  279. self.arg_parser.add_argument(
  280. '--after',
  281. help='Only export items submitted after specified time. '
  282. '(ISO8601 format YYYY-MM-DDTHH:MM:SS.mmmmmm)')
  283. self.arg_parser.add_argument(
  284. '--include-settings',
  285. help='Include project settings', action='store_true')
  286. self.arg_parser.add_argument(
  287. '--zip', help='Zip the projects after exporting',
  288. action='store_true')
  289. self.arg_parser.add_argument(
  290. '--dir-length', type=int, default=2,
  291. help='Number of characters per directory name'
  292. )
  293. self.arg_parser.add_argument(
  294. '--file-length', type=int, default=2,
  295. help='Number of characters per filename prefix (excluding directory names)'
  296. )
  297. self.arg_parser.add_argument(
  298. '--max-right', type=int, default=4,
  299. help='Number of characters used inside the file (excluding directory and file prefix names)'
  300. )
  301. self.arg_parser.add_argument(
  302. '--delete', action='store_true',
  303. help='Delete the exported rows after export'
  304. )
  305. self.arg_parser.add_argument(
  306. '--max-items', type=int, metavar='N',
  307. help='Export a maximum of N items.')
  308. self.arg_parser.add_argument(
  309. '--zip-filename-infix',
  310. help='Insert string in filename in final zip filename.'
  311. )
  312. self.arg_parser.add_argument(
  313. '--database-busy-file',
  314. help='A sentinel file to indicate the database is likely busy and locked'
  315. )
  316. self.arg_parser.add_argument(
  317. 'output_dir', help='Output directory (will be created)')
  318. def write_stats(self):
  319. logger.info(
  320. 'Written %d items in %d projects',
  321. self.exporter.items_count, self.exporter.projects_count
  322. )
  323. if self.exporter.last_date:
  324. logger.info('Last item timestamp (use --after to dump after this item):')
  325. logger.info(self.exporter.last_date.isoformat())
  326. if __name__ == '__main__':
  327. ExporterBootstrap().start()