db_archive_exec.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127
  1. #!/usr/bin/python
  2. # -*- coding: UTF-8 -*-
  3. import sys
  4. import os
  5. import time
  6. import db_conn
  7. # get db connection
  8. db = db_conn.db
  9. # use cursor
  10. cursor = db.cursor()
  11. # 获取命令行参数
  12. #server_source = '10.73.129.187'
  13. #db_source = 'test123'
  14. server_source = sys.argv[1]
  15. db_source = sys.argv[2]
  16. try:
  17. # SQL 查询语句
  18. sql = "select id, server_source, port_source, user_source, password_source, db_source, table_source," \
  19. "server_dest, port_dest, user_dest, password_dest, db_dest, table_dest, archive_condition " \
  20. "from db_archive_info " \
  21. "where server_source = '%s' and db_source = '%s' " % (server_source, db_source)
  22. # 执行SQL语句
  23. cursor.execute(sql)
  24. # 获取所有记录列表
  25. results = cursor.fetchall()
  26. for row in results:
  27. id = row[0]
  28. server_source = row[1]
  29. port_source = row[2]
  30. user_source = row[3]
  31. password_source = row[4]
  32. db_source = row[5]
  33. table_source = row[6]
  34. server_dest= row[7]
  35. port_dest = row[8]
  36. user_dest = row[9]
  37. password_dest = row[10]
  38. db_dest = row[11]
  39. table_dest = row[12]
  40. archive_condition = row[13]
  41. # 归档开始时间
  42. archive_starttime = time.strftime('%Y-%m-%d %H:%M:%S')
  43. # 生成pt-archive命令
  44. archive_cmd = "pt-archiver " \
  45. "--source h='%s',P='%s',u='%s',p='%s',D='%s',t='%s' " \
  46. "--dest h='%s',P='%s',u='%s',p='%s',D='%s',t='%s' " \
  47. "--charset=UTF8 --where '%s' --progress 50000 --limit 10000 --txn-size 10000 " \
  48. "--bulk-insert --bulk-delete --statistics --purge " % \
  49. (server_source, port_source, user_source, password_source, db_source, table_source, \
  50. server_dest, port_dest, user_dest, password_dest, db_dest, table_dest, \
  51. archive_condition)
  52. #print archive_cmd
  53. # make a copy of original stdout route
  54. stdout_archive = sys.stdout
  55. # define the log file that receives your log info
  56. log_file = open("/software/python_script/db_archive_%s_%s.log"% (db_source, table_source), "w")
  57. # redirect print output to log file
  58. sys.stdout = log_file
  59. #archive_cmd = os.popen(pt_archive)
  60. with os.popen(archive_cmd) as c:
  61. #with open("db_archive1.log", "r") as c:
  62. archive_log = c.read()
  63. print (archive_log)
  64. # close log file
  65. log_file.close()
  66. # restore the output to initial pattern
  67. sys.stdout = stdout_archive
  68. # 定义归档相关变量
  69. inserted_qty = 0
  70. deleted_qty = 0
  71. # 归档结束时间
  72. archive_endtime = time.strftime('%Y-%m-%d %H:%M:%S')
  73. with open("/software/python_script/db_archive_%s_%s.log"% (db_source, table_source),"r") as f:
  74. for line in f:
  75. if 'INSERT' in line:
  76. i = line.index(" ")
  77. inserted_qty = line[i+1:]
  78. elif 'DELETE' in line:
  79. i = line.index(" ")
  80. deleted_qty = line[i+1:]
  81. #判断归档是否失败
  82. if inserted_qty == deleted_qty:
  83. archive_status = 'Y'
  84. archive_error = ''
  85. else:
  86. archive_status = 'N'
  87. archive_error = 'inserted_qty and deleted_qty are not equal'
  88. # insert sql
  89. sql_insert = "insert into db_archive_log(server_source, db_source, table_source, server_dest, " \
  90. "db_dest, table_dest, archive_qty, archive_cmd, archive_log, archive_start, archive_end, " \
  91. "archive_status, archive_error ) " \
  92. "values('%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s','%s')" % \
  93. (server_source, db_source, table_source, server_dest, \
  94. db_dest, table_dest, inserted_qty, db.escape_string(archive_cmd), archive_log, archive_starttime, archive_endtime, \
  95. archive_status, archive_error)
  96. # exec sql
  97. cursor.execute(sql_insert)
  98. # exec commit
  99. db.commit()
  100. if archive_status == 'Y':
  101. sql_update = "update db_archive_info " \
  102. "set datetime_modified = '%s', last_archive_date = '%s', last_archive_qty = %s " \
  103. "where id = %d" % \
  104. (archive_starttime, archive_endtime, inserted_qty, id)
  105. cursor.execute(sql_update)
  106. # exec commit
  107. db.commit()
  108. except Exception as e:
  109. print (str(Exception))
  110. print (str(e))