db.py 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. # Use of this source code is governed by a BSD-style
  2. # license that can be found in the LICENSE file.
  3. # Copyright 2019 The OSArchiver Authors. All rights reserved.
  4. """
  5. DB base class file which provide helpers and common method for Source and
  6. Destination Db backend
  7. The class provide a metadata storage to prevent doing some compute
  8. several times. It also keeps a reference on pymysq.cursor per table to
  9. avoid creating too much cursor
  10. """
  11. import logging
  12. import re
  13. import warnings
  14. import time
  15. import timeit
  16. # need to include datetime to handle some result
  17. # of pymysql (integrity exception helpers)
  18. import datetime # noqa
  19. import pymysql
  20. from sqlalchemy import create_engine
  21. class DbBase():
  22. """
  23. The DbBase class that should be inherited from Source and Destination Db
  24. backend
  25. """
  26. def __init__(self,
  27. host=None,
  28. user=None,
  29. password=None,
  30. select_limit=1000,
  31. delete_limit=500,
  32. port=3306,
  33. dry_run=False,
  34. deleted_column=None,
  35. max_retries=5,
  36. bulk_insert=1000,
  37. retry_time_limit=2,
  38. delete_loop_delay=2,
  39. foreign_key_check=True,
  40. **kwargs):
  41. """
  42. instantiator of database base class
  43. """
  44. self.host = host
  45. self.user = user
  46. self.port = int(port)
  47. self.password = password
  48. self.delete_limit = int(delete_limit)
  49. self.deleted_column = deleted_column
  50. self.connection = None
  51. self.select_limit = int(select_limit)
  52. self.bulk_insert = int(bulk_insert)
  53. self.dry_run = dry_run
  54. self.metadata = {}
  55. self._sqlalchemy_engine = None
  56. # number of retries when an error occure
  57. self.max_retries = max_retries
  58. # how long wait between two retry
  59. self.retry_time_limit = retry_time_limit
  60. self.delete_loop_delay = delete_loop_delay
  61. self.foreign_key_check = foreign_key_check
  62. # hide some warnings we do not care
  63. warnings.simplefilter("ignore")
  64. self.connect()
  65. @property
  66. def sqlalchemy_engine(self):
  67. if self._sqlalchemy_engine is None:
  68. url = "mysql+pymysql://{user}:{password}@".format(
  69. user=self.user, password=self.password)
  70. if self.host is not None:
  71. url += self.host
  72. if self.port is not None:
  73. url += ':{port}'.format(port=self.port)
  74. self._sqlalchemy_engine = create_engine(url)
  75. return self._sqlalchemy_engine
  76. def connect(self):
  77. """
  78. connect to the database and set the connection attribute to
  79. pymysql.connect
  80. """
  81. self.connection = pymysql.connect(host=self.host,
  82. user=self.user,
  83. port=self.port,
  84. password=self.password,
  85. database=None)
  86. logging.debug("Successfully connected to mysql://%s:%s@%s:%s",
  87. self.user, '*' * len(self.password), self.host,
  88. self.port)
  89. def disconnect(self):
  90. """
  91. disconnect from the databse if connection is open
  92. """
  93. if self.connection.open:
  94. self.connection.close()
  95. def add_metadata(self, database=None, table=None, key=None, value=None):
  96. """
  97. store for one database/table a key with a value
  98. """
  99. if database not in self.metadata:
  100. self.metadata[database] = {}
  101. if table not in self.metadata[database]:
  102. self.metadata[database][table] = {}
  103. logging.debug("Adding metadata %s.%s.%s = %s", database, table, key,
  104. value)
  105. self.metadata[database][table][key] = value
  106. return self.metadata[database][table][key]
  107. def get_metadata(self, database=None, table=None, key=None):
  108. """
  109. return the key's value for a database.table
  110. """
  111. if database is None or table is None:
  112. return None
  113. if database in self.metadata and table in self.metadata[database]:
  114. return self.metadata[database][table].get(key)
  115. return None
  116. def disable_fk_check(self, cursor=None):
  117. """
  118. Disable foreign key check for a cursor
  119. """
  120. logging.debug("Disabling foreign_key_check")
  121. cursor.execute("SET FOREIGN_KEY_CHECKS=0;")
  122. def enable_fk_check(self, cursor=None):
  123. """
  124. Enable foreign key check for a cursor
  125. """
  126. logging.debug("Enabling foreign_key_check")
  127. cursor.execute("SET FOREIGN_KEY_CHECKS=1;")
  128. def check_request_retry(self):
  129. """
  130. When an SQL error occured, this method is called and do some check
  131. Right now it only re-open connection if the connection is closed
  132. """
  133. logging.debug("Sleeping %s sec before retrying....",
  134. self.retry_time_limit)
  135. time.sleep(int(self.retry_time_limit))
  136. # Handle auto reconnect
  137. if not self.connection.open:
  138. logging.info("Re-opening connection which seems abnormaly "
  139. "closed")
  140. self.connect()
  141. def set_foreign_key_check(self,
  142. foreign_key_check=None,
  143. cursor=None,
  144. database=None,
  145. table=None,
  146. new_cursor=False):
  147. """
  148. This method set the correct value to foreign key check. Instead of
  149. executing it at each requests which is time consuming and overloading
  150. it checks in metadata the current value and change it if needed
  151. """
  152. fk_check_in_cache = False
  153. current_fk_check = self.get_metadata(
  154. database=database,
  155. table=table,
  156. key='fk_check_{c}'.format(c=cursor))
  157. # nothing in cache we want to apply the foreign_key_check value
  158. # set current_fk_check to negate of foreign_key_check
  159. if current_fk_check is None or new_cursor is True:
  160. logging.debug("foreign key check value not found in cache")
  161. current_fk_check = not foreign_key_check
  162. if foreign_key_check is False and current_fk_check is True:
  163. self.disable_fk_check(cursor=cursor)
  164. elif foreign_key_check is True and current_fk_check is False:
  165. self.enable_fk_check(cursor=cursor)
  166. else:
  167. fk_check_in_cache = True
  168. if database is not None \
  169. and table is not None and not fk_check_in_cache:
  170. self.add_metadata(database=database,
  171. table=table,
  172. key='fk_check_{c}'.format(c=cursor),
  173. value=foreign_key_check)
  174. def get_cursor(self,
  175. database=None,
  176. table=None,
  177. cursor_type=None,
  178. new=False,
  179. fk_check=None):
  180. """
  181. Return the pymysql cursor mapped to a database.table if exists in
  182. metadata otherwise it create a new cursor
  183. """
  184. default_cursor_type = pymysql.cursors.Cursor
  185. cursor_type = cursor_type or default_cursor_type
  186. cursor = None
  187. cursor_in_cache = False
  188. # open db connection if not opened
  189. if not self.connection.open:
  190. self.connect()
  191. cursor = self.connection.cursor(cursor_type)
  192. else:
  193. # if this is not a cursor creation
  194. # try to get the cached one from metadata
  195. if not new:
  196. cursor = self.get_metadata(
  197. database=database,
  198. table=table,
  199. key='cursor_{c}'.format(c=cursor_type))
  200. # if cursor is None (creation or not found in metadata)
  201. # set the cursor type to default one
  202. type_of_cursor = type(cursor)
  203. if cursor is None:
  204. type_of_cursor = default_cursor_type
  205. # Check if the cursor retrieved is well typed
  206. # if not force the cursor to be unset
  207. # it will be re-created after
  208. if cursor is not None and cursor_type != type_of_cursor:
  209. logging.debug(
  210. "Type of cursor found in cache is %s, we want %s"
  211. " instead, need to create a new cursor", type_of_cursor,
  212. cursor_type)
  213. cursor = None
  214. # cursor creation
  215. if cursor is None:
  216. logging.debug("No existing cursor found, creating a new one")
  217. cursor = self.connection.cursor(cursor_type)
  218. else:
  219. cursor_in_cache = True
  220. logging.debug("Using cached cursor %s", cursor)
  221. # set the foreign key check value if needed
  222. # for the cursor
  223. if fk_check is not None:
  224. self.set_foreign_key_check(cursor=cursor,
  225. database=database,
  226. table=table,
  227. foreign_key_check=fk_check,
  228. new_cursor=new)
  229. # Add the cursor in cache
  230. if database is not None and table is not None and not cursor_in_cache:
  231. logging.debug("Caching cursor for %s.%s", database, table)
  232. self.add_metadata(database=database,
  233. table=table,
  234. key='cursor_{c}'.format(c=cursor_type),
  235. value=cursor)
  236. return cursor
  237. def _db_execute(self, sql=None, cursor=None, method=None, values=None):
  238. """
  239. Execute a request on database
  240. """
  241. logging.debug("Executing SQL command: '%s'", sql)
  242. # execute / execute_many method
  243. start = timeit.default_timer()
  244. getattr(cursor, method)(sql, values)
  245. end = timeit.default_timer()
  246. logging.debug("SQL duration: %s sec", end - start)
  247. def _db_fetch(self, fetch_method=None, cursor=None, fetch_args=None):
  248. """
  249. This method fetch data in database
  250. """
  251. start = timeit.default_timer()
  252. fetched_values = getattr(cursor, fetch_method)(**fetch_args)
  253. end = timeit.default_timer()
  254. logging.debug("Data fetch duration: %s sec", end - start)
  255. return fetched_values
  256. def _db_commit(self, cursor=None, sql=None, values_length=None):
  257. """
  258. Commit the executed request, return the number of row modified
  259. """
  260. if self.dry_run:
  261. logging.info(
  262. "[DRY RUN]: here is what I should have "
  263. "commited: '%s'", cursor.mogrify(query=sql))
  264. self.connection.rollback()
  265. return values_length
  266. # Not dry-run mode: commit the request
  267. # return the number of row affected by the request
  268. start = timeit.default_timer()
  269. self.connection.commit()
  270. end = timeit.default_timer()
  271. logging.debug("Commit duration: %s sec", end - start)
  272. return cursor.rowcount
  273. def db_request(self,
  274. sql=None,
  275. values=None,
  276. fetch_method=None,
  277. fetch_args=None,
  278. database=None,
  279. table=None,
  280. cursor_type=None,
  281. foreign_key_check=None,
  282. execute_method='execute'):
  283. """
  284. generic method to do a request to the db
  285. It handles a retry on failure, execept for foreign key exception which
  286. in our case useless
  287. In case of error connection, it sleeps 20 seconds before retrying
  288. """
  289. retry = 0
  290. cursor = None
  291. force_cursor_creation = False
  292. values = values or []
  293. fetch_args = fetch_args or {}
  294. if foreign_key_check is None:
  295. foreign_key_check = self.foreign_key_check
  296. if self.dry_run:
  297. foreign_key_check = False
  298. logging.debug("Force disabling foreign key check because we are in"
  299. " dry run mode")
  300. while retry <= self.max_retries:
  301. try:
  302. if retry > 0:
  303. logging.info("Retry %s/%s", retry, self.max_retries)
  304. self.check_request_retry()
  305. if cursor is None:
  306. cursor = self.get_cursor(database=database,
  307. table=table,
  308. cursor_type=cursor_type,
  309. fk_check=foreign_key_check,
  310. new=force_cursor_creation)
  311. if database is not None:
  312. self.connection.select_db(database)
  313. # Execute the query
  314. self._db_execute(sql=sql,
  315. cursor=cursor,
  316. method=execute_method,
  317. values=values)
  318. # Fetch and return the data
  319. if fetch_method is not None:
  320. return self._db_fetch(fetch_method=fetch_method,
  321. cursor=cursor,
  322. fetch_args=fetch_args)
  323. # no fetch_method means we need to commit the request
  324. # In dry_run mode just display what would have been commited
  325. return self._db_commit(cursor=cursor,
  326. sql=sql,
  327. values_length=len(values))
  328. except pymysql.Error as sql_exception:
  329. logging.error("SQL error: %s", sql_exception.args)
  330. if sql_exception.args[0] == "(0, '')":
  331. logging.debug("Cursor need to be recreated")
  332. if cursor is not None:
  333. cursor.close()
  334. cursor = None
  335. force_cursor_creation = True
  336. # foreign key constraint error, there is no sense in continuing
  337. if sql_exception.args[0] == 1451:
  338. logging.debug("Foreign key constraint error no retry "
  339. "attempted")
  340. retry = self.max_retries
  341. if sql_exception.args[0] == 2003:
  342. self.connection.close()
  343. logging.error("MySQL connection error, sleeping 20 "
  344. "seconds before reconnecting...")
  345. retry += 1
  346. if retry > self.max_retries:
  347. raise sql_exception
  348. continue
  349. finally:
  350. # We want to rollback regardless the error
  351. # This to prevent some undo log to be stacked on server side
  352. self.connection.rollback()
  353. def get_os_databases(self):
  354. """
  355. Return a list of databases available
  356. """
  357. sql = "SHOW DATABASES"
  358. result = self.db_request(sql=sql, fetch_method='fetchall')
  359. logging.debug("DB result: %s", result)
  360. return [i[0] for i in result]
  361. def get_database_tables(self, database=None):
  362. """
  363. Return a list of tables available for a database
  364. """
  365. if database is None:
  366. logging.warning(
  367. "Can not call get_database_tables on None database")
  368. return []
  369. sql = "SHOW TABLES"
  370. return self.db_request(sql=sql,
  371. database=database,
  372. fetch_method='fetchall')
  373. def table_has_column(self, database=None, table=None, column=None):
  374. """
  375. Return True/False after checking that a column exists in a table
  376. """
  377. sql = "SELECT column_name FROM information_schema.columns WHERE "\
  378. "table_schema='{db}' and table_name='{table}' AND "\
  379. "column_name='{column}'".format(
  380. db=database, table=table, column=column)
  381. return bool(
  382. self.db_request(sql=sql,
  383. fetch_method='fetchall',
  384. database=database,
  385. table=table))
  386. def table_has_deleted_column(self, database=None, table=None):
  387. """
  388. Return True/False depending if the table has the deleted column
  389. """
  390. return self.table_has_column(database=database,
  391. table=table,
  392. column=self.deleted_column)
  393. def get_table_primary_key(self, database=None, table=None):
  394. """
  395. Return the first primary key of a table
  396. Store the pk in metadata and return it if exists
  397. """
  398. primary_key = self.get_metadata(database=database,
  399. table=table,
  400. key='primary_key')
  401. if primary_key is not None:
  402. return primary_key
  403. sql = "SHOW KEYS FROM {db}.{table} WHERE "\
  404. "Key_name='PRIMARY'".format(db=database, table=table)
  405. # Dirty but .... Column name is the 5 row
  406. primary_key = self.db_request(sql=sql, fetch_method='fetchone')[4]
  407. logging.debug("Primary key of %s.%s is %s", database, table,
  408. primary_key)
  409. self.add_metadata(database=database,
  410. table=table,
  411. key='primary_key',
  412. value=primary_key)
  413. return primary_key
  414. def get_tables_with_fk(self, database=None, table=None):
  415. """
  416. For a given table return a list of foreign key
  417. """
  418. sql = "SELECT table_schema, table_name, column_name "\
  419. "FROM information_schema.key_column_usage "\
  420. "WHERE referenced_table_name IS NOT NULL" \
  421. " AND referenced_table_schema='{db}'"\
  422. " AND table_name='{table}'".format(
  423. db=database, table=table)
  424. result = self.db_request(sql=sql,
  425. fetch_method='fetchall',
  426. cursor_type=pymysql.cursors.DictCursor)
  427. if result:
  428. logging.debug("Table %s.%s have child tables with foreign key: %s",
  429. database, table, result)
  430. else:
  431. logging.debug(
  432. "Table %s.%s don't have child tables with foreign "
  433. "key", database, table)
  434. return result
  435. def sql_integrity_exception_parser(self, error):
  436. """
  437. Parse a foreign key integrity exception and return a dict of pattern
  438. with useful information
  439. """
  440. result = {}
  441. regexp = r'^.+fails \(`'\
  442. r'(?P<db>.+)`\.`'\
  443. r'(?P<table>.+)`, CONSTRAINT `.+`'\
  444. r' FOREIGN KEY \(`'\
  445. r'(?P<fk>.+)`\) REFERENCES `'\
  446. r'(?P<ref_table>.+)` \(`'\
  447. r'(?P<ref_column>.+)`\)\)$'
  448. match = re.match(regexp, error)
  449. if match:
  450. result = match.groupdict()
  451. else:
  452. logging.warning("SQL error '%s' does not match regexp "
  453. "'%s'", error, regexp)
  454. return result
  455. def integrity_exception_select_statement(self, error="", row=None):
  456. """
  457. Parse a foreign key excpetion and return a SELECT statement to
  458. retrieve the offending children rows
  459. """
  460. row = row or {}
  461. data = self.sql_integrity_exception_parser(error)
  462. # empty dict is when failing to parse exception
  463. if not data:
  464. return "Unable to parse exception, here data: "\
  465. "{row}".format(row=row)
  466. return "SELECT * FROM `{db}`.`{table}` WHERE `{fk}` = "\
  467. "'{value}'".format(value=row[data['ref_column']],
  468. **data)
  469. def integrity_exception_potential_fix(self, error="", row=None):
  470. """
  471. Parse a foerign key exception and return an UPDATE sql statement that
  472. mark non deleted children data as deleted
  473. """
  474. row = row or {}
  475. data = self.sql_integrity_exception_parser(error)
  476. if not data:
  477. return "Unable to parse exception, here data: "\
  478. "{row}".format(row=row)
  479. update = "UPDATE `{db}`.`{table}` INNER JOIN `{db}`.`{ref_table}` ON "\
  480. "`{db}`.`{ref_table}`.`{ref_column}` = `{db}`.`{table}`.`{fk}` "\
  481. "SET `{db}`.`{table}`.`{deleted_column}` = "\
  482. "`{db}`.`{ref_table}`.`{deleted_column}` WHERE {fk} = "
  483. if str(row[data['ref_column']]).isdigit():
  484. update += "{value}"
  485. else:
  486. update += "'{value}'"
  487. update += " AND `{db}`.`{table}`.`{deleted_column}` IS NULL"
  488. update = update.format(deleted_column=self.deleted_column,
  489. value=row[data['ref_column']],
  490. **data)
  491. return update