def do_part_upload(args): """ Upload a part of a MultiPartUpload Open the target file and read in a chunk. Since we can't pickle S3Connection or MultiPartUpload objects, we have to reconnect and lookup the MPU object with each part upload. :type args: tuple of (string, string, string, int, int, int) :param args: The actual arguments of this method. Due to lameness of multiprocessing, we have to extract these outside of the function definition. The arguments are: S3 Bucket name, MultiPartUpload id, file name, the part number, part offset, part size """ # Multiprocessing args lameness bucket_name, mpu_id, fname, i, start, size, secure, max_tries, current_tries = args logger.debug("do_part_upload got args: %s" % (args,)) # Connect to S3, get the MultiPartUpload s3 = boto.connect_s3(calling_format=OrdinaryCallingFormat()) s3.is_secure = secure bucket = s3.lookup(bucket_name) mpu = None for mp in bucket.list_multipart_uploads(): if mp.id == mpu_id: mpu = mp break if mpu is None: raise Exception("Could not find MultiPartUpload %s" % mpu_id) # Read the chunk from the file fp = open(fname, 'rb') fp.seek(start) data = fp.read(size) fp.close() if not data: raise Exception("Unexpectedly tried to read an empty chunk") def progress(x,y): logger.debug("Part %d: %0.2f%%" % (i+1, 100.*x/y)) try: # Do the upload t1 = time.time() mpu.upload_part_from_file(StringIO(data), i+1, cb=progress) # Print some timings t2 = time.time() - t1 s = len(data)/1024./1024. logger.info("Uploaded part %s (%0.2fM) in %0.2fs at %0.2fMBps" % (i+1, s, t2, s/t2)) except Exception as err: logger.debug("Retry request %d of max %d times" % (current_tries, max_tries)) if (current_tries > max_tries): logger.error(err) else: time.sleep(3) current_tries += 1 do_part_download(bucket_name, mpu_id, fname, i, start, size, secure, max_tries, current_tries)