cdk_ec2_stack.py 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398
  1. '''
  2. Below stack is for PUT mode.
  3. The GET mode is almost the same as put mode, just inverse the Source and Destination buckets.
  4. And auth the read and write access for ec2 role
  5. Dont forget to change the JobTpe to GET in s3_migration_cluster_config.ini
  6. '''
  7. import json
  8. from aws_cdk import core
  9. import aws_cdk.aws_ec2 as ec2
  10. import aws_cdk.aws_autoscaling as autoscaling
  11. import aws_cdk.aws_iam as iam
  12. import aws_cdk.aws_s3 as s3
  13. import aws_cdk.aws_cloudwatch as cw
  14. import aws_cdk.aws_cloudwatch_actions as action
  15. import aws_cdk.aws_sns as sns
  16. import aws_cdk.aws_sns_subscriptions as sub
  17. import aws_cdk.aws_logs as logs
  18. # Adjust ec2 type here
  19. worker_type = "c5.large"
  20. jobsender_type = "t3.micro"
  21. # Setup your alarm email
  22. alarm_email = "alarm_your_email@email.com"
  23. # Specify your AMI here
  24. linux_ami = ec2.AmazonLinuxImage(generation=ec2.AmazonLinuxGeneration.AMAZON_LINUX_2,
  25. edition=ec2.AmazonLinuxEdition.STANDARD,
  26. virtualization=ec2.AmazonLinuxVirt.HVM,
  27. storage=ec2.AmazonLinuxStorage.GENERAL_PURPOSE
  28. )
  29. # Load your user data for ec2
  30. with open("./cdk/user_data_part1.sh") as f:
  31. user_data_part1 = f.read()
  32. with open("./cdk/user_data_part2.sh") as f:
  33. user_data_part2 = f.read()
  34. with open("./cdk/cw_agent_config.json") as f:
  35. cw_agent_config = json.load(f)
  36. with open("./cdk/user_data_worker.sh") as f:
  37. user_data_worker_p = f.read()
  38. with open("./cdk/user_data_jobsender.sh") as f:
  39. user_data_jobsender_p = f.read()
  40. class CdkEc2Stack(core.Stack):
  41. def __init__(self, scope: core.Construct, _id: str, vpc, bucket_para,
  42. # key_name,
  43. ddb_file_list, sqs_queue, sqs_queue_DLQ, ssm_bucket_para, ssm_credential_para, s3bucket, s3_deploy,
  44. **kwargs) -> None:
  45. super().__init__(scope, _id, **kwargs)
  46. # Create environment variable into userdata
  47. env_var = f'export table_queue_name={ddb_file_list.table_name}\n' \
  48. f'export sqs_queue_name={sqs_queue.queue_name}\n' \
  49. f'export ssm_parameter_bucket={ssm_bucket_para.parameter_name}\n'
  50. env_var_st = f'echo \"export table_queue_name={ddb_file_list.table_name}\" >> /etc/rc.local\n' \
  51. f'echo \"export sqs_queue_name={sqs_queue.queue_name}\" >> /etc/rc.local\n' \
  52. f'echo \"export ssm_parameter_bucket={ssm_bucket_para.parameter_name}\" >> /etc/rc.local\n'
  53. # Create log group and put group name into userdata
  54. s3_migrate_log = logs.LogGroup(self, "applog")
  55. cw_agent_config['logs']['logs_collected']['files']['collect_list'][0][
  56. 'log_group_name'] = s3_migrate_log.log_group_name
  57. cw_agent_config['logs']['logs_collected']['files']['collect_list'][1][
  58. 'log_group_name'] = s3_migrate_log.log_group_name
  59. cw_agent_config['metrics']['append_dimensions']['AutoScalingGroupName'] = "\\${aws:AutoScalingGroupName}"
  60. cw_agent_config['metrics']['append_dimensions']['InstanceId'] = "\\${aws:InstanceId}"
  61. cw_agent_config_str = json.dumps(cw_agent_config, indent=4).replace("\\\\", "\\")
  62. userdata_head = user_data_part1 + cw_agent_config_str + user_data_part2 + \
  63. s3_deploy.bucket_name + " .\n" + env_var + env_var_st
  64. jobsender_userdata = userdata_head + user_data_jobsender_p
  65. worker_userdata = userdata_head + user_data_worker_p
  66. # Create jobsender ec2 node
  67. jobsender = autoscaling.AutoScalingGroup(self, "jobsender",
  68. instance_type=ec2.InstanceType(
  69. instance_type_identifier=jobsender_type),
  70. machine_image=linux_ami,
  71. # key_name=key_name,
  72. user_data=ec2.UserData.custom(jobsender_userdata),
  73. vpc=vpc,
  74. vpc_subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
  75. desired_capacity=1,
  76. min_capacity=0,
  77. max_capacity=1
  78. )
  79. # jobsender.connections.allow_from_any_ipv4(ec2.Port.tcp(22), "Internet access SSH")
  80. # Don't need SSH since we use Session Manager
  81. # Assign EC2 Policy to use SSM and CWAgent
  82. jobsender.role.add_managed_policy(
  83. iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSSMManagedInstanceCore"))
  84. jobsender.role.add_managed_policy(
  85. iam.ManagedPolicy.from_aws_managed_policy_name("CloudWatchAgentServerPolicy"))
  86. # jobsender.role.add_managed_policy(
  87. # iam.ManagedPolicy.from_aws_managed_policy_name("AmazonS3FullAccess"))
  88. # Don't give full access s3 to ec2, violate security rule
  89. # Create Autoscaling Group with fixed 2*EC2 hosts
  90. worker_asg = autoscaling.AutoScalingGroup(self, "worker-asg",
  91. vpc=vpc,
  92. vpc_subnets=ec2.SubnetSelection(
  93. subnet_type=ec2.SubnetType.PUBLIC),
  94. instance_type=ec2.InstanceType(
  95. instance_type_identifier=worker_type),
  96. machine_image=linux_ami,
  97. # key_name=key_name, # Optional if use SSM-SessionManager
  98. user_data=ec2.UserData.custom(worker_userdata),
  99. desired_capacity=2,
  100. min_capacity=2,
  101. max_capacity=10,
  102. spot_price="0.5"
  103. )
  104. # TODO: There is no MetricsCollection in CDK autoscaling group high level API yet.
  105. # You need to enable "Group Metrics Collection" in EC2 Console Autoscaling Group - Monitoring tab for metric:
  106. # GroupDesiredCapacity, GroupInServiceInstances, GroupPendingInstances and etc.
  107. # worker_asg.connections.allow_from_any_ipv4(ec2.Port.tcp(22), "Internet access SSH")
  108. # Don't need SSH since we use Session Manager
  109. # Assign EC2 Policy to use SSM and CWAgent
  110. worker_asg.role.add_managed_policy(
  111. iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSSMManagedInstanceCore"))
  112. worker_asg.role.add_managed_policy(
  113. iam.ManagedPolicy.from_aws_managed_policy_name("CloudWatchAgentServerPolicy"))
  114. # Allow EC2 access new DynamoDB Table
  115. ddb_file_list.grant_full_access(jobsender)
  116. ddb_file_list.grant_full_access(worker_asg)
  117. # Allow EC2 access new sqs and its DLQ
  118. sqs_queue.grant_consume_messages(jobsender)
  119. sqs_queue.grant_send_messages(jobsender)
  120. sqs_queue.grant_consume_messages(worker_asg)
  121. sqs_queue_DLQ.grant_consume_messages(jobsender)
  122. # Allow EC2 access SSM Parameter Store, get bucket infor and get credential
  123. ssm_bucket_para.grant_read(jobsender)
  124. ssm_credential_para.grant_read(jobsender)
  125. ssm_credential_para.grant_read(worker_asg)
  126. # Allow EC2 access source code on s3_deploy bucket
  127. s3_deploy.grant_read(jobsender)
  128. s3_deploy.grant_read(worker_asg)
  129. # Allow EC2 access new s3 bucket
  130. s3bucket.grant_read(jobsender)
  131. s3bucket.grant_read(worker_asg)
  132. # Allow EC2 access exist s3 bucket for PUT mode: readonly access the source buckets
  133. bucket_name = ''
  134. for b in bucket_para:
  135. if bucket_name != b['src_bucket']: # 如果列了多个相同的Bucket,就跳过
  136. bucket_name = b['src_bucket']
  137. s3exist_bucket = s3.Bucket.from_bucket_name(self,
  138. bucket_name, # 用这个做id
  139. bucket_name=bucket_name)
  140. s3exist_bucket.grant_read(jobsender)
  141. s3exist_bucket.grant_read(worker_asg)
  142. # Allow EC2 access exist s3 bucket for GET mode: read and write access the destination buckets
  143. # bucket_name = ''
  144. # for b in bucket_para:
  145. # if bucket_name != b['des_bucket']: # 如果列了多个相同的Bucket,就跳过
  146. # bucket_name = b['des_bucket']
  147. # s3exist_bucket = s3.Bucket.from_bucket_name(self,
  148. # bucket_name, # 用这个做id
  149. # bucket_name=bucket_name)
  150. # s3exist_bucket.grant_read_write(jobsender)
  151. # s3exist_bucket.grant_read_write(worker_asg)
  152. # Dashboard to monitor SQS and EC2
  153. board = cw.Dashboard(self, "s3_migrate")
  154. ec2_metric_cpu_avg = cw.Metric(namespace="AWS/EC2",
  155. metric_name="CPUUtilization",
  156. dimensions={"AutoScalingGroupName": worker_asg.auto_scaling_group_name},
  157. period=core.Duration.minutes(1))
  158. ec2_metric_net_out = cw.MathExpression(
  159. expression="SEARCH('{AWS/EC2, InstanceId} NetworkOut', 'Average', 60)",
  160. label="EC2-NetworkOut",
  161. using_metrics={}
  162. )
  163. autoscaling_GroupDesiredCapacity = cw.Metric(namespace="AWS/AutoScaling",
  164. metric_name="GroupDesiredCapacity",
  165. dimensions={
  166. "AutoScalingGroupName": worker_asg.auto_scaling_group_name},
  167. period=core.Duration.minutes(1))
  168. autoscaling_GroupInServiceInstances = cw.Metric(namespace="AWS/AutoScaling",
  169. metric_name="GroupInServiceInstances",
  170. dimensions={
  171. "AutoScalingGroupName": worker_asg.auto_scaling_group_name},
  172. period=core.Duration.minutes(1))
  173. autoscaling_GroupMinSize = cw.Metric(namespace="AWS/AutoScaling",
  174. metric_name="GroupMinSize",
  175. dimensions={
  176. "AutoScalingGroupName": worker_asg.auto_scaling_group_name},
  177. period=core.Duration.minutes(1))
  178. autoscaling_GroupMaxSize = cw.Metric(namespace="AWS/AutoScaling",
  179. metric_name="GroupMaxSize",
  180. dimensions={
  181. "AutoScalingGroupName": worker_asg.auto_scaling_group_name},
  182. period=core.Duration.minutes(1))
  183. # CWAgent collected metric
  184. cwagent_mem_avg = cw.MathExpression(
  185. expression="SEARCH('{CWAgent, AutoScalingGroupName, InstanceId} (AutoScalingGroupName=" +
  186. worker_asg.auto_scaling_group_name +
  187. " AND MetricName=mem_used_percent)', 'Average', 60)",
  188. label="mem_avg",
  189. using_metrics={}
  190. )
  191. cwagent_disk_avg = cw.MathExpression(
  192. expression="SEARCH('{CWAgent, path, InstanceId, AutoScalingGroupName, device, fstype} "
  193. "(AutoScalingGroupName=" + worker_asg.auto_scaling_group_name +
  194. " AND MetricName=disk_used_percent AND path=\"/\")', 'Average', 60)",
  195. label="disk_avg",
  196. using_metrics={}
  197. )
  198. cwagent_net_tcp = cw.MathExpression(
  199. expression="SEARCH('{CWAgent, AutoScalingGroupName, InstanceId} (AutoScalingGroupName=" +
  200. worker_asg.auto_scaling_group_name +
  201. " AND MetricName=tcp_established)', 'Average', 60)",
  202. label="tcp_conn",
  203. using_metrics={}
  204. )
  205. # CWAgent collected application logs - filter metric
  206. s3_migrate_log.add_metric_filter("Completed-bytes",
  207. metric_name="Completed-bytes",
  208. metric_namespace="s3_migrate",
  209. metric_value="$bytes",
  210. filter_pattern=logs.FilterPattern.literal(
  211. '[date, time, info, hs, p="--->Complete", bytes, key]'))
  212. s3_migrate_log.add_metric_filter("Uploading-bytes",
  213. metric_name="Uploading-bytes",
  214. metric_namespace="s3_migrate",
  215. metric_value="$bytes",
  216. filter_pattern=logs.FilterPattern.literal(
  217. '[date, time, info, hs, p="--->Uploading", bytes, key]'))
  218. s3_migrate_log.add_metric_filter("Downloading-bytes",
  219. metric_name="Downloading-bytes",
  220. metric_namespace="s3_migrate",
  221. metric_value="$bytes",
  222. filter_pattern=logs.FilterPattern.literal(
  223. '[date, time, info, hs, p="--->Downloading", bytes, key]'))
  224. traffic_metric_Complete = cw.Metric(namespace="s3_migrate",
  225. metric_name="Completed-bytes",
  226. statistic="Sum",
  227. period=core.Duration.minutes(1))
  228. traffic_metric_Upload = cw.Metric(namespace="s3_migrate",
  229. metric_name="Uploading-bytes",
  230. statistic="Sum",
  231. period=core.Duration.minutes(1))
  232. traffic_metric_Download = cw.Metric(namespace="s3_migrate",
  233. metric_name="Downloading-bytes",
  234. statistic="Sum",
  235. period=core.Duration.minutes(1))
  236. s3_migrate_log.add_metric_filter("ERROR",
  237. metric_name="ERROR-Logs",
  238. metric_namespace="s3_migrate",
  239. metric_value="1",
  240. filter_pattern=logs.FilterPattern.literal(
  241. '"ERROR"'))
  242. s3_migrate_log.add_metric_filter("WARNING",
  243. metric_name="WARNING-Logs",
  244. metric_namespace="s3_migrate",
  245. metric_value="1",
  246. filter_pattern=logs.FilterPattern.literal(
  247. '"WARNING"'))
  248. log_metric_ERROR = cw.Metric(namespace="s3_migrate",
  249. metric_name="ERROR-Logs",
  250. statistic="Sum",
  251. period=core.Duration.minutes(1))
  252. log_metric_WARNING = cw.Metric(namespace="s3_migrate",
  253. metric_name="WARNING-Logs",
  254. statistic="Sum",
  255. period=core.Duration.minutes(1))
  256. board.add_widgets(cw.GraphWidget(title="S3-MIGRATION-TOTAL-TRAFFIC",
  257. left=[traffic_metric_Complete, traffic_metric_Upload,
  258. traffic_metric_Download],
  259. left_y_axis=cw.YAxisProps(label="Bytes/min", show_units=False)),
  260. cw.GraphWidget(title="ERROR/WARNING LOGS",
  261. left=[log_metric_ERROR],
  262. left_y_axis=cw.YAxisProps(label="Count", show_units=False),
  263. right=[log_metric_WARNING],
  264. right_y_axis=cw.YAxisProps(label="Count", show_units=False)),
  265. cw.GraphWidget(title="SQS-JOBS",
  266. left=[sqs_queue.metric_approximate_number_of_messages_visible(
  267. period=core.Duration.minutes(1)),
  268. sqs_queue.metric_approximate_number_of_messages_not_visible(
  269. period=core.Duration.minutes(1))]),
  270. cw.SingleValueWidget(title="RUNNING, WAITING & DEATH JOBS",
  271. metrics=[sqs_queue.metric_approximate_number_of_messages_not_visible(
  272. period=core.Duration.minutes(1)),
  273. sqs_queue.metric_approximate_number_of_messages_visible(
  274. period=core.Duration.minutes(1)),
  275. sqs_queue_DLQ.metric_approximate_number_of_messages_not_visible(
  276. period=core.Duration.minutes(1)),
  277. sqs_queue_DLQ.metric_approximate_number_of_messages_visible(
  278. period=core.Duration.minutes(1))],
  279. height=6)
  280. )
  281. board.add_widgets(cw.GraphWidget(title="EC2-AutoscalingGroup-TCP",
  282. left=[cwagent_net_tcp],
  283. left_y_axis=cw.YAxisProps(label="Count", show_units=False)),
  284. cw.GraphWidget(title="EC2-AutoscalingGroup-CPU/MEMORY",
  285. left=[ec2_metric_cpu_avg, cwagent_mem_avg],
  286. left_y_axis=cw.YAxisProps(max=100, min=0, label="%", show_units=False)),
  287. cw.GraphWidget(title="EC2-AutoscalingGroup-DISK",
  288. left=[cwagent_disk_avg],
  289. left_y_axis=cw.YAxisProps(max=100, min=0, label="%", show_units=False)),
  290. cw.SingleValueWidget(title="EC2-AutoscalingGroup-CAPACITY",
  291. metrics=[autoscaling_GroupDesiredCapacity,
  292. autoscaling_GroupInServiceInstances,
  293. autoscaling_GroupMinSize,
  294. autoscaling_GroupMaxSize],
  295. height=6)
  296. )
  297. board.add_widgets(cw.GraphWidget(title="EC2-NetworkOut",
  298. left=[ec2_metric_net_out],
  299. left_y_axis=cw.YAxisProps(label="Bytes/min", show_units=False)))
  300. # Autoscaling up when visible message > 100 in 5 mins
  301. worker_asg.scale_on_metric("scaleup", metric=sqs_queue.metric_approximate_number_of_messages_visible(),
  302. scaling_steps=[autoscaling.ScalingInterval(
  303. change=1,
  304. lower=100,
  305. upper=500
  306. ), autoscaling.ScalingInterval(
  307. change=2,
  308. lower=500
  309. ),
  310. autoscaling.ScalingInterval(
  311. change=0,
  312. upper=100,
  313. lower=0
  314. )],
  315. adjustment_type=autoscaling.AdjustmentType.CHANGE_IN_CAPACITY)
  316. # Alarm for queue empty and ec2 > 1
  317. # 消息队列空(没有Visible+Invisible),并且EC2不止一台,则告警,并设置EC2为1台
  318. # 这里还可以根据场景调整,如果Jobsender也用来做传输,则可以在这里设置没有任务的时候,Autoscaling Group为0
  319. metric_all_message = cw.MathExpression(
  320. expression="IF(((a+b) == 0) AND (c >1), 0, 1)", # a+b且c>1则设置为0,告警
  321. label="empty_queue_expression",
  322. using_metrics={
  323. "a": sqs_queue.metric_approximate_number_of_messages_visible(),
  324. "b": sqs_queue.metric_approximate_number_of_messages_not_visible(),
  325. "c": autoscaling_GroupInServiceInstances
  326. }
  327. )
  328. alarm_0 = cw.Alarm(self, "SQSempty",
  329. alarm_name="s3-migration-cluster-SQS queue empty and ec2 more than 1 in Cluster",
  330. metric=metric_all_message,
  331. threshold=0,
  332. comparison_operator=cw.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD,
  333. evaluation_periods=3,
  334. datapoints_to_alarm=3,
  335. treat_missing_data=cw.TreatMissingData.NOT_BREACHING
  336. )
  337. alarm_topic_empty = sns.Topic(self, "SQS queue empty and ec2 more than 1 in Cluster")
  338. # 这个告警可以作为批量传输完成后的通知,而且这样做可以只通知一次,而不会不停地通知
  339. alarm_topic_empty.add_subscription(subscription=sub.EmailSubscription(alarm_email))
  340. alarm_0.add_alarm_action(action.SnsAction(alarm_topic_empty))
  341. # If queue empty, set autoscale down to 1 EC2
  342. action_shutdown = autoscaling.StepScalingAction(self, "shutdown",
  343. auto_scaling_group=worker_asg,
  344. adjustment_type=autoscaling.AdjustmentType.EXACT_CAPACITY
  345. )
  346. action_shutdown.add_adjustment(adjustment=1, upper_bound=0)
  347. alarm_0.add_alarm_action(action.AutoScalingAction(action_shutdown))
  348. # While message in SQS-DLQ, alarm to sns
  349. alarm_DLQ = cw.Alarm(self, "SQS_DLQ",
  350. alarm_name="s3-migration-cluster-SQS DLQ more than 1 message-Cluster",
  351. metric=sqs_queue_DLQ.metric_approximate_number_of_messages_visible(),
  352. threshold=0,
  353. comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD,
  354. evaluation_periods=3,
  355. datapoints_to_alarm=3,
  356. treat_missing_data=cw.TreatMissingData.IGNORE)
  357. alarm_topic_DLQ = sns.Topic(self, "SQS DLQ more than 1 message-Cluster")
  358. alarm_topic_DLQ.add_subscription(subscription=sub.EmailSubscription(alarm_email))
  359. alarm_DLQ.add_alarm_action(action.SnsAction(alarm_topic_DLQ))
  360. # Output
  361. core.CfnOutput(self, "LogGroup", value=s3_migrate_log.log_group_name)
  362. core.CfnOutput(self, "Dashboard", value="CloudWatch Dashboard name s3_migrate_cluster")
  363. core.CfnOutput(self, "Alarm", value="CloudWatch SQS queue empty Alarm for cluster: " + alarm_email)