db_archiver.py 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. import argparse
  2. import gzip
  3. import logging
  4. import os
  5. import sentry_sdk
  6. import archive_utils
  7. import db_utils
  8. import s3_utils
  9. from config_loader import database_config, sentry_dsn
  10. from mysql.connector.errors import ProgrammingError
  11. logging.basicConfig(
  12. level=logging.INFO,
  13. format='%(asctime)s %(levelname)-8s %(message)s'
  14. )
  15. def start_archival():
  16. parser = argparse.ArgumentParser(description='MySQL DB Archiver')
  17. parser.add_argument(
  18. '--table',
  19. '-t',
  20. dest='table',
  21. type=str,
  22. required=True,
  23. help='Table to be archived')
  24. parser.add_argument(
  25. '--where',
  26. '-w',
  27. dest='where',
  28. type=str,
  29. required=True,
  30. help='Where clause for archiving table, this will also be appended to archive file name')
  31. parser.add_argument(
  32. '--column_name_to_log',
  33. '-c',
  34. dest='column_name_to_log',
  35. required=True,
  36. help='Smallest and largest values from this column will be part of the archiver file name')
  37. parser.add_argument(
  38. '--index_hint',
  39. '-i',
  40. dest='index_hint',
  41. required=False,
  42. help="From pt-archiver doc: The 'i' part deserves special mention. This tells pt-archiver which index it "
  43. "should scan to archive. This appears in a FORCE INDEX or USE INDEX hint in the SELECT statements used "
  44. "to fetch achievable rows. If you don't specify anything, pt-archiver will auto-discover a good index, "
  45. "preferring a PRIMARY KEY if one exists. In my experience this usually works well, so most of the time "
  46. "you can probably just omit the 'i' part.")
  47. parser.add_argument('--optimize', dest='optimize', action='store_true')
  48. args = parser.parse_args()
  49. table_name = args.table
  50. where_clause = args.where
  51. column_name_to_log_in_file = args.column_name_to_log
  52. index_hint = args.index_hint
  53. optimize = args.optimize
  54. if not table_name or not where_clause or not column_name_to_log_in_file:
  55. raise ValueError(
  56. f'table: {table_name} | where: {where_clause} | column_name_to_log: {column_name_to_log_in_file},'
  57. f' These are mandatory values.'
  58. )
  59. host = database_config.get('host')
  60. archive_host = database_config.get('archive_host')
  61. db_name = database_config.get('database')
  62. transaction_size = database_config.get('transaction_size')
  63. logging.info('Starting archive...')
  64. archive(host, archive_host, db_name, table_name, where_clause, column_name_to_log_in_file, transaction_size,
  65. optimize, index_hint)
  66. def archive(host, archive_host, db_name, table_name, where_clause, column_name_to_log_in_file,
  67. transaction_size, optimize, index_hint):
  68. logging.info('')
  69. logging.info('')
  70. logging.info(f'------------- archiving {db_name}.{table_name} -------------')
  71. archive_db_name = db_name + '_archive'
  72. archive_table_name = table_name + '_archive'
  73. db_utils.create_archive_database(db_name, archive_db_name)
  74. try:
  75. db_utils.create_archive_table(
  76. db_name, table_name, archive_db_name, archive_table_name)
  77. except ProgrammingError as er:
  78. if er.errno == 1050:
  79. logging.info(
  80. f'Archive table {archive_db_name}.{archive_table_name} exists,'
  81. f' archiving older rows'
  82. )
  83. fetch_archived_data_upload_to_s3_and_delete(
  84. archive_host, db_name, table_name, archive_db_name, archive_table_name,
  85. column_name_to_log_in_file, transaction_size, '')
  86. archive(host, archive_host, db_name, table_name, where_clause, column_name_to_log_in_file, transaction_size,
  87. optimize, index_hint)
  88. return None
  89. else:
  90. raise er
  91. archive_utils.archive_to_db(host, archive_host, db_name, table_name, archive_db_name, archive_table_name,
  92. where_clause, transaction_size, optimize, index_hint)
  93. fetch_archived_data_upload_to_s3_and_delete(
  94. archive_host, db_name, table_name, archive_db_name, archive_table_name,
  95. column_name_to_log_in_file, transaction_size, where_clause)
  96. def fetch_archived_data_upload_to_s3_and_delete(
  97. archive_host, db_name, table_name, archive_db_name, archive_table_name,
  98. column_name_to_log_in_file, transaction_size, where_clause):
  99. no_of_rows_archived = db_utils.get_count_of_rows_archived(
  100. archive_db_name, archive_table_name)
  101. if not no_of_rows_archived:
  102. logging.info(
  103. f'Archive table {archive_db_name}.{archive_table_name} '
  104. f'had no rows, dropping archive table')
  105. db_utils.drop_archive_table(archive_db_name, archive_table_name)
  106. return None
  107. local_file_name, s3_path = db_utils.get_file_names(
  108. db_name, table_name, archive_db_name, archive_table_name,
  109. column_name_to_log_in_file, where_clause)
  110. archive_utils.archive_to_file(
  111. archive_host, archive_db_name, archive_table_name, transaction_size, local_file_name)
  112. gzip_file_name = compress_to_gzip(local_file_name)
  113. gzip_s3_path = f'{s3_path}.gz'
  114. # s3_utils.upload_to_s3(local_file_name, s3_path)
  115. s3_utils.upload_to_s3(gzip_file_name, gzip_s3_path)
  116. logging.info(f'Deleting local file: {local_file_name}')
  117. os.remove(local_file_name)
  118. os.remove(gzip_file_name)
  119. db_utils.drop_archive_table(archive_db_name, archive_table_name)
  120. return None
  121. def compress_to_gzip(local_file_name):
  122. gzip_file_name = f'{local_file_name}.gz'
  123. fp = open(local_file_name, 'rb')
  124. with gzip.open(gzip_file_name, 'wb') as gz_fp:
  125. gz_fp.write(bytearray(fp.read()))
  126. return gzip_file_name
  127. if __name__ == '__main__':
  128. sentry_sdk.init(dsn=sentry_dsn)
  129. try:
  130. start_archival()
  131. except Exception as e:
  132. sentry_sdk.capture_exception(e)
  133. raise e