test_copy.py 2.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. from s3transfer.manager import TransferConfig
  14. from tests import RecordingSubscriber
  15. from tests.integration import BaseTransferManagerIntegTest
  16. class TestCopy(BaseTransferManagerIntegTest):
  17. def setUp(self):
  18. super().setUp()
  19. self.multipart_threshold = 5 * 1024 * 1024
  20. self.config = TransferConfig(
  21. multipart_threshold=self.multipart_threshold
  22. )
  23. def test_copy_below_threshold(self):
  24. transfer_manager = self.create_transfer_manager(self.config)
  25. key = '1mb.txt'
  26. new_key = '1mb-copy.txt'
  27. filename = self.files.create_file_with_size(key, filesize=1024 * 1024)
  28. self.upload_file(filename, key)
  29. future = transfer_manager.copy(
  30. copy_source={'Bucket': self.bucket_name, 'Key': key},
  31. bucket=self.bucket_name,
  32. key=new_key,
  33. )
  34. future.result()
  35. self.assertTrue(self.object_exists(new_key))
  36. def test_copy_above_threshold(self):
  37. transfer_manager = self.create_transfer_manager(self.config)
  38. key = '20mb.txt'
  39. new_key = '20mb-copy.txt'
  40. filename = self.files.create_file_with_size(
  41. key, filesize=20 * 1024 * 1024
  42. )
  43. self.upload_file(filename, key)
  44. future = transfer_manager.copy(
  45. copy_source={'Bucket': self.bucket_name, 'Key': key},
  46. bucket=self.bucket_name,
  47. key=new_key,
  48. )
  49. future.result()
  50. self.assertTrue(self.object_exists(new_key))
  51. def test_progress_subscribers_on_copy(self):
  52. subscriber = RecordingSubscriber()
  53. transfer_manager = self.create_transfer_manager(self.config)
  54. key = '20mb.txt'
  55. new_key = '20mb-copy.txt'
  56. filename = self.files.create_file_with_size(
  57. key, filesize=20 * 1024 * 1024
  58. )
  59. self.upload_file(filename, key)
  60. future = transfer_manager.copy(
  61. copy_source={'Bucket': self.bucket_name, 'Key': key},
  62. bucket=self.bucket_name,
  63. key=new_key,
  64. subscribers=[subscriber],
  65. )
  66. future.result()
  67. # The callback should have been called enough times such that
  68. # the total amount of bytes we've seen (via the "amount"
  69. # arg to the callback function) should be the size
  70. # of the file we uploaded.
  71. self.assertEqual(subscriber.calculate_bytes_seen(), 20 * 1024 * 1024)