db.py 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422
  1. # Use of this source code is governed by a BSD-style
  2. # license that can be found in the LICENSE file.
  3. # Copyright 2019 The OSArchiver Authors. All rights reserved.
  4. """
  5. This module implements the database destination backend which handle writing
  6. data into a MySQL/MariaDB backend
  7. """
  8. import logging
  9. import difflib
  10. import re
  11. import time
  12. import arrow
  13. from osarchiver.destination import Destination
  14. from osarchiver.common.db import DbBase
  15. from . import errors as db_errors
  16. class Db(Destination, DbBase):
  17. """
  18. Db class which is instanced when Db backend is required
  19. """
  20. def __init__(self,
  21. table=None,
  22. archive_data=None,
  23. name=None,
  24. source=None,
  25. db_suffix='',
  26. table_suffix='',
  27. database=None,
  28. **kwargs):
  29. """
  30. instance osarchiver.destination.Db class backend
  31. """
  32. self.database = database
  33. self.table = table
  34. self.archive_data = archive_data
  35. self.source = source
  36. self.archive_db_name = None
  37. self.archive_table_name = None
  38. self.table_db_name = None
  39. self.db_suffix = db_suffix
  40. self.table_suffix = table_suffix
  41. self.normalized_db_suffixes = {}
  42. Destination.__init__(self, backend='db', name=name)
  43. DbBase.__init__(self, **kwargs)
  44. def __repr__(self):
  45. return "Destination {name} [Backend:{backend} - Host:{host}]".format(
  46. backend=self.backend, host=self.host, name=self.name)
  47. def normalize_db_suffix(self, db_suffix='', database=None):
  48. """
  49. Return the name of the suffix that should be added to database name to
  50. build the archive database name in which archive data. It checks that
  51. it is not archived in the same Db than Source.
  52. The database name may contains '{date}' which will be replaced by the
  53. date of archiving in the format '2019-01-17_10:42:42'
  54. """
  55. if database is not None and database in self.normalized_db_suffixes:
  56. logging.debug("Using cached db suffix '%s' of '%s' database",
  57. self.normalized_db_suffixes[database], database)
  58. return self.normalized_db_suffixes[database]
  59. if db_suffix:
  60. self.db_suffix = db_suffix
  61. # in case source and destination are the same
  62. # archiving in the same db is a non sense
  63. # force db suffix to _'archive in that case
  64. # unless table_suffix is set
  65. if self.source.host == self.host and \
  66. self.source.port == self.port and \
  67. not self.db_suffix:
  68. self.db_suffix = '_archive'
  69. logging.warning(
  70. "Your destination host is the same as the source "
  71. "host, to prevent writing on the same database, "
  72. "which could result in data loss the suffix of DB "
  73. "is forced to %s", self.db_suffix)
  74. if self.source.host == self.host and \
  75. self.source.port != self.port and \
  76. not self.db_suffix and not self.table_suffix:
  77. logging.warning("!!!! I can't verify that destination database is "
  78. "different of source database, you may loose data,"
  79. " BE CAREFULL!!!")
  80. logging.warning("Sleeping 10 sec...")
  81. time.sleep(10)
  82. self.db_suffix = str(
  83. self.db_suffix).format(date=arrow.now().strftime('%F_%T'))
  84. if database is not None:
  85. self.normalized_db_suffixes[database] = self.db_suffix
  86. logging.debug("Caching db suffix '%s' of '%s' database",
  87. self.normalized_db_suffixes[database], database)
  88. return self.db_suffix
  89. def normalize_table_suffix(self, table_suffix=None):
  90. """
  91. Return the suffix of table in which archive data.
  92. The table name may contains '{date}' which will be replaced by the date
  93. of archiving in the format '2019-01-17_10:42:42'
  94. """
  95. if table_suffix:
  96. self.table_suffix = table_suffix
  97. self.table_suffix = str(
  98. self.table_suffix).format(date=arrow.now().strftime('%F_%T'))
  99. return self.table_suffix
  100. def get_archive_db_name(self, database=None):
  101. """
  102. Return the name of the archiving database, which is build from the name
  103. of the source database plus a suffix
  104. """
  105. self.archive_db_name = database + \
  106. self.normalize_db_suffix(database=database)
  107. return self.archive_db_name
  108. def archive_db_exists(self, database=None):
  109. """
  110. Check if a databae already exists, return True/False
  111. """
  112. self.get_archive_db_name(database=database)
  113. show_db_sql = "SHOW DATABASES LIKE "\
  114. "'{db}'".format(db=self.archive_db_name)
  115. return bool(self.db_request(sql=show_db_sql, fetch_method='fetchall'))
  116. def get_src_create_db_statement(self, database=None):
  117. """
  118. Return result of SHOW CREATE DATABASE of the Source
  119. """
  120. src_db_create_sql = "SHOW CREATE DATABASE "\
  121. "{db}".format(db=database)
  122. src_db_create_statement = self.source.db_request(
  123. sql=src_db_create_sql, fetch_method='fetchone')[1]
  124. logging.debug("Source database '%s' CREATE statement: '%s'", database,
  125. src_db_create_statement)
  126. return src_db_create_statement
  127. def get_dst_create_db_statement(self, database=None):
  128. """
  129. Return result of SHOW CREATE DATABASE of the Destination
  130. """
  131. dst_db_create_sql = "SHOW CREATE DATABASE "\
  132. "{db}".format(db=database)
  133. dst_db_create_statement = self.db_request(sql=dst_db_create_sql,
  134. fetch_method='fetchone')[1]
  135. logging.debug("Destination database '%s' CREATE statement: '%s'",
  136. database, dst_db_create_statement)
  137. return dst_db_create_statement
  138. def create_archive_db(self, database=None):
  139. """
  140. Create the Destination database
  141. It checks that if the Destination database exists, the show create
  142. statement are the same than Source which is useful to detect Db schema
  143. upgrade
  144. """
  145. # Check if db exists
  146. archive_db_exists = self.archive_db_exists(database=database)
  147. # retrieve source db create statement
  148. # if archive database exists, compare create statement
  149. # else use the statement to create it
  150. src_db_create_statement = self.get_src_create_db_statement(
  151. database=database)
  152. if archive_db_exists:
  153. logging.debug("Destination DB has '%s' database",
  154. self.archive_db_name)
  155. dst_db_create_statement = self.get_dst_create_db_statement(
  156. database=self.archive_db_name)
  157. # compare create statement substituing db name in dst (arbitrary
  158. # choice)
  159. to_compare_dst_db_create_statement = re.sub(
  160. 'DATABASE `{dst_db}`'.format(dst_db=self.archive_db_name),
  161. 'DATABASE `{src_db}`'.format(src_db=database),
  162. dst_db_create_statement)
  163. if src_db_create_statement == to_compare_dst_db_create_statement:
  164. logging.info("source and destination database are identical")
  165. else:
  166. logging.debug(
  167. difflib.SequenceMatcher(
  168. None, src_db_create_statement,
  169. to_compare_dst_db_create_statement))
  170. raise db_errors.OSArchiverNotEqualDbCreateStatements
  171. else:
  172. logging.debug("'%s' on remote DB does not exists",
  173. self.archive_db_name)
  174. sql = re.sub('`{db}`'.format(db=database),
  175. '`{db}`'.format(db=self.archive_db_name),
  176. src_db_create_statement)
  177. self.db_request(sql=sql)
  178. if not self.dry_run:
  179. logging.debug("Successfully created '%s'",
  180. self.archive_db_name)
  181. def archive_table_exists(self, database=None, table=None):
  182. """
  183. Check if the archiving tabel exists, return True or False
  184. """
  185. self.archive_table_name = table + self.normalize_table_suffix()
  186. show_table_sql = 'SHOW TABLES LIKE '\
  187. '\'{table}\''.format(table=self.archive_table_name)
  188. return bool(
  189. self.db_request(sql=show_table_sql,
  190. fetch_method='fetchall',
  191. database=self.archive_db_name))
  192. def get_src_create_table_statement(self, database=None, table=None):
  193. """
  194. Return the SHOW CREATE TABLE of Source database
  195. """
  196. src_table_create_sql = 'SHOW CREATE TABLE '\
  197. '{table}'.format(table=table)
  198. src_table_create_statement = self.source.db_request(
  199. sql=src_table_create_sql,
  200. fetch_method='fetchone',
  201. database=database)[1]
  202. logging.debug("Source table '%s' CREATE statement: '%s'", database,
  203. src_table_create_statement)
  204. return src_table_create_statement
  205. def get_dst_create_table_statement(self, database=None, table=None):
  206. """
  207. Return the SHOW CREATE TABLE of Destination database
  208. """
  209. dst_table_create_sql = 'SHOW CREATE TABLE '\
  210. '{table}'.format(table=table)
  211. dst_table_create_statement = self.db_request(sql=dst_table_create_sql,
  212. fetch_method='fetchone',
  213. database=database)[1]
  214. logging.debug("Destination table '%s' CREATE statement: '%s'",
  215. self.archive_db_name, dst_table_create_statement)
  216. return dst_table_create_statement
  217. def compare_src_and_dst_create_table_statement(self,
  218. src_statement=None,
  219. dst_statement=None,
  220. src_table=None,
  221. dst_table=None):
  222. """
  223. Check that Source and Destination table are identical to prevent errors
  224. due to db schema upgrade
  225. It raises an exception if there is a difference and display the
  226. difference
  227. """
  228. # compare create statement substituing db name in dst (arbitrary
  229. # choice)
  230. dst_statement = re.sub(
  231. 'TABLE `{dst_table}`'.format(dst_table=dst_table),
  232. 'TABLE `{src_table}`'.format(src_table=src_table), dst_statement)
  233. # Remove autoincrement statement
  234. dst_statement = re.sub(r'AUTO_INCREMENT=\d+ ', '', dst_statement)
  235. src_statement = re.sub(r'AUTO_INCREMENT=\d+ ', '', src_statement)
  236. logging.debug("Comparing source create statement %s", src_statement)
  237. logging.debug("Comparing dest create statement %s", dst_statement)
  238. if dst_statement == src_statement:
  239. logging.info("source and destination tables are identical")
  240. else:
  241. for diff in difflib.context_diff(src_statement.split('\n'),
  242. dst_statement.split('\n')):
  243. logging.debug(diff.strip())
  244. raise db_errors.OSArchiverNotEqualTableCreateStatements
  245. def create_archive_table(self, database=None, table=None):
  246. """
  247. Create the archive table in the archive database.
  248. It checks that Source and Destination table are the identical.
  249. """
  250. # Call create db if archive_db_name is None
  251. if self.archive_db_name is None:
  252. self.create_archive_db(database=database)
  253. else:
  254. logging.debug("Archive db is '%s'", self.archive_db_name)
  255. # Check if table exists
  256. archive_table_exists = False
  257. if self.archive_db_exists:
  258. archive_table_exists = self.archive_table_exists(database=database,
  259. table=table)
  260. # retrieve source tabe create statement
  261. # if archive table exists, compare create statement
  262. # else use the statement to create it
  263. src_create_table_statement = self.get_src_create_table_statement(
  264. database=database, table=table)
  265. if archive_table_exists:
  266. logging.debug("Remote DB has '%s.%s' table", self.archive_db_name,
  267. self.archive_table_name)
  268. dst_table_create_statement = self.get_dst_create_table_statement(
  269. database=self.archive_db_name, table=self.archive_table_name)
  270. self.compare_src_and_dst_create_table_statement(
  271. src_statement=src_create_table_statement,
  272. dst_statement=dst_table_create_statement,
  273. src_table=table,
  274. dst_table=self.archive_table_name)
  275. else:
  276. logging.debug("'%s' table on remote DB does not exists",
  277. self.archive_table_name)
  278. sql = re.sub(
  279. 'TABLE `{table}`'.format(table=table),
  280. 'TABLE `{table}`'.format(table=self.archive_table_name),
  281. src_create_table_statement)
  282. self.db_request(sql=sql,
  283. database=self.archive_db_name,
  284. foreign_key_check=False)
  285. if not self.dry_run:
  286. logging.debug("Successfully created '%s.%s'",
  287. self.archive_db_name, self.archive_table_name)
  288. def prerequisites(self, database=None, table=None):
  289. """
  290. Check that destination database and tables exists before proceeding to
  291. archiving. Keep the result in metadata for performance purpose.
  292. """
  293. if database in self.metadata and table in self.metadata[database]:
  294. logging.debug("Use cached prerequisites metadata")
  295. return
  296. self.metadata[database] = {}
  297. logging.info("Checking prerequisites")
  298. self.create_archive_db(database=database)
  299. self.create_archive_table(database=database, table=table)
  300. self.metadata[database][table] = \
  301. {'checked': True,
  302. 'primary_key': self.get_table_primary_key(database=database,
  303. table=table)}
  304. return
  305. def db_bulk_insert(self,
  306. sql=None,
  307. database=None,
  308. table=None,
  309. values=None,
  310. force_commit=False):
  311. """
  312. Insert a set of data when there are enough data or when the
  313. force_commit is True
  314. Retrurn the remaining values to insert
  315. """
  316. values = values or []
  317. # execute and commit if we have enough data to commit(bulk_insert) or
  318. # if commit is forced
  319. if len(values) >= self.bulk_insert or (values and force_commit):
  320. logging.info("Processing bulk insert")
  321. count = self.db_request(sql=sql,
  322. values=values,
  323. database=database,
  324. table=table,
  325. foreign_key_check=False,
  326. execute_method='executemany')
  327. values = []
  328. logging.info("%s rows inserted into %s.%s", count, database, table)
  329. return values
  330. def write(self, database=None, table=None, data=None):
  331. """
  332. Write method implemented which is in charge of writing data from
  333. Source into archive database. It calls the db_bulk_insert method to
  334. write by set of data
  335. """
  336. if not self.archive_data:
  337. logging.info(
  338. "Ignoring data archiving because archive_data is "
  339. "set to % s", self.archive_data)
  340. return
  341. self.prerequisites(database=database, table=table)
  342. primary_key = self.get_table_primary_key(database=database,
  343. table=table)
  344. values = []
  345. for item in data:
  346. placeholders = ', '.join(['%s'] * len(item))
  347. columns = '`' + '`, `'.join(item.keys()) + '`'
  348. sql = "INSERT INTO {database}.{table} ({columns}) VALUES "\
  349. "({placeholders}) ON DUPLICATE KEY UPDATE {pk} = {pk}".format(
  350. database=self.archive_db_name,
  351. table=table,
  352. columns=columns,
  353. placeholders=placeholders,
  354. pk=primary_key)
  355. values.append([v for v in item.values()])
  356. values = self.db_bulk_insert(sql=sql,
  357. values=values,
  358. database=self.archive_db_name,
  359. table=table)
  360. # Force commit of remaining data even if we do not reach the
  361. # bulk_insert limit
  362. self.db_bulk_insert(sql=sql,
  363. database=self.archive_db_name,
  364. table=table,
  365. values=values,
  366. force_commit=True)
  367. return
  368. def clean_exit(self):
  369. """
  370. Tasks to be executed to exit cleanly
  371. - disconnect from the db
  372. """
  373. logging.info("Closing destination DB connection")
  374. self.disconnect()