__init__.py 28 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875
  1. # Copyright 2016 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  2. #
  3. # Licensed under the Apache License, Version 2.0 (the "License"). You
  4. # may not use this file except in compliance with the License. A copy of
  5. # the License is located at
  6. #
  7. # http://aws.amazon.com/apache2.0/
  8. #
  9. # or in the "license" file accompanying this file. This file is
  10. # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF
  11. # ANY KIND, either express or implied. See the License for the specific
  12. # language governing permissions and limitations under the License.
  13. """Abstractions over S3's upload/download operations.
  14. This module provides high level abstractions for efficient
  15. uploads/downloads. It handles several things for the user:
  16. * Automatically switching to multipart transfers when
  17. a file is over a specific size threshold
  18. * Uploading/downloading a file in parallel
  19. * Throttling based on max bandwidth
  20. * Progress callbacks to monitor transfers
  21. * Retries. While botocore handles retries for streaming uploads,
  22. it is not possible for it to handle retries for streaming
  23. downloads. This module handles retries for both cases so
  24. you don't need to implement any retry logic yourself.
  25. This module has a reasonable set of defaults. It also allows you
  26. to configure many aspects of the transfer process including:
  27. * Multipart threshold size
  28. * Max parallel downloads
  29. * Max bandwidth
  30. * Socket timeouts
  31. * Retry amounts
  32. There is no support for s3->s3 multipart copies at this
  33. time.
  34. .. _ref_s3transfer_usage:
  35. Usage
  36. =====
  37. The simplest way to use this module is:
  38. .. code-block:: python
  39. client = boto3.client('s3', 'us-west-2')
  40. transfer = S3Transfer(client)
  41. # Upload /tmp/myfile to s3://bucket/key
  42. transfer.upload_file('/tmp/myfile', 'bucket', 'key')
  43. # Download s3://bucket/key to /tmp/myfile
  44. transfer.download_file('bucket', 'key', '/tmp/myfile')
  45. The ``upload_file`` and ``download_file`` methods also accept
  46. ``**kwargs``, which will be forwarded through to the corresponding
  47. client operation. Here are a few examples using ``upload_file``::
  48. # Making the object public
  49. transfer.upload_file('/tmp/myfile', 'bucket', 'key',
  50. extra_args={'ACL': 'public-read'})
  51. # Setting metadata
  52. transfer.upload_file('/tmp/myfile', 'bucket', 'key',
  53. extra_args={'Metadata': {'a': 'b', 'c': 'd'}})
  54. # Setting content type
  55. transfer.upload_file('/tmp/myfile.json', 'bucket', 'key',
  56. extra_args={'ContentType': "application/json"})
  57. The ``S3Transfer`` class also supports progress callbacks so you can
  58. provide transfer progress to users. Both the ``upload_file`` and
  59. ``download_file`` methods take an optional ``callback`` parameter.
  60. Here's an example of how to print a simple progress percentage
  61. to the user:
  62. .. code-block:: python
  63. class ProgressPercentage(object):
  64. def __init__(self, filename):
  65. self._filename = filename
  66. self._size = float(os.path.getsize(filename))
  67. self._seen_so_far = 0
  68. self._lock = threading.Lock()
  69. def __call__(self, bytes_amount):
  70. # To simplify we'll assume this is hooked up
  71. # to a single filename.
  72. with self._lock:
  73. self._seen_so_far += bytes_amount
  74. percentage = (self._seen_so_far / self._size) * 100
  75. sys.stdout.write(
  76. "\r%s %s / %s (%.2f%%)" % (self._filename, self._seen_so_far,
  77. self._size, percentage))
  78. sys.stdout.flush()
  79. transfer = S3Transfer(boto3.client('s3', 'us-west-2'))
  80. # Upload /tmp/myfile to s3://bucket/key and print upload progress.
  81. transfer.upload_file('/tmp/myfile', 'bucket', 'key',
  82. callback=ProgressPercentage('/tmp/myfile'))
  83. You can also provide a TransferConfig object to the S3Transfer
  84. object that gives you more fine grained control over the
  85. transfer. For example:
  86. .. code-block:: python
  87. client = boto3.client('s3', 'us-west-2')
  88. config = TransferConfig(
  89. multipart_threshold=8 * 1024 * 1024,
  90. max_concurrency=10,
  91. num_download_attempts=10,
  92. )
  93. transfer = S3Transfer(client, config)
  94. transfer.upload_file('/tmp/foo', 'bucket', 'key')
  95. """
  96. import concurrent.futures
  97. import functools
  98. import logging
  99. import math
  100. import os
  101. import queue
  102. import random
  103. import socket
  104. import string
  105. import threading
  106. from botocore.compat import six # noqa: F401
  107. from botocore.exceptions import IncompleteReadError
  108. from botocore.vendored.requests.packages.urllib3.exceptions import (
  109. ReadTimeoutError,
  110. )
  111. import s3transfer.compat
  112. from s3transfer.exceptions import RetriesExceededError, S3UploadFailedError
  113. __author__ = 'Amazon Web Services'
  114. __version__ = '0.6.0'
  115. class NullHandler(logging.Handler):
  116. def emit(self, record):
  117. pass
  118. logger = logging.getLogger(__name__)
  119. logger.addHandler(NullHandler())
  120. MB = 1024 * 1024
  121. SHUTDOWN_SENTINEL = object()
  122. def random_file_extension(num_digits=8):
  123. return ''.join(random.choice(string.hexdigits) for _ in range(num_digits))
  124. def disable_upload_callbacks(request, operation_name, **kwargs):
  125. if operation_name in ['PutObject', 'UploadPart'] and hasattr(
  126. request.body, 'disable_callback'
  127. ):
  128. request.body.disable_callback()
  129. def enable_upload_callbacks(request, operation_name, **kwargs):
  130. if operation_name in ['PutObject', 'UploadPart'] and hasattr(
  131. request.body, 'enable_callback'
  132. ):
  133. request.body.enable_callback()
  134. class QueueShutdownError(Exception):
  135. pass
  136. class ReadFileChunk:
  137. def __init__(
  138. self,
  139. fileobj,
  140. start_byte,
  141. chunk_size,
  142. full_file_size,
  143. callback=None,
  144. enable_callback=True,
  145. ):
  146. """
  147. Given a file object shown below:
  148. |___________________________________________________|
  149. 0 | | full_file_size
  150. |----chunk_size---|
  151. start_byte
  152. :type fileobj: file
  153. :param fileobj: File like object
  154. :type start_byte: int
  155. :param start_byte: The first byte from which to start reading.
  156. :type chunk_size: int
  157. :param chunk_size: The max chunk size to read. Trying to read
  158. pass the end of the chunk size will behave like you've
  159. reached the end of the file.
  160. :type full_file_size: int
  161. :param full_file_size: The entire content length associated
  162. with ``fileobj``.
  163. :type callback: function(amount_read)
  164. :param callback: Called whenever data is read from this object.
  165. """
  166. self._fileobj = fileobj
  167. self._start_byte = start_byte
  168. self._size = self._calculate_file_size(
  169. self._fileobj,
  170. requested_size=chunk_size,
  171. start_byte=start_byte,
  172. actual_file_size=full_file_size,
  173. )
  174. self._fileobj.seek(self._start_byte)
  175. self._amount_read = 0
  176. self._callback = callback
  177. self._callback_enabled = enable_callback
  178. @classmethod
  179. def from_filename(
  180. cls,
  181. filename,
  182. start_byte,
  183. chunk_size,
  184. callback=None,
  185. enable_callback=True,
  186. ):
  187. """Convenience factory function to create from a filename.
  188. :type start_byte: int
  189. :param start_byte: The first byte from which to start reading.
  190. :type chunk_size: int
  191. :param chunk_size: The max chunk size to read. Trying to read
  192. pass the end of the chunk size will behave like you've
  193. reached the end of the file.
  194. :type full_file_size: int
  195. :param full_file_size: The entire content length associated
  196. with ``fileobj``.
  197. :type callback: function(amount_read)
  198. :param callback: Called whenever data is read from this object.
  199. :type enable_callback: bool
  200. :param enable_callback: Indicate whether to invoke callback
  201. during read() calls.
  202. :rtype: ``ReadFileChunk``
  203. :return: A new instance of ``ReadFileChunk``
  204. """
  205. f = open(filename, 'rb')
  206. file_size = os.fstat(f.fileno()).st_size
  207. return cls(
  208. f, start_byte, chunk_size, file_size, callback, enable_callback
  209. )
  210. def _calculate_file_size(
  211. self, fileobj, requested_size, start_byte, actual_file_size
  212. ):
  213. max_chunk_size = actual_file_size - start_byte
  214. return min(max_chunk_size, requested_size)
  215. def read(self, amount=None):
  216. if amount is None:
  217. amount_to_read = self._size - self._amount_read
  218. else:
  219. amount_to_read = min(self._size - self._amount_read, amount)
  220. data = self._fileobj.read(amount_to_read)
  221. self._amount_read += len(data)
  222. if self._callback is not None and self._callback_enabled:
  223. self._callback(len(data))
  224. return data
  225. def enable_callback(self):
  226. self._callback_enabled = True
  227. def disable_callback(self):
  228. self._callback_enabled = False
  229. def seek(self, where):
  230. self._fileobj.seek(self._start_byte + where)
  231. if self._callback is not None and self._callback_enabled:
  232. # To also rewind the callback() for an accurate progress report
  233. self._callback(where - self._amount_read)
  234. self._amount_read = where
  235. def close(self):
  236. self._fileobj.close()
  237. def tell(self):
  238. return self._amount_read
  239. def __len__(self):
  240. # __len__ is defined because requests will try to determine the length
  241. # of the stream to set a content length. In the normal case
  242. # of the file it will just stat the file, but we need to change that
  243. # behavior. By providing a __len__, requests will use that instead
  244. # of stat'ing the file.
  245. return self._size
  246. def __enter__(self):
  247. return self
  248. def __exit__(self, *args, **kwargs):
  249. self.close()
  250. def __iter__(self):
  251. # This is a workaround for http://bugs.python.org/issue17575
  252. # Basically httplib will try to iterate over the contents, even
  253. # if its a file like object. This wasn't noticed because we've
  254. # already exhausted the stream so iterating over the file immediately
  255. # stops, which is what we're simulating here.
  256. return iter([])
  257. class StreamReaderProgress:
  258. """Wrapper for a read only stream that adds progress callbacks."""
  259. def __init__(self, stream, callback=None):
  260. self._stream = stream
  261. self._callback = callback
  262. def read(self, *args, **kwargs):
  263. value = self._stream.read(*args, **kwargs)
  264. if self._callback is not None:
  265. self._callback(len(value))
  266. return value
  267. class OSUtils:
  268. def get_file_size(self, filename):
  269. return os.path.getsize(filename)
  270. def open_file_chunk_reader(self, filename, start_byte, size, callback):
  271. return ReadFileChunk.from_filename(
  272. filename, start_byte, size, callback, enable_callback=False
  273. )
  274. def open(self, filename, mode):
  275. return open(filename, mode)
  276. def remove_file(self, filename):
  277. """Remove a file, noop if file does not exist."""
  278. # Unlike os.remove, if the file does not exist,
  279. # then this method does nothing.
  280. try:
  281. os.remove(filename)
  282. except OSError:
  283. pass
  284. def rename_file(self, current_filename, new_filename):
  285. s3transfer.compat.rename_file(current_filename, new_filename)
  286. class MultipartUploader:
  287. # These are the extra_args that need to be forwarded onto
  288. # subsequent upload_parts.
  289. UPLOAD_PART_ARGS = [
  290. 'SSECustomerKey',
  291. 'SSECustomerAlgorithm',
  292. 'SSECustomerKeyMD5',
  293. 'RequestPayer',
  294. ]
  295. def __init__(
  296. self,
  297. client,
  298. config,
  299. osutil,
  300. executor_cls=concurrent.futures.ThreadPoolExecutor,
  301. ):
  302. self._client = client
  303. self._config = config
  304. self._os = osutil
  305. self._executor_cls = executor_cls
  306. def _extra_upload_part_args(self, extra_args):
  307. # Only the args in UPLOAD_PART_ARGS actually need to be passed
  308. # onto the upload_part calls.
  309. upload_parts_args = {}
  310. for key, value in extra_args.items():
  311. if key in self.UPLOAD_PART_ARGS:
  312. upload_parts_args[key] = value
  313. return upload_parts_args
  314. def upload_file(self, filename, bucket, key, callback, extra_args):
  315. response = self._client.create_multipart_upload(
  316. Bucket=bucket, Key=key, **extra_args
  317. )
  318. upload_id = response['UploadId']
  319. try:
  320. parts = self._upload_parts(
  321. upload_id, filename, bucket, key, callback, extra_args
  322. )
  323. except Exception as e:
  324. logger.debug(
  325. "Exception raised while uploading parts, "
  326. "aborting multipart upload.",
  327. exc_info=True,
  328. )
  329. self._client.abort_multipart_upload(
  330. Bucket=bucket, Key=key, UploadId=upload_id
  331. )
  332. raise S3UploadFailedError(
  333. "Failed to upload {} to {}: {}".format(
  334. filename, '/'.join([bucket, key]), e
  335. )
  336. )
  337. self._client.complete_multipart_upload(
  338. Bucket=bucket,
  339. Key=key,
  340. UploadId=upload_id,
  341. MultipartUpload={'Parts': parts},
  342. )
  343. def _upload_parts(
  344. self, upload_id, filename, bucket, key, callback, extra_args
  345. ):
  346. upload_parts_extra_args = self._extra_upload_part_args(extra_args)
  347. parts = []
  348. part_size = self._config.multipart_chunksize
  349. num_parts = int(
  350. math.ceil(self._os.get_file_size(filename) / float(part_size))
  351. )
  352. max_workers = self._config.max_concurrency
  353. with self._executor_cls(max_workers=max_workers) as executor:
  354. upload_partial = functools.partial(
  355. self._upload_one_part,
  356. filename,
  357. bucket,
  358. key,
  359. upload_id,
  360. part_size,
  361. upload_parts_extra_args,
  362. callback,
  363. )
  364. for part in executor.map(upload_partial, range(1, num_parts + 1)):
  365. parts.append(part)
  366. return parts
  367. def _upload_one_part(
  368. self,
  369. filename,
  370. bucket,
  371. key,
  372. upload_id,
  373. part_size,
  374. extra_args,
  375. callback,
  376. part_number,
  377. ):
  378. open_chunk_reader = self._os.open_file_chunk_reader
  379. with open_chunk_reader(
  380. filename, part_size * (part_number - 1), part_size, callback
  381. ) as body:
  382. response = self._client.upload_part(
  383. Bucket=bucket,
  384. Key=key,
  385. UploadId=upload_id,
  386. PartNumber=part_number,
  387. Body=body,
  388. **extra_args,
  389. )
  390. etag = response['ETag']
  391. return {'ETag': etag, 'PartNumber': part_number}
  392. class ShutdownQueue(queue.Queue):
  393. """A queue implementation that can be shutdown.
  394. Shutting down a queue means that this class adds a
  395. trigger_shutdown method that will trigger all subsequent
  396. calls to put() to fail with a ``QueueShutdownError``.
  397. It purposefully deviates from queue.Queue, and is *not* meant
  398. to be a drop in replacement for ``queue.Queue``.
  399. """
  400. def _init(self, maxsize):
  401. self._shutdown = False
  402. self._shutdown_lock = threading.Lock()
  403. # queue.Queue is an old style class so we don't use super().
  404. return queue.Queue._init(self, maxsize)
  405. def trigger_shutdown(self):
  406. with self._shutdown_lock:
  407. self._shutdown = True
  408. logger.debug("The IO queue is now shutdown.")
  409. def put(self, item):
  410. # Note: this is not sufficient, it's still possible to deadlock!
  411. # Need to hook into the condition vars used by this class.
  412. with self._shutdown_lock:
  413. if self._shutdown:
  414. raise QueueShutdownError(
  415. "Cannot put item to queue when " "queue has been shutdown."
  416. )
  417. return queue.Queue.put(self, item)
  418. class MultipartDownloader:
  419. def __init__(
  420. self,
  421. client,
  422. config,
  423. osutil,
  424. executor_cls=concurrent.futures.ThreadPoolExecutor,
  425. ):
  426. self._client = client
  427. self._config = config
  428. self._os = osutil
  429. self._executor_cls = executor_cls
  430. self._ioqueue = ShutdownQueue(self._config.max_io_queue)
  431. def download_file(
  432. self, bucket, key, filename, object_size, extra_args, callback=None
  433. ):
  434. with self._executor_cls(max_workers=2) as controller:
  435. # 1 thread for the future that manages the uploading of files
  436. # 1 thread for the future that manages IO writes.
  437. download_parts_handler = functools.partial(
  438. self._download_file_as_future,
  439. bucket,
  440. key,
  441. filename,
  442. object_size,
  443. callback,
  444. )
  445. parts_future = controller.submit(download_parts_handler)
  446. io_writes_handler = functools.partial(
  447. self._perform_io_writes, filename
  448. )
  449. io_future = controller.submit(io_writes_handler)
  450. results = concurrent.futures.wait(
  451. [parts_future, io_future],
  452. return_when=concurrent.futures.FIRST_EXCEPTION,
  453. )
  454. self._process_future_results(results)
  455. def _process_future_results(self, futures):
  456. finished, unfinished = futures
  457. for future in finished:
  458. future.result()
  459. def _download_file_as_future(
  460. self, bucket, key, filename, object_size, callback
  461. ):
  462. part_size = self._config.multipart_chunksize
  463. num_parts = int(math.ceil(object_size / float(part_size)))
  464. max_workers = self._config.max_concurrency
  465. download_partial = functools.partial(
  466. self._download_range,
  467. bucket,
  468. key,
  469. filename,
  470. part_size,
  471. num_parts,
  472. callback,
  473. )
  474. try:
  475. with self._executor_cls(max_workers=max_workers) as executor:
  476. list(executor.map(download_partial, range(num_parts)))
  477. finally:
  478. self._ioqueue.put(SHUTDOWN_SENTINEL)
  479. def _calculate_range_param(self, part_size, part_index, num_parts):
  480. start_range = part_index * part_size
  481. if part_index == num_parts - 1:
  482. end_range = ''
  483. else:
  484. end_range = start_range + part_size - 1
  485. range_param = f'bytes={start_range}-{end_range}'
  486. return range_param
  487. def _download_range(
  488. self, bucket, key, filename, part_size, num_parts, callback, part_index
  489. ):
  490. try:
  491. range_param = self._calculate_range_param(
  492. part_size, part_index, num_parts
  493. )
  494. max_attempts = self._config.num_download_attempts
  495. last_exception = None
  496. for i in range(max_attempts):
  497. try:
  498. logger.debug("Making get_object call.")
  499. response = self._client.get_object(
  500. Bucket=bucket, Key=key, Range=range_param
  501. )
  502. streaming_body = StreamReaderProgress(
  503. response['Body'], callback
  504. )
  505. buffer_size = 1024 * 16
  506. current_index = part_size * part_index
  507. for chunk in iter(
  508. lambda: streaming_body.read(buffer_size), b''
  509. ):
  510. self._ioqueue.put((current_index, chunk))
  511. current_index += len(chunk)
  512. return
  513. except (
  514. socket.timeout,
  515. OSError,
  516. ReadTimeoutError,
  517. IncompleteReadError,
  518. ) as e:
  519. logger.debug(
  520. "Retrying exception caught (%s), "
  521. "retrying request, (attempt %s / %s)",
  522. e,
  523. i,
  524. max_attempts,
  525. exc_info=True,
  526. )
  527. last_exception = e
  528. continue
  529. raise RetriesExceededError(last_exception)
  530. finally:
  531. logger.debug("EXITING _download_range for part: %s", part_index)
  532. def _perform_io_writes(self, filename):
  533. with self._os.open(filename, 'wb') as f:
  534. while True:
  535. task = self._ioqueue.get()
  536. if task is SHUTDOWN_SENTINEL:
  537. logger.debug(
  538. "Shutdown sentinel received in IO handler, "
  539. "shutting down IO handler."
  540. )
  541. return
  542. else:
  543. try:
  544. offset, data = task
  545. f.seek(offset)
  546. f.write(data)
  547. except Exception as e:
  548. logger.debug(
  549. "Caught exception in IO thread: %s",
  550. e,
  551. exc_info=True,
  552. )
  553. self._ioqueue.trigger_shutdown()
  554. raise
  555. class TransferConfig:
  556. def __init__(
  557. self,
  558. multipart_threshold=8 * MB,
  559. max_concurrency=10,
  560. multipart_chunksize=8 * MB,
  561. num_download_attempts=5,
  562. max_io_queue=100,
  563. ):
  564. self.multipart_threshold = multipart_threshold
  565. self.max_concurrency = max_concurrency
  566. self.multipart_chunksize = multipart_chunksize
  567. self.num_download_attempts = num_download_attempts
  568. self.max_io_queue = max_io_queue
  569. class S3Transfer:
  570. ALLOWED_DOWNLOAD_ARGS = [
  571. 'VersionId',
  572. 'SSECustomerAlgorithm',
  573. 'SSECustomerKey',
  574. 'SSECustomerKeyMD5',
  575. 'RequestPayer',
  576. ]
  577. ALLOWED_UPLOAD_ARGS = [
  578. 'ACL',
  579. 'CacheControl',
  580. 'ContentDisposition',
  581. 'ContentEncoding',
  582. 'ContentLanguage',
  583. 'ContentType',
  584. 'Expires',
  585. 'GrantFullControl',
  586. 'GrantRead',
  587. 'GrantReadACP',
  588. 'GrantWriteACL',
  589. 'Metadata',
  590. 'RequestPayer',
  591. 'ServerSideEncryption',
  592. 'StorageClass',
  593. 'SSECustomerAlgorithm',
  594. 'SSECustomerKey',
  595. 'SSECustomerKeyMD5',
  596. 'SSEKMSKeyId',
  597. 'SSEKMSEncryptionContext',
  598. 'Tagging',
  599. ]
  600. def __init__(self, client, config=None, osutil=None):
  601. self._client = client
  602. if config is None:
  603. config = TransferConfig()
  604. self._config = config
  605. if osutil is None:
  606. osutil = OSUtils()
  607. self._osutil = osutil
  608. def upload_file(
  609. self, filename, bucket, key, callback=None, extra_args=None
  610. ):
  611. """Upload a file to an S3 object.
  612. Variants have also been injected into S3 client, Bucket and Object.
  613. You don't have to use S3Transfer.upload_file() directly.
  614. """
  615. if extra_args is None:
  616. extra_args = {}
  617. self._validate_all_known_args(extra_args, self.ALLOWED_UPLOAD_ARGS)
  618. events = self._client.meta.events
  619. events.register_first(
  620. 'request-created.s3',
  621. disable_upload_callbacks,
  622. unique_id='s3upload-callback-disable',
  623. )
  624. events.register_last(
  625. 'request-created.s3',
  626. enable_upload_callbacks,
  627. unique_id='s3upload-callback-enable',
  628. )
  629. if (
  630. self._osutil.get_file_size(filename)
  631. >= self._config.multipart_threshold
  632. ):
  633. self._multipart_upload(filename, bucket, key, callback, extra_args)
  634. else:
  635. self._put_object(filename, bucket, key, callback, extra_args)
  636. def _put_object(self, filename, bucket, key, callback, extra_args):
  637. # We're using open_file_chunk_reader so we can take advantage of the
  638. # progress callback functionality.
  639. open_chunk_reader = self._osutil.open_file_chunk_reader
  640. with open_chunk_reader(
  641. filename,
  642. 0,
  643. self._osutil.get_file_size(filename),
  644. callback=callback,
  645. ) as body:
  646. self._client.put_object(
  647. Bucket=bucket, Key=key, Body=body, **extra_args
  648. )
  649. def download_file(
  650. self, bucket, key, filename, extra_args=None, callback=None
  651. ):
  652. """Download an S3 object to a file.
  653. Variants have also been injected into S3 client, Bucket and Object.
  654. You don't have to use S3Transfer.download_file() directly.
  655. """
  656. # This method will issue a ``head_object`` request to determine
  657. # the size of the S3 object. This is used to determine if the
  658. # object is downloaded in parallel.
  659. if extra_args is None:
  660. extra_args = {}
  661. self._validate_all_known_args(extra_args, self.ALLOWED_DOWNLOAD_ARGS)
  662. object_size = self._object_size(bucket, key, extra_args)
  663. temp_filename = filename + os.extsep + random_file_extension()
  664. try:
  665. self._download_file(
  666. bucket, key, temp_filename, object_size, extra_args, callback
  667. )
  668. except Exception:
  669. logger.debug(
  670. "Exception caught in download_file, removing partial "
  671. "file: %s",
  672. temp_filename,
  673. exc_info=True,
  674. )
  675. self._osutil.remove_file(temp_filename)
  676. raise
  677. else:
  678. self._osutil.rename_file(temp_filename, filename)
  679. def _download_file(
  680. self, bucket, key, filename, object_size, extra_args, callback
  681. ):
  682. if object_size >= self._config.multipart_threshold:
  683. self._ranged_download(
  684. bucket, key, filename, object_size, extra_args, callback
  685. )
  686. else:
  687. self._get_object(bucket, key, filename, extra_args, callback)
  688. def _validate_all_known_args(self, actual, allowed):
  689. for kwarg in actual:
  690. if kwarg not in allowed:
  691. raise ValueError(
  692. "Invalid extra_args key '%s', "
  693. "must be one of: %s" % (kwarg, ', '.join(allowed))
  694. )
  695. def _ranged_download(
  696. self, bucket, key, filename, object_size, extra_args, callback
  697. ):
  698. downloader = MultipartDownloader(
  699. self._client, self._config, self._osutil
  700. )
  701. downloader.download_file(
  702. bucket, key, filename, object_size, extra_args, callback
  703. )
  704. def _get_object(self, bucket, key, filename, extra_args, callback):
  705. # precondition: num_download_attempts > 0
  706. max_attempts = self._config.num_download_attempts
  707. last_exception = None
  708. for i in range(max_attempts):
  709. try:
  710. return self._do_get_object(
  711. bucket, key, filename, extra_args, callback
  712. )
  713. except (
  714. socket.timeout,
  715. OSError,
  716. ReadTimeoutError,
  717. IncompleteReadError,
  718. ) as e:
  719. # TODO: we need a way to reset the callback if the
  720. # download failed.
  721. logger.debug(
  722. "Retrying exception caught (%s), "
  723. "retrying request, (attempt %s / %s)",
  724. e,
  725. i,
  726. max_attempts,
  727. exc_info=True,
  728. )
  729. last_exception = e
  730. continue
  731. raise RetriesExceededError(last_exception)
  732. def _do_get_object(self, bucket, key, filename, extra_args, callback):
  733. response = self._client.get_object(
  734. Bucket=bucket, Key=key, **extra_args
  735. )
  736. streaming_body = StreamReaderProgress(response['Body'], callback)
  737. with self._osutil.open(filename, 'wb') as f:
  738. for chunk in iter(lambda: streaming_body.read(8192), b''):
  739. f.write(chunk)
  740. def _object_size(self, bucket, key, extra_args):
  741. return self._client.head_object(Bucket=bucket, Key=key, **extra_args)[
  742. 'ContentLength'
  743. ]
  744. def _multipart_upload(self, filename, bucket, key, callback, extra_args):
  745. uploader = MultipartUploader(self._client, self._config, self._osutil)
  746. uploader.upload_file(filename, bucket, key, callback, extra_args)