123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170 |
- import argparse
- import gzip
- import logging
- import os
- import sentry_sdk
- import archive_utils
- import db_utils
- import s3_utils
- from config_loader import database_config, sentry_dsn
- from mysql.connector.errors import ProgrammingError
- logging.basicConfig(
- level=logging.INFO,
- format='%(asctime)s %(levelname)-8s %(message)s'
- )
- def start_archival():
- parser = argparse.ArgumentParser(description='MySQL DB Archiver')
- parser.add_argument(
- '--table',
- '-t',
- dest='table',
- type=str,
- required=True,
- help='Table to be archived')
- parser.add_argument(
- '--where',
- '-w',
- dest='where',
- type=str,
- required=True,
- help='Where clause for archiving table, this will also be appended to archive file name')
- parser.add_argument(
- '--column_name_to_log',
- '-c',
- dest='column_name_to_log',
- required=True,
- help='Smallest and largest values from this column will be part of the archiver file name')
- parser.add_argument(
- '--index_hint',
- '-i',
- dest='index_hint',
- required=False,
- help="From pt-archiver doc: The 'i' part deserves special mention. This tells pt-archiver which index it "
- "should scan to archive. This appears in a FORCE INDEX or USE INDEX hint in the SELECT statements used "
- "to fetch achievable rows. If you don't specify anything, pt-archiver will auto-discover a good index, "
- "preferring a PRIMARY KEY if one exists. In my experience this usually works well, so most of the time "
- "you can probably just omit the 'i' part.")
- parser.add_argument('--optimize', dest='optimize', action='store_true')
- args = parser.parse_args()
- table_name = args.table
- where_clause = args.where
- column_name_to_log_in_file = args.column_name_to_log
- index_hint = args.index_hint
- optimize = args.optimize
- if not table_name or not where_clause or not column_name_to_log_in_file:
- raise ValueError(
- f'table: {table_name} | where: {where_clause} | column_name_to_log: {column_name_to_log_in_file},'
- f' These are mandatory values.'
- )
- host = database_config.get('host')
- archive_host = database_config.get('archive_host')
- db_name = database_config.get('database')
- transaction_size = database_config.get('transaction_size')
- logging.info('Starting archive...')
- archive(host, archive_host, db_name, table_name, where_clause, column_name_to_log_in_file, transaction_size,
- optimize, index_hint)
- def archive(host, archive_host, db_name, table_name, where_clause, column_name_to_log_in_file,
- transaction_size, optimize, index_hint):
- logging.info('')
- logging.info('')
- logging.info(f'------------- archiving {db_name}.{table_name} -------------')
- archive_db_name = db_name + '_archive'
- archive_table_name = table_name + '_archive'
- db_utils.create_archive_database(db_name, archive_db_name)
- try:
- db_utils.create_archive_table(
- db_name, table_name, archive_db_name, archive_table_name)
- except ProgrammingError as er:
- if er.errno == 1050:
- logging.info(
- f'Archive table {archive_db_name}.{archive_table_name} exists,'
- f' archiving older rows'
- )
- fetch_archived_data_upload_to_s3_and_delete(
- archive_host, db_name, table_name, archive_db_name, archive_table_name,
- column_name_to_log_in_file, transaction_size, '')
- archive(host, archive_host, db_name, table_name, where_clause, column_name_to_log_in_file, transaction_size,
- optimize, index_hint)
- return None
- else:
- raise er
- archive_utils.archive_to_db(host, archive_host, db_name, table_name, archive_db_name, archive_table_name,
- where_clause, transaction_size, optimize, index_hint)
- fetch_archived_data_upload_to_s3_and_delete(
- archive_host, db_name, table_name, archive_db_name, archive_table_name,
- column_name_to_log_in_file, transaction_size, where_clause)
- def fetch_archived_data_upload_to_s3_and_delete(
- archive_host, db_name, table_name, archive_db_name, archive_table_name,
- column_name_to_log_in_file, transaction_size, where_clause):
- no_of_rows_archived = db_utils.get_count_of_rows_archived(
- archive_db_name, archive_table_name)
- if not no_of_rows_archived:
- logging.info(
- f'Archive table {archive_db_name}.{archive_table_name} '
- f'had no rows, dropping archive table')
- db_utils.drop_archive_table(archive_db_name, archive_table_name)
- return None
- local_file_name, s3_path = db_utils.get_file_names(
- db_name, table_name, archive_db_name, archive_table_name,
- column_name_to_log_in_file, where_clause)
- archive_utils.archive_to_file(
- archive_host, archive_db_name, archive_table_name, transaction_size, local_file_name)
- gzip_file_name = compress_to_gzip(local_file_name)
- gzip_s3_path = f'{s3_path}.gz'
- # s3_utils.upload_to_s3(local_file_name, s3_path)
- s3_utils.upload_to_s3(gzip_file_name, gzip_s3_path)
- logging.info(f'Deleting local file: {local_file_name}')
- os.remove(local_file_name)
- os.remove(gzip_file_name)
- db_utils.drop_archive_table(archive_db_name, archive_table_name)
- return None
- def compress_to_gzip(local_file_name):
- gzip_file_name = f'{local_file_name}.gz'
- fp = open(local_file_name, 'rb')
- with gzip.open(gzip_file_name, 'wb') as gz_fp:
- gz_fp.write(bytearray(fp.read()))
- return gzip_file_name
- if __name__ == '__main__':
- sentry_sdk.init(dsn=sentry_dsn)
- try:
- start_archival()
- except Exception as e:
- sentry_sdk.capture_exception(e)
- raise e
|