12345678910111213141516171819202122232425 |
- def job_upload_sqs_ddb(*, sqs, sqs_queue, job_list):
- sqs_batch = 0
- sqs_message = []
- logger.info(f'Start uploading jobs to queue: {sqs_queue}')
- # create ddb writer
- # with table.batch_writer() as ddb_batch:
- for job in job_list:
- # construct sqs messages
- sqs_message.append({
- "Id": str(sqs_batch),
- "MessageBody": json.dumps(job),
- })
- sqs_batch += 1
- # write to sqs in batch 10 or is last one
- if sqs_batch == 10 or job == job_list[-1]:
- try:
- sqs.send_message_batch(QueueUrl=sqs_queue, Entries=sqs_message)
- except Exception as e:
- logger.error(f'Fail to send sqs message: {str(sqs_message)}, {str(e)}')
- sqs_batch = 0
- sqs_message = []
- logger.info(f'Complete upload job to queue: {sqs_queue}')
- return
|