s3_migration_lib_9.py 874 B

12345678910111213141516171819202122232425
  1. def job_upload_sqs_ddb(*, sqs, sqs_queue, job_list):
  2. sqs_batch = 0
  3. sqs_message = []
  4. logger.info(f'Start uploading jobs to queue: {sqs_queue}')
  5. # create ddb writer
  6. # with table.batch_writer() as ddb_batch:
  7. for job in job_list:
  8. # construct sqs messages
  9. sqs_message.append({
  10. "Id": str(sqs_batch),
  11. "MessageBody": json.dumps(job),
  12. })
  13. sqs_batch += 1
  14. # write to sqs in batch 10 or is last one
  15. if sqs_batch == 10 or job == job_list[-1]:
  16. try:
  17. sqs.send_message_batch(QueueUrl=sqs_queue, Entries=sqs_message)
  18. except Exception as e:
  19. logger.error(f'Fail to send sqs message: {str(sqs_message)}, {str(e)}')
  20. sqs_batch = 0
  21. sqs_message = []
  22. logger.info(f'Complete upload job to queue: {sqs_queue}')
  23. return