12345678910111213141516171819202122 |
- def check_access(self):
- """
- 检查目标 s3 是否能写入
- :return:
- """
- bucket_flag = True
- try:
- res = self.s3_client.list_buckets()
- for bucket in res["Buckets"]:
- if bucket["Name"] == DESTINATION_BUCKET:
- bucket_flag = False
- if bucket_flag:
- self.s3_client.create_bucket(Bucket=DESTINATION_BUCKET)
- self.s3_client.put_object(
- Bucket=DESTINATION_BUCKET,
- Key="access_test",
- Body="This is a access test"
- )
- except Exception as e:
- print (e.message)
|