app.py 22 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384
  1. #!/usr/bin/env python3
  2. from aws_cdk import core
  3. import aws_cdk.aws_s3 as s3
  4. import aws_cdk.aws_s3_notifications as s3n
  5. import aws_cdk.aws_dynamodb as ddb
  6. import aws_cdk.aws_sqs as sqs
  7. import aws_cdk.aws_lambda as lam
  8. from aws_cdk.aws_lambda_event_sources import SqsEventSource
  9. import aws_cdk.aws_ssm as ssm
  10. import aws_cdk.aws_cloudwatch as cw
  11. import aws_cdk.aws_cloudwatch_actions as action
  12. import aws_cdk.aws_events as event
  13. import aws_cdk.aws_events_targets as target
  14. import aws_cdk.aws_sns as sns
  15. import aws_cdk.aws_sns_subscriptions as sub
  16. import aws_cdk.aws_logs as logs
  17. import aws_cdk.aws_apigateway as api
  18. from aws_cdk.core import CustomResource
  19. import aws_cdk.custom_resources as cr
  20. import json
  21. # Define bucket and prefix for Jobsender to compare
  22. bucket_para = [{
  23. "src_bucket": "huangzb-tokyo-video",
  24. "src_prefix": "",
  25. "des_bucket": "s3-migration-test-nx",
  26. "des_prefix": ""
  27. }]
  28. StorageClass = 'STANDARD'
  29. # Des_bucket_default is only used for non-jobsender use case, S3 trigger SQS then to Lambda.
  30. # There is no destination buckets information in SQS message in this case, so you need to setup Des_bucket_default
  31. Des_bucket_default = 's3-migration-test-nx'
  32. Des_prefix_default = 'from-jp'
  33. JobType = 'PUT'
  34. # 'PUT': Destination Bucket is not the same account as Lambda.
  35. # 'GET': Source bucket is not the same account as Lambda.
  36. MaxRetry = '20' # Max retry for requests
  37. MaxThread = '50' # Max threads per file
  38. MaxParallelFile = '1' # Recommend to be 1 in AWS Lambda
  39. JobTimeout = '870' # Timeout for each job, should be less than AWS Lambda timeout
  40. JobsenderCompareVersionId = 'False' # Jobsender should compare versioinId of source B3 bucket and versionId in DDB
  41. UpdateVersionId = 'False' # get lastest version id from s3 before before get object
  42. GetObjectWithVersionId = 'False' # get object together with the specified version id
  43. # Setup your alarm email
  44. alarm_email = "alarm_your_email@email.com"
  45. # The region credential (not the same account as Lambda) setting in SSM Parameter Store
  46. ssm_parameter_credentials = 's3_migration_credentials'
  47. '''
  48. BEFORE DEPLOY CDK, please setup a "s3_migration_credentials" secure parameter in ssm parameter store MANUALLY!
  49. This is the access_key which is not in the same account as ec2.
  50. For example, if ec2 running in Global, this is China Account access_key. Example as below:
  51. {
  52. "aws_access_key_id": "your_aws_access_key_id",
  53. "aws_secret_access_key": "your_aws_secret_access_key",
  54. "region": "cn-northwest-1"
  55. }
  56. CDK don not allow to deploy secure para, so you have to do it mannually
  57. And then in this template will assign ec2 role to access it.
  58. 请在部署CDK前,先在ssm parameter store手工创建一个名为 "s3_migration_credentials" 的 secure parameter:
  59. 这个是跟EC2不在一个Account体系下的另一个Account的access_key
  60. 例如EC2在Global,则这个是China Account access_key,反之EC2在中国,这就是Global Account
  61. CDK 不允许直接部署加密Key,所以你需要先去手工创建,然后在CDK中会赋予EC2角色有读取权限
  62. '''
  63. # Setup ignore files list, you can put the file name/wildcard for ignoring in this txt file:
  64. try:
  65. with open("s3_migration_ignore_list.txt", "r") as f:
  66. ignore_list = f.read()
  67. except Exception:
  68. ignore_list = ""
  69. class CdkResourceStack(core.Stack):
  70. def __init__(self, scope: core.Construct, _id: str, **kwargs) -> None:
  71. super().__init__(scope, _id, **kwargs)
  72. # Setup SSM parameter of credentials, bucket_para, ignore_list
  73. ssm_credential_para = ssm.StringParameter.from_secure_string_parameter_attributes(
  74. self, "ssm_parameter_credentials",
  75. parameter_name=ssm_parameter_credentials,
  76. version=1
  77. )
  78. ssm_bucket_para = ssm.StringParameter(self, "s3bucket_serverless",
  79. string_value=json.dumps(bucket_para, indent=4)
  80. )
  81. ssm_parameter_ignore_list = ssm.StringParameter(self, "s3_migrate_ignore_list",
  82. string_value=ignore_list)
  83. # Setup DynamoDB
  84. ddb_file_list = ddb.Table(self, "s3migrate_serverless",
  85. partition_key=ddb.Attribute(name="Key", type=ddb.AttributeType.STRING),
  86. billing_mode=ddb.BillingMode.PAY_PER_REQUEST)
  87. ddb_file_list.add_global_secondary_index(
  88. partition_key=ddb.Attribute(name="desBucket", type=ddb.AttributeType.STRING),
  89. index_name="desBucket-index",
  90. projection_type=ddb.ProjectionType.INCLUDE,
  91. non_key_attributes=["desKey", "versionId"]
  92. )
  93. # Setup SQS
  94. sqs_queue_DLQ = sqs.Queue(self, "s3migrate_serverless_Q_DLQ",
  95. visibility_timeout=core.Duration.minutes(15),
  96. retention_period=core.Duration.days(14)
  97. )
  98. sqs_queue = sqs.Queue(self, "s3migrate_serverless_Q",
  99. visibility_timeout=core.Duration.minutes(15),
  100. retention_period=core.Duration.days(14),
  101. dead_letter_queue=sqs.DeadLetterQueue(
  102. max_receive_count=60,
  103. queue=sqs_queue_DLQ
  104. )
  105. )
  106. # Setup API for Lambda to get IP address (for debug networking routing purpose)
  107. checkip = api.RestApi(self, "lambda-checkip-api",
  108. cloud_watch_role=True,
  109. deploy=True,
  110. description="For Lambda get IP address",
  111. default_integration=api.MockIntegration(
  112. integration_responses=[api.IntegrationResponse(
  113. status_code="200",
  114. response_templates={"application/json": "$context.identity.sourceIp"})
  115. ],
  116. request_templates={"application/json": '{"statusCode": 200}'}
  117. ),
  118. endpoint_types=[api.EndpointType.REGIONAL])
  119. checkip.root.add_method("GET", method_responses=[api.MethodResponse(
  120. status_code="200",
  121. response_models={"application/json": api.Model.EMPTY_MODEL}
  122. )])
  123. # Setup Lambda functions
  124. handler = lam.Function(self, "s3-migrate-worker",
  125. code=lam.Code.asset("./lambda"),
  126. handler="lambda_function_worker.lambda_handler",
  127. runtime=lam.Runtime.PYTHON_3_8,
  128. memory_size=1024,
  129. timeout=core.Duration.minutes(15),
  130. tracing=lam.Tracing.ACTIVE,
  131. environment={
  132. 'table_queue_name': ddb_file_list.table_name,
  133. 'Des_bucket_default': Des_bucket_default,
  134. 'Des_prefix_default': Des_prefix_default,
  135. 'StorageClass': StorageClass,
  136. 'checkip_url': checkip.url,
  137. 'ssm_parameter_credentials': ssm_parameter_credentials,
  138. 'JobType': JobType,
  139. 'MaxRetry': MaxRetry,
  140. 'MaxThread': MaxThread,
  141. 'MaxParallelFile': MaxParallelFile,
  142. 'JobTimeout': JobTimeout,
  143. 'UpdateVersionId': UpdateVersionId,
  144. 'GetObjectWithVersionId': GetObjectWithVersionId
  145. })
  146. handler_jobsender = lam.Function(self, "s3-migrate-jobsender",
  147. code=lam.Code.asset("./lambda"),
  148. handler="lambda_function_jobsender.lambda_handler",
  149. runtime=lam.Runtime.PYTHON_3_8,
  150. memory_size=1024,
  151. timeout=core.Duration.minutes(15),
  152. tracing=lam.Tracing.ACTIVE,
  153. environment={
  154. 'table_queue_name': ddb_file_list.table_name,
  155. 'StorageClass': StorageClass,
  156. 'checkip_url': checkip.url,
  157. 'sqs_queue': sqs_queue.queue_name,
  158. 'ssm_parameter_credentials': ssm_parameter_credentials,
  159. 'ssm_parameter_ignore_list': ssm_parameter_ignore_list.parameter_name,
  160. 'ssm_parameter_bucket': ssm_bucket_para.parameter_name,
  161. 'JobType': JobType,
  162. 'MaxRetry': MaxRetry,
  163. 'JobsenderCompareVersionId': JobsenderCompareVersionId
  164. })
  165. # Allow lambda read/write DDB, SQS
  166. ddb_file_list.grant_read_write_data(handler)
  167. ddb_file_list.grant_read_write_data(handler_jobsender)
  168. sqs_queue.grant_send_messages(handler_jobsender)
  169. # SQS trigger Lambda worker
  170. handler.add_event_source(SqsEventSource(sqs_queue, batch_size=1))
  171. # Option1: Create S3 Bucket, all new objects in this bucket will be transmitted by Lambda Worker
  172. s3bucket = s3.Bucket(self, "s3_new_migrate")
  173. s3bucket.grant_read(handler)
  174. s3bucket.add_event_notification(s3.EventType.OBJECT_CREATED,
  175. s3n.SqsDestination(sqs_queue))
  176. # Option2: Allow Exist S3 Buckets to be read by Lambda functions.
  177. # Lambda Jobsender will scan and compare the these buckets and trigger Lambda Workers to transmit
  178. bucket_name = ''
  179. for b in bucket_para:
  180. if bucket_name != b['src_bucket']: # 如果列了多个相同的Bucket,就跳过
  181. bucket_name = b['src_bucket']
  182. s3exist_bucket = s3.Bucket.from_bucket_name(self,
  183. bucket_name, # 用这个做id
  184. bucket_name=bucket_name)
  185. if JobType == 'PUT':
  186. s3exist_bucket.grant_read(handler_jobsender)
  187. s3exist_bucket.grant_read(handler)
  188. else: # 'GET' mode
  189. s3exist_bucket.grant_read_write(handler_jobsender)
  190. s3exist_bucket.grant_read_write(handler)
  191. # Allow Lambda read ssm parameters
  192. ssm_bucket_para.grant_read(handler_jobsender)
  193. ssm_credential_para.grant_read(handler)
  194. ssm_credential_para.grant_read(handler_jobsender)
  195. ssm_parameter_ignore_list.grant_read(handler_jobsender)
  196. # Schedule cron event to trigger Lambda Jobsender per hour:
  197. event.Rule(self, 'cron_trigger_jobsender',
  198. schedule=event.Schedule.rate(core.Duration.hours(1)),
  199. targets=[target.LambdaFunction(handler_jobsender)])
  200. # TODO: Trigger event imediately, add custom resource lambda to invoke handler_jobsender
  201. # Create Lambda logs filter to create network traffic metric
  202. handler.log_group.add_metric_filter("Completed-bytes",
  203. metric_name="Completed-bytes",
  204. metric_namespace="s3_migrate",
  205. metric_value="$bytes",
  206. filter_pattern=logs.FilterPattern.literal(
  207. '[info, date, sn, p="--->Complete", bytes, key]'))
  208. handler.log_group.add_metric_filter("Uploading-bytes",
  209. metric_name="Uploading-bytes",
  210. metric_namespace="s3_migrate",
  211. metric_value="$bytes",
  212. filter_pattern=logs.FilterPattern.literal(
  213. '[info, date, sn, p="--->Uploading", bytes, key]'))
  214. handler.log_group.add_metric_filter("Downloading-bytes",
  215. metric_name="Downloading-bytes",
  216. metric_namespace="s3_migrate",
  217. metric_value="$bytes",
  218. filter_pattern=logs.FilterPattern.literal(
  219. '[info, date, sn, p="--->Downloading", bytes, key]'))
  220. handler.log_group.add_metric_filter("MaxMemoryUsed",
  221. metric_name="MaxMemoryUsed",
  222. metric_namespace="s3_migrate",
  223. metric_value="$memory",
  224. filter_pattern=logs.FilterPattern.literal(
  225. '[head="REPORT", a1, a2, a3, a4, a5, a6, a7, a8, a9, a10, a11, a12, '
  226. 'a13, a14, a15, a16, memory, MB="MB", rest]'))
  227. lambda_metric_Complete = cw.Metric(namespace="s3_migrate",
  228. metric_name="Completed-bytes",
  229. statistic="Sum",
  230. period=core.Duration.minutes(1))
  231. lambda_metric_Upload = cw.Metric(namespace="s3_migrate",
  232. metric_name="Uploading-bytes",
  233. statistic="Sum",
  234. period=core.Duration.minutes(1))
  235. lambda_metric_Download = cw.Metric(namespace="s3_migrate",
  236. metric_name="Downloading-bytes",
  237. statistic="Sum",
  238. period=core.Duration.minutes(1))
  239. lambda_metric_MaxMemoryUsed = cw.Metric(namespace="s3_migrate",
  240. metric_name="MaxMemoryUsed",
  241. statistic="Maximum",
  242. period=core.Duration.minutes(1))
  243. handler.log_group.add_metric_filter("ERROR",
  244. metric_name="ERROR-Logs",
  245. metric_namespace="s3_migrate",
  246. metric_value="1",
  247. filter_pattern=logs.FilterPattern.literal(
  248. '"ERROR"'))
  249. handler.log_group.add_metric_filter("WARNING",
  250. metric_name="WARNING-Logs",
  251. metric_namespace="s3_migrate",
  252. metric_value="1",
  253. filter_pattern=logs.FilterPattern.literal(
  254. '"WARNING"'))
  255. # Task timed out
  256. handler.log_group.add_metric_filter("TIMEOUT",
  257. metric_name="TIMEOUT-Logs",
  258. metric_namespace="s3_migrate",
  259. metric_value="1",
  260. filter_pattern=logs.FilterPattern.literal(
  261. '"Task timed out"'))
  262. log_metric_ERROR = cw.Metric(namespace="s3_migrate",
  263. metric_name="ERROR-Logs",
  264. statistic="Sum",
  265. period=core.Duration.minutes(1))
  266. log_metric_WARNING = cw.Metric(namespace="s3_migrate",
  267. metric_name="WARNING-Logs",
  268. statistic="Sum",
  269. period=core.Duration.minutes(1))
  270. log_metric_TIMEOUT = cw.Metric(namespace="s3_migrate",
  271. metric_name="TIMEOUT-Logs",
  272. statistic="Sum",
  273. period=core.Duration.minutes(1))
  274. # Dashboard to monitor SQS and Lambda
  275. board = cw.Dashboard(self, "s3_migrate_serverless")
  276. board.add_widgets(cw.GraphWidget(title="Lambda-NETWORK",
  277. left=[lambda_metric_Download, lambda_metric_Upload, lambda_metric_Complete]),
  278. cw.GraphWidget(title="Lambda-concurrent",
  279. left=[handler.metric(metric_name="ConcurrentExecutions",
  280. period=core.Duration.minutes(1)
  281. )]),
  282. cw.GraphWidget(title="Lambda-invocations/errors/throttles",
  283. left=[handler.metric_invocations(period=core.Duration.minutes(1)),
  284. handler.metric_errors(period=core.Duration.minutes(1)),
  285. handler.metric_throttles(period=core.Duration.minutes(1))]),
  286. cw.GraphWidget(title="Lambda-duration",
  287. left=[handler.metric_duration(period=core.Duration.minutes(1))]),
  288. )
  289. board.add_widgets(cw.GraphWidget(title="Lambda_MaxMemoryUsed(MB)",
  290. left=[lambda_metric_MaxMemoryUsed]),
  291. cw.GraphWidget(title="ERROR/WARNING Logs",
  292. left=[log_metric_ERROR],
  293. right=[log_metric_WARNING, log_metric_TIMEOUT]),
  294. cw.GraphWidget(title="SQS-Jobs",
  295. left=[sqs_queue.metric_approximate_number_of_messages_visible(
  296. period=core.Duration.minutes(1)
  297. ),
  298. sqs_queue.metric_approximate_number_of_messages_not_visible(
  299. period=core.Duration.minutes(1)
  300. )]),
  301. cw.SingleValueWidget(title="Running/Waiting and Dead Jobs",
  302. metrics=[sqs_queue.metric_approximate_number_of_messages_not_visible(
  303. period=core.Duration.minutes(1)
  304. ),
  305. sqs_queue.metric_approximate_number_of_messages_visible(
  306. period=core.Duration.minutes(1)
  307. ),
  308. sqs_queue_DLQ.metric_approximate_number_of_messages_not_visible(
  309. period=core.Duration.minutes(1)
  310. ),
  311. sqs_queue_DLQ.metric_approximate_number_of_messages_visible(
  312. period=core.Duration.minutes(1)
  313. )],
  314. height=6)
  315. )
  316. # Alarm for queue - DLQ
  317. alarm_DLQ = cw.Alarm(self, "SQS_DLQ",
  318. metric=sqs_queue_DLQ.metric_approximate_number_of_messages_visible(),
  319. threshold=0,
  320. comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD,
  321. evaluation_periods=1,
  322. datapoints_to_alarm=1)
  323. alarm_topic = sns.Topic(self, "SQS queue-DLQ has dead letter")
  324. alarm_topic.add_subscription(subscription=sub.EmailSubscription(alarm_email))
  325. alarm_DLQ.add_alarm_action(action.SnsAction(alarm_topic))
  326. core.CfnOutput(self, "Dashboard", value="CloudWatch Dashboard name s3_migrate_serverless")
  327. # Alarm for queue empty, i.e. no visible message and no in-visible message
  328. # metric_all_message = cw.MathExpression(
  329. # expression="a + b",
  330. # label="empty_queue_expression",
  331. # using_metrics={
  332. # "a": sqs_queue.metric_approximate_number_of_messages_visible(),
  333. # "b": sqs_queue.metric_approximate_number_of_messages_not_visible()
  334. # }
  335. # )
  336. # alarm_0 = cw.Alarm(self, "SQSempty",
  337. # alarm_name="SQS queue empty-Serverless",
  338. # metric=metric_all_message,
  339. # threshold=0,
  340. # comparison_operator=cw.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD,
  341. # evaluation_periods=3,
  342. # datapoints_to_alarm=3,
  343. # treat_missing_data=cw.TreatMissingData.IGNORE
  344. # )
  345. # alarm_topic = sns.Topic(self, "SQS queue empty-Serverless")
  346. # alarm_topic.add_subscription(subscription=sub.EmailSubscription(alarm_email))
  347. # alarm_0.add_alarm_action(action.SnsAction(alarm_topic))
  348. ###############
  349. app = core.App()
  350. CdkResourceStack(app, "s3-migrate-serverless")
  351. app.synth()