123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215 |
- """
- Job handlers
- """
- import json
- import os
- import boto3
- from boto3.dynamodb.conditions import Key, Attr
- from boto_utils import DecimalEncoder, utc_timestamp
- from decorators import (
- with_logging,
- request_validator,
- catch_errors,
- add_cors_headers,
- load_schema,
- )
- ddb = boto3.resource("dynamodb")
- table = ddb.Table(os.getenv("JobTable", "S3F2_Jobs"))
- index = os.getenv("JobTableDateGSI", "Date-GSI")
- bucket_count = int(os.getenv("GSIBucketCount", 1))
- end_statuses = [
- "COMPLETED_CLEANUP_FAILED",
- "COMPLETED",
- "FAILED",
- "FIND_FAILED",
- "FORGET_FAILED",
- "FORGET_PARTIALLY_FAILED",
- ]
- job_summary_attributes = [
- "Id",
- "CreatedAt",
- "JobStatus",
- "JobFinishTime",
- "JobStartTime",
- "TotalObjectRollbackFailedCount",
- "TotalObjectUpdatedCount",
- "TotalObjectUpdateSkippedCount",
- "TotalObjectUpdateFailedCount",
- "TotalQueryCount",
- "TotalQueryFailedCount",
- "TotalQueryScannedInBytes",
- "TotalQuerySucceededCount",
- "TotalQueryTimeInMillis",
- ]
- @with_logging
- @add_cors_headers
- @request_validator(load_schema("get_job"))
- @catch_errors
- def get_job_handler(event, context):
- job_id = event["pathParameters"]["job_id"]
- resp = table.get_item(
- Key={
- "Id": job_id,
- "Sk": job_id,
- }
- )
- item = resp.get("Item")
- if not item:
- return {"statusCode": 404}
- return {"statusCode": 200, "body": json.dumps(item, cls=DecimalEncoder)}
- @with_logging
- @add_cors_headers
- @request_validator(load_schema("list_jobs"))
- @catch_errors
- def list_jobs_handler(event, context):
- qs = event.get("queryStringParameters")
- if not qs:
- qs = {}
- page_size = int(qs.get("page_size", 10))
- start_at = int(qs.get("start_at", utc_timestamp()))
- items = []
- for gsi_bucket in range(0, bucket_count):
- response = table.query(
- IndexName=index,
- KeyConditionExpression=Key("GSIBucket").eq(str(gsi_bucket))
- & Key("CreatedAt").lt(start_at),
- ScanIndexForward=False,
- Limit=page_size,
- ProjectionExpression=", ".join(job_summary_attributes),
- )
- items += response.get("Items", [])
- items = sorted(items, key=lambda i: i["CreatedAt"], reverse=True)[:page_size]
- if len(items) < page_size:
- next_start = None
- else:
- next_start = min([item["CreatedAt"] for item in items])
- return {
- "statusCode": 200,
- "body": json.dumps(
- {
- "Jobs": items,
- "NextStart": next_start,
- },
- cls=DecimalEncoder,
- ),
- }
- @with_logging
- @add_cors_headers
- @request_validator(load_schema("list_job_events"))
- @catch_errors
- def list_job_events_handler(event, context):
- # Input parsing
- job_id = event["pathParameters"]["job_id"]
- qs = event.get("queryStringParameters")
- mvqs = event.get("multiValueQueryStringParameters")
- if not qs:
- qs = {}
- mvqs = {}
- page_size = int(qs.get("page_size", 20))
- start_at = qs.get("start_at", "0")
- # Check the job exists
- job = table.get_item(
- Key={
- "Id": job_id,
- "Sk": job_id,
- }
- ).get("Item")
- if not job:
- return {"statusCode": 404}
- watermark_boundary_mu = (job.get("JobFinishTime", utc_timestamp()) + 1) * 1000
- # Check the watermark is not "future"
- if int(start_at.split("#")[0]) > watermark_boundary_mu:
- raise ValueError("Watermark {} is out of bounds for this job".format(start_at))
- # Apply filters
- filter_expression = Attr("Type").eq("JobEvent")
- user_filters = mvqs.get("filter", [])
- for f in user_filters:
- k, v = f.split("=")
- filter_expression = filter_expression & Attr(k).begins_with(v)
- # Because result may contain both JobEvent and Job items, we request max page_size+1 items then apply the type
- # filter as FilterExpression. We then limit the list size to the requested page size in case the number of
- # items after filtering is still page_size+1 i.e. the Job item wasn't on the page.
- items = []
- query_start_key = str(start_at)
- last_evaluated = None
- last_query_size = 0
- while len(items) < page_size:
- resp = table.query(
- KeyConditionExpression=Key("Id").eq(job_id),
- ScanIndexForward=True,
- FilterExpression=filter_expression,
- Limit=100 if len(user_filters) else page_size + 1,
- ExclusiveStartKey={"Id": job_id, "Sk": query_start_key},
- )
- results = resp.get("Items", [])
- last_query_size = len(results)
- items.extend(results[: page_size - len(items)])
- query_start_key = resp.get("LastEvaluatedKey", {}).get("Sk")
- if not query_start_key:
- break
- last_evaluated = query_start_key
- next_start = _get_watermark(
- items, start_at, page_size, job["JobStatus"], last_evaluated, last_query_size
- )
- resp = {
- k: v
- for k, v in {"JobEvents": items, "NextStart": next_start}.items()
- if v is not None
- }
- return {"statusCode": 200, "body": json.dumps(resp, cls=DecimalEncoder)}
- def _get_watermark(
- items,
- initial_start_key,
- page_size,
- job_status,
- last_evaluated_ddb_key,
- last_query_size,
- ):
- """
- Work out the watermark to return to the user using the following logic:
- 1. If the job is in progress, we always return a watermark but the source of the watermark
- is determined as follows:
- a. We've reached the last available items in DDB but filtering has left us with less than the desired page
- size but we have a LastEvaluatedKey that allows the client to skip the filtered items next time
- b. There is at least 1 event and there are (or will be) more items available
- c. There's no events after the supplied watermark so just return whatever the user sent
- 2. If the job is finished, return a watermark if the last query executed indicates there *might* be more
- results
- """
- next_start = None
- if job_status not in end_statuses:
- # Job is in progress
- if len(items) < page_size and last_evaluated_ddb_key:
- next_start = last_evaluated_ddb_key
- elif 0 < len(items) <= page_size:
- next_start = items[len(items) - 1]["Sk"]
- else:
- next_start = str(initial_start_key)
- # Job is finished but there are potentially more results
- elif last_query_size >= page_size:
- next_start = items[len(items) - 1]["Sk"]
- return next_start
|