123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388 |
- '''
- Below stack is for PUT mode.
- The GET mode is almost the same as put mode, just inverse the Source and Destination buckets.
- And auth the read and write access for ec2 role
- Dont forget to change the JobTpe to GET in s3_migration_cluster_config.ini
- '''
- import json
- from aws_cdk import core
- import aws_cdk.aws_ec2 as ec2
- import aws_cdk.aws_autoscaling as autoscaling
- import aws_cdk.aws_iam as iam
- import aws_cdk.aws_s3 as s3
- import aws_cdk.aws_cloudwatch as cw
- import aws_cdk.aws_cloudwatch_actions as action
- import aws_cdk.aws_sns as sns
- import aws_cdk.aws_sns_subscriptions as sub
- import aws_cdk.aws_logs as logs
- # Adjust ec2 type here
- worker_type = "c5.large"
- jobsender_type = "t3.micro"
- # Setup your alarm email
- alarm_email = "alarm_your_email@email.com"
- # Specify your AMI here
- linux_ami = ec2.AmazonLinuxImage(generation=ec2.AmazonLinuxGeneration.AMAZON_LINUX_2,
- edition=ec2.AmazonLinuxEdition.STANDARD,
- virtualization=ec2.AmazonLinuxVirt.HVM,
- storage=ec2.AmazonLinuxStorage.GENERAL_PURPOSE
- )
- # Load your user data for ec2
- with open("./cdk/user_data_part1.sh") as f:
- user_data_part1 = f.read()
- with open("./cdk/user_data_part2.sh") as f:
- user_data_part2 = f.read()
- with open("./cdk/cw_agent_config.json") as f:
- cw_agent_config = json.load(f)
- with open("./cdk/user_data_worker.sh") as f:
- user_data_worker_p = f.read()
- with open("./cdk/user_data_jobsender.sh") as f:
- user_data_jobsender_p = f.read()
- class CdkEc2Stack(core.Stack):
- def __init__(self, scope: core.Construct, _id: str, *, vpc, bucket_para, # key_name,
- ddb_file_list, sqs_queue, sqs_queue_DLQ, ssm_bucket_para, ssm_credential_para, s3bucket, s3_deploy,
- **kwargs) -> None:
- super().__init__(scope, _id, **kwargs)
- # Create environment variable into userdata
- env_var = f'export table_queue_name={ddb_file_list.table_name}\n' \
- f'export sqs_queue_name={sqs_queue.queue_name}\n' \
- f'export ssm_parameter_bucket={ssm_bucket_para.parameter_name}\n'
- env_var_st = f'echo \"export table_queue_name={ddb_file_list.table_name}\" >> /etc/rc.local\n' \
- f'echo \"export sqs_queue_name={sqs_queue.queue_name}\" >> /etc/rc.local\n' \
- f'echo \"export ssm_parameter_bucket={ssm_bucket_para.parameter_name}\" >> /etc/rc.local\n'
- # Create log group and put group name into userdata
- s3_migrate_log = logs.LogGroup(self, "applog")
- cw_agent_config['logs']['logs_collected']['files']['collect_list'][0][
- 'log_group_name'] = s3_migrate_log.log_group_name
- cw_agent_config['logs']['logs_collected']['files']['collect_list'][1][
- 'log_group_name'] = s3_migrate_log.log_group_name
- cw_agent_config['metrics']['append_dimensions']['AutoScalingGroupName'] = "\\${aws:AutoScalingGroupName}"
- cw_agent_config['metrics']['append_dimensions']['InstanceId'] = "\\${aws:InstanceId}"
- cw_agent_config_str = json.dumps(cw_agent_config, indent=4).replace("\\\\", "\\")
- userdata_head = user_data_part1 + cw_agent_config_str + user_data_part2 + \
- s3_deploy.bucket_name + " .\n" + env_var + env_var_st
- jobsender_userdata = userdata_head + user_data_jobsender_p
- worker_userdata = userdata_head + user_data_worker_p
- # Create jobsender ec2 node
- jobsender = autoscaling.AutoScalingGroup(self, "jobsender",
- instance_type=ec2.InstanceType(
- instance_type_identifier=jobsender_type),
- machine_image=linux_ami,
- # key_name=key_name,
- user_data=ec2.UserData.custom(jobsender_userdata),
- vpc=vpc,
- vpc_subnets=ec2.SubnetSelection(subnet_type=ec2.SubnetType.PUBLIC),
- desired_capacity=1,
- min_capacity=0,
- max_capacity=1
- )
- # jobsender.connections.allow_from_any_ipv4(ec2.Port.tcp(22), "Internet access SSH")
- # Don't need SSH since we use Session Manager
- # Assign EC2 Policy to use SSM and CWAgent
- jobsender.role.add_managed_policy(
- iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSSMManagedInstanceCore"))
- jobsender.role.add_managed_policy(
- iam.ManagedPolicy.from_aws_managed_policy_name("CloudWatchAgentServerPolicy"))
- # Create Worker Autoscaling Group
- worker_asg = autoscaling.AutoScalingGroup(self, "worker-asg",
- vpc=vpc,
- vpc_subnets=ec2.SubnetSelection(
- subnet_type=ec2.SubnetType.PUBLIC),
- instance_type=ec2.InstanceType(
- instance_type_identifier=worker_type),
- machine_image=linux_ami,
- # key_name=key_name, # Optional if use SSM-SessionManager
- user_data=ec2.UserData.custom(worker_userdata),
- desired_capacity=2,
- min_capacity=2,
- max_capacity=10,
- spot_price="0.5",
- group_metrics=[autoscaling.GroupMetrics.all()]
- )
- # worker_asg.connections.allow_from_any_ipv4(ec2.Port.tcp(22), "Internet access SSH")
- # Don't need SSH since we use Session Manager
- # Assign EC2 Policy to use SSM and CWAgent
- worker_asg.role.add_managed_policy(
- iam.ManagedPolicy.from_aws_managed_policy_name("AmazonSSMManagedInstanceCore"))
- worker_asg.role.add_managed_policy(
- iam.ManagedPolicy.from_aws_managed_policy_name("CloudWatchAgentServerPolicy"))
- # Allow EC2 access new DynamoDB Table
- ddb_file_list.grant_full_access(jobsender)
- ddb_file_list.grant_full_access(worker_asg)
- # Allow EC2 access new sqs and its DLQ
- sqs_queue.grant_consume_messages(jobsender)
- sqs_queue.grant_send_messages(jobsender)
- sqs_queue.grant_consume_messages(worker_asg)
- sqs_queue_DLQ.grant_consume_messages(jobsender)
- # Allow EC2 access SSM Parameter Store, get bucket infor and get credential
- ssm_bucket_para.grant_read(jobsender)
- ssm_credential_para.grant_read(jobsender)
- ssm_credential_para.grant_read(worker_asg)
- # Allow EC2 access source code on s3_deploy bucket
- s3_deploy.grant_read(jobsender)
- s3_deploy.grant_read(worker_asg)
- # Allow EC2 access new s3 bucket
- s3bucket.grant_read(jobsender)
- s3bucket.grant_read(worker_asg)
- # Allow EC2 access exist s3 bucket for PUT mode: readonly access the source buckets
- bucket_name = ''
- for b in bucket_para:
- if bucket_name != b['src_bucket']: # 如果列了多个相同的Bucket,就跳过
- bucket_name = b['src_bucket']
- s3exist_bucket = s3.Bucket.from_bucket_name(self,
- bucket_name, # 用这个做id
- bucket_name=bucket_name)
- s3exist_bucket.grant_read(jobsender)
- s3exist_bucket.grant_read(worker_asg)
- # Allow EC2 access exist s3 bucket for GET mode: read and write access the destination buckets
- # bucket_name = ''
- # for b in bucket_para:
- # if bucket_name != b['des_bucket']: # 如果列了多个相同的Bucket,就跳过
- # bucket_name = b['des_bucket']
- # s3exist_bucket = s3.Bucket.from_bucket_name(self,
- # bucket_name, # 用这个做id
- # bucket_name=bucket_name)
- # s3exist_bucket.grant_read_write(jobsender)
- # s3exist_bucket.grant_read_write(worker_asg)
- # Dashboard to monitor SQS and EC2
- board = cw.Dashboard(self, "s3_migrate")
- ec2_metric_cpu_avg = cw.Metric(namespace="AWS/EC2",
- metric_name="CPUUtilization",
- dimensions={"AutoScalingGroupName": worker_asg.auto_scaling_group_name},
- period=core.Duration.minutes(1))
- ec2_metric_net_out = cw.MathExpression(
- expression="SEARCH('{AWS/EC2, InstanceId} NetworkOut', 'Average', 60)",
- label="EC2-NetworkOut",
- using_metrics={}
- )
- autoscaling_GroupDesiredCapacity = cw.Metric(namespace="AWS/AutoScaling",
- metric_name="GroupDesiredCapacity",
- dimensions={
- "AutoScalingGroupName": worker_asg.auto_scaling_group_name},
- period=core.Duration.minutes(1))
- autoscaling_GroupInServiceInstances = cw.Metric(namespace="AWS/AutoScaling",
- metric_name="GroupInServiceInstances",
- dimensions={
- "AutoScalingGroupName": worker_asg.auto_scaling_group_name},
- period=core.Duration.minutes(1))
- autoscaling_GroupMinSize = cw.Metric(namespace="AWS/AutoScaling",
- metric_name="GroupMinSize",
- dimensions={
- "AutoScalingGroupName": worker_asg.auto_scaling_group_name},
- period=core.Duration.minutes(1))
- autoscaling_GroupMaxSize = cw.Metric(namespace="AWS/AutoScaling",
- metric_name="GroupMaxSize",
- dimensions={
- "AutoScalingGroupName": worker_asg.auto_scaling_group_name},
- period=core.Duration.minutes(1))
- # CWAgent collected metric
- cwagent_mem_avg = cw.MathExpression(
- expression="SEARCH('{CWAgent, AutoScalingGroupName, InstanceId} (AutoScalingGroupName=" +
- worker_asg.auto_scaling_group_name +
- " AND MetricName=mem_used_percent)', 'Average', 60)",
- label="mem_avg",
- using_metrics={}
- )
- cwagent_disk_avg = cw.MathExpression(
- expression="SEARCH('{CWAgent, path, InstanceId, AutoScalingGroupName, device, fstype} "
- "(AutoScalingGroupName=" + worker_asg.auto_scaling_group_name +
- " AND MetricName=disk_used_percent AND path=\"/\")', 'Average', 60)",
- label="disk_avg",
- using_metrics={}
- )
- cwagent_net_tcp = cw.MathExpression(
- expression="SEARCH('{CWAgent, AutoScalingGroupName, InstanceId} (AutoScalingGroupName=" +
- worker_asg.auto_scaling_group_name +
- " AND MetricName=tcp_established)', 'Average', 60)",
- label="tcp_conn",
- using_metrics={}
- )
- # CWAgent collected application logs - filter metric
- s3_migrate_log.add_metric_filter("Completed-bytes",
- metric_name="Completed-bytes",
- metric_namespace="s3_migrate",
- metric_value="$bytes",
- filter_pattern=logs.FilterPattern.literal(
- '[date, time, info, hs, p="--->Complete", bytes, key]'))
- s3_migrate_log.add_metric_filter("Uploading-bytes",
- metric_name="Uploading-bytes",
- metric_namespace="s3_migrate",
- metric_value="$bytes",
- filter_pattern=logs.FilterPattern.literal(
- '[date, time, info, hs, p="--->Uploading", bytes, key]'))
- s3_migrate_log.add_metric_filter("Downloading-bytes",
- metric_name="Downloading-bytes",
- metric_namespace="s3_migrate",
- metric_value="$bytes",
- filter_pattern=logs.FilterPattern.literal(
- '[date, time, info, hs, p="--->Downloading", bytes, key]'))
- traffic_metric_Complete = cw.Metric(namespace="s3_migrate",
- metric_name="Completed-bytes",
- statistic="Sum",
- period=core.Duration.minutes(1))
- traffic_metric_Upload = cw.Metric(namespace="s3_migrate",
- metric_name="Uploading-bytes",
- statistic="Sum",
- period=core.Duration.minutes(1))
- traffic_metric_Download = cw.Metric(namespace="s3_migrate",
- metric_name="Downloading-bytes",
- statistic="Sum",
- period=core.Duration.minutes(1))
- s3_migrate_log.add_metric_filter("ERROR",
- metric_name="ERROR-Logs",
- metric_namespace="s3_migrate",
- metric_value="1",
- filter_pattern=logs.FilterPattern.literal(
- '"ERROR"'))
- s3_migrate_log.add_metric_filter("WARNING",
- metric_name="WARNING-Logs",
- metric_namespace="s3_migrate",
- metric_value="1",
- filter_pattern=logs.FilterPattern.literal(
- '"WARNING"'))
- log_metric_ERROR = cw.Metric(namespace="s3_migrate",
- metric_name="ERROR-Logs",
- statistic="Sum",
- period=core.Duration.minutes(1))
- log_metric_WARNING = cw.Metric(namespace="s3_migrate",
- metric_name="WARNING-Logs",
- statistic="Sum",
- period=core.Duration.minutes(1))
- board.add_widgets(cw.GraphWidget(title="S3-MIGRATION-TOTAL-TRAFFIC",
- left=[traffic_metric_Complete, traffic_metric_Upload,
- traffic_metric_Download],
- left_y_axis=cw.YAxisProps(label="Bytes/min", show_units=False)),
- cw.GraphWidget(title="ERROR/WARNING LOGS",
- left=[log_metric_ERROR],
- left_y_axis=cw.YAxisProps(label="Count", show_units=False),
- right=[log_metric_WARNING],
- right_y_axis=cw.YAxisProps(label="Count", show_units=False)),
- cw.GraphWidget(title="SQS-JOBS",
- left=[sqs_queue.metric_approximate_number_of_messages_visible(
- period=core.Duration.minutes(1)),
- sqs_queue.metric_approximate_number_of_messages_not_visible(
- period=core.Duration.minutes(1))]),
- cw.SingleValueWidget(title="RUNNING, WAITING & DEATH JOBS",
- metrics=[sqs_queue.metric_approximate_number_of_messages_not_visible(
- period=core.Duration.minutes(1)),
- sqs_queue.metric_approximate_number_of_messages_visible(
- period=core.Duration.minutes(1)),
- sqs_queue_DLQ.metric_approximate_number_of_messages_not_visible(
- period=core.Duration.minutes(1)),
- sqs_queue_DLQ.metric_approximate_number_of_messages_visible(
- period=core.Duration.minutes(1))],
- height=6)
- )
- board.add_widgets(cw.GraphWidget(title="EC2-AutoscalingGroup-TCP",
- left=[cwagent_net_tcp],
- left_y_axis=cw.YAxisProps(label="Count", show_units=False)),
- cw.GraphWidget(title="EC2-AutoscalingGroup-CPU/MEMORY",
- left=[ec2_metric_cpu_avg, cwagent_mem_avg],
- left_y_axis=cw.YAxisProps(max=100, min=0, label="%", show_units=False)),
- cw.GraphWidget(title="EC2-AutoscalingGroup-DISK",
- left=[cwagent_disk_avg],
- left_y_axis=cw.YAxisProps(max=100, min=0, label="%", show_units=False)),
- cw.SingleValueWidget(title="EC2-AutoscalingGroup-CAPACITY",
- metrics=[autoscaling_GroupDesiredCapacity,
- autoscaling_GroupInServiceInstances,
- autoscaling_GroupMinSize,
- autoscaling_GroupMaxSize],
- height=6)
- )
- board.add_widgets(cw.GraphWidget(title="EC2-NetworkOut",
- left=[ec2_metric_net_out],
- left_y_axis=cw.YAxisProps(label="Bytes/min", show_units=False)))
- # Autoscaling up when visible message > 100 in 5 mins
- worker_asg.scale_on_metric("scaleup", metric=sqs_queue.metric_approximate_number_of_messages_visible(),
- scaling_steps=[autoscaling.ScalingInterval(
- change=1,
- lower=100,
- upper=500
- ), autoscaling.ScalingInterval(
- change=2,
- lower=500
- ),
- autoscaling.ScalingInterval(
- change=0,
- upper=100,
- lower=0
- )],
- adjustment_type=autoscaling.AdjustmentType.CHANGE_IN_CAPACITY)
- # Alarm for queue empty and ec2 > 1
- # 消息队列空(没有Visible+Invisible),并且EC2不止一台,则告警,并设置EC2为1台
- # 这里还可以根据场景调整,如果Jobsender也用来做传输,则可以在这里设置没有任务的时候,Autoscaling Group为0
- metric_all_message = cw.MathExpression(
- expression="IF(((a+b) == 0) AND (c >1), 0, 1)", # a+b且c>1则设置为0,告警
- label="empty_queue_expression",
- using_metrics={
- "a": sqs_queue.metric_approximate_number_of_messages_visible(),
- "b": sqs_queue.metric_approximate_number_of_messages_not_visible(),
- "c": autoscaling_GroupInServiceInstances
- }
- )
- alarm_0 = cw.Alarm(self, "SQSempty",
- alarm_name="s3-migration-cluster-SQS queue empty and ec2 more than 1 in Cluster",
- metric=metric_all_message,
- threshold=0,
- comparison_operator=cw.ComparisonOperator.LESS_THAN_OR_EQUAL_TO_THRESHOLD,
- evaluation_periods=3,
- datapoints_to_alarm=3,
- treat_missing_data=cw.TreatMissingData.NOT_BREACHING
- )
- alarm_topic_empty = sns.Topic(self, "SQS queue empty and ec2 more than 1 in Cluster")
- # 这个告警可以作为批量传输完成后的通知,而且这样做可以只通知一次,而不会不停地通知
- alarm_topic_empty.add_subscription(subscription=sub.EmailSubscription(alarm_email))
- alarm_0.add_alarm_action(action.SnsAction(alarm_topic_empty))
- # If queue empty, set autoscale down to 1 EC2
- action_shutdown = autoscaling.StepScalingAction(self, "shutdown",
- auto_scaling_group=worker_asg,
- adjustment_type=autoscaling.AdjustmentType.EXACT_CAPACITY
- )
- action_shutdown.add_adjustment(adjustment=1, upper_bound=0)
- alarm_0.add_alarm_action(action.AutoScalingAction(action_shutdown))
- # While message in SQS-DLQ, alarm to sns
- alarm_DLQ = cw.Alarm(self, "SQS_DLQ",
- alarm_name="s3-migration-cluster-SQS DLQ more than 1 message-Cluster",
- metric=sqs_queue_DLQ.metric_approximate_number_of_messages_visible(),
- threshold=0,
- comparison_operator=cw.ComparisonOperator.GREATER_THAN_THRESHOLD,
- evaluation_periods=3,
- datapoints_to_alarm=3,
- treat_missing_data=cw.TreatMissingData.IGNORE)
- alarm_topic_DLQ = sns.Topic(self, "SQS DLQ more than 1 message-Cluster")
- alarm_topic_DLQ.add_subscription(subscription=sub.EmailSubscription(alarm_email))
- alarm_DLQ.add_alarm_action(action.SnsAction(alarm_topic_DLQ))
- # Output
- core.CfnOutput(self, "LogGroup", value=s3_migrate_log.log_group_name)
- core.CfnOutput(self, "Dashboard", value="CloudWatch Dashboard name s3_migrate_cluster")
- core.CfnOutput(self, "Alarm", value="CloudWatch SQS queue empty Alarm for cluster: " + alarm_email)
|