__init__.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the 'License'). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the 'license' file accompanying this file. This file is
  10. # distributed on an 'AS IS' BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. import botocore
  14. import botocore.session
  15. from botocore.exceptions import WaiterError
  16. from s3transfer.manager import TransferManager
  17. from s3transfer.subscribers import BaseSubscriber
  18. from tests import FileCreator, random_bucket_name, unittest
  19. def recursive_delete(client, bucket_name):
  20. # Ensure the bucket exists before attempting to wipe it out
  21. exists_waiter = client.get_waiter('bucket_exists')
  22. exists_waiter.wait(Bucket=bucket_name)
  23. page = client.get_paginator('list_objects')
  24. # Use pages paired with batch delete_objects().
  25. for page in page.paginate(Bucket=bucket_name):
  26. keys = [{'Key': obj['Key']} for obj in page.get('Contents', [])]
  27. if keys:
  28. client.delete_objects(Bucket=bucket_name, Delete={'Objects': keys})
  29. for _ in range(5):
  30. try:
  31. client.delete_bucket(Bucket=bucket_name)
  32. break
  33. except client.exceptions.NoSuchBucket:
  34. exists_waiter.wait(Bucket=bucket_name)
  35. except Exception:
  36. # We can sometimes get exceptions when trying to
  37. # delete a bucket. We'll let the waiter make
  38. # the final call as to whether the bucket was able
  39. # to be deleted.
  40. not_exists_waiter = client.get_waiter('bucket_not_exists')
  41. try:
  42. not_exists_waiter.wait(Bucket=bucket_name)
  43. except botocore.exceptions.WaiterError:
  44. continue
  45. class BaseTransferManagerIntegTest(unittest.TestCase):
  46. """Tests for the high level s3transfer module."""
  47. @classmethod
  48. def setUpClass(cls):
  49. cls.region = 'us-west-2'
  50. cls.session = botocore.session.get_session()
  51. cls.client = cls.session.create_client('s3', cls.region)
  52. cls.bucket_name = random_bucket_name()
  53. cls.client.create_bucket(
  54. Bucket=cls.bucket_name,
  55. CreateBucketConfiguration={'LocationConstraint': cls.region},
  56. )
  57. def setUp(self):
  58. self.files = FileCreator()
  59. def tearDown(self):
  60. self.files.remove_all()
  61. @classmethod
  62. def tearDownClass(cls):
  63. recursive_delete(cls.client, cls.bucket_name)
  64. def delete_object(self, key):
  65. self.client.delete_object(Bucket=self.bucket_name, Key=key)
  66. def object_exists(self, key, extra_args=None):
  67. try:
  68. self.wait_object_exists(key, extra_args)
  69. return True
  70. except WaiterError:
  71. return False
  72. def object_not_exists(self, key, extra_args=None):
  73. if extra_args is None:
  74. extra_args = {}
  75. try:
  76. self.client.get_waiter('object_not_exists').wait(
  77. Bucket=self.bucket_name, Key=key, **extra_args
  78. )
  79. return True
  80. except WaiterError:
  81. return False
  82. def wait_object_exists(self, key, extra_args=None):
  83. if extra_args is None:
  84. extra_args = {}
  85. for _ in range(5):
  86. self.client.get_waiter('object_exists').wait(
  87. Bucket=self.bucket_name, Key=key, **extra_args
  88. )
  89. def create_transfer_manager(self, config=None):
  90. return TransferManager(self.client, config=config)
  91. def upload_file(self, filename, key, extra_args=None):
  92. transfer = self.create_transfer_manager()
  93. with open(filename, 'rb') as f:
  94. transfer.upload(f, self.bucket_name, key, extra_args)
  95. self.wait_object_exists(key, extra_args)
  96. self.addCleanup(self.delete_object, key)
  97. class WaitForTransferStart(BaseSubscriber):
  98. def __init__(self, bytes_transfer_started_event):
  99. self._bytes_transfer_started_event = bytes_transfer_started_event
  100. def on_progress(self, **kwargs):
  101. if not self._bytes_transfer_started_event.is_set():
  102. self._bytes_transfer_started_event.set()