1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027 |
- AWSTemplateFormatVersion: "2010-09-09"
- Transform: AWS::Serverless-2016-10-31
- Description: Amazon S3 Find and Forget State Machine
- Globals:
- Function:
- Runtime: python3.9
- Timeout: 900
- Tracing: Active
- Layers: !Ref CommonLayers
- Environment:
- Variables:
- DataMapperTable: !Ref DataMapperTableName
- DeletionQueueTable: !Ref DeletionQueueTableName
- GlueDatabase: !Ref GlueDatabase
- LogLevel: !Ref LogLevel
- JobManifestsGlueTable: !Ref JobManifestsGlueTable
- JobTable: !Ref JobTableName
- StateBucket: !Ref ResultBucket
- ManifestsBucket: !Ref ManifestsBucket
- Parameters:
- AthenaWorkGroup:
- Description: WorkGroup to use for Athena queries
- Type: String
- Default: primary
- CommonLayers:
- Type: CommaDelimitedList
- Description: Common layers supplied to all functions
- DataMapperTableName:
- Description: Table name for Data Mapper Table
- Type: String
- DeleteQueueUrl:
- Type: String
- DeleteServiceName:
- Type: String
- DeletionQueueTableName:
- Type: String
- ECSCluster:
- Type: String
- GlueDatabase:
- Type: String
- JobTableName:
- Description: Table name for Jobs Table
- Type: String
- LogLevel:
- Type: String
- Default: INFO
- AllowedValues:
- - CRITICAL
- - FATAL
- - ERROR
- - WARNING
- - INFO
- - DEBUG
- - NOTSET
- JobManifestsGlueTable:
- Type: String
- ManifestsBucket:
- Type: String
- ResultBucket:
- Type: String
- StateMachinePrefix:
- Type: String
- Resources:
- StatesExecutionRole:
- Type: "AWS::IAM::Role"
- Properties:
- AssumeRolePolicyDocument:
- Version: "2012-10-17"
- Statement:
- - Effect: "Allow"
- Principal:
- Service:
- - !Sub states.${AWS::Region}.amazonaws.com
- Action: "sts:AssumeRole"
- Path: "/"
- Policies:
- - PolicyName: StatesExecutionPolicy
- PolicyDocument:
- Version: "2012-10-17"
- Statement:
- - Effect: Allow
- Action: "lambda:InvokeFunction"
- Resource:
- - !GetAtt CheckQueueSize.Arn
- - !GetAtt ExecuteQuery.Arn
- - !GetAtt CheckQueryStatus.Arn
- - !GetAtt CheckTaskCount.Arn
- - !GetAtt SubmitQueryResults.Arn
- - !GetAtt GenerateQueries.Arn
- - !GetAtt OrchestrateECSServiceScaling.Arn
- - !GetAtt WorkQueryQueue.Arn
- - !GetAtt DeleteQueueMessage.Arn
- - !GetAtt PurgeQueue.Arn
- - !GetAtt EmitEvent.Arn
- - Effect: Allow
- Action:
- - "events:PutTargets"
- - "events:PutRule"
- - "events:DescribeRule"
- Resource: !Sub arn:${AWS::Partition}:events:${AWS::Region}:${AWS::AccountId}:rule/*
- - Effect: Allow
- Action: "states:ListStateMachines"
- Resource: "*"
- - Effect: Allow
- Action:
- - "states:DescribeExecution"
- - "states:DescribeStateMachineForExecution"
- Resource: !Sub "arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:execution:${StateMachinePrefix}*:*"
- - Effect: Allow
- Action:
- - "states:DescribeStateMachine"
- - "states:ListExecutions"
- - "states:StartExecution"
- Resource: !Sub "arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${StateMachinePrefix}*"
- AthenaStateMachine:
- Type: AWS::StepFunctions::StateMachine
- Properties:
- StateMachineName: !Sub ${StateMachinePrefix}-AthenaStateMachine
- RoleArn: !GetAtt StatesExecutionRole.Arn
- DefinitionString: !Sub |-
- {
- "StartAt": "Execute Query",
- "States": {
- "Execute Query": {
- "Comment": "Start an Athena query asynchronously",
- "Type": "Task",
- "Parameters": {
- "QueryData.$": "$",
- "Bucket": "${ResultBucket}",
- "Prefix": "queries"
- },
- "Resource": "${ExecuteQuery.Arn}",
- "ResultPath": "$.QueryId",
- "Next": "Get Query Status",
- "Retry": [{
- "ErrorEquals": [ "States.ALL" ],
- "IntervalSeconds": 10,
- "BackoffRate": 10,
- "MaxAttempts": 2
- }],
- "Catch": [{
- "ErrorEquals": ["States.ALL"],
- "ResultPath": "$.ErrorDetails",
- "Next": "Handle Error"
- }]
- },
- "Wait for Query": {
- "Comment": "Waits before checking again whether Athena is done",
- "Type": "Wait",
- "SecondsPath": "$.WaitDuration",
- "Next": "Get Query Status"
- },
- "Get Query Status": {
- "Comment": "Gets the status of the given Athena query",
- "Type": "Task",
- "Resource": "${CheckQueryStatus.Arn}",
- "Next": "Query Complete?",
- "Retry": [{
- "ErrorEquals": [ "States.ALL" ],
- "IntervalSeconds": 10,
- "BackoffRate": 10,
- "MaxAttempts": 2
- }],
- "Catch": [{
- "ErrorEquals": ["States.ALL"],
- "ResultPath": "$.ErrorDetails",
- "Next": "Handle Error"
- }]
- },
- "Query Complete?": {
- "Comment": "Check if the Athena query is still running",
- "Type": "Choice",
- "Choices": [
- {
- "Variable": "$.State",
- "StringEquals": "SUCCEEDED",
- "Next": "Submit Query Results"
- },
- {
- "Or": [
- {
- "Variable": "$.State",
- "StringEquals": "FAILED"
- },
- {
- "Variable": "$.State",
- "StringEquals": "CANCELLED"
- }
- ],
- "Next": "Retriable?"
- }
- ],
- "Default": "Wait for Query"
- },
- "Retriable?": {
- "Comment": "Check if the Athena query can be retried",
- "Type": "Choice",
- "Choices": [
- {
- "Variable": "$.ExecutionRetriesLeft",
- "NumericGreaterThan": 0,
- "Next": "Execute Query"
- }
- ],
- "Default": "Raise Query Error"
- },
- "Raise Query Error": {
- "Type": "Pass",
- "Parameters": {
- "Cause.$": "$.Reason",
- "Error": "Query Failed"
- },
- "ResultPath": "$.ErrorDetails",
- "Next": "Handle Error"
- },
- "Handle Error": {
- "Type": "Pass",
- "Parameters": {
- "Error.$": "$.ErrorDetails.Error",
- "Cause.$": "$.ErrorDetails.Cause",
- "State.$": "$"
- },
- "Next": "Emit Error"
- },
- "Emit Error": {
- "Comment": "Emit the failure event",
- "Type": "Task",
- "Parameters": {
- "EventName": "QueryFailed",
- "EventData.$": "$",
- "EmitterId": "StepFunctions",
- "JobId.$": "$.State.JobId"
- },
- "Resource": "${EmitEvent.Arn}",
- "ResultPath": null,
- "Next": "Query Failed"
- },
- "Query Failed": {
- "Comment": "The query was unsuccessful",
- "Type": "Fail"
- },
- "Submit Query Results": {
- "Comment": "Obtain the query results from S3 and send them to Fargate",
- "Type": "Task",
- "Resource": "${SubmitQueryResults.Arn}",
- "ResultPath": null,
- "Next": "Query Succeeded",
- "Retry": [{
- "ErrorEquals": [ "States.ALL" ],
- "MaxAttempts": 0
- }],
- "Catch": [{
- "ErrorEquals": ["States.ALL"],
- "ResultPath": "$.ErrorDetails",
- "Next": "Handle Error"
- }]
- },
- "Query Succeeded": {
- "Comment": "The query was successful",
- "Type": "Pass",
- "Parameters": {
- "JobId.$": "$.JobId",
- "QueryId.$": "$.QueryId",
- "PartitionKeys.$": "$.PartitionKeys",
- "Statistics.$": "$.Statistics",
- "DataMapperId.$": "$.DataMapperId",
- "Table.$": "$.Table"
- },
- "Next": "Emit Success"
- },
- "Emit Success": {
- "Comment": "Emit the successful query event",
- "Type": "Task",
- "Parameters": {
- "EventName": "QuerySucceeded",
- "EventData.$": "$",
- "EmitterId": "StepFunctions",
- "JobId.$": "$.JobId"
- },
- "Resource": "${EmitEvent.Arn}",
- "ResultPath": null,
- "End": true,
- "Retry": [{
- "ErrorEquals": [ "States.ALL" ],
- "IntervalSeconds": 3,
- "BackoffRate": 1.5,
- "MaxAttempts": 1
- }]
- }
- }
- }
- DeleteStateMachine:
- Type: AWS::StepFunctions::StateMachine
- Properties:
- StateMachineName: !Sub ${StateMachinePrefix}-DeletionStateMachine
- RoleArn: !GetAtt StatesExecutionRole.Arn
- DefinitionString: !Sub |-
- {
- "StartAt": "Fetch Queue Size",
- "States": {
- "Fetch Queue Size": {
- "Comment": "Checks the number of messages in the Object Deletion Queue",
- "Type": "Task",
- "Resource": "${CheckQueueSize.Arn}",
- "Parameters": {
- "QueueUrl": "${DeleteQueueUrl}"
- },
- "ResultPath": "$.Queue",
- "Next": "Adjust Deletion Service Instance Count",
- "Retry": [{
- "ErrorEquals": [ "States.ALL" ],
- "IntervalSeconds": 10,
- "MaxAttempts": 1
- }]
- },
- "Adjust Deletion Service Instance Count": {
- "Comment": "Sets the desired instance count based on Object Deletion Queue size",
- "Type": "Task",
- "Resource": "${OrchestrateECSServiceScaling.Arn}",
- "Parameters": {
- "Cluster": "${ECSCluster}",
- "DeleteService": "${DeleteServiceName}",
- "DeletionTasksMaxNumber.$": "$.DeletionTasksMaxNumber",
- "QueueSize.$": "$.Queue.Total"
- },
- "ResultPath": "$.DesiredCount",
- "Next": "Items in Queue?",
- "Retry": [{
- "ErrorEquals": [ "States.ALL" ],
- "IntervalSeconds": 10,
- "MaxAttempts": 1
- }]
- },
- "Items in Queue?": {
- "Comment": "Checks if any tasks are being created/terminated",
- "Type": "Choice",
- "Choices": [
- {
- "Variable": "$.DesiredCount",
- "NumericEquals": 0,
- "Next": "Fetch Task Count"
- },
- {
- "Variable": "$.DesiredCount",
- "NumericGreaterThan": 0,
- "Next": "Wait"
- }
- ],
- "Default": "Not sure, fail"
- },
- "Wait": {
- "Comment": "Waits before checking if the Object Deletion Queue is empty",
- "Type": "Wait",
- "SecondsPath": "$.WaitDuration",
- "Next": "Fetch Queue Size Again"
- },
- "Fetch Queue Size Again": {
- "Comment": "Checks the number of messages in the Object Deletion Queue",
- "Type": "Task",
- "Resource": "${CheckQueueSize.Arn}",
- "Parameters": {
- "QueueUrl": "${DeleteQueueUrl}"
- },
- "ResultPath": "$.Queue",
- "Next": "Queue is Empty?",
- "Retry": [{
- "ErrorEquals": [ "States.ALL" ],
- "IntervalSeconds": 10,
- "MaxAttempts": 1
- }]
- },
- "Queue is Empty?": {
- "Comment": "Checks if the Total messages are 0",
- "Type": "Choice",
- "Choices": [
- {
- "Variable": "$.Queue.Total",
- "NumericEquals": 0,
- "Next": "Adjust Deletion Service Instance Count"
- },
- {
- "Variable": "$.Queue.Total",
- "NumericGreaterThan": 0,
- "Next": "Wait"
- }
- ],
- "Default": "Not sure, fail"
- },
- "Fetch Task Count": {
- "Comment": "Checks the number of tasks for a service",
- "Type": "Task",
- "Resource": "${CheckTaskCount.Arn}",
- "Parameters": {
- "Cluster": "${ECSCluster}",
- "ServiceName": "${DeleteServiceName}"
- },
- "ResultPath": "$.TaskCount",
- "Next": "Has Fargate Shutdown?",
- "Retry": [{
- "ErrorEquals": [ "States.ALL" ],
- "IntervalSeconds": 10,
- "MaxAttempts": 1
- }]
- },
- "Has Fargate Shutdown?": {
- "Comment": "Checks if the remaining tasks is 0",
- "Type": "Choice",
- "Choices": [
- {
- "Variable": "$.TaskCount.Total",
- "NumericEquals": 0,
- "Next": "Done"
- },
- {
- "Variable": "$.TaskCount.Total",
- "NumericGreaterThan": 0,
- "Next": "Wait for Fargate Shutdown"
- }
- ],
- "Default": "Not sure, fail"
- },
- "Wait for Fargate Shutdown": {
- "Comment": "Waits before checking if Fargate has reached the desired count",
- "Type": "Wait",
- "Seconds": 10,
- "Next": "Fetch Task Count"
- },
- "Not sure, fail": {
- "Type": "Fail"
- },
- "Done": {
- "Type": "Succeed"
- }
- }
- }
- StateMachine:
- Type: AWS::StepFunctions::StateMachine
- Properties:
- StateMachineName: !Sub ${StateMachinePrefix}-StateMachine
- DefinitionString: !Sub |-
- {
- "Comment": "State machine for processing the S3 Find and Forget deletion queue.",
- "StartAt": "Start Job",
- "States": {
- "Start Job": {
- "Type": "Pass",
- "Parameters": {
- "ExecutionId.$": "$$.Execution.Id",
- "ExecutionName.$": "$$.Execution.Name",
- "AthenaConcurrencyLimit.$": "$.AthenaConcurrencyLimit",
- "AthenaQueryMaxRetries.$": "$.AthenaQueryMaxRetries",
- "DeletionTasksMaxNumber.$": "$.DeletionTasksMaxNumber",
- "ForgetQueueWaitSeconds.$": "$.ForgetQueueWaitSeconds",
- "QueryExecutionWaitSeconds.$": "$.QueryExecutionWaitSeconds",
- "QueryQueueWaitSeconds.$": "$.QueryQueueWaitSeconds"
- },
- "Next": "Emit Job Started"
- },
- "Emit Job Started": {
- "Type": "Task",
- "Parameters": {
- "EventName": "JobStarted",
- "EventData.$": "$$.State.EnteredTime",
- "EmitterId": "StepFunctions",
- "JobId.$": "$$.Execution.Name"
- },
- "Resource": "${EmitEvent.Arn}",
- "ResultPath": null,
- "Next": "Purge Queues",
- "Catch": [{
- "ErrorEquals": ["States.ALL"],
- "ResultPath": "$.ErrorDetails",
- "Next": "Handle Error"
- }]
- },
- "Purge Queues": {
- "Comment": "Purges the query and object deletion queues. Purges wait 61 seconds before retrying to avoid exceeding the max purge attempt rate for SQS.",
- "Type": "Parallel",
- "Next": "Start Find Phase",
- "ResultPath": null,
- "Branches": [{
- "StartAt": "Purge Query Queue",
- "States": {
- "Purge Query Queue": {
- "Parameters": {
- "QueueUrl": "${QueryQueue}"
- },
- "Comment": "Purge the query queue",
- "Type": "Task",
- "Resource": "${PurgeQueue.Arn}",
- "End": true,
- "Retry": [{
- "ErrorEquals": [ "States.ALL" ],
- "IntervalSeconds": 61,
- "MaxAttempts": 1
- }]
- }
- }
- }, {
- "StartAt": "Purge Deletion Queue",
- "States": {
- "Purge Deletion Queue": {
- "Parameters": {
- "QueueUrl": "${DeleteQueueUrl}"
- },
- "Comment": "Purge the deletion queue.",
- "Type": "Task",
- "Resource": "${PurgeQueue.Arn}",
- "End": true,
- "Retry": [{
- "ErrorEquals": [ "States.ALL" ],
- "IntervalSeconds": 61,
- "MaxAttempts": 1
- }]
- }
- }
- }],
- "Catch": [{
- "ErrorEquals": ["States.ALL"],
- "ResultPath": "$.ErrorDetails",
- "Next": "Handle Error"
- }]
- },
- "Start Find Phase": {
- "Type": "Task",
- "Parameters": {
- "EventName": "FindPhaseStarted",
- "EventData.$": "$$.State.EnteredTime",
- "EmitterId": "StepFunctions",
- "JobId.$": "$$.Execution.Name"
- },
- "Resource": "${EmitEvent.Arn}",
- "Next": "Generate Queries",
- "ResultPath": null,
- "Catch": [{
- "ErrorEquals": ["States.ALL"],
- "ResultPath": "$.ErrorDetails",
- "Next": "Handle Error"
- }]
- },
- "Generate Queries": {
- "Comment": "Process each of the data mappers and populate the query queue",
- "Type": "Task",
- "Resource": "${GenerateQueries.Arn}",
- "ResultPath": "$.QueriesStats",
- "Next": "End Query Planning",
- "Catch": [{
- "ErrorEquals": ["States.ALL"],
- "ResultPath": "$.ErrorDetails",
- "Next": "Handle Find Error"
- }]
- },
- "End Query Planning": {
- "Type": "Task",
- "Parameters": {
- "EventName": "QueryPlanningComplete",
- "EventData.$": "$.QueriesStats",
- "EmitterId": "StepFunctions",
- "JobId.$": "$$.Execution.Name"
- },
- "Resource": "${EmitEvent.Arn}",
- "ResultPath": null,
- "Next": "Work Queue",
- "Catch": [{
- "ErrorEquals": ["States.ALL"],
- "ResultPath": "$.ErrorDetails",
- "Next": "Handle Find Error"
- }]
- },
- "Work Queue": {
- "Comment": "Works the Object Deletion Queue by starting Athena State Machine executions",
- "Type": "Task",
- "Resource": "${WorkQueryQueue.Arn}",
- "ResultPath": "$.RunningExecutions",
- "Next": "Outstanding Queries?",
- "Catch": [{
- "ErrorEquals": ["States.ALL"],
- "ResultPath": "$.ErrorDetails",
- "Next": "Handle Find Error"
- }]
- },
- "Outstanding Queries?": {
- "Comment": "Checks if any queries are yet to be ran or are in progress",
- "Type": "Choice",
- "Choices": [
- {
- "Variable": "$.RunningExecutions.Total",
- "NumericEquals": 0,
- "Next": "End Find Phase"
- },
- {
- "Variable": "$.RunningExecutions.Total",
- "NumericGreaterThan": 0,
- "Next": "Wait for Queries"
- }
- ],
- "Default": "Wait for Queries"
- },
- "Wait for Queries": {
- "Comment": "Waits before rechecking if the Object Deletion Queue is empty",
- "Type": "Wait",
- "SecondsPath": "$.QueryQueueWaitSeconds",
- "Next": "Work Queue"
- },
- "End Find Phase": {
- "Type": "Task",
- "Parameters": {
- "EventName": "FindPhaseEnded",
- "EventData.$": "$$.State.EnteredTime",
- "EmitterId": "StepFunctions",
- "JobId.$": "$$.Execution.Name"
- },
- "Resource": "${EmitEvent.Arn}",
- "ResultPath": null,
- "Next": "Start Forget Phase",
- "Catch": [{
- "ErrorEquals": ["States.ALL"],
- "ResultPath": "$.ErrorDetails",
- "Next": "Handle Error"
- }]
- },
- "Start Forget Phase": {
- "Type": "Task",
- "Parameters": {
- "EventName": "ForgetPhaseStarted",
- "EventData.$": "$$.State.EnteredTime",
- "EmitterId": "StepFunctions",
- "JobId.$": "$$.Execution.Name"
- },
- "Resource": "${EmitEvent.Arn}",
- "ResultPath": null,
- "Next": "Start Fargate Workflow",
- "Catch": [{
- "ErrorEquals": ["States.ALL"],
- "ResultPath": "$.ErrorDetails",
- "Next": "Handle Error"
- }]
- },
- "Start Fargate Workflow": {
- "Type":"Task",
- "Resource":"arn:${AWS::Partition}:states:::states:startExecution.sync",
- "Parameters":{
- "Input":{
- "AWS_STEP_FUNCTIONS_STARTED_BY_EXECUTION_ID.$": "$$.Execution.Id",
- "DeletionTasksMaxNumber.$": "$.DeletionTasksMaxNumber",
- "WaitDuration.$": "$.ForgetQueueWaitSeconds"
- },
- "StateMachineArn":"${DeleteStateMachine}",
- "Name.$": "$$.Execution.Name"
- },
- "ResultPath": null,
- "Next": "End Forget Phase",
- "Catch": [{
- "ErrorEquals": ["States.ALL"],
- "ResultPath": "$.ErrorDetails",
- "Next": "Handle Forget Error"
- }]
- },
- "End Forget Phase": {
- "Type": "Task",
- "Parameters": {
- "EventName": "ForgetPhaseEnded",
- "EventData.$": "$$.State.EnteredTime",
- "EmitterId": "StepFunctions",
- "JobId.$": "$$.Execution.Name"
- },
- "Resource": "${EmitEvent.Arn}",
- "ResultPath": null,
- "End": true
- },
- "Handle Error": {
- "Type": "Pass",
- "Parameters": {
- "EventName": "Exception",
- "Error.$": "$.ErrorDetails.Error",
- "Cause.$": "$.ErrorDetails.Cause",
- "State.$": "$"
- },
- "Next": "Emit Error"
- },
- "Handle Find Error": {
- "Type": "Pass",
- "Parameters": {
- "EventName": "FindPhaseFailed",
- "Error.$": "$.ErrorDetails.Error",
- "Cause.$": "$.ErrorDetails.Cause",
- "State.$": "$"
- },
- "Next": "Emit Error"
- },
- "Handle Forget Error": {
- "Type": "Pass",
- "Parameters": {
- "EventName": "ForgetPhaseFailed",
- "Error.$": "$.ErrorDetails.Error",
- "Cause.$": "$.ErrorDetails.Cause",
- "State.$": "$"
- },
- "Next": "Emit Error"
- },
- "Emit Error": {
- "Comment": "Emit the generic failure event",
- "Type": "Task",
- "Parameters": {
- "EventName.$": "$.EventName",
- "EventData.$": "$",
- "EmitterId": "StepFunctions",
- "JobId.$": "$$.Execution.Name"
- },
- "Resource": "${EmitEvent.Arn}",
- "ResultPath": null,
- "End": true
- }
- }
- }
- RoleArn: !GetAtt StatesExecutionRole.Arn
- # Supporting Resources
- QueryQueue:
- Type: AWS::SQS::Queue
- Properties:
- KmsMasterKeyId: alias/aws/sqs
- VisibilityTimeout: 43200
- # Tasks
- CheckQueueSize:
- Type: AWS::Serverless::Function
- Properties:
- Handler: check_queue_size.handler
- CodeUri: ../backend/lambdas/tasks/
- Policies:
- - Statement:
- - Action:
- - "sqs:GetQueueAttributes"
- Effect: "Allow"
- Resource:
- - !GetAtt QueryQueue.Arn
- - !Sub
- - arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:${QueueName}
- - QueueName: !Select [4, !Split ["/", !Ref DeleteQueueUrl]]
- CheckTaskCount:
- Type: AWS::Serverless::Function
- Properties:
- Handler: check_task_count.handler
- CodeUri: ../backend/lambdas/tasks/
- Policies:
- - Statement:
- - Action:
- - "ecs:DescribeServices"
- Effect: "Allow"
- Resource:
- - !Sub arn:${AWS::Partition}:ecs:${AWS::Region}:${AWS::AccountId}:service/${ECSCluster}/${DeleteServiceName}
- ExecuteQuery:
- Type: AWS::Serverless::Function
- Properties:
- Handler: execute_query.handler
- CodeUri: ../backend/lambdas/tasks/
- Environment:
- Variables:
- WorkGroup: !Ref AthenaWorkGroup
- Policies:
- - S3ReadPolicy:
- BucketName: !Ref ManifestsBucket
- - S3CrudPolicy:
- BucketName: !Ref ResultBucket
- - Statement:
- - Action:
- - "glue:BatchGetPartition"
- - "glue:GetDatabase*"
- - "glue:GetPartition*"
- - "glue:GetTable*"
- Effect: "Allow"
- Resource:
- - !Sub "arn:${AWS::Partition}:glue:*:*:catalog*"
- - !Sub "arn:${AWS::Partition}:glue:*:*:database*"
- - !Sub "arn:${AWS::Partition}:glue:*:*:table*"
- - !Sub "arn:${AWS::Partition}:glue:*:*:partition*"
- - Action:
- - "athena:StartQueryExecution"
- Effect: "Allow"
- Resource: !Sub "arn:${AWS::Partition}:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroup}"
- - Action:
- - "s3:GetObject*"
- - "s3:ListBucket*"
- - "lakeformation:GetDataAccess"
- Effect: "Allow"
- Resource: "*"
- Condition:
- ForAnyValue:StringEquals:
- aws:CalledVia:
- - !Sub athena.${AWS::URLSuffix}
- - Action:
- - "kms:Encrypt"
- - "kms:Decrypt"
- - "kms:ReEncrypt*"
- - "kms:GenerateDataKey*"
- - "kms:DescribeKey"
- Effect: "Allow"
- NotResource: !Sub "arn:${AWS::Partition}:kms:${AWS::Region}:${AWS::AccountId}:key/*"
- Condition:
- ForAnyValue:StringEquals:
- aws:CalledVia:
- - !Sub athena.${AWS::URLSuffix}
- CheckQueryStatus:
- Type: AWS::Serverless::Function
- Properties:
- Handler: check_query_status.handler
- CodeUri: ../backend/lambdas/tasks/
- Policies:
- - Statement:
- - Action:
- - "athena:GetQueryExecution"
- Effect: "Allow"
- Resource: !Sub "arn:${AWS::Partition}:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroup}"
- SubmitQueryResults:
- Type: AWS::Serverless::Function
- Properties:
- Handler: submit_query_results.handler
- CodeUri: ../backend/lambdas/tasks/
- MemorySize: 512
- Environment:
- Variables:
- QueueUrl: !Ref DeleteQueueUrl
- Policies:
- - S3ReadPolicy:
- BucketName: !Ref ResultBucket
- - Statement:
- - Action:
- - "athena:GetQueryResults"
- Effect: "Allow"
- Resource: !Sub "arn:${AWS::Partition}:athena:${AWS::Region}:${AWS::AccountId}:workgroup/${AthenaWorkGroup}"
- - Action:
- - "sqs:SendMessage"
- - "sqs:GetQueueAttributes"
- Effect: "Allow"
- Resource:
- - !GetAtt QueryQueue.Arn
- - !Sub
- - arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:${QueueName}
- - QueueName: !Select [4, !Split ["/", !Ref DeleteQueueUrl]]
- GenerateQueries:
- Type: AWS::Serverless::Function
- Properties:
- Handler: generate_queries.handler
- CodeUri: ../backend/lambdas/tasks/
- MemorySize: 512
- Environment:
- Variables:
- QueryQueue: !Ref QueryQueue
- Policies:
- - S3CrudPolicy:
- BucketName: !Ref ManifestsBucket
- - DynamoDBReadPolicy:
- TableName: !Ref JobTableName
- - DynamoDBReadPolicy:
- TableName: !Ref DataMapperTableName
- - DynamoDBReadPolicy:
- TableName: !Ref DeletionQueueTableName
- - Statement:
- - Action:
- - "glue:BatchGetPartition"
- - "glue:GetDatabase*"
- - "glue:GetPartition*"
- - "glue:GetTable*"
- Effect: "Allow"
- Resource:
- - !Sub "arn:${AWS::Partition}:glue:*:*:catalog*"
- - !Sub "arn:${AWS::Partition}:glue:*:*:database*"
- - !Sub "arn:${AWS::Partition}:glue:*:*:table*"
- - !Sub "arn:${AWS::Partition}:glue:*:*:partition*"
- - Action:
- - "glue:BatchCreatePartition"
- Effect: "Allow"
- Resource:
- - !Sub "arn:${AWS::Partition}:glue:*:*:catalog*"
- - !Sub "arn:${AWS::Partition}:glue:*:*:database/${GlueDatabase}"
- - !Sub "arn:${AWS::Partition}:glue:*:*:table/${GlueDatabase}/${JobManifestsGlueTable}"
- - Effect: Allow
- Action:
- - "sqs:SendMessage*"
- - "sqs:GetQueueAttributes"
- Resource:
- - !GetAtt QueryQueue.Arn
- OrchestrateECSServiceScaling:
- Type: AWS::Serverless::Function
- Properties:
- Handler: orchestrate_ecs_service_scaling.handler
- CodeUri: ../backend/lambdas/tasks/
- Policies:
- - Statement:
- - Action:
- - "ecs:UpdateService"
- Effect: "Allow"
- Resource: !Sub "arn:${AWS::Partition}:ecs:${AWS::Region}:${AWS::AccountId}:service/${ECSCluster}/${DeleteServiceName}"
- WorkQueryQueue:
- Type: AWS::Serverless::Function
- Properties:
- Handler: work_query_queue.handler
- CodeUri: ../backend/lambdas/tasks/
- Environment:
- Variables:
- StateMachineArn: !Sub "arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${StateMachinePrefix}-AthenaStateMachine"
- QueueUrl: !Ref QueryQueue
- Policies:
- - S3CrudPolicy:
- BucketName: !Ref ResultBucket
- - Statement:
- - Action:
- - "states:StartExecution"
- - "states:DescribeExecution"
- Effect: Allow
- Resource:
- - !Sub "arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${StateMachinePrefix}-AthenaStateMachine"
- - !Sub "arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:execution:${StateMachinePrefix}-AthenaStateMachine:*"
- - Action:
- - "sqs:ReceiveMessage*"
- - "sqs:ChangeMessageVisibility"
- - "sqs:PurgeQueue"
- - "sqs:DeleteMessage"
- - "sqs:GetQueueAttributes"
- Effect: "Allow"
- Resource:
- - !GetAtt QueryQueue.Arn
- DeleteQueueMessage:
- Type: AWS::Serverless::Function
- Properties:
- Handler: delete_message.handler
- CodeUri: ../backend/lambdas/tasks/
- Environment:
- Variables:
- QueueUrl: !Ref QueryQueue
- Policies:
- - Statement:
- - Action:
- - "sqs:DeleteMessage"
- - "sqs:GetQueueAttributes"
- Effect: "Allow"
- Resource: !GetAtt QueryQueue.Arn
- PurgeQueue:
- Type: AWS::Serverless::Function
- Properties:
- Handler: purge_queue.handler
- CodeUri: ../backend/lambdas/tasks/
- Policies:
- - Statement:
- - Action:
- - "sqs:PurgeQueue"
- - "sqs:GetQueueAttributes"
- Effect: "Allow"
- Resource:
- - !GetAtt QueryQueue.Arn
- - !Sub
- - arn:${AWS::Partition}:sqs:${AWS::Region}:${AWS::AccountId}:${QueueName}
- - QueueName: !Select [4, !Split ["/", !Ref DeleteQueueUrl]]
- EmitEvent:
- Type: AWS::Serverless::Function
- Properties:
- Handler: emit_event.handler
- CodeUri: ../backend/lambdas/tasks/
- Policies:
- - DynamoDBCrudPolicy:
- TableName: !Ref JobTableName
- EventRule:
- Type: AWS::Events::Rule
- Properties:
- Description: "Event rule for emitting unknown Step Functions failures"
- EventPattern:
- source:
- - "aws.states"
- detail:
- stateMachineArn:
- - !Sub arn:${AWS::Partition}:states:${AWS::Region}:${AWS::AccountId}:stateMachine:${StateMachinePrefix}-StateMachine
- status:
- - "ABORTED"
- - "TIMED_OUT"
- - "FAILED"
- State: "ENABLED"
- Targets:
- -
- Arn: !GetAtt EmitEvent.Arn
- Id: "TargetEmitEvent"
- InputTransformer:
- InputPathsMap:
- job_id: "$.detail.name"
- status: "$.detail.status"
- InputTemplate: '{"JobId": <job_id>, "EventName": "Exception", "EventData": {"Error": <status>, "Cause": "State Machine execution exited unexpectedly"}, "EmitterId": "CloudWatchEvents"}'
- PermissionForEventsToInvokeLambda:
- Type: AWS::Lambda::Permission
- Properties:
- FunctionName: !Ref EmitEvent
- Action: "lambda:InvokeFunction"
- Principal: !Sub "events.${AWS::URLSuffix}"
- SourceArn: !GetAtt EventRule.Arn
- Outputs:
- AthenaExecutionRole:
- Value: !Ref ExecuteQueryRole
- AthenaExecutionRoleArn:
- Value: !GetAtt ExecuteQueryRole.Arn
- AthenaStateMachineArn:
- Value: !Ref AthenaStateMachine
- GenerateQueriesRole:
- Value: !Ref GenerateQueriesRole
- QueryQueueUrl:
- Value: !Ref QueryQueue
- StateMachineArn:
- Value: !Ref StateMachine
- StateMachineRoleArn:
- Value: !GetAtt StatesExecutionRole.Arn
|