PROJECT LONGBOW - Amazon EC2 Autoscaling 集群,支撑海量文件于海外和中国Amazon S3之间传输
Cluster & Serverless Version 0.98
Jobsender 扫描 Amazon S3 派发传输任务:
如果需要定时运行Jobsender任务,可以登录EC2设置定时任务(crontab -e 命令)运行s3_migration_cluster_jobsender.py。
预备阶段(可选):Jobsender 获取源和目的 Bucket List并比对差异,形成Job List。并定期进行。
Jobsender 发送 Job messages to SQS (每个 job 是Amazon S3上的一个文件),并记录到 DDB (仅做统计进度) Amazon S3 新增也可以直接触发Amazon SQS
Amazon EC2 or/and Lambda 从SQS获取Job. 每个EC2 instance获取多个Job,每个Lambda runtime获取一个Job
Amazon EC2 or/and Lambda 对每个Job创建多线程,每个线程各自获取文件分片,并上传到目标 S3
Job Sender 去List多个源和目的桶进行比对
Amazon S3新增文件,直接触发Amazon SQS
批量任务完成后,Job Sender 再次List比对
定时任务触发Job Sender进行比对
测试结果,对于大量的GB级文件,单机 c5.large(该测试设置 5 文件 x 30 线程)可达到跨境 800Mbps以上带宽吞吐。如文件是MB级,则可以设置单节点并发处理更多的文件。上图中可以看到 Autoscaling Group 在逐步增加EC2,多机吞吐叠加,9台 Autoscale 吞吐线性叠加达到900+MB/s (7.2Gbps)。传输 1.2TB 数据 (916个文件) 只用了1小时,从 us-east-1 到 cn-northwest-1。并且在传输完成时自动关闭了服务器,只留下了一台。
各种文件大小的用时统计:
请在 AWS CDK 部署前手工配置 System Manager Parameter Store 新增这个参数
名称:s3_migration_credentials
类型:SecureString
级别:Standard
KMS key source:My current account/alias/aws/ssm 或选择其他你已有的加密 KMS Key
这个 s3_migration_credentials 是用于访问跟EC2不在一个账号系统下的那个S3桶的访问密钥,在目标Account 的IAM user配置获取。配置示例:
{
"aws_access_key_id": "your_aws_access_key_id",
"aws_secret_access_key": "your_aws_secret_access_key",
"region": "cn-northwest-1"
}
配置 AWS CDK 中 app.py 你需要传输的源S3桶/目标S3桶信息,示例如下:
[{
"src_bucket": "your_global_bucket_1",
"src_prefix": "your_prefix",
"des_bucket": "your_china_bucket_1",
"des_prefix": "prefix_1"
}, {
"src_bucket": "your_global_bucket_2",
"src_prefix": "your_prefix",
"des_bucket": "your_china_bucket_2",
"des_prefix": "prefix_2"
}]
这些会被AWS CDK自动部署到 System Manager Parameter Store 的 s3_migration_bucket_para
配置告警通知邮件地址在 cdk_ec2stack.py
如果有需要可以修改默认的 ./cdk-cluster/code/s3_migration_cluster_config.ini 配置(例如JobType,或者并发线程数),CDK会自动部署代码和配置文件到新建的一个s3-migration-cluster-resourc-deploybucket桶。在部署完成后,也可以再次修改配置文件,重新上传S3同样的位置。新启动的EC2就会在Userdata中下载新的配置文件。
CDK 会自动化部署以下所有资源除了 1. 前置配置所要求手工配置的Key:
Amazon VPC(含2AZ,2个公有子网) 和 S3 Endpoint,
Amazon SQS Queue: s3_migrate_file_list
Amazon SQS Queue DLQ: s3_migrate_file_list-DLQ,
Amazon DynamoDB 表: s3_migrate_file_list,
Amazon EC2 JobSender Autoscaling Group with only 1 instance: t3.micro,
Amazon EC2 Workers Autoscaling Group capacity 1-10: c5.large 可以在 cdk_ec2_stack.py 中修改,
Amazon EC2 Autoscaling Policy
Amazon SSM Parameter Store: s3_migration_bucket_para 作为S3桶信息给Jobsender去扫描比对
Amazon EC2 所需要访问各种资源的 IAM Role
Amazon CloudWatch Dashboard 监控
Amazon CloudWatch Alarm 自动Email告警
Amazon S3 Bucket - s3-migration-cluster-resource-newbucket 新增内容桶,新增内容自动触发SQS,所有新存入的对象都会被传输
Amazon S3 Bucket - s3-migration-cluster-resourc-deploybucket 代码部署桶,这个桶是给部署的时候CDK上传应用程序代码的,每个EC2启动的时候都会从这个桶下载代码。如果要修改配置文件,可以直接在这个桶修改。
Amazon EC2 User Data 自动安装 CloudWatch Logs Agent 收集 EC2 初始化运行 User Data 时候的 Logs,以及收集 s3_migrate 程序运行产生的 Logs
Amazon EC2 User Data 自动启用 TCP BBR
Amazon EC2 User data 自动拉 github 上的程序和默认配置,并自动启动。建议把程序和配置放你自己的S3上面,让user data启动时拉取你修改后的配置,或者使用你自己打包的AMI启动EC2。
如果有需要可以修改 Amazon EC2 上的配置文件 s3_migration_config.ini 说明如下:
* JobType = PUT 或 GET (default: PUT)
决定了Worker把自己的IAM Role用来访问源还是访问目的S3, PUT表示EC2跟目标S3不在一个Account,GET表示EC2跟源S3不在一个Account
* Des_bucket_default/Des_prefix_default
是给Amazon S3新增文件触发Amazon SQS的场景,用来配置目标桶/前缀的。
对于Jobsender扫描S3并派发Job的场景,不需要配置这两项。即使配置了,程序看到SQS消息里面有就会使用消息里面的目标桶/前缀
* table_queue_name
访问的Amazon SQS和DynamoDB的表名,需与CloudFormation/CDK创建的ddb/sqs名称一致
* ssm_parameter_bucket
在AWS SSM ParameterStore 上保存的参数名,用于保存buckets的信息,需与CloudFormation/CDK创建的 parameter store 的名称一致
* ssm_parameter_credentials
在 SSM ParameterStore 上保存的另一个账户访问密钥的那个参数名,需与CloudFormation/CDK创建的 parameter store 的名称一致
* StorageClass = STANDARD|REDUCED_REDUNDANCY|STANDARD_IA|ONEZONE_IA|INTELLIGENT_TIERING|GLACIER|DEEP_ARCHIVE
选择目标存储的存储类型, default STANDARD
* ResumableThreshold (default 5MB)
单位MBytes,小于该值的文件,则开始传文件时不走Multipart Upload,不做断点续传,节省性能
* MaxRetry (default 20)
API Call在应用层面的最大重试次数
* MaxThread (default 20)
单文件同时working的Thread进程数量
* MaxParallelFile (default 5)
并行操作文件数量
* JobTimeout (default 3600)
单个文件传输超时时间,Seconds 秒
* LoggingLevel = WARNING | INFO | DEBUG (default: INFO)
* 不建议修改:ifVerifyMD5Twice, ChunkSize, CleanUnfinishedUpload, LocalProfileMode
* 隐藏参数 max_pool_connections=200 在 s3_migration_lib.py
Jobsender 启动之后会检查Amazon SQS 是否空,空则按照 Parameter Store 上所配置的 s3_migration_bucket_para 来获取桶信息,然后进行比对。如果非空,则说明前面还有任务没完成,将不派送新任务。
手工配置时,注意三个超时时间的配合: Amazon SQS, EC2 JobTimeout, Lambda,CDK 默认部署是SQS/EC2 JobTimeout为1小时
CDK 已经自动部署了 CloudWatch Dashbard:
Jobsender 比对S3 Bucket时候可以设置忽略某个文件,或某前缀/某后缀的一批文件,编辑 s3_migration_ignore_list.txt 增加你要忽略对象的 bucket/key,一个文件一行,可以使用通配符如 "*"或"?",例如:
your_src_bucket/your_exact_key.mp4
your_src_bucket/your_exact_key.*
your_src_bucket/your_*
*.jpg
*/readme.md
CDK 默认部署时启动了 Amazon EC2 服务器上的 TCP BBR: Congestion-Based Congestion Control,提高传输效率
Amazon Linux AMI 2017.09.1 Kernel 4.9.51 or later version supported TCP Bottleneck Bandwidth and RTT (BBR) .
当前目录下的CDK配置的是按PUT模式配置。改变为GET模式只需修改:
cdk_ec2_stack.py 里面这一段,即把src_bucket换成des_bucket,并且允许EC2读写现有S3:
# Allow EC2 access exist s3 bucket for GET mode: read and write access the destination buckets
# bucket_name = ''
# for b in bucket_para:
# if bucket_name != b['des_bucket']: # 如果列了多个相同的Bucket,就跳过
# bucket_name = b['des_bucket']
# s3exist_bucket = s3.Bucket.from_bucket_name(self,
# bucket_name,
# bucket_name=bucket_name)
# s3exist_bucket.grant_read_write(jobsender)
# s3exist_bucket.grant_read_write(worker_asg)
另外,注意 s3_migration_cluster_config.ini 设置为 JobType = GET
如果不启用S3版本控制,在大文件正在传输的过程中,如果此时原文件被新的同名文件替换,会导致复制的文件部分是老的文件,部分是新的文件,从而破坏了文件的完整性。以前做法是强调不要在传输过程中去替换源文件,或者只允许新增文件。
而对于无法避免会覆盖源文件的场景下,可以启用S3版本控制,则可以避免破坏文件完整性的情况。本项目已支持S3版本控制,需要配置:
以及设置以下的配置参数,分不同场景说明如下
JobsenderCompareVersionId(True/False):
默认 Flase。
True意味着:Jobsender 在对比S3桶的时候,对源桶获取对象列表时同时获取每个对象的 versionId,来跟目标桶比对。目标桶的 versionId 是在每个对象开始下载的时候保存在 DynamoDB 中。Jobsener 会从 DynamoDB 获取目标桶 versionId 列表。Jobsender发给SQS的Job会带上versionId。
UpdateVersionId(True/False):
默认 Flase。
True意味着:Worker 在开始下载源文件之前是否更新Job所记录的versionId。这功能主要是面向 Jobsender 并没有送 versionId 过来,但又需要启用 versionId 的场景。
GetObjectWithVersionId(True/False):
默认 Flase。
True意味着:Worker 在获取源文件的时候,是否带 versionId 去获取。如果不带 versionId 则获取当前最新文件。
对于Cluster版本,以上参数都在配置文件 s3_migration_config.ini 对于Serverless版本,以上参数分别在 Lambda jobsender 和 worker Python 文件的头部,内部参数定义的位置
S3新增文件触发的SQS Jobs:
UpdateVersionId = False
GetObjectWithVersionId = True
S3新增文件直接触发SQS的情况下,S3启用了版本控制功能,那么SQS消息里面是带 versionId 的,Worker 以这个versionId 去 GetObject。如果出现中断,恢复时仍以该 versionId 去再次获取。 如果你没有GetObjectVersion 权限,则需要把这两个开关设False,否则会因为没权限而获取文件失败 (AccessDenied)。
Jobsender比对S3发送SQS Jobs这里又分两种目的:
完整性:为了绝对保证文件复制过程中,即使被同名文件覆盖,也不至于文件半新半旧。
JobsenderCompareVersionId = False
UpdateVersionId = True
GetObjectWithVersionId = True
为了比对速度更快,所以在比对的时候不获取versionId,这样发送SQS的 versionId 是 'null' 在 Get object 之前获取真实 versionId,并写入 DynamoDB,完成传输之后对比DDB记录的 versionId 。在文件传输结束时,再次校验 DynamoDB 中保存的 versionId,如果因为发生中断重启,而刚好文件被替换,此时versionId会不一致,程序会重新执行传输。
完整性和一致性:为了对比现存源和目的S3的文件不仅仅Size一致,版本也要一致性。并绝对保证文件复制过程中,即使被同名文件覆盖,也不至于文件半新半旧。
JobsenderCompareVersionId = True
UpdateVersionId = False
GetObjectWithVersionId = True
Jobsender会比对 versionId,并且发送SQS带真实 versionId。此时已经有真实的 versionId了,就没必要启用worker UpdateVersionId了,节省时间和请求数。
对于觉得没必要比对文件 version,文件传输过程中也不会被替换,或者没有源S3的 GetObjectVersion 权限,则三个开关都设置False即可(默认情况)
所需内存 = 并发数 * ChunckSize 。小于50GB的文件,ChunckSize为5MB,大于50GB的文件,则ChunkSize会自动调整为约等于: 文件Size/10000。
例如如果平均要处理的文件 Size 是 500GB ,ChunckSize 会自动调整为50MB,并发设置是 5 File x 10 Concurrency/File = 50,所以需要的EC2或Lambda的可运行内存约为 50 x 50MB = 2.5GB。
如需要提高并发度,可以调整配置,但对应的EC2或Lambda内存也要相应增加。
CDK 默认配置的是EC2启动之后,自动通过 Userdata 脚本去yum安装 python,pip安装boto3, CloudwatchAgent,以及下载S3上面的代码和配置。
另外,如果你在中国区部署EC2,下载boto3, CWAgent的速度会慢,建议放到自己的S3上面,或不用userdata(包括jobsender和worker),而是自己手工部署,然后打包成AMI给EC2启动。
不要在开始数据复制之后修改Chunksize。
删除资源则 cdk destroy 。
另外 DynamoDB、CloudWatch Log Group 、自动新建的 S3 bucket 需要手工删除
This library is licensed under the MIT-0 License. See the LICENSE file.