app.py 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336
  1. #!/usr/bin/env python3
  2. from aws_cdk import core
  3. import aws_cdk.aws_s3 as s3
  4. import aws_cdk.aws_s3_notifications as s3n
  5. import aws_cdk.aws_dynamodb as ddb
  6. import aws_cdk.aws_sqs as sqs
  7. import aws_cdk.aws_lambda as lam
  8. from aws_cdk.aws_lambda_event_sources import SqsEventSource
  9. import aws_cdk.aws_ssm as ssm
  10. import aws_cdk.aws_cloudwatch as cw
  11. import aws_cdk.aws_cloudwatch_actions as action
  12. import aws_cdk.aws_events as event
  13. import aws_cdk.aws_events_targets as target
  14. import aws_cdk.aws_sns as sns
  15. import aws_cdk.aws_sns_subscriptions as sub
  16. import aws_cdk.aws_logs as logs
  17. import aws_cdk.aws_apigateway as api
  18. import json
  19. # Define bucket parameter before deploy CDK
  20. bucket_para = [{
  21. "src_bucket": "broad-references",
  22. "src_prefix": "",
  23. "des_bucket": "s3-migration-test-nx",
  24. "des_prefix": "broad-references-20200405b"
  25. }, {
  26. "src_bucket": "covid19-lake",
  27. "src_prefix": "",
  28. "des_bucket": "covid19-lake",
  29. "des_prefix": ""
  30. }]
  31. StorageClass = 'STANDARD'
  32. # Des_bucket_default is only used for non-jobsender senario: S3 trigger SQS then to Lambda,
  33. # There is no destination buckets information in SQS message in this case, so you need to setup Des_bucket_default
  34. Des_bucket_default = 's3-migration-test-nx'
  35. Des_prefix_default = 's3-migration-cdk-from-us'
  36. # The region credential (not the same account as Lambda) setting in SSM Parameter Store
  37. ssm_parameter_credentials = 's3_migration_credentials'
  38. '''
  39. BEFORE DEPLOY CDK, please setup a "s3_migration_credentials" secure parameter in ssm parameter store MANUALLY!
  40. This is the access_key which is not in the same account as ec2.
  41. For example, if ec2 running in Global, this is China Account access_key. Example as below:
  42. {
  43. "aws_access_key_id": "your_aws_access_key_id",
  44. "aws_secret_access_key": "your_aws_secret_access_key",
  45. "region": "cn-northwest-1"
  46. }
  47. CDK don not allow to deploy secure para, so you have to do it mannually
  48. And then in this template will assign ec2 role to access it.
  49. 请在部署CDK前,先在ssm parameter store手工创建一个名为 "s3_migration_credentials" 的 secure parameter:
  50. 这个是跟EC2不在一个Account体系下的另一个Account的access_key
  51. 例如EC2在Global,则这个是China Account access_key,反之EC2在中国,这就是Global Account
  52. CDK 不允许直接部署加密Key,所以你需要先去手工创建,然后在CDK中会赋予EC2角色有读取权限
  53. '''
  54. # Setup your alarm email
  55. alarm_email = "alarm_your_email@email.com"
  56. # Setup ignore files list, you can put the file name/wildcard for ignoring in this txt file:
  57. try:
  58. with open("s3_migration_ignore_list.txt", "r") as f:
  59. ignore_list = f.read()
  60. except Exception:
  61. ignore_list = ""
  62. class CdkResourceStack(core.Stack):
  63. def __init__(self, scope: core.Construct, _id: str, **kwargs) -> None:
  64. super().__init__(scope, _id, **kwargs)
  65. # Setup SSM parameter of credentials, bucket_para, ignore_list
  66. ssm_credential_para = ssm.StringParameter.from_secure_string_parameter_attributes(
  67. self, "ssm_parameter_credentials",
  68. parameter_name=ssm_parameter_credentials,
  69. version=1
  70. )
  71. ssm_bucket_para = ssm.StringParameter(self, "s3bucket_serverless",
  72. string_value=json.dumps(bucket_para, indent=4)
  73. )
  74. ssm_parameter_ignore_list = ssm.StringParameter(self, "s3_migrate_ignore_list",
  75. string_value=ignore_list)
  76. # Setup DynamoDB
  77. ddb_file_list = ddb.Table(self, "s3migrate_serverless",
  78. partition_key=ddb.Attribute(name="Key", type=ddb.AttributeType.STRING),
  79. billing_mode=ddb.BillingMode.PAY_PER_REQUEST)
  80. # Setup SQS
  81. sqs_queue_DLQ = sqs.Queue(self, "s3migrate_serverless_Q_DLQ",
  82. visibility_timeout=core.Duration.minutes(15),
  83. retention_period=core.Duration.days(14)
  84. )
  85. sqs_queue = sqs.Queue(self, "s3migrate_serverless_Q",
  86. visibility_timeout=core.Duration.minutes(15),
  87. retention_period=core.Duration.days(14),
  88. dead_letter_queue=sqs.DeadLetterQueue(
  89. max_receive_count=3,
  90. queue=sqs_queue_DLQ
  91. )
  92. )
  93. # Setup API for Lambda to get IP address (for debug networking routing purpose)
  94. checkip = api.RestApi(self, "lambda-checkip-api",
  95. cloud_watch_role=True,
  96. deploy=True,
  97. description="For Lambda get IP address",
  98. default_integration=api.MockIntegration(
  99. integration_responses=[api.IntegrationResponse(
  100. status_code="200",
  101. response_templates={"application/json": "$context.identity.sourceIp"})
  102. ],
  103. request_templates={"application/json": '{"statusCode": 200}'}
  104. ),
  105. endpoint_types=[api.EndpointType.REGIONAL])
  106. checkip.root.add_method("GET", method_responses=[api.MethodResponse(
  107. status_code="200",
  108. response_models={"application/json": api.Model.EMPTY_MODEL}
  109. )])
  110. # Setup Lambda functions
  111. handler = lam.Function(self, "s3-migrate-worker",
  112. code=lam.Code.asset("./lambda"),
  113. handler="lambda_function_worker.lambda_handler",
  114. runtime=lam.Runtime.PYTHON_3_8,
  115. memory_size=1024,
  116. timeout=core.Duration.minutes(15),
  117. tracing=lam.Tracing.ACTIVE,
  118. environment={
  119. 'table_queue_name': ddb_file_list.table_name,
  120. 'Des_bucket_default': Des_bucket_default,
  121. 'Des_prefix_default': Des_prefix_default,
  122. 'StorageClass': StorageClass,
  123. 'checkip_url': checkip.url,
  124. 'ssm_parameter_credentials': ssm_parameter_credentials
  125. })
  126. handler_jobsender = lam.Function(self, "s3-migrate-jobsender",
  127. code=lam.Code.asset("./lambda"),
  128. handler="lambda_function_jobsender.lambda_handler",
  129. runtime=lam.Runtime.PYTHON_3_8,
  130. memory_size=1024,
  131. timeout=core.Duration.minutes(15),
  132. tracing=lam.Tracing.ACTIVE,
  133. environment={
  134. 'table_queue_name': ddb_file_list.table_name,
  135. 'StorageClass': StorageClass,
  136. 'checkip_url': checkip.url,
  137. 'sqs_queue': sqs_queue.queue_name,
  138. 'ssm_parameter_credentials': ssm_parameter_credentials,
  139. 'ssm_parameter_ignore_list': ssm_parameter_ignore_list.parameter_name,
  140. 'ssm_parameter_bucket': ssm_bucket_para.parameter_name
  141. })
  142. # Allow lambda read/write DDB, SQS
  143. ddb_file_list.grant_read_write_data(handler)
  144. ddb_file_list.grant_read_write_data(handler_jobsender)
  145. sqs_queue.grant_send_messages(handler_jobsender)
  146. # SQS trigger Lambda worker
  147. handler.add_event_source(SqsEventSource(sqs_queue, batch_size=1))
  148. # Option1: Create S3 Bucket, all new objects in this bucket will be transmitted by Lambda Worker
  149. s3bucket = s3.Bucket(self, "s3_new_migrate")
  150. s3bucket.grant_read(handler)
  151. s3bucket.add_event_notification(s3.EventType.OBJECT_CREATED,
  152. s3n.SqsDestination(sqs_queue))
  153. # Option2: Allow Exist S3 Buckets to be read by Lambda functions.
  154. # Lambda Jobsender will scan and compare the these buckets and trigger Lambda Workers to transmit
  155. bucket_name = ''
  156. for b in bucket_para:
  157. if bucket_name != b['src_bucket']: # 如果列了多个相同的Bucket,就跳过
  158. bucket_name = b['src_bucket']
  159. s3exist_bucket = s3.Bucket.from_bucket_name(self,
  160. bucket_name, # 用这个做id
  161. bucket_name=bucket_name)
  162. s3exist_bucket.grant_read(handler_jobsender)
  163. s3exist_bucket.grant_read(handler)
  164. # Allow Lambda read ssm parameters
  165. ssm_bucket_para.grant_read(handler_jobsender)
  166. ssm_credential_para.grant_read(handler)
  167. ssm_credential_para.grant_read(handler_jobsender)
  168. ssm_parameter_ignore_list.grant_read(handler_jobsender)
  169. # Schedule cron event to trigger Lambda Jobsender per hour:
  170. event.Rule(self, 'cron_trigger_jobsender',
  171. schedule=event.Schedule.rate(core.Duration.hours(1)),
  172. targets=[target.LambdaFunction(handler_jobsender)])
  173. # Create Lambda logs filter to create network traffic metric
  174. handler.log_group.add_metric_filter("Complete-bytes",
  175. metric_name="Complete-bytes",
  176. metric_namespace="s3_migrate",
  177. metric_value="$bytes",
  178. filter_pattern=logs.FilterPattern.literal(
  179. '[info, date, sn, p="--->Complete", bytes, key]'))
  180. handler.log_group.add_metric_filter("Uploading-bytes",
  181. metric_name="Uploading-bytes",
  182. metric_namespace="s3_migrate",
  183. metric_value="$bytes",
  184. filter_pattern=logs.FilterPattern.literal(
  185. '[info, date, sn, p="--->Uploading", bytes, key]'))
  186. handler.log_group.add_metric_filter("Downloading-bytes",
  187. metric_name="Downloading-bytes",
  188. metric_namespace="s3_migrate",
  189. metric_value="$bytes",
  190. filter_pattern=logs.FilterPattern.literal(
  191. '[info, date, sn, p="--->Downloading", bytes, key]'))
  192. lambda_metric_Complete = cw.Metric(namespace="s3_migrate",
  193. metric_name="Complete-bytes",
  194. statistic="Sum",
  195. period=core.Duration.minutes(1))
  196. lambda_metric_Upload = cw.Metric(namespace="s3_migrate",
  197. metric_name="Uploading-bytes",
  198. statistic="Sum",
  199. period=core.Duration.minutes(1))
  200. lambda_metric_Download = cw.Metric(namespace="s3_migrate",
  201. metric_name="Downloading-bytes",
  202. statistic="Sum",
  203. period=core.Duration.minutes(1))
  204. handler.log_group.add_metric_filter("ERROR",
  205. metric_name="ERROR-Logs",
  206. metric_namespace="s3_migrate",
  207. metric_value="1",
  208. filter_pattern=logs.FilterPattern.literal(
  209. '"ERROR"'))
  210. handler.log_group.add_metric_filter("WARNING",
  211. metric_name="WARNING-Logs",
  212. metric_namespace="s3_migrate",
  213. metric_value="1",
  214. filter_pattern=logs.FilterPattern.literal(
  215. '"WARNING"'))
  216. log_metric_ERROR = cw.Metric(namespace="s3_migrate",
  217. metric_name="ERROR-Logs",
  218. statistic="Sum",
  219. period=core.Duration.minutes(1))
  220. log_metric_WARNING = cw.Metric(namespace="s3_migrate",
  221. metric_name="WARNING-Logs",
  222. statistic="Sum",
  223. period=core.Duration.minutes(1))
  224. # Dashboard to monitor SQS and Lambda
  225. board = cw.Dashboard(self, "s3_migrate_serverless")
  226. board.add_widgets(cw.GraphWidget(title="Lambda-NETWORK",
  227. left=[lambda_metric_Download, lambda_metric_Upload, lambda_metric_Complete]),
  228. # TODO: here monitor all lambda concurrency not just the working one. Limitation from CDK
  229. # Lambda now supports monitor single lambda concurrency, will change this after CDK support
  230. cw.GraphWidget(title="Lambda-all-concurrent",
  231. left=[handler.metric_all_concurrent_executions(
  232. period=core.Duration.minutes(1))]),
  233. cw.GraphWidget(title="Lambda-invocations/errors/throttles",
  234. left=[handler.metric_invocations(period=core.Duration.minutes(1)),
  235. handler.metric_errors(period=core.Duration.minutes(1)),
  236. handler.metric_throttles(period=core.Duration.minutes(1))]),
  237. cw.GraphWidget(title="Lambda-duration",
  238. left=[handler.metric_duration(period=core.Duration.minutes(1))]),
  239. )
  240. board.add_widgets(cw.GraphWidget(title="SQS-Jobs",
  241. left=[sqs_queue.metric_approximate_number_of_messages_visible(
  242. period=core.Duration.minutes(1)
  243. ),
  244. sqs_queue.metric_approximate_number_of_messages_not_visible(
  245. period=core.Duration.minutes(1)
  246. )]),
  247. cw.GraphWidget(title="SQS-DeadLetterQueue",
  248. left=[sqs_queue_DLQ.metric_approximate_number_of_messages_visible(
  249. period=core.Duration.minutes(1)
  250. ),
  251. sqs_queue_DLQ.metric_approximate_number_of_messages_not_visible(
  252. period=core.Duration.minutes(1)
  253. )]),
  254. cw.GraphWidget(title="ERROR/WARNING Logs",
  255. left=[log_metric_ERROR],
  256. right=[log_metric_WARNING]),
  257. cw.SingleValueWidget(title="Running/Waiting and Dead Jobs",
  258. metrics=[sqs_queue.metric_approximate_number_of_messages_not_visible(
  259. period=core.Duration.minutes(1)
  260. ),
  261. sqs_queue.metric_approximate_number_of_messages_visible(
  262. period=core.Duration.minutes(1)
  263. ),
  264. sqs_queue_DLQ.metric_approximate_number_of_messages_not_visible(
  265. period=core.Duration.minutes(1)
  266. ),
  267. sqs_queue_DLQ.metric_approximate_number_of_messages_visible(
  268. period=core.Duration.minutes(1)
  269. )],
  270. height=6)
  271. )
  272. # Alarm for queue - DLQ
  273. alarm_DLQ = cw.Alarm(self, "SQS_DLQ",
  274. metric=sqs_queue_DLQ.metric_approximate_number_of_messages_visible(),
  275. threshold=0,
  276. comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD,
  277. evaluation_periods=1,
  278. datapoints_to_alarm=1)
  279. alarm_topic = sns.Topic(self, "SQS queue-DLQ has dead letter")
  280. alarm_topic.add_subscription(subscription=sub.EmailSubscription(alarm_email))
  281. alarm_DLQ.add_alarm_action(action.SnsAction(alarm_topic))
  282. core.CfnOutput(self, "Dashboard", value="CloudWatch Dashboard name s3_migrate_serverless")
  283. # Alarm for queue empty, i.e. no visible message and no in-visible message
  284. # metric_all_message = cw.MathExpression(
  285. # expression="a + b",
  286. # label="empty_queue_expression",
  287. # using_metrics={
  288. # "a": sqs_queue.metric_approximate_number_of_messages_visible(),
  289. # "b": sqs_queue.metric_approximate_number_of_messages_not_visible()
  290. # }
  291. # )
  292. # alarm_0 = cw.Alarm(self, "SQSempty",
  293. # alarm_name="SQS queue empty-Serverless",
  294. # metric=metric_all_message,
  295. # threshold=0,
  296. # comparison_operator=cw.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD,
  297. # evaluation_periods=3,
  298. # datapoints_to_alarm=3,
  299. # treat_missing_data=cw.TreatMissingData.IGNORE
  300. # )
  301. # alarm_topic = sns.Topic(self, "SQS queue empty-Serverless")
  302. # alarm_topic.add_subscription(subscription=sub.EmailSubscription(alarm_email))
  303. # alarm_0.add_alarm_action(action.SnsAction(alarm_topic))
  304. ###############
  305. app = core.App()
  306. CdkResourceStack(app, "s3-migrate-serverless")
  307. app.synth()