AWSTemplateFormatVersion: "2010-09-09" Transform: AWS::Serverless-2016-10-31 Description: Amazon S3 Find and Forget State Machine Globals: Function: Runtime: python3.9 Timeout: 900 Tracing: Active Layers: !Ref CommonLayers Environment: Variables: DataMapperTable: !Ref DataMapperTableName DeletionQueueTable: !Ref DeletionQueueTableName GlueDatabase: !Ref GlueDatabase LogLevel: !Ref LogLevel JobManifestsGlueTable: !Ref JobManifestsGlueTable JobTable: !Ref JobTableName StateBucket: !Ref ResultBucket ManifestsBucket: !Ref ManifestsBucket Parameters: AthenaWorkGroup: Description: WorkGroup to use for Athena queries Type: String Default: primary CommonLayers: Type: CommaDelimitedList Description: Common layers supplied to all functions DataMapperTableName: Description: Table name for Data Mapper Table Type: String DeleteQueueUrl: Type: String DeleteServiceName: Type: String DeletionQueueTableName: Type: String ECSCluster: Type: String GlueDatabase: Type: String JobTableName: Description: Table name for Jobs Table Type: String LogLevel: Type: String Default: INFO AllowedValues: - CRITICAL - FATAL - ERROR - WARNING - INFO - DEBUG - NOTSET JobManifestsGlueTable: Type: String ManifestsBucket: Type: String ResultBucket: Type: String StateMachinePrefix: Type: String Resources: StatesExecutionRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - !Sub states.${AWS::Region}.amazonaws.com Action: "sts:AssumeRole" Path: "/" Policies: - PolicyName: StatesExecutionPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: "lambda:InvokeFunction" Resource: - !GetAtt CheckQueueSize.Arn - !GetAtt ExecuteQuery.Arn - !GetAtt CheckQueryStatus.Arn - !GetAtt CheckTaskCount.Arn - !GetAtt SubmitQueryResults.Arn - !GetAtt GenerateQueries.Arn - !GetAtt OrchestrateECSServiceScaling.Arn - !GetAtt WorkQueryQueue.Arn - !GetAtt DeleteQueueMessage.Arn - !GetAtt PurgeQueue.Arn - !GetAtt EmitEvent.Arn - Effect: Allow Action: - "events:PutTargets" - "events:PutRule" - "events:DescribeRule" Resource: !Sub arn:${AWS::Partition}:events:${AWS::Region}:${AWS::AccountId}:rule/* - Effect: Allow Action: "states:ListStateMachines" Resource: "*" - Effect: Allow Action: - "states:DescribeExecution" - "states:DescribeStateMachineForExecution" Resource: !Sub "arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:execution:${StateMachinePrefix}*:*" - Effect: Allow Action: - "states:DescribeStateMachine" - "states:ListExecutions" - "states:StartExecution" Resource: !Sub "arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${StateMachinePrefix}*" AthenaStateMachine: Type: AWS::StepFunctions::StateMachine Properties: StateMachineName: !Sub ${StateMachinePrefix}-AthenaStateMachine RoleArn: !GetAtt StatesExecutionRole.Arn DefinitionString: !Sub |- { "StartAt": "Execute Query", "States": { "Execute Query": { "Comment": "Start an Athena query asynchronously", "Type": "Task", "Parameters": { "QueryData.$": "$", "Bucket": "${ResultBucket}", "Prefix": "queries" }, "Resource": "${ExecuteQuery.Arn}", "ResultPath": "$.QueryId", "Next": "Get Query Status", "Retry": [{ "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 10, "BackoffRate": 10, "MaxAttempts": 2 }], "Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.ErrorDetails", "Next": "Handle Error" }] }, "Wait for Query": { "Comment": "Waits before checking again whether Athena is done", "Type": "Wait", "SecondsPath": "$.WaitDuration", "Next": "Get Query Status" }, "Get Query Status": { "Comment": "Gets the status of the given Athena query", "Type": "Task", "Resource": "${CheckQueryStatus.Arn}", "Next": "Query Complete?", "Retry": [{ "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 10, "BackoffRate": 10, "MaxAttempts": 2 }], "Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.ErrorDetails", "Next": "Handle Error" }] }, "Query Complete?": { "Comment": "Check if the Athena query is still running", "Type": "Choice", "Choices": [ { "Variable": "$.State", "StringEquals": "SUCCEEDED", "Next": "Submit Query Results" }, { "Or": [ { "Variable": "$.State", "StringEquals": "FAILED" }, { "Variable": "$.State", "StringEquals": "CANCELLED" } ], "Next": "Retriable?" } ], "Default": "Wait for Query" }, "Retriable?": { "Comment": "Check if the Athena query can be retried", "Type": "Choice", "Choices": [ { "Variable": "$.ExecutionRetriesLeft", "NumericGreaterThan": 0, "Next": "Execute Query" } ], "Default": "Raise Query Error" }, "Raise Query Error": { "Type": "Pass", "Parameters": { "Cause.$": "$.Reason", "Error": "Query Failed" }, "ResultPath": "$.ErrorDetails", "Next": "Handle Error" }, "Handle Error": { "Type": "Pass", "Parameters": { "Error.$": "$.ErrorDetails.Error", "Cause.$": "$.ErrorDetails.Cause", "State.$": "$" }, "Next": "Emit Error" }, "Emit Error": { "Comment": "Emit the failure event", "Type": "Task", "Parameters": { "EventName": "QueryFailed", "EventData.$": "$", "EmitterId": "StepFunctions", "JobId.$": "$.State.JobId" }, "Resource": "${EmitEvent.Arn}", "ResultPath": null, "Next": "Query Failed" }, "Query Failed": { "Comment": "The query was unsuccessful", "Type": "Fail" }, "Submit Query Results": { "Comment": "Obtain the query results from S3 and send them to Fargate", "Type": "Task", "Resource": "${SubmitQueryResults.Arn}", "ResultPath": null, "Next": "Query Succeeded", "Retry": [{ "ErrorEquals": [ "States.ALL" ], "MaxAttempts": 0 }], "Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.ErrorDetails", "Next": "Handle Error" }] }, "Query Succeeded": { "Comment": "The query was successful", "Type": "Pass", "Parameters": { "JobId.$": "$.JobId", "QueryId.$": "$.QueryId", "PartitionKeys.$": "$.PartitionKeys", "Statistics.$": "$.Statistics", "DataMapperId.$": "$.DataMapperId", "Table.$": "$.Table" }, "Next": "Emit Success" }, "Emit Success": { "Comment": "Emit the successful query event", "Type": "Task", "Parameters": { "EventName": "QuerySucceeded", "EventData.$": "$", "EmitterId": "StepFunctions", "JobId.$": "$.JobId" }, "Resource": "${EmitEvent.Arn}", "ResultPath": null, "End": true, "Retry": [{ "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 3, "BackoffRate": 1.5, "MaxAttempts": 1 }] } } } DeleteStateMachine: Type: AWS::StepFunctions::StateMachine Properties: StateMachineName: !Sub ${StateMachinePrefix}-DeletionStateMachine RoleArn: !GetAtt StatesExecutionRole.Arn DefinitionString: !Sub |- { "StartAt": "Fetch Queue Size", "States": { "Fetch Queue Size": { "Comment": "Checks the number of messages in the Object Deletion Queue", "Type": "Task", "Resource": "${CheckQueueSize.Arn}", "Parameters": { "QueueUrl": "${DeleteQueueUrl}" }, "ResultPath": "$.Queue", "Next": "Adjust Deletion Service Instance Count", "Retry": [{ "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 10, "MaxAttempts": 1 }] }, "Adjust Deletion Service Instance Count": { "Comment": "Sets the desired instance count based on Object Deletion Queue size", "Type": "Task", "Resource": "${OrchestrateECSServiceScaling.Arn}", "Parameters": { "Cluster": "${ECSCluster}", "DeleteService": "${DeleteServiceName}", "DeletionTasksMaxNumber.$": "$.DeletionTasksMaxNumber", "QueueSize.$": "$.Queue.Total" }, "ResultPath": "$.DesiredCount", "Next": "Items in Queue?", "Retry": [{ "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 10, "MaxAttempts": 1 }] }, "Items in Queue?": { "Comment": "Checks if any tasks are being created/terminated", "Type": "Choice", "Choices": [ { "Variable": "$.DesiredCount", "NumericEquals": 0, "Next": "Fetch Task Count" }, { "Variable": "$.DesiredCount", "NumericGreaterThan": 0, "Next": "Wait" } ], "Default": "Not sure, fail" }, "Wait": { "Comment": "Waits before checking if the Object Deletion Queue is empty", "Type": "Wait", "SecondsPath": "$.WaitDuration", "Next": "Fetch Queue Size Again" }, "Fetch Queue Size Again": { "Comment": "Checks the number of messages in the Object Deletion Queue", "Type": "Task", "Resource": "${CheckQueueSize.Arn}", "Parameters": { "QueueUrl": "${DeleteQueueUrl}" }, "ResultPath": "$.Queue", "Next": "Queue is Empty?", "Retry": [{ "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 10, "MaxAttempts": 1 }] }, "Queue is Empty?": { "Comment": "Checks if the Total messages are 0", "Type": "Choice", "Choices": [ { "Variable": "$.Queue.Total", "NumericEquals": 0, "Next": "Adjust Deletion Service Instance Count" }, { "Variable": "$.Queue.Total", "NumericGreaterThan": 0, "Next": "Wait" } ], "Default": "Not sure, fail" }, "Fetch Task Count": { "Comment": "Checks the number of tasks for a service", "Type": "Task", "Resource": "${CheckTaskCount.Arn}", "Parameters": { "Cluster": "${ECSCluster}", "ServiceName": "${DeleteServiceName}" }, "ResultPath": "$.TaskCount", "Next": "Has Fargate Shutdown?", "Retry": [{ "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 10, "MaxAttempts": 1 }] }, "Has Fargate Shutdown?": { "Comment": "Checks if the remaining tasks is 0", "Type": "Choice", "Choices": [ { "Variable": "$.TaskCount.Total", "NumericEquals": 0, "Next": "Done" }, { "Variable": "$.TaskCount.Total", "NumericGreaterThan": 0, "Next": "Wait for Fargate Shutdown" } ], "Default": "Not sure, fail" }, "Wait for Fargate Shutdown": { "Comment": "Waits before checking if Fargate has reached the desired count", "Type": "Wait", "Seconds": 10, "Next": "Fetch Task Count" }, "Not sure, fail": { "Type": "Fail" }, "Done": { "Type": "Succeed" } } } StateMachine: Type: AWS::StepFunctions::StateMachine Properties: StateMachineName: !Sub ${StateMachinePrefix}-StateMachine DefinitionString: !Sub |- { "Comment": "State machine for processing the S3 Find and Forget deletion queue.", "StartAt": "Start Job", "States": { "Start Job": { "Type": "Pass", "Parameters": { "ExecutionId.$": "$$.Execution.Id", "ExecutionName.$": "$$.Execution.Name", "AthenaConcurrencyLimit.$": "$.AthenaConcurrencyLimit", "AthenaQueryMaxRetries.$": "$.AthenaQueryMaxRetries", "DeletionTasksMaxNumber.$": "$.DeletionTasksMaxNumber", "ForgetQueueWaitSeconds.$": "$.ForgetQueueWaitSeconds", "QueryExecutionWaitSeconds.$": "$.QueryExecutionWaitSeconds", "QueryQueueWaitSeconds.$": "$.QueryQueueWaitSeconds" }, "Next": "Emit Job Started" }, "Emit Job Started": { "Type": "Task", "Parameters": { "EventName": "JobStarted", "EventData.$": "$$.State.EnteredTime", "EmitterId": "StepFunctions", "JobId.$": "$$.Execution.Name" }, "Resource": "${EmitEvent.Arn}", "ResultPath": null, "Next": "Purge Queues", "Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.ErrorDetails", "Next": "Handle Error" }] }, "Purge Queues": { "Comment": "Purges the query and object deletion queues. Purges wait 61 seconds before retrying to avoid exceeding the max purge attempt rate for SQS.", "Type": "Parallel", "Next": "Start Find Phase", "ResultPath": null, "Branches": [{ "StartAt": "Purge Query Queue", "States": { "Purge Query Queue": { "Parameters": { "QueueUrl": "${QueryQueue}" }, "Comment": "Purge the query queue", "Type": "Task", "Resource": "${PurgeQueue.Arn}", "End": true, "Retry": [{ "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 61, "MaxAttempts": 1 }] } } }, { "StartAt": "Purge Deletion Queue", "States": { "Purge Deletion Queue": { "Parameters": { "QueueUrl": "${DeleteQueueUrl}" }, "Comment": "Purge the deletion queue.", "Type": "Task", "Resource": "${PurgeQueue.Arn}", "End": true, "Retry": [{ "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 61, "MaxAttempts": 1 }] } } }], "Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.ErrorDetails", "Next": "Handle Error" }] }, "Start Find Phase": { "Type": "Task", "Parameters": { "EventName": "FindPhaseStarted", "EventData.$": "$$.State.EnteredTime", "EmitterId": "StepFunctions", "JobId.$": "$$.Execution.Name" }, "Resource": "${EmitEvent.Arn}", "Next": "Generate Queries", "ResultPath": null, "Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.ErrorDetails", "Next": "Handle Error" }] }, "Generate Queries": { "Comment": "Process each of the data mappers and populate the query queue", "Type": "Task", "Resource": "${GenerateQueries.Arn}", "ResultPath": "$.QueriesStats", "Next": "End Query Planning", "Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.ErrorDetails", "Next": "Handle Find Error" }] }, "End Query Planning": { "Type": "Task", "Parameters": { "EventName": "QueryPlanningComplete", "EventData.$": "$.QueriesStats", "EmitterId": "StepFunctions", "JobId.$": "$$.Execution.Name" }, "Resource": "${EmitEvent.Arn}", "ResultPath": null, "Next": "Work Queue", "Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.ErrorDetails", "Next": "Handle Find Error" }] }, "Work Queue": { "Comment": "Works the Object Deletion Queue by starting Athena State Machine executions", "Type": "Task", "Resource": "${WorkQueryQueue.Arn}", "ResultPath": "$.RunningExecutions", "Next": "Outstanding Queries?", "Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.ErrorDetails", "Next": "Handle Find Error" }] }, "Outstanding Queries?": { "Comment": "Checks if any queries are yet to be ran or are in progress", "Type": "Choice", "Choices": [ { "Variable": "$.RunningExecutions.Total", "NumericEquals": 0, "Next": "End Find Phase" }, { "Variable": "$.RunningExecutions.Total", "NumericGreaterThan": 0, "Next": "Wait for Queries" } ], "Default": "Wait for Queries" }, "Wait for Queries": { "Comment": "Waits before rechecking if the Object Deletion Queue is empty", "Type": "Wait", "SecondsPath": "$.QueryQueueWaitSeconds", "Next": "Work Queue" }, "End Find Phase": { "Type": "Task", "Parameters": { "EventName": "FindPhaseEnded", "EventData.$": "$$.State.EnteredTime", "EmitterId": "StepFunctions", "JobId.$": "$$.Execution.Name" }, "Resource": "${EmitEvent.Arn}", "ResultPath": null, "Next": "Start Forget Phase", "Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.ErrorDetails", "Next": "Handle Error" }] }, "Start Forget Phase": { "Type": "Task", "Parameters": { "EventName": "ForgetPhaseStarted", "EventData.$": "$$.State.EnteredTime", "EmitterId": "StepFunctions", "JobId.$": "$$.Execution.Name" }, "Resource": "${EmitEvent.Arn}", "ResultPath": null, "Next": "Start Fargate Workflow", "Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.ErrorDetails", "Next": "Handle Error" }] }, "Start Fargate Workflow": { "Type":"Task", "Resource":"arn:${AWS::Partition}:states:::states:startExecution.sync", "Parameters":{ "Input":{ "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id", "DeletionTasksMaxNumber.$": "$.DeletionTasksMaxNumber", "WaitDuration.$": "$.ForgetQueueWaitSeconds" }, "StateMachineArn":"${DeleteStateMachine}", "Name.$": "$$.Execution.Name" }, "ResultPath": null, "Next": "End Forget Phase", "Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.ErrorDetails", "Next": "Handle Forget Error" }] }, "End Forget Phase": { "Type": "Task", "Parameters": { "EventName": "ForgetPhaseEnded", "EventData.$": "$$.State.EnteredTime", "EmitterId": "StepFunctions", "JobId.$": "$$.Execution.Name" }, "Resource": "${EmitEvent.Arn}", "ResultPath": null, "End": true }, "Handle Error": { "Type": "Pass", "Parameters": { "EventName": "Exception", "Error.$": "$.ErrorDetails.Error", "Cause.$": "$.ErrorDetails.Cause", "State.$": "$" }, "Next": "Emit Error" }, "Handle Find Error": { "Type": "Pass", "Parameters": { "EventName": "FindPhaseFailed", "Error.$": "$.ErrorDetails.Error", "Cause.$": "$.ErrorDetails.Cause", "State.$": "$" }, "Next": "Emit Error" }, "Handle Forget Error": { "Type": "Pass", "Parameters": { "EventName": "ForgetPhaseFailed", "Error.$": "$.ErrorDetails.Error", "Cause.$": "$.ErrorDetails.Cause", "State.$": "$" }, "Next": "Emit Error" }, "Emit Error": { "Comment": "Emit the generic failure event", "Type": "Task", "Parameters": { "EventName.$": "$.EventName", "EventData.$": "$", "EmitterId": "StepFunctions", "JobId.$": "$$.Execution.Name" }, "Resource": "${EmitEvent.Arn}", "ResultPath": null, "End": true } } } RoleArn: !GetAtt StatesExecutionRole.Arn # Supporting Resources QueryQueue: Type: AWS::SQS::Queue Properties: KmsMasterKeyId: alias/aws/sqs VisibilityTimeout: 43200 # Tasks CheckQueueSize: Type: AWS::Serverless::Function Properties: Handler: check_queue_size.handler CodeUri: ../backend/lambdas/tasks/ Policies: - Statement: - Action: - "sqs:GetQueueAttributes" Effect: "Allow" Resource: - !GetAtt QueryQueue.Arn - !Sub - arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:${QueueName} - QueueName: !Select [4, !Split ["/", !Ref DeleteQueueUrl]] CheckTaskCount: Type: AWS::Serverless::Function Properties: Handler: check_task_count.handler CodeUri: ../backend/lambdas/tasks/ Policies: - Statement: - Action: - "ecs:DescribeServices" Effect: "Allow" Resource: - !Sub arn:${AWS::Partition}:ecs:${AWS::Region}:${AWS::AccountId}:service/${ECSCluster}/${DeleteServiceName} ExecuteQuery: Type: AWS::Serverless::Function Properties: Handler: execute_query.handler CodeUri: ../backend/lambdas/tasks/ Environment: Variables: WorkGroup: !Ref AthenaWorkGroup Policies: - S3ReadPolicy: BucketName: !Ref ManifestsBucket - S3CrudPolicy: BucketName: !Ref ResultBucket - Statement: - Action: - "glue:BatchGetPartition" - "glue:GetDatabase*" - "glue:GetPartition*" - "glue:GetTable*" Effect: "Allow" Resource: - !Sub "arn:${AWS::Partition}:glue:*:*:catalog*" - !Sub "arn:${AWS::Partition}:glue:*:*:database*" - !Sub "arn:${AWS::Partition}:glue:*:*:table*" - !Sub "arn:${AWS::Partition}:glue:*:*:partition*" - Action: - "athena:StartQueryExecution" Effect: "Allow" Resource: !Sub "arn:${AWS::Partition}:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroup}" - Action: - "s3:GetObject*" - "s3:ListBucket*" - "lakeformation:GetDataAccess" Effect: "Allow" Resource: "*" Condition: ForAnyValue:StringEquals: aws:CalledVia: - !Sub athena.${AWS::URLSuffix} - Action: - "kms:Encrypt" - "kms:Decrypt" - "kms:ReEncrypt*" - "kms:GenerateDataKey*" - "kms:DescribeKey" Effect: "Allow" NotResource: !Sub "arn:${AWS::Partition}:kms:${AWS::Region}:${AWS::AccountId}:key/*" Condition: ForAnyValue:StringEquals: aws:CalledVia: - !Sub athena.${AWS::URLSuffix} CheckQueryStatus: Type: AWS::Serverless::Function Properties: Handler: check_query_status.handler CodeUri: ../backend/lambdas/tasks/ Policies: - Statement: - Action: - "athena:GetQueryExecution" Effect: "Allow" Resource: !Sub "arn:${AWS::Partition}:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroup}" SubmitQueryResults: Type: AWS::Serverless::Function Properties: Handler: submit_query_results.handler CodeUri: ../backend/lambdas/tasks/ MemorySize: 512 Environment: Variables: QueueUrl: !Ref DeleteQueueUrl Policies: - S3ReadPolicy: BucketName: !Ref ResultBucket - Statement: - Action: - "athena:GetQueryResults" Effect: "Allow" Resource: !Sub "arn:${AWS::Partition}:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroup}" - Action: - "sqs:SendMessage" - "sqs:GetQueueAttributes" Effect: "Allow" Resource: - !GetAtt QueryQueue.Arn - !Sub - arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:${QueueName} - QueueName: !Select [4, !Split ["/", !Ref DeleteQueueUrl]] GenerateQueries: Type: AWS::Serverless::Function Properties: Handler: generate_queries.handler CodeUri: ../backend/lambdas/tasks/ MemorySize: 512 Environment: Variables: QueryQueue: !Ref QueryQueue Policies: - S3CrudPolicy: BucketName: !Ref ManifestsBucket - DynamoDBReadPolicy: TableName: !Ref JobTableName - DynamoDBReadPolicy: TableName: !Ref DataMapperTableName - DynamoDBReadPolicy: TableName: !Ref DeletionQueueTableName - Statement: - Action: - "glue:BatchGetPartition" - "glue:GetDatabase*" - "glue:GetPartition*" - "glue:GetTable*" Effect: "Allow" Resource: - !Sub "arn:${AWS::Partition}:glue:*:*:catalog*" - !Sub "arn:${AWS::Partition}:glue:*:*:database*" - !Sub "arn:${AWS::Partition}:glue:*:*:table*" - !Sub "arn:${AWS::Partition}:glue:*:*:partition*" - Action: - "glue:BatchCreatePartition" Effect: "Allow" Resource: - !Sub "arn:${AWS::Partition}:glue:*:*:catalog*" - !Sub "arn:${AWS::Partition}:glue:*:*:database/${GlueDatabase}" - !Sub "arn:${AWS::Partition}:glue:*:*:table/${GlueDatabase}/${JobManifestsGlueTable}" - Effect: Allow Action: - "sqs:SendMessage*" - "sqs:GetQueueAttributes" Resource: - !GetAtt QueryQueue.Arn OrchestrateECSServiceScaling: Type: AWS::Serverless::Function Properties: Handler: orchestrate_ecs_service_scaling.handler CodeUri: ../backend/lambdas/tasks/ Policies: - Statement: - Action: - "ecs:UpdateService" Effect: "Allow" Resource: !Sub "arn:${AWS::Partition}:ecs:${AWS::Region}:${AWS::AccountId}:service/${ECSCluster}/${DeleteServiceName}" WorkQueryQueue: Type: AWS::Serverless::Function Properties: Handler: work_query_queue.handler CodeUri: ../backend/lambdas/tasks/ Environment: Variables: StateMachineArn: !Sub "arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${StateMachinePrefix}-AthenaStateMachine" QueueUrl: !Ref QueryQueue Policies: - S3CrudPolicy: BucketName: !Ref ResultBucket - Statement: - Action: - "states:StartExecution" - "states:DescribeExecution" Effect: Allow Resource: - !Sub "arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${StateMachinePrefix}-AthenaStateMachine" - !Sub "arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:execution:${StateMachinePrefix}-AthenaStateMachine:*" - Action: - "sqs:ReceiveMessage*" - "sqs:ChangeMessageVisibility" - "sqs:PurgeQueue" - "sqs:DeleteMessage" - "sqs:GetQueueAttributes" Effect: "Allow" Resource: - !GetAtt QueryQueue.Arn DeleteQueueMessage: Type: AWS::Serverless::Function Properties: Handler: delete_message.handler CodeUri: ../backend/lambdas/tasks/ Environment: Variables: QueueUrl: !Ref QueryQueue Policies: - Statement: - Action: - "sqs:DeleteMessage" - "sqs:GetQueueAttributes" Effect: "Allow" Resource: !GetAtt QueryQueue.Arn PurgeQueue: Type: AWS::Serverless::Function Properties: Handler: purge_queue.handler CodeUri: ../backend/lambdas/tasks/ Policies: - Statement: - Action: - "sqs:PurgeQueue" - "sqs:GetQueueAttributes" Effect: "Allow" Resource: - !GetAtt QueryQueue.Arn - !Sub - arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:${QueueName} - QueueName: !Select [4, !Split ["/", !Ref DeleteQueueUrl]] EmitEvent: Type: AWS::Serverless::Function Properties: Handler: emit_event.handler CodeUri: ../backend/lambdas/tasks/ Policies: - DynamoDBCrudPolicy: TableName: !Ref JobTableName EventRule: Type: AWS::Events::Rule Properties: Description: "Event rule for emitting unknown Step Functions failures" EventPattern: source: - "aws.states" detail: stateMachineArn: - !Sub arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${StateMachinePrefix}-StateMachine status: - "ABORTED" - "TIMED_OUT" - "FAILED" State: "ENABLED" Targets: - Arn: !GetAtt EmitEvent.Arn Id: "TargetEmitEvent" InputTransformer: InputPathsMap: job_id: "$.detail.name" status: "$.detail.status" InputTemplate: '{"JobId": , "EventName": "Exception", "EventData": {"Error": , "Cause": "State Machine execution exited unexpectedly"}, "EmitterId": "CloudWatchEvents"}' PermissionForEventsToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: !Ref EmitEvent Action: "lambda:InvokeFunction" Principal: !Sub "events.${AWS::URLSuffix}" SourceArn: !GetAtt EventRule.Arn Outputs: AthenaExecutionRole: Value: !Ref ExecuteQueryRole AthenaExecutionRoleArn: Value: !GetAtt ExecuteQueryRole.Arn AthenaStateMachineArn: Value: !Ref AthenaStateMachine GenerateQueriesRole: Value: !Ref GenerateQueriesRole QueryQueueUrl: Value: !Ref QueryQueue StateMachineArn: Value: !Ref StateMachine StateMachineRoleArn: Value: !GetAtt StatesExecutionRole.Arn