multipart_upload_2.py 599 B

12345678910111213141516171819202122
  1. def check_access(self):
  2. """
  3. 检查目标 s3 是否能写入
  4. :return:
  5. """
  6. bucket_flag = True
  7. try:
  8. res = self.s3_client.list_buckets()
  9. for bucket in res["Buckets"]:
  10. if bucket["Name"] == DESTINATION_BUCKET:
  11. bucket_flag = False
  12. if bucket_flag:
  13. self.s3_client.create_bucket(Bucket=DESTINATION_BUCKET)
  14. self.s3_client.put_object(
  15. Bucket=DESTINATION_BUCKET,
  16. Key="access_test",
  17. Body="This is a access test"
  18. )
  19. except Exception as e:
  20. print (e.message)