cdk_ec2_stack.py 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388
  1. '''
  2. Below stack is for PUT mode.
  3. The GET mode is almost the same as put mode, just inverse the Source and Destination buckets.
  4. And auth the read and write access for ec2 role
  5. Dont forget to change the JobTpe to GET in s3_migration_cluster_config.ini
  6. '''
  7. import json
  8. from aws_cdk import core
  9. import aws_cdk.aws_ec2 as ec2
  10. import aws_cdk.aws_autoscaling as autoscaling
  11. import aws_cdk.aws_iam as iam
  12. import aws_cdk.aws_s3 as s3
  13. import aws_cdk.aws_cloudwatch as cw
  14. import aws_cdk.aws_cloudwatch_actions as action
  15. import aws_cdk.aws_sns as sns
  16. import aws_cdk.aws_sns_subscriptions as sub
  17. import aws_cdk.aws_logs as logs
  18. # Adjust ec2 type here
  19. worker_type = "c5.large"
  20. jobsender_type = "t3.micro"
  21. # Setup your alarm email
  22. alarm_email = "alarm_your_email@email.com"
  23. # Specify your AMI here
  24. linux_ami = ec2.AmazonLinuxImage(generation=ec2.AmazonLinuxGeneration.AMAZON_LINUX_2,
  25. edition=ec2.AmazonLinuxEdition.STANDARD,
  26. virtualization=ec2.AmazonLinuxVirt.HVM,
  27. storage=ec2.AmazonLinuxStorage.GENERAL_PURPOSE
  28. )
  29. # Load your user data for ec2
  30. with open("./cdk/user_data_part1.sh") as f:
  31. user_data_part1 = f.read()
  32. with open("./cdk/user_data_part2.sh") as f:
  33. user_data_part2 = f.read()
  34. with open("./cdk/cw_agent_config.json") as f:
  35. cw_agent_config = json.load(f)
  36. with open("./cdk/user_data_worker.sh") as f:
  37. user_data_worker_p = f.read()
  38. with open("./cdk/user_data_jobsender.sh") as f:
  39. user_data_jobsender_p = f.read()
  40. class CdkEc2Stack(core.Stack):
  41. def __init__(self, scope: core.Construct, _id: str, *, vpc, bucket_para, # key_name,
  42. ddb_file_list, sqs_queue, sqs_queue_DLQ, ssm_bucket_para, ssm_credential_para, s3bucket, s3_deploy,
  43. **kwargs) -> None:
  44. super().__init__(scope, _id, **kwargs)
  45. # Create environment variable into userdata
  46. env_var = f'export table_queue_name={ddb_file_list.table_name}\n' \
  47. f'export sqs_queue_name={sqs_queue.queue_name}\n' \
  48. f'export ssm_parameter_bucket={ssm_bucket_para.parameter_name}\n'
  49. env_var_st = f'echo \"export table_queue_name={ddb_file_list.table_name}\" >> /etc/rc.local\n' \
  50. f'echo \"export sqs_queue_name={sqs_queue.queue_name}\" >> /etc/rc.local\n' \
  51. f'echo \"export ssm_parameter_bucket={ssm_bucket_para.parameter_name}\" >> /etc/rc.local\n'
  52. # Create log group and put group name into userdata
  53. s3_migrate_log = logs.LogGroup(self, "applog")
  54. cw_agent_config['logs']['logs_collected']['files']['collect_list'][0][
  55. 'log_group_name'] = s3_migrate_log.log_group_name
  56. cw_agent_config['logs']['logs_collected']['files']['collect_list'][1][
  57. 'log_group_name'] = s3_migrate_log.log_group_name
  58. cw_agent_config['metrics']['append_dimensions']['AutoScalingGroupName'] = "\\${aws:AutoScalingGroupName}"
  59. cw_agent_config['metrics']['append_dimensions']['InstanceId'] = "\\${aws:InstanceId}"
  60. cw_agent_config_str = json.dumps(cw_agent_config, indent=4).replace("\\\\", "\\")
  61. userdata_head = user_data_part1 + cw_agent_config_str + user_data_part2 + \
  62. s3_deploy.bucket_name + " .\n" + env_var + env_var_st
  63. jobsender_userdata = userdata_head + user_data_jobsender_p
  64. worker_userdata = userdata_head + user_data_worker_p
  65. # Create jobsender ec2 node
  66. jobsender = autoscaling.AutoScalingGroup(self, "jobsender",
  67. instance_type=ec2.InstanceType(
  68. instance_type_identifier=jobsender_type),
  69. machine_image=linux_ami,
  70. # key_name=key_name,
  71. user_data=ec2.UserData.custom(jobsender_userdata),
  72. vpc=vpc,
  73. vpc_subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
  74. desired_capacity=1,
  75. min_capacity=0,
  76. max_capacity=1
  77. )
  78. # jobsender.connections.allow_from_any_ipv4(ec2.Port.tcp(22), "Internet access SSH")
  79. # Don't need SSH since we use Session Manager
  80. # Assign EC2 Policy to use SSM and CWAgent
  81. jobsender.role.add_managed_policy(
  82. iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSSMManagedInstanceCore"))
  83. jobsender.role.add_managed_policy(
  84. iam.ManagedPolicy.from_aws_managed_policy_name("CloudWatchAgentServerPolicy"))
  85. # Create Worker Autoscaling Group
  86. worker_asg = autoscaling.AutoScalingGroup(self, "worker-asg",
  87. vpc=vpc,
  88. vpc_subnets=ec2.SubnetSelection(
  89. subnet_type=ec2.SubnetType.PUBLIC),
  90. instance_type=ec2.InstanceType(
  91. instance_type_identifier=worker_type),
  92. machine_image=linux_ami,
  93. # key_name=key_name, # Optional if use SSM-SessionManager
  94. user_data=ec2.UserData.custom(worker_userdata),
  95. desired_capacity=2,
  96. min_capacity=2,
  97. max_capacity=10,
  98. spot_price="0.5",
  99. group_metrics=[autoscaling.GroupMetrics.all()]
  100. )
  101. # worker_asg.connections.allow_from_any_ipv4(ec2.Port.tcp(22), "Internet access SSH")
  102. # Don't need SSH since we use Session Manager
  103. # Assign EC2 Policy to use SSM and CWAgent
  104. worker_asg.role.add_managed_policy(
  105. iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSSMManagedInstanceCore"))
  106. worker_asg.role.add_managed_policy(
  107. iam.ManagedPolicy.from_aws_managed_policy_name("CloudWatchAgentServerPolicy"))
  108. # Allow EC2 access new DynamoDB Table
  109. ddb_file_list.grant_full_access(jobsender)
  110. ddb_file_list.grant_full_access(worker_asg)
  111. # Allow EC2 access new sqs and its DLQ
  112. sqs_queue.grant_consume_messages(jobsender)
  113. sqs_queue.grant_send_messages(jobsender)
  114. sqs_queue.grant_consume_messages(worker_asg)
  115. sqs_queue_DLQ.grant_consume_messages(jobsender)
  116. # Allow EC2 access SSM Parameter Store, get bucket infor and get credential
  117. ssm_bucket_para.grant_read(jobsender)
  118. ssm_credential_para.grant_read(jobsender)
  119. ssm_credential_para.grant_read(worker_asg)
  120. # Allow EC2 access source code on s3_deploy bucket
  121. s3_deploy.grant_read(jobsender)
  122. s3_deploy.grant_read(worker_asg)
  123. # Allow EC2 access new s3 bucket
  124. s3bucket.grant_read(jobsender)
  125. s3bucket.grant_read(worker_asg)
  126. # Allow EC2 access exist s3 bucket for PUT mode: readonly access the source buckets
  127. bucket_name = ''
  128. for b in bucket_para:
  129. if bucket_name != b['src_bucket']: # 如果列了多个相同的Bucket,就跳过
  130. bucket_name = b['src_bucket']
  131. s3exist_bucket = s3.Bucket.from_bucket_name(self,
  132. bucket_name, # 用这个做id
  133. bucket_name=bucket_name)
  134. s3exist_bucket.grant_read(jobsender)
  135. s3exist_bucket.grant_read(worker_asg)
  136. # Allow EC2 access exist s3 bucket for GET mode: read and write access the destination buckets
  137. # bucket_name = ''
  138. # for b in bucket_para:
  139. # if bucket_name != b['des_bucket']: # 如果列了多个相同的Bucket,就跳过
  140. # bucket_name = b['des_bucket']
  141. # s3exist_bucket = s3.Bucket.from_bucket_name(self,
  142. # bucket_name, # 用这个做id
  143. # bucket_name=bucket_name)
  144. # s3exist_bucket.grant_read_write(jobsender)
  145. # s3exist_bucket.grant_read_write(worker_asg)
  146. # Dashboard to monitor SQS and EC2
  147. board = cw.Dashboard(self, "s3_migrate")
  148. ec2_metric_cpu_avg = cw.Metric(namespace="AWS/EC2",
  149. metric_name="CPUUtilization",
  150. dimensions={"AutoScalingGroupName": worker_asg.auto_scaling_group_name},
  151. period=core.Duration.minutes(1))
  152. ec2_metric_net_out = cw.MathExpression(
  153. expression="SEARCH('{AWS/EC2, InstanceId} NetworkOut', 'Average', 60)",
  154. label="EC2-NetworkOut",
  155. using_metrics={}
  156. )
  157. autoscaling_GroupDesiredCapacity = cw.Metric(namespace="AWS/AutoScaling",
  158. metric_name="GroupDesiredCapacity",
  159. dimensions={
  160. "AutoScalingGroupName": worker_asg.auto_scaling_group_name},
  161. period=core.Duration.minutes(1))
  162. autoscaling_GroupInServiceInstances = cw.Metric(namespace="AWS/AutoScaling",
  163. metric_name="GroupInServiceInstances",
  164. dimensions={
  165. "AutoScalingGroupName": worker_asg.auto_scaling_group_name},
  166. period=core.Duration.minutes(1))
  167. autoscaling_GroupMinSize = cw.Metric(namespace="AWS/AutoScaling",
  168. metric_name="GroupMinSize",
  169. dimensions={
  170. "AutoScalingGroupName": worker_asg.auto_scaling_group_name},
  171. period=core.Duration.minutes(1))
  172. autoscaling_GroupMaxSize = cw.Metric(namespace="AWS/AutoScaling",
  173. metric_name="GroupMaxSize",
  174. dimensions={
  175. "AutoScalingGroupName": worker_asg.auto_scaling_group_name},
  176. period=core.Duration.minutes(1))
  177. # CWAgent collected metric
  178. cwagent_mem_avg = cw.MathExpression(
  179. expression="SEARCH('{CWAgent, AutoScalingGroupName, InstanceId} (AutoScalingGroupName=" +
  180. worker_asg.auto_scaling_group_name +
  181. " AND MetricName=mem_used_percent)', 'Average', 60)",
  182. label="mem_avg",
  183. using_metrics={}
  184. )
  185. cwagent_disk_avg = cw.MathExpression(
  186. expression="SEARCH('{CWAgent, path, InstanceId, AutoScalingGroupName, device, fstype} "
  187. "(AutoScalingGroupName=" + worker_asg.auto_scaling_group_name +
  188. " AND MetricName=disk_used_percent AND path=\"/\")', 'Average', 60)",
  189. label="disk_avg",
  190. using_metrics={}
  191. )
  192. cwagent_net_tcp = cw.MathExpression(
  193. expression="SEARCH('{CWAgent, AutoScalingGroupName, InstanceId} (AutoScalingGroupName=" +
  194. worker_asg.auto_scaling_group_name +
  195. " AND MetricName=tcp_established)', 'Average', 60)",
  196. label="tcp_conn",
  197. using_metrics={}
  198. )
  199. # CWAgent collected application logs - filter metric
  200. s3_migrate_log.add_metric_filter("Completed-bytes",
  201. metric_name="Completed-bytes",
  202. metric_namespace="s3_migrate",
  203. metric_value="$bytes",
  204. filter_pattern=logs.FilterPattern.literal(
  205. '[date, time, info, hs, p="--->Complete", bytes, key]'))
  206. s3_migrate_log.add_metric_filter("Uploading-bytes",
  207. metric_name="Uploading-bytes",
  208. metric_namespace="s3_migrate",
  209. metric_value="$bytes",
  210. filter_pattern=logs.FilterPattern.literal(
  211. '[date, time, info, hs, p="--->Uploading", bytes, key]'))
  212. s3_migrate_log.add_metric_filter("Downloading-bytes",
  213. metric_name="Downloading-bytes",
  214. metric_namespace="s3_migrate",
  215. metric_value="$bytes",
  216. filter_pattern=logs.FilterPattern.literal(
  217. '[date, time, info, hs, p="--->Downloading", bytes, key]'))
  218. traffic_metric_Complete = cw.Metric(namespace="s3_migrate",
  219. metric_name="Completed-bytes",
  220. statistic="Sum",
  221. period=core.Duration.minutes(1))
  222. traffic_metric_Upload = cw.Metric(namespace="s3_migrate",
  223. metric_name="Uploading-bytes",
  224. statistic="Sum",
  225. period=core.Duration.minutes(1))
  226. traffic_metric_Download = cw.Metric(namespace="s3_migrate",
  227. metric_name="Downloading-bytes",
  228. statistic="Sum",
  229. period=core.Duration.minutes(1))
  230. s3_migrate_log.add_metric_filter("ERROR",
  231. metric_name="ERROR-Logs",
  232. metric_namespace="s3_migrate",
  233. metric_value="1",
  234. filter_pattern=logs.FilterPattern.literal(
  235. '"ERROR"'))
  236. s3_migrate_log.add_metric_filter("WARNING",
  237. metric_name="WARNING-Logs",
  238. metric_namespace="s3_migrate",
  239. metric_value="1",
  240. filter_pattern=logs.FilterPattern.literal(
  241. '"WARNING"'))
  242. log_metric_ERROR = cw.Metric(namespace="s3_migrate",
  243. metric_name="ERROR-Logs",
  244. statistic="Sum",
  245. period=core.Duration.minutes(1))
  246. log_metric_WARNING = cw.Metric(namespace="s3_migrate",
  247. metric_name="WARNING-Logs",
  248. statistic="Sum",
  249. period=core.Duration.minutes(1))
  250. board.add_widgets(cw.GraphWidget(title="S3-MIGRATION-TOTAL-TRAFFIC",
  251. left=[traffic_metric_Complete, traffic_metric_Upload,
  252. traffic_metric_Download],
  253. left_y_axis=cw.YAxisProps(label="Bytes/min", show_units=False)),
  254. cw.GraphWidget(title="ERROR/WARNING LOGS",
  255. left=[log_metric_ERROR],
  256. left_y_axis=cw.YAxisProps(label="Count", show_units=False),
  257. right=[log_metric_WARNING],
  258. right_y_axis=cw.YAxisProps(label="Count", show_units=False)),
  259. cw.GraphWidget(title="SQS-JOBS",
  260. left=[sqs_queue.metric_approximate_number_of_messages_visible(
  261. period=core.Duration.minutes(1)),
  262. sqs_queue.metric_approximate_number_of_messages_not_visible(
  263. period=core.Duration.minutes(1))]),
  264. cw.SingleValueWidget(title="RUNNING, WAITING & DEATH JOBS",
  265. metrics=[sqs_queue.metric_approximate_number_of_messages_not_visible(
  266. period=core.Duration.minutes(1)),
  267. sqs_queue.metric_approximate_number_of_messages_visible(
  268. period=core.Duration.minutes(1)),
  269. sqs_queue_DLQ.metric_approximate_number_of_messages_not_visible(
  270. period=core.Duration.minutes(1)),
  271. sqs_queue_DLQ.metric_approximate_number_of_messages_visible(
  272. period=core.Duration.minutes(1))],
  273. height=6)
  274. )
  275. board.add_widgets(cw.GraphWidget(title="EC2-AutoscalingGroup-TCP",
  276. left=[cwagent_net_tcp],
  277. left_y_axis=cw.YAxisProps(label="Count", show_units=False)),
  278. cw.GraphWidget(title="EC2-AutoscalingGroup-CPU/MEMORY",
  279. left=[ec2_metric_cpu_avg, cwagent_mem_avg],
  280. left_y_axis=cw.YAxisProps(max=100, min=0, label="%", show_units=False)),
  281. cw.GraphWidget(title="EC2-AutoscalingGroup-DISK",
  282. left=[cwagent_disk_avg],
  283. left_y_axis=cw.YAxisProps(max=100, min=0, label="%", show_units=False)),
  284. cw.SingleValueWidget(title="EC2-AutoscalingGroup-CAPACITY",
  285. metrics=[autoscaling_GroupDesiredCapacity,
  286. autoscaling_GroupInServiceInstances,
  287. autoscaling_GroupMinSize,
  288. autoscaling_GroupMaxSize],
  289. height=6)
  290. )
  291. board.add_widgets(cw.GraphWidget(title="EC2-NetworkOut",
  292. left=[ec2_metric_net_out],
  293. left_y_axis=cw.YAxisProps(label="Bytes/min", show_units=False)))
  294. # Autoscaling up when visible message > 100 in 5 mins
  295. worker_asg.scale_on_metric("scaleup", metric=sqs_queue.metric_approximate_number_of_messages_visible(),
  296. scaling_steps=[autoscaling.ScalingInterval(
  297. change=1,
  298. lower=100,
  299. upper=500
  300. ), autoscaling.ScalingInterval(
  301. change=2,
  302. lower=500
  303. ),
  304. autoscaling.ScalingInterval(
  305. change=0,
  306. upper=100,
  307. lower=0
  308. )],
  309. adjustment_type=autoscaling.AdjustmentType.CHANGE_IN_CAPACITY)
  310. # Alarm for queue empty and ec2 > 1
  311. # 消息队列空(没有Visible+Invisible),并且EC2不止一台,则告警,并设置EC2为1台
  312. # 这里还可以根据场景调整,如果Jobsender也用来做传输,则可以在这里设置没有任务的时候,Autoscaling Group为0
  313. metric_all_message = cw.MathExpression(
  314. expression="IF(((a+b) == 0) AND (c >1), 0, 1)", # a+b且c>1则设置为0,告警
  315. label="empty_queue_expression",
  316. using_metrics={
  317. "a": sqs_queue.metric_approximate_number_of_messages_visible(),
  318. "b": sqs_queue.metric_approximate_number_of_messages_not_visible(),
  319. "c": autoscaling_GroupInServiceInstances
  320. }
  321. )
  322. alarm_0 = cw.Alarm(self, "SQSempty",
  323. alarm_name="s3-migration-cluster-SQS queue empty and ec2 more than 1 in Cluster",
  324. metric=metric_all_message,
  325. threshold=0,
  326. comparison_operator=cw.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD,
  327. evaluation_periods=3,
  328. datapoints_to_alarm=3,
  329. treat_missing_data=cw.TreatMissingData.NOT_BREACHING
  330. )
  331. alarm_topic_empty = sns.Topic(self, "SQS queue empty and ec2 more than 1 in Cluster")
  332. # 这个告警可以作为批量传输完成后的通知,而且这样做可以只通知一次,而不会不停地通知
  333. alarm_topic_empty.add_subscription(subscription=sub.EmailSubscription(alarm_email))
  334. alarm_0.add_alarm_action(action.SnsAction(alarm_topic_empty))
  335. # If queue empty, set autoscale down to 1 EC2
  336. action_shutdown = autoscaling.StepScalingAction(self, "shutdown",
  337. auto_scaling_group=worker_asg,
  338. adjustment_type=autoscaling.AdjustmentType.EXACT_CAPACITY
  339. )
  340. action_shutdown.add_adjustment(adjustment=1, upper_bound=0)
  341. alarm_0.add_alarm_action(action.AutoScalingAction(action_shutdown))
  342. # While message in SQS-DLQ, alarm to sns
  343. alarm_DLQ = cw.Alarm(self, "SQS_DLQ",
  344. alarm_name="s3-migration-cluster-SQS DLQ more than 1 message-Cluster",
  345. metric=sqs_queue_DLQ.metric_approximate_number_of_messages_visible(),
  346. threshold=0,
  347. comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD,
  348. evaluation_periods=3,
  349. datapoints_to_alarm=3,
  350. treat_missing_data=cw.TreatMissingData.IGNORE)
  351. alarm_topic_DLQ = sns.Topic(self, "SQS DLQ more than 1 message-Cluster")
  352. alarm_topic_DLQ.add_subscription(subscription=sub.EmailSubscription(alarm_email))
  353. alarm_DLQ.add_alarm_action(action.SnsAction(alarm_topic_DLQ))
  354. # Output
  355. core.CfnOutput(self, "LogGroup", value=s3_migrate_log.log_group_name)
  356. core.CfnOutput(self, "Dashboard", value="CloudWatch Dashboard name s3_migrate_cluster")
  357. core.CfnOutput(self, "Alarm", value="CloudWatch SQS queue empty Alarm for cluster: " + alarm_email)