123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547 |
- # Use of this source code is governed by a BSD-style
- # license that can be found in the LICENSE file.
- # Copyright 2019 The OSArchiver Authors. All rights reserved.
- """
- DB base class file which provide helpers and common method for Source and
- Destination Db backend
- The class provide a metadata storage to prevent doing some compute
- several times. It also keeps a reference on pymysq.cursor per table to
- avoid creating too much cursor
- """
- import logging
- import re
- import warnings
- import time
- import timeit
- # need to include datetime to handle some result
- # of pymysql (integrity exception helpers)
- import datetime # noqa
- import pymysql
- from sqlalchemy import create_engine
- class DbBase():
- """
- The DbBase class that should be inherited from Source and Destination Db
- backend
- """
- def __init__(self,
- host=None,
- user=None,
- password=None,
- select_limit=1000,
- delete_limit=500,
- port=3306,
- dry_run=False,
- deleted_column=None,
- max_retries=5,
- bulk_insert=1000,
- retry_time_limit=2,
- delete_loop_delay=2,
- foreign_key_check=True,
- **kwargs):
- """
- instantiator of database base class
- """
- self.host = host
- self.user = user
- self.port = int(port)
- self.password = password
- self.delete_limit = int(delete_limit)
- self.deleted_column = deleted_column
- self.connection = None
- self.select_limit = int(select_limit)
- self.bulk_insert = int(bulk_insert)
- self.dry_run = dry_run
- self.metadata = {}
- self._sqlalchemy_engine = None
- # number of retries when an error occure
- self.max_retries = max_retries
- # how long wait between two retry
- self.retry_time_limit = retry_time_limit
- self.delete_loop_delay = delete_loop_delay
- self.foreign_key_check = foreign_key_check
- # hide some warnings we do not care
- warnings.simplefilter("ignore")
- self.connect()
- @property
- def sqlalchemy_engine(self):
- if self._sqlalchemy_engine is None:
- url = "mysql+pymysql://{user}:{password}@".format(
- user=self.user, password=self.password)
- if self.host is not None:
- url += self.host
- if self.port is not None:
- url += ':{port}'.format(port=self.port)
- self._sqlalchemy_engine = create_engine(url)
- return self._sqlalchemy_engine
- def connect(self):
- """
- connect to the database and set the connection attribute to
- pymysql.connect
- """
- self.connection = pymysql.connect(host=self.host,
- user=self.user,
- port=self.port,
- password=self.password,
- database=None)
- logging.debug("Successfully connected to mysql://%s:%s@%s:%s",
- self.user, '*' * len(self.password), self.host,
- self.port)
- def disconnect(self):
- """
- disconnect from the databse if connection is open
- """
- if self.connection.open:
- self.connection.close()
- def add_metadata(self, database=None, table=None, key=None, value=None):
- """
- store for one database/table a key with a value
- """
- if database not in self.metadata:
- self.metadata[database] = {}
- if table not in self.metadata[database]:
- self.metadata[database][table] = {}
- logging.debug("Adding metadata %s.%s.%s = %s", database, table, key,
- value)
- self.metadata[database][table][key] = value
- return self.metadata[database][table][key]
- def get_metadata(self, database=None, table=None, key=None):
- """
- return the key's value for a database.table
- """
- if database is None or table is None:
- return None
- if database in self.metadata and table in self.metadata[database]:
- return self.metadata[database][table].get(key)
- return None
- def disable_fk_check(self, cursor=None):
- """
- Disable foreign key check for a cursor
- """
- logging.debug("Disabling foreign_key_check")
- cursor.execute("SET FOREIGN_KEY_CHECKS=0;")
- def enable_fk_check(self, cursor=None):
- """
- Enable foreign key check for a cursor
- """
- logging.debug("Enabling foreign_key_check")
- cursor.execute("SET FOREIGN_KEY_CHECKS=1;")
- def check_request_retry(self):
- """
- When an SQL error occured, this method is called and do some check
- Right now it only re-open connection if the connection is closed
- """
- logging.debug("Sleeping %s sec before retrying....",
- self.retry_time_limit)
- time.sleep(int(self.retry_time_limit))
- # Handle auto reconnect
- if not self.connection.open:
- logging.info("Re-opening connection which seems abnormaly "
- "closed")
- self.connect()
- def set_foreign_key_check(self,
- foreign_key_check=None,
- cursor=None,
- database=None,
- table=None,
- new_cursor=False):
- """
- This method set the correct value to foreign key check. Instead of
- executing it at each requests which is time consuming and overloading
- it checks in metadata the current value and change it if needed
- """
- fk_check_in_cache = False
- current_fk_check = self.get_metadata(
- database=database,
- table=table,
- key='fk_check_{c}'.format(c=cursor))
- # nothing in cache we want to apply the foreign_key_check value
- # set current_fk_check to negate of foreign_key_check
- if current_fk_check is None or new_cursor is True:
- logging.debug("foreign key check value not found in cache")
- current_fk_check = not foreign_key_check
- if foreign_key_check is False and current_fk_check is True:
- self.disable_fk_check(cursor=cursor)
- elif foreign_key_check is True and current_fk_check is False:
- self.enable_fk_check(cursor=cursor)
- else:
- fk_check_in_cache = True
- if database is not None \
- and table is not None and not fk_check_in_cache:
- self.add_metadata(database=database,
- table=table,
- key='fk_check_{c}'.format(c=cursor),
- value=foreign_key_check)
- def get_cursor(self,
- database=None,
- table=None,
- cursor_type=None,
- new=False,
- fk_check=None):
- """
- Return the pymysql cursor mapped to a database.table if exists in
- metadata otherwise it create a new cursor
- """
- default_cursor_type = pymysql.cursors.Cursor
- cursor_type = cursor_type or default_cursor_type
- cursor = None
- cursor_in_cache = False
- # open db connection if not opened
- if not self.connection.open:
- self.connect()
- cursor = self.connection.cursor(cursor_type)
- else:
- # if this is not a cursor creation
- # try to get the cached one from metadata
- if not new:
- cursor = self.get_metadata(
- database=database,
- table=table,
- key='cursor_{c}'.format(c=cursor_type))
- # if cursor is None (creation or not found in metadata)
- # set the cursor type to default one
- type_of_cursor = type(cursor)
- if cursor is None:
- type_of_cursor = default_cursor_type
- # Check if the cursor retrieved is well typed
- # if not force the cursor to be unset
- # it will be re-created after
- if cursor is not None and cursor_type != type_of_cursor:
- logging.debug(
- "Type of cursor found in cache is %s, we want %s"
- " instead, need to create a new cursor", type_of_cursor,
- cursor_type)
- cursor = None
- # cursor creation
- if cursor is None:
- logging.debug("No existing cursor found, creating a new one")
- cursor = self.connection.cursor(cursor_type)
- else:
- cursor_in_cache = True
- logging.debug("Using cached cursor %s", cursor)
- # set the foreign key check value if needed
- # for the cursor
- if fk_check is not None:
- self.set_foreign_key_check(cursor=cursor,
- database=database,
- table=table,
- foreign_key_check=fk_check,
- new_cursor=new)
- # Add the cursor in cache
- if database is not None and table is not None and not cursor_in_cache:
- logging.debug("Caching cursor for %s.%s", database, table)
- self.add_metadata(database=database,
- table=table,
- key='cursor_{c}'.format(c=cursor_type),
- value=cursor)
- return cursor
- def _db_execute(self, sql=None, cursor=None, method=None, values=None):
- """
- Execute a request on database
- """
- logging.debug("Executing SQL command: '%s'", sql)
- # execute / execute_many method
- start = timeit.default_timer()
- getattr(cursor, method)(sql, values)
- end = timeit.default_timer()
- logging.debug("SQL duration: %s sec", end - start)
- def _db_fetch(self, fetch_method=None, cursor=None, fetch_args=None):
- """
- This method fetch data in database
- """
- start = timeit.default_timer()
- fetched_values = getattr(cursor, fetch_method)(**fetch_args)
- end = timeit.default_timer()
- logging.debug("Data fetch duration: %s sec", end - start)
- return fetched_values
- def _db_commit(self, cursor=None, sql=None, values_length=None):
- """
- Commit the executed request, return the number of row modified
- """
- if self.dry_run:
- logging.info(
- "[DRY RUN]: here is what I should have "
- "commited: '%s'", cursor.mogrify(query=sql))
- self.connection.rollback()
- return values_length
- # Not dry-run mode: commit the request
- # return the number of row affected by the request
- start = timeit.default_timer()
- self.connection.commit()
- end = timeit.default_timer()
- logging.debug("Commit duration: %s sec", end - start)
- return cursor.rowcount
- def db_request(self,
- sql=None,
- values=None,
- fetch_method=None,
- fetch_args=None,
- database=None,
- table=None,
- cursor_type=None,
- foreign_key_check=None,
- execute_method='execute'):
- """
- generic method to do a request to the db
- It handles a retry on failure, execept for foreign key exception which
- in our case useless
- In case of error connection, it sleeps 20 seconds before retrying
- """
- retry = 0
- cursor = None
- force_cursor_creation = False
- values = values or []
- fetch_args = fetch_args or {}
- if foreign_key_check is None:
- foreign_key_check = self.foreign_key_check
- if self.dry_run:
- foreign_key_check = False
- logging.debug("Force disabling foreign key check because we are in"
- " dry run mode")
- while retry <= self.max_retries:
- try:
- if retry > 0:
- logging.info("Retry %s/%s", retry, self.max_retries)
- self.check_request_retry()
- if cursor is None:
- cursor = self.get_cursor(database=database,
- table=table,
- cursor_type=cursor_type,
- fk_check=foreign_key_check,
- new=force_cursor_creation)
- if database is not None:
- self.connection.select_db(database)
- # Execute the query
- self._db_execute(sql=sql,
- cursor=cursor,
- method=execute_method,
- values=values)
- # Fetch and return the data
- if fetch_method is not None:
- return self._db_fetch(fetch_method=fetch_method,
- cursor=cursor,
- fetch_args=fetch_args)
- # no fetch_method means we need to commit the request
- # In dry_run mode just display what would have been commited
- return self._db_commit(cursor=cursor,
- sql=sql,
- values_length=len(values))
- except pymysql.Error as sql_exception:
- logging.error("SQL error: %s", sql_exception.args)
- if sql_exception.args[0] == "(0, '')":
- logging.debug("Cursor need to be recreated")
- if cursor is not None:
- cursor.close()
- cursor = None
- force_cursor_creation = True
- # foreign key constraint error, there is no sense in continuing
- if sql_exception.args[0] == 1451:
- logging.debug("Foreign key constraint error no retry "
- "attempted")
- retry = self.max_retries
- if sql_exception.args[0] == 2003:
- self.connection.close()
- logging.error("MySQL connection error, sleeping 20 "
- "seconds before reconnecting...")
- retry += 1
- if retry > self.max_retries:
- raise sql_exception
- continue
- finally:
- # We want to rollback regardless the error
- # This to prevent some undo log to be stacked on server side
- self.connection.rollback()
- def get_os_databases(self):
- """
- Return a list of databases available
- """
- sql = "SHOW DATABASES"
- result = self.db_request(sql=sql, fetch_method='fetchall')
- logging.debug("DB result: %s", result)
- return [i[0] for i in result]
- def get_database_tables(self, database=None):
- """
- Return a list of tables available for a database
- """
- if database is None:
- logging.warning(
- "Can not call get_database_tables on None database")
- return []
- sql = "SHOW TABLES"
- return self.db_request(sql=sql,
- database=database,
- fetch_method='fetchall')
- def table_has_column(self, database=None, table=None, column=None):
- """
- Return True/False after checking that a column exists in a table
- """
- sql = "SELECT column_name FROM information_schema.columns WHERE "\
- "table_schema='{db}' and table_name='{table}' AND "\
- "column_name='{column}'".format(
- db=database, table=table, column=column)
- return bool(
- self.db_request(sql=sql,
- fetch_method='fetchall',
- database=database,
- table=table))
- def table_has_deleted_column(self, database=None, table=None):
- """
- Return True/False depending if the table has the deleted column
- """
- return self.table_has_column(database=database,
- table=table,
- column=self.deleted_column)
- def get_table_primary_key(self, database=None, table=None):
- """
- Return the first primary key of a table
- Store the pk in metadata and return it if exists
- """
- primary_key = self.get_metadata(database=database,
- table=table,
- key='primary_key')
- if primary_key is not None:
- return primary_key
- sql = "SHOW KEYS FROM {db}.{table} WHERE "\
- "Key_name='PRIMARY'".format(db=database, table=table)
- # Dirty but .... Column name is the 5 row
- primary_key = self.db_request(sql=sql, fetch_method='fetchone')[4]
- logging.debug("Primary key of %s.%s is %s", database, table,
- primary_key)
- self.add_metadata(database=database,
- table=table,
- key='primary_key',
- value=primary_key)
- return primary_key
- def get_tables_with_fk(self, database=None, table=None):
- """
- For a given table return a list of foreign key
- """
- sql = "SELECT table_schema, table_name, column_name "\
- "FROM information_schema.key_column_usage "\
- "WHERE referenced_table_name IS NOT NULL" \
- " AND referenced_table_schema='{db}'"\
- " AND table_name='{table}'".format(
- db=database, table=table)
- result = self.db_request(sql=sql,
- fetch_method='fetchall',
- cursor_type=pymysql.cursors.DictCursor)
- if result:
- logging.debug("Table %s.%s have child tables with foreign key: %s",
- database, table, result)
- else:
- logging.debug(
- "Table %s.%s don't have child tables with foreign "
- "key", database, table)
- return result
- def sql_integrity_exception_parser(self, error):
- """
- Parse a foreign key integrity exception and return a dict of pattern
- with useful information
- """
- result = {}
- regexp = r'^.+fails \(`'\
- r'(?P<db>.+)`\.`'\
- r'(?P<table>.+)`, CONSTRAINT `.+`'\
- r' FOREIGN KEY \(`'\
- r'(?P<fk>.+)`\) REFERENCES `'\
- r'(?P<ref_table>.+)` \(`'\
- r'(?P<ref_column>.+)`\)\)$'
- match = re.match(regexp, error)
- if match:
- result = match.groupdict()
- else:
- logging.warning("SQL error '%s' does not match regexp "
- "'%s'", error, regexp)
- return result
- def integrity_exception_select_statement(self, error="", row=None):
- """
- Parse a foreign key excpetion and return a SELECT statement to
- retrieve the offending children rows
- """
- row = row or {}
- data = self.sql_integrity_exception_parser(error)
- # empty dict is when failing to parse exception
- if not data:
- return "Unable to parse exception, here data: "\
- "{row}".format(row=row)
- return "SELECT * FROM `{db}`.`{table}` WHERE `{fk}` = "\
- "'{value}'".format(value=row[data['ref_column']],
- **data)
- def integrity_exception_potential_fix(self, error="", row=None):
- """
- Parse a foerign key exception and return an UPDATE sql statement that
- mark non deleted children data as deleted
- """
- row = row or {}
- data = self.sql_integrity_exception_parser(error)
- if not data:
- return "Unable to parse exception, here data: "\
- "{row}".format(row=row)
- update = "UPDATE `{db}`.`{table}` INNER JOIN `{db}`.`{ref_table}` ON "\
- "`{db}`.`{ref_table}`.`{ref_column}` = `{db}`.`{table}`.`{fk}` "\
- "SET `{db}`.`{table}`.`{deleted_column}` = "\
- "`{db}`.`{ref_table}`.`{deleted_column}` WHERE {fk} = "
- if str(row[data['ref_column']]).isdigit():
- update += "{value}"
- else:
- update += "'{value}'"
- update += " AND `{db}`.`{table}`.`{deleted_column}` IS NULL"
- update = update.format(deleted_column=self.deleted_column,
- value=row[data['ref_column']],
- **data)
- return update
|