123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960 |
- def do_part_upload(args):
- """
- Upload a part of a MultiPartUpload
- Open the target file and read in a chunk. Since we can't pickle
- S3Connection or MultiPartUpload objects, we have to reconnect and lookup
- the MPU object with each part upload.
- :type args: tuple of (string, string, string, int, int, int)
- :param args: The actual arguments of this method. Due to lameness of
- multiprocessing, we have to extract these outside of the
- function definition.
- The arguments are: S3 Bucket name, MultiPartUpload id, file
- name, the part number, part offset, part size
- """
- # Multiprocessing args lameness
- bucket_name, mpu_id, fname, i, start, size, secure, max_tries, current_tries = args
- logger.debug("do_part_upload got args: %s" % (args,))
- # Connect to S3, get the MultiPartUpload
- s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat())
- s3.is_secure = secure
- bucket = s3.lookup(bucket_name)
- mpu = None
- for mp in bucket.list_multipart_uploads():
- if mp.id == mpu_id:
- mpu = mp
- break
- if mpu is None:
- raise Exception("Could not find MultiPartUpload %s" % mpu_id)
- # Read the chunk from the file
- fp = open(fname, 'rb')
- fp.seek(start)
- data = fp.read(size)
- fp.close()
- if not data:
- raise Exception("Unexpectedly tried to read an empty chunk")
- def progress(x,y):
- logger.debug("Part %d: %0.2f%%" % (i+1, 100.*x/y))
- try:
- # Do the upload
- t1 = time.time()
- mpu.upload_part_from_file(StringIO(data), i+1, cb=progress)
- # Print some timings
- t2 = time.time() - t1
- s = len(data)/1024./1024.
- logger.info("Uploaded part %s (%0.2fM) in %0.2fs at %0.2fMBps" % (i+1, s, t2, s/t2))
- except Exception as err:
- logger.debug("Retry request %d of max %d times" % (current_tries, max_tries))
- if (current_tries > max_tries):
- logger.error(err)
- else:
- time.sleep(3)
- current_tries += 1
- do_part_download(bucket_name, mpu_id, fname, i, start, size, secure, max_tries, current_tries)
|