analystic_dynamodb_table.py 5.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156
  1. """
  2. 本工具是dump DynamoDB上关键列的数据,并存入本地文件
  3. """
  4. import boto3
  5. import os, time, csv
  6. from operator import itemgetter
  7. def get_ddb():
  8. result = []
  9. try:
  10. response = table.scan()
  11. if "Items" in response:
  12. result.extend(response['Items'])
  13. if "LastEvaluatedKey" in response:
  14. LastEvaluatedKey = response['LastEvaluatedKey']
  15. while LastEvaluatedKey:
  16. response = table.scan(
  17. ExclusiveStartKey=LastEvaluatedKey
  18. )
  19. if "Items" in response:
  20. result.extend(response['Items'])
  21. if "LastEvaluatedKey" in response:
  22. LastEvaluatedKey = response['LastEvaluatedKey']
  23. else:
  24. break
  25. except Exception as e:
  26. print(e)
  27. return result
  28. def get_running(data):
  29. result = []
  30. for i in data:
  31. if 'lastTimeProgress' in i:
  32. if i['lastTimeProgress'] != 100:
  33. result.append(i)
  34. print('Total:', len(result), 'running')
  35. return result
  36. def size_to_str(size):
  37. def loop(integer, remainder, level):
  38. if integer >= 1024:
  39. remainder = integer % 1024
  40. integer //= 1024
  41. level += 1
  42. return loop(integer, remainder, level)
  43. else:
  44. return integer, round(remainder / 1024, 1), level
  45. units = ['B', 'KB', 'MB', 'GB', 'TB', 'PB']
  46. integer, remainder, level = loop(int(size), 0, 0)
  47. if level+1 > len(units):
  48. level = -1
  49. return f'{integer+remainder} {units[level]}'
  50. def display(data, limit=10, mute=False):
  51. count = 0
  52. result = []
  53. for i in data:
  54. p = " "
  55. if 'Size' in i:
  56. size_str = size_to_str(i['Size'])
  57. p += f"Size: \033[0;34;1m{size_str}\033[0m "
  58. else:
  59. i['Size'] = 0
  60. if 'firstTime_f' in i:
  61. p += f"firstTime: \033[0;34;1m{i['firstTime_f']}\033[0m "
  62. else:
  63. i['firstTime_f'] = 'NA'
  64. if 'lastTimeProgress' in i:
  65. p += f"lastTimeProgress: \033[0;34;1m{i['lastTimeProgress']}%\033[0m "
  66. else:
  67. i['lastTimeProgress'] = 'NA'
  68. if 'totalSpentTime' in i:
  69. p += f"totalSpentTime: \033[0;34;1m{i['totalSpentTime']}\033[0m "
  70. else:
  71. i['totalSpentTime'] = 'NA'
  72. if 'tryTimes' in i:
  73. p += f"tryTimes: \033[0;34;1m{i['tryTimes']}\033[0m "
  74. else:
  75. i['tryTimes'] = 'NA'
  76. if 'thisRoundStart_f' in i:
  77. p += f"thisRoundStart: \033[0;34;1m{i['thisRoundStart_f']}\033[0m "
  78. else:
  79. i['thisRoundStart_f'] = 'NA'
  80. if 'endTime_f' in i:
  81. p += f"endTime_f: \033[0;34;1m{i['endTime_f']}\033[0m "
  82. else:
  83. i['endTime_f'] = 'NA'
  84. if 'instanceID' in i:
  85. p += f"instanceID: {str(i['instanceID'])} "
  86. else:
  87. i['instanceID'] = 'NA'
  88. if 'jobStatus' in i:
  89. p += f"jobStatus: {str(i['jobStatus'])}"
  90. else:
  91. i['jobStatus'] = 'NA'
  92. result.append(i)
  93. if mute:
  94. continue
  95. print({i['Key']})
  96. print(p)
  97. count += 1
  98. if count > limit:
  99. break
  100. return result
  101. # Main
  102. if __name__ == '__main__':
  103. table_queue_name = "covid19-s3-migrate-serverless-covid19s3migrateddb4500B92E-A8EFI0ZPV1UE"
  104. src_session = boto3.session.Session(profile_name='iad')
  105. dynamodb = src_session.resource('dynamodb')
  106. table = dynamodb.Table(table_queue_name)
  107. file_path = os.path.split(os.path.abspath(__file__))[0]
  108. ddb_result_path = file_path + '/s3_migration_ddb_result'
  109. os.system('mkdir ' + ddb_result_path)
  110. this_file_name = os.path.splitext(os.path.basename(__file__))[0]
  111. t = time.localtime()
  112. start_time = f'{t.tm_year}-{t.tm_mon}-{t.tm_mday}-{t.tm_hour}-{t.tm_min}-{t.tm_sec}'
  113. ddb_result_filename = f'{ddb_result_path}/{this_file_name}-{start_time}.csv'
  114. '''
  115. MAIN
  116. '''
  117. # 整个表scan下来
  118. ddb_result = get_ddb()
  119. print("DDB records: ", len(ddb_result))
  120. # 查询正在进行中的Job
  121. query_running = get_running(ddb_result)
  122. display(query_running, 10)
  123. # 输出启动时间最晚的记录
  124. print("\nLast firstTime data:")
  125. query_descend = sorted(
  126. ddb_result, key=itemgetter('firstTime'), reverse=True)
  127. display(query_descend, 10)
  128. # 补缺数据,否则文件输出csv会在缺失数据的地方中断
  129. convert_data = display(ddb_result, mute=True)
  130. # 写入本地文件
  131. with open(ddb_result_filename, "w+") as f:
  132. csvwriter = csv.writer(f)
  133. csvwriter.writerow(['Key', 'Size', 'firstTime_f', 'lastTimeProgress', 'totalSpentTime', 'tryTimes',
  134. 'thisRoundStart_f', 'endTime_f', 'instanceID', 'jobStatus']) # 写表头
  135. for i in ddb_result:
  136. csvwriter.writerow([i['Key'], i['Size'], i['firstTime_f'], i['lastTimeProgress'], i['totalSpentTime'],
  137. i['tryTimes'], i['thisRoundStart_f'], i['endTime_f'], str(i['instanceID']), str(i['jobStatus'])])
  138. print(f'\n Full data write to {ddb_result_filename}')