db.py 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434
  1. # Use of this source code is governed by a BSD-style
  2. # license that can be found in the LICENSE file.
  3. # Copyright 2019 The OSArchiver Authors. All rights reserved.
  4. """
  5. OSArchiver's Source class that implement a db backend
  6. """
  7. import re
  8. import time
  9. import logging
  10. import pymysql
  11. import arrow
  12. from numpy import array_split
  13. from osarchiver.source import Source
  14. from osarchiver.common.db import DbBase
  15. from sqlalchemy import inspect
  16. import sqlalchemy_utils
  17. NOT_OS_DB = ['mysql', 'performance_schema', 'information_schema']
  18. class Db(Source, DbBase):
  19. """
  20. Database backend of OSArchiver's Source
  21. """
  22. def __init__(self,
  23. databases=None,
  24. tables=None,
  25. delete_data=0,
  26. excluded_databases='',
  27. excluded_tables='',
  28. where='1=1 LIMIT 0',
  29. archive_data=None,
  30. name=None,
  31. destination=None,
  32. **kwargs):
  33. """
  34. Create a Source instance with relevant configuration parameters given
  35. in arguments
  36. """
  37. self.databases = databases
  38. self.tables = tables
  39. self.configured_excluded_databases = [
  40. d for d in re.split(',|;|\n', excluded_databases.replace(' ', ''))
  41. ]
  42. self._excluded_databases = None
  43. self.configured_excluded_tables = [
  44. d for d in re.split(',|;|\n', excluded_tables.replace(' ', ''))
  45. ]
  46. self._excluded_tables = None
  47. self.archive_data = archive_data
  48. self.delete_data = delete_data
  49. self.destination = destination
  50. self._databases_to_archive = []
  51. self._tables_to_archive = {}
  52. self.tables_with_circular_fk = []
  53. # When selecting data be sure to use the same date to prevent selecting
  54. # parent data newer than children data, it is of the responsability of
  55. # the operator to use the {now} formating value in the configuration
  56. # file in the where option. If {now} is ommitted it it is possible to
  57. # get foreign key check errors because of parents data newer than
  58. # children data
  59. self.now = arrow.utcnow().format(fmt='YYYY-MM-DD HH:mm:ss')
  60. self.where = where.format(now=self.now)
  61. Source.__init__(self, backend='db', name=name,
  62. conf=kwargs.get('conf', None))
  63. DbBase.__init__(self, **kwargs)
  64. def __repr__(self):
  65. return "Source {name} [Backend:{backend} Host:{host} - DB:{db} - "\
  66. "Tables:{tables}]".format(backend=self.backend, db=self.databases,
  67. name=self.name, tables=self.tables,
  68. host=self.host)
  69. @property
  70. def excluded_databases(self):
  71. if self._excluded_databases is not None:
  72. return self._excluded_databases
  73. excluded_db_set = set(self.configured_excluded_databases)
  74. excluded_db_set.update(set(NOT_OS_DB))
  75. self._excluded_databases = list(excluded_db_set)
  76. return self._excluded_databases
  77. @property
  78. def excluded_tables(self):
  79. if self._excluded_tables is not None:
  80. return self._excluded_tables
  81. self._excluded_tables = self.configured_excluded_tables
  82. return self._excluded_tables
  83. def databases_to_archive(self):
  84. """
  85. Return a list of databases that are eligibles to archiving. If no
  86. database are provided or the * character is used the method basically
  87. do a SHOW DATABASE to get available databases
  88. The method exclude the databases that are explicitly excluded
  89. """
  90. if self._databases_to_archive:
  91. return self._databases_to_archive
  92. if self.databases is None or self.databases == '*':
  93. self._databases_to_archive = self.get_os_databases()
  94. else:
  95. self._databases_to_archive = [
  96. d for d in re.split(',|;|\n', self.databases.replace(' ', ''))
  97. ]
  98. excluded_databases_regex = \
  99. "^(" + "|".join(self.excluded_databases) + ")$"
  100. self._databases_to_archive = [
  101. d for d in self._databases_to_archive
  102. if not re.match(excluded_databases_regex, d)
  103. ]
  104. return self._databases_to_archive
  105. def tables_to_archive(self, database=None):
  106. """
  107. For a given database, return the list of tables that are eligible to
  108. archiving.
  109. - Retrieve tables if needed (*, or empty)
  110. - Check that tables has 'deleted_at' column (deleted_column
  111. parameter)
  112. - Exclude tables in excluded_tables
  113. - Reorder tables depending foreign key
  114. """
  115. if database is None:
  116. logging.warning("Can not call tables_to_archive on None database")
  117. return []
  118. if database in self._tables_to_archive:
  119. return self._tables_to_archive[database]
  120. database_tables = [
  121. v[0] for (i, v) in enumerate(self.get_database_tables(database))
  122. ]
  123. logging.info("Tables list of database '%s': %s", database,
  124. database_tables)
  125. # Step 1: is to get all the tables we want to archive
  126. # no table specified or jocker used means we want all tables
  127. # else we filter against the tables specified
  128. if self.tables is None or self.tables == '*':
  129. self._tables_to_archive[database] = database_tables
  130. else:
  131. self._tables_to_archive[database] = \
  132. [t for t in re.split(',|;|\n', self.tables.replace(' ', ''))
  133. if t in database_tables]
  134. # Step 2: verify that all tables have the deleted column 'deleted_at'
  135. logging.debug("Verifying that tables have the '%s' column",
  136. self.deleted_column)
  137. tables = []
  138. for table in self._tables_to_archive[database]:
  139. if not self.table_has_deleted_column(table=table,
  140. database=database):
  141. logging.debug(
  142. "Table '%s' has no column named '%s',"
  143. " ignoring it", table, self.deleted_column)
  144. continue
  145. tables.append(table)
  146. # update self._tables_to_archive with the filtered tables
  147. self._tables_to_archive[database] = tables
  148. # Step 3: then exclude the one explicitly given
  149. excluded_tables_regex = "^(" + "|".join(self.excluded_tables) + ")$"
  150. logging.debug("Ignoring tables matching '%s'", excluded_tables_regex)
  151. self._tables_to_archive[database] = [
  152. t for t in self._tables_to_archive[database]
  153. if not re.match(excluded_tables_regex, t)
  154. ]
  155. # Step 4 for each table retrieve child tables referencing the parent
  156. # table and order them childs first, parents then
  157. sorted_tables = self.sort_tables(
  158. database=database, tables=self._tables_to_archive[database])
  159. self._tables_to_archive[database] = sorted_tables
  160. logging.debug(
  161. "Tables ordered depending foreign key dependencies: "
  162. "'%s'", self._tables_to_archive[database])
  163. return self._tables_to_archive[database]
  164. def sort_tables(self, database=None, tables=[]):
  165. """
  166. Given a DB and a list of tables return the list orderered depending
  167. foreign key check in order to get child table before parent table
  168. """
  169. inspector = inspect(self.sqlalchemy_engine)
  170. sorted_tables = []
  171. logging.debug("Tables to sort: %s", sorted_tables)
  172. for table in tables:
  173. if not self.table_has_deleted_column(table=table, database=database):
  174. continue
  175. if table not in sorted_tables:
  176. logging.debug("Table %s added to final list", table)
  177. sorted_tables.append(table)
  178. idx = sorted_tables.index(table)
  179. fks = inspector.get_foreign_keys(table, schema=database)
  180. logging.debug("Foreign keys of %s: %s", table, fks)
  181. for fk in fks:
  182. t = fk['referred_table']
  183. if t in sorted_tables:
  184. if sorted_tables.index(t) > idx:
  185. continue
  186. else:
  187. sorted_tables.remove(t)
  188. sorted_tables.insert(idx+1, t)
  189. return sorted_tables
  190. def select(self, limit=None, database=None, table=None):
  191. """
  192. select data from a database.table, apply limit or take the default one
  193. the select by set depends of the primary key type (int vs uuid)
  194. In case of int:
  195. SELECT * FROM <db>.<table> WHERE <pk> > <last_selected_id> AND ...
  196. In case of uuid (uuid are not ordered naturally ordered, we sort them)
  197. SELECT * FROM <db>.<table> WHERE <pk> > "<last_selected_id>" AND...
  198. ORDER BY <pk>
  199. """
  200. offset = 0
  201. last_selected_id = 0
  202. # Use primary key column to improve performance on large
  203. # dataset vs using OFFSET
  204. primary_key = self.get_table_primary_key(database=database,
  205. table=table)
  206. if limit is None:
  207. limit = self.select_limit
  208. sql = "SELECT * FROM `{database}`.`{table}` WHERE {pk} > "\
  209. "'{last_id}' AND {where} LIMIT {limit}"
  210. pk_type_checked = False
  211. while True:
  212. formatted_sql = sql.format(database=database,
  213. table=table,
  214. where=self.where,
  215. limit=limit,
  216. last_id=last_selected_id,
  217. pk=primary_key,
  218. offset=offset)
  219. result = self.db_request(sql=formatted_sql,
  220. cursor_type=pymysql.cursors.DictCursor,
  221. database=database,
  222. table=table,
  223. fetch_method='fetchall')
  224. logging.info("Fetched %s result in %s.%s", len(result), database,
  225. table)
  226. if not result:
  227. break
  228. last_selected_id = result[-1][primary_key]
  229. yield result
  230. offset += len(result)
  231. if pk_type_checked is False:
  232. # If the primary key is a digit remove the simple quote from
  233. # the last_id variable for performance purpose
  234. if str(last_selected_id).isdigit():
  235. # remove the simple quote arround id
  236. sql = "SELECT * FROM `{database}`.`{table}` WHERE {pk} >"\
  237. " {last_id} AND {where} LIMIT {limit}"
  238. else:
  239. # else this a string and we force to order by that string
  240. # to simulate an integer primary key
  241. sql = "SELECT * FROM `{database}`.`{table}` WHERE {pk} >"\
  242. " '{last_id}' AND {where} ORDER BY {pk} LIMIT {limit}"
  243. pk_type_checked = True
  244. def read(self, limit=None):
  245. """
  246. The read method that has to be implemented (Source abstract class)
  247. """
  248. databases_to_archive = self.databases_to_archive()
  249. logging.info("Database elected for archiving: %s",
  250. databases_to_archive)
  251. for database in databases_to_archive:
  252. tables_to_archive = self.tables_to_archive(database=database)
  253. logging.info("Tables elected for archiving: %s", tables_to_archive)
  254. for table in tables_to_archive:
  255. logging.info("%s.%s is to archive", database, table)
  256. yield {
  257. 'database':
  258. database,
  259. 'table':
  260. table,
  261. 'data':
  262. self.select(limit=limit, database=database, table=table)
  263. }
  264. def delete_set(self, database=None, table=None, limit=None, data=None):
  265. """
  266. Delete a set of data using the primary_key of table
  267. """
  268. if not self.delete_data:
  269. logging.info(
  270. "Ignoring delete step because delete_data is set to"
  271. " %s", self.delete_data)
  272. return
  273. if limit is None:
  274. limit = self.delete_limit
  275. primary_key = self.get_table_primary_key(database=database,
  276. table=table)
  277. # Check if primary key is a digit to prevent casting by MySQL and
  278. # optimize the request, store the value in metadata for caching
  279. pk_is_digit = self.get_metadata(database=database,
  280. table=table,
  281. key='pk_is_digit')
  282. if pk_is_digit is None:
  283. pk_is_digit = str(data[0][primary_key]).isdigit()
  284. self.add_metadata(database=database,
  285. table=table,
  286. key='pk_is_digit',
  287. value=pk_is_digit)
  288. def create_array_chunks(array, chunk_size):
  289. for i in range(0, len(array), chunk_size):
  290. yield array[i:i + chunk_size]
  291. # For performance purpose split data in subdata of lenght=limit
  292. for subdata in list(create_array_chunks(data, limit)):
  293. if pk_is_digit:
  294. ids = ', '.join([str(d[primary_key]) for d in subdata])
  295. else:
  296. ids = '"' + '", "'.join([str(d['id']) for d in subdata]) + '"'
  297. total_deleted_count = 0
  298. # equivalent to a while True but we know why we are looping
  299. while "there are rows to delete":
  300. if total_deleted_count > 0:
  301. logging.debug(
  302. "Waiting %s seconds before deleting next"
  303. "subset of data ", self.delete_loop_delay)
  304. time.sleep(int(self.delete_loop_delay))
  305. sql = "DELETE FROM `{database}`.`{table}` WHERE "\
  306. "`{pk}` IN ({ids}) LIMIT {limit}".format(
  307. database=database,
  308. table=table,
  309. ids=ids,
  310. pk=primary_key,
  311. limit=limit)
  312. foreign_key_check = None
  313. if '{db}.{table}'.format(db=database, table=table) \
  314. in self.tables_with_circular_fk:
  315. foreign_key_check = False
  316. count = self.db_request(sql=sql,
  317. foreign_key_check=foreign_key_check,
  318. database=database,
  319. table=table)
  320. logging.info("%s rows deleted from %s.%s", count, database,
  321. table)
  322. total_deleted_count += count
  323. if int(count) < int(limit) or \
  324. total_deleted_count == len(subdata):
  325. logging.debug("No more row to delete in this data set")
  326. break
  327. logging.debug("Waiting %s seconds after a deletion",
  328. self.delete_loop_delay)
  329. time.sleep(int(self.delete_loop_delay))
  330. def delete(self, database=None, table=None, limit=None, data=None):
  331. """
  332. The delete method that has to be implemented (Source abstract class)
  333. """
  334. try:
  335. self.delete_set(database=database,
  336. table=table,
  337. limit=limit,
  338. data=data)
  339. except pymysql.err.IntegrityError as integrity_error:
  340. # foreign key constraint fails usually because of error while
  341. # processing openstack tasks
  342. # to prevent never deleting some of data, we re run delete with
  343. # half set of data if we caught an integrity error (1451)
  344. # To prevent never deleting rest of data of a set, we re run delete
  345. # with a half set if we caught an integrity error (1451)
  346. # until we caught the offending row
  347. if integrity_error.args[0] != 1451:
  348. raise integrity_error
  349. # we caught the row causing integrity error
  350. if len(data) == 1:
  351. logging.error("OSArchiver hit a row that will never be deleted"
  352. " unless you fix remaining chlidren data")
  353. logging.error("Parent row that can not be deleted: %s", data)
  354. logging.error("To get children items:")
  355. logging.error(
  356. self.integrity_exception_select_statement(
  357. error=integrity_error.args[1], row=data[0]))
  358. logging.error("Here a POTENTIAL fix, ensure BEFORE that data "
  359. "should be effectively deleted, then run "
  360. "osarchiver again:")
  361. logging.error(
  362. self.integrity_exception_potential_fix(
  363. error=integrity_error.args[1], row=data[0]))
  364. else:
  365. logging.error("Integrity error caught, deleting with "
  366. "dichotomy")
  367. for subdata in array_split(data, 2):
  368. logging.debug(
  369. "Dichotomy delete with a set of %s data "
  370. "length", len(subdata))
  371. # Add a sleep period because in case of error in delete_set
  372. # we never sleep, it will avoid some lock wait timeout for
  373. # incoming requests
  374. time.sleep(int(self.delete_loop_delay))
  375. self.delete(database=database,
  376. table=table,
  377. data=subdata,
  378. limit=len(subdata))
  379. def clean_exit(self):
  380. """
  381. Tasks to be executed to exit cleanly:
  382. - Disconnect from the database
  383. """
  384. logging.info("Closing source DB connection")
  385. self.disconnect()