123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484 |
- #!/usr/bin/env python
- # Parallel uploads to Amazon AWS S3
- #
- # The MIT License (MIT)
- #
- # Copyright (c) 2011-2014 Tom Payne
- #
- # Permission is hereby granted, free of charge, to any person obtaining a copy
- # of this software and associated documentation files (the "Software"), to deal
- # in the Software without restriction, including without limitation the rights
- # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- # copies of the Software, and to permit persons to whom the Software is
- # furnished to do so, subject to the following conditions:
- #
- # The above copyright notice and this permission notice shall be included in
- # all copies or substantial portions of the Software.
- #
- # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
- # SOFTWARE.
- try:
- from cStringIO import StringIO
- except ImportError:
- from StringIO import StringIO
- from fnmatch import fnmatch
- from gzip import GzipFile
- from itertools import chain, imap, islice
- import logging
- from multiprocessing import JoinableQueue, Process, current_process
- from optparse import OptionGroup, OptionParser
- import os.path
- import re
- from ssl import SSLError
- import sys
- import tarfile
- import time
- from datetime import datetime
- import magic #python-magic
- import mimetypes
- import time
- import boto
- from boto.s3.connection import S3Connection
- from boto.s3.acl import CannedACLStrings
- from boto.utils import compute_md5
- from boto.exception import BotoServerError
- current_time = time.time()
- DONE_RE = re.compile(r'\AINFO:s3-parallel-put\[putter-\d+\]:\S+\s+->\s+(\S+)\s*\Z')
- # These content types are amenable to compression
- # WISHLIST more types means more internets
- GZIP_CONTENT_TYPES = (
- 'application/javascript',
- 'application/x-javascript',
- 'text/css',
- 'text/html',
- 'text/javascript',
- )
- GZIP_ALL = 'all'
- def repeatedly(func, *args, **kwargs):
- while True:
- yield func(*args, **kwargs)
- class FileObjectCache(object):
- def __init__(self):
- self.name = None
- self.file_object = None
- def open(self, name, *args):
- if name != self.name:
- self.name = name
- self.file_object = open(self.name, *args)
- return self
- def __enter__(self):
- return self.file_object
- def __exit__(self, exc_type, exc_value, traceback):
- pass
- class Value(object):
- def __init__(self, file_object_cache, content=None, filename=None, md5=None, offset=None, path=None, size=None, bucket_name=None):
- self.file_object_cache = file_object_cache
- self.content = content
- self.filename = filename
- self.md5 = md5
- self.offset = offset
- self.path = path
- self.size = size
- self.bucket_name = bucket_name
- def get_content(self):
- if self.content is None:
- if self.filename:
- with self.file_object_cache.open(self.filename) as file_object:
- file_object.seek(self.offset)
- self.content = file_object.read(self.size)
- elif self.path:
- with open(self.path) as file_object:
- self.content = file_object.read()
- else:
- assert False
- return self.content
- def calculate_md5(self):
- if self.md5 is None:
- self.md5 = compute_md5(StringIO(self.get_content()))
- return self.md5
- def get_size(self):
- if self.size is None:
- if self.content:
- self.size = len(self.content)
- elif self.path:
- self.size = os.stat(self.path).st_size
- else:
- assert False
- return self.size
- def should_copy_content(self):
- return self.bucket_name is None
- def excluded(pathname, options):
- for glob in options.include:
- if fnmatch(pathname, glob):
- return False
- for glob in options.exclude:
- if fnmatch(pathname, glob):
- return True
- if options.ignore_files_older_than_days > 0:
- if os.path.isdir(pathname):
- creation_time = os.path.getctime(pathname)
- if (current_time - creation_time) // (24 * 3600) >= options.ignore_files_older_than_days:
- logger = logging.getLogger('%s[walker-%d]' % (os.path.basename(sys.argv[0]), current_process().pid))
- logger.info('ignore file %s because crtime=%s older than %s days' % (pathname, datetime.fromtimestamp(creation_time).strftime("%d.%m.%Y, %H:%M:%S"), options.ignore_files_older_than_days))
- return True
- return False
- def walk_filesystem(source, options):
- if os.path.isdir(source):
- for dirpath, dirnames, filenames in os.walk(source):
- if excluded(dirpath, options):
- continue
- for filename in filenames:
- abs_path = os.path.join(dirpath, filename)
- if not os.path.isfile(abs_path):
- continue
- if excluded(filename, options):
- continue
- rel_path = os.path.relpath(abs_path, source)
- key_name = '/'.join([options.prefix] + rel_path.split(os.sep))
- yield (key_name, dict(path=abs_path))
- elif os.path.isfile(source):
- if excluded(source, options):
- return
- key_name = os.path.normpath(os.path.join(options.prefix, source))
- yield (key_name, dict(path=source))
- def walk_tar(source, options):
- try:
- tar_file = tarfile.open(source, 'r:')
- for tarinfo in tar_file:
- if tarinfo.isfile():
- path = tarinfo.name
- if excluded(path, options):
- continue
- key_name = os.path.normpath(os.path.join(options.prefix, path))
- filename = source
- offset = tarinfo.offset_data
- size = tarinfo.size
- yield (key_name, dict(filename=filename, offset=offset, path=path, size=size))
- # http://blogs.oucs.ox.ac.uk/inapickle/2011/06/20/high-memory-usage-when-using-pythons-tarfile-module/
- tar_file.members = []
- except tarfile.ReadError:
- tar_file = tarfile.open(source)
- for tarinfo in tar_file:
- if tarinfo.isfile():
- path = tarinfo.name
- if excluded(path, options):
- continue
- key_name = os.path.normpath(os.path.join(options.prefix, path))
- content = tar_file.extractfile(tarinfo).read()
- yield (key_name, dict(content=content, path=path))
- def walk_s3(source, options):
- connection = S3Connection(host=options.host, is_secure=options.secure)
- for key in connection.get_bucket(source).list():
- if excluded(key.name, options):
- continue
- yield (
- key.name,
- dict(
- bucket_name=key.bucket.name,
- md5=key.etag,
- size=key.size,
- path='%s/%s' % (source, key.name)))
- def walker(walk, put_queue, sources, options):
- logger = logging.getLogger('%s[walker-%d]' % (os.path.basename(sys.argv[0]), current_process().pid))
- pairs = chain(*imap(lambda source: walk(source, options), sources))
- if options.resume:
- done = set()
- for filename in options.resume:
- with open(filename) as file_object:
- for line in file_object:
- match = DONE_RE.match(line)
- if match:
- done.add(match.group(1))
- pairs = ((key_name, args) for key_name, args in pairs if key_name not in done)
- if options.limit:
- pairs = islice(pairs, options.limit)
- for pair in pairs:
- put_queue.put(pair)
- def put_add(bucket, key_name, value):
- key = bucket.get_key(key_name)
- if key is None:
- return bucket.new_key(key_name)
- else:
- return None
- def put_stupid(bucket, key_name, value):
- return bucket.new_key(key_name)
- def put_update(bucket, key_name, value):
- key = bucket.get_key(key_name)
- if key is None:
- return bucket.new_key(key_name)
- else:
- # Boto's md5 function actually returns 3-tuple: (hexdigest, base64, size)
- value.calculate_md5()
- if key.etag == '"%s"' % value.md5[0]:
- return None
- else:
- return key
- def put_copy(bucket, key_name, value):
- return bucket.copy_key(key_name, value.bucket_name, key_name)
- def putter(put, put_queue, stat_queue, options):
- logger = logging.getLogger('%s[putter-%d]' % (os.path.basename(sys.argv[0]), current_process().pid))
- connection, bucket = None, None
- file_object_cache = FileObjectCache()
- # Figure out what content types we want to gzip
- if not options.gzip_type: # default
- gzip_content_types = GZIP_CONTENT_TYPES
- elif 'all' in options.gzip_type:
- gzip_content_types = GZIP_ALL
- else:
- gzip_content_types = options.gzip_type
- if 'guess' in gzip_content_types:
- # don't bother removing 'guess' from the list since nothing will match it
- gzip_content_types.extend(GZIP_CONTENT_TYPES)
- if options.gzip:
- logger.debug('These content types will be gzipped: %s' % unicode(gzip_content_types))
- while True:
- args = put_queue.get()
- if args is None:
- put_queue.task_done()
- break
- key_name, value_kwargs = args
- value = Value(file_object_cache, **value_kwargs)
- should_gzip = False
- try:
- if connection is None:
- connection = S3Connection(is_secure=options.secure, host=options.host)
- if bucket is None:
- bucket = connection.get_bucket(options.bucket)
- key = put(bucket, key_name, value)
- if key:
- if value.should_copy_content():
- if options.headers:
- headers = dict(tuple(header.split(':', 1)) for header in options.headers)
- else:
- headers = {}
- content_type = None
- if options.content_type:
- if options.content_type == 'guess':
- content_type = mimetypes.guess_type(value.path)[0]
- elif options.content_type == 'magic':
- content_type = mimetypes.guess_type(value.path)[0]
- if content_type is None:
- content_type = magic.from_file(value.path, mime=True)
- else:
- content_type = options.content_type
- headers['Content-Type'] = content_type
- content = value.get_content()
- md5 = value.md5
- should_gzip = options.gzip and (
- content_type and content_type in gzip_content_types or
- gzip_content_types == GZIP_ALL)
- if should_gzip:
- headers['Content-Encoding'] = 'gzip'
- string_io = StringIO()
- gzip_file = GzipFile(compresslevel=9, fileobj=string_io, mode='w')
- gzip_file.write(content)
- gzip_file.close()
- content = string_io.getvalue()
- md5 = compute_md5(StringIO(content))
- if not options.dry_run:
- key.set_contents_from_string(content, headers, md5=md5, policy=options.grant, encrypt_key=options.encrypt_key)
- logger.info('%s %s> %s' % (
- value.path, 'z' if should_gzip else '-', key.name))
- stat_queue.put(dict(size=value.get_size()))
- else:
- logger.info('skipping %s -> %s' % (value.path, key_name))
- except SSLError as exc:
- logger.error('%s -> %s (%s)' % (value.path, key_name, exc))
- put_queue.put(args)
- connection, bucket = None, None
- except IOError as exc:
- logger.error('%s -> %s (%s)' % (value.path, key_name, exc))
- except BotoServerError as exc:
- logger.error('%s -> %s (%s)' % (value.path, key_name, exc))
- put_queue.put(args)
- put_queue.task_done()
- def statter(stat_queue, start, options):
- logger = logging.getLogger('%s[statter-%d]' % (os.path.basename(sys.argv[0]), current_process().pid))
- count, total_size = 0, 0
- while True:
- kwargs = stat_queue.get()
- if kwargs is None:
- stat_queue.task_done()
- break
- count += 1
- total_size += kwargs.get('size', 0)
- stat_queue.task_done()
- duration = time.time() - start
- logger.info('put %d bytes in %d files in %.1f seconds (%d bytes/s, %.1f files/s)' % (total_size, count, duration, total_size / duration, count / duration))
- def main(argv):
- parser = OptionParser()
- group = OptionGroup(parser, 'S3 options')
- group.add_option('--bucket', metavar='BUCKET',
- help='set bucket')
- group.add_option('--bucket_region', default='us-east-1',
- help='set bucket region if not in us-east-1 (default new bucket region)')
- group.add_option('--host', default='s3.amazonaws.com',
- help='set AWS host name')
- group.add_option('--insecure', action='store_false', dest='secure',
- help='use insecure connection')
- group.add_option('--secure', action='store_true', default=True, dest='secure',
- help='use secure connection')
- parser.add_option_group(group)
- group = OptionGroup(parser, 'Source options')
- group.add_option('--walk', choices=('filesystem', 'tar', 's3'), default='filesystem', metavar='MODE',
- help='set walk mode (filesystem or tar)')
- group.add_option('--exclude', action='append', default=[], metavar='PATTERN',
- help='exclude files matching PATTERN')
- group.add_option('--include', action='append', default=[], metavar='PATTERN',
- help='don\'t exclude files matching PATTERN')
- group.add_option('--ignore-files-older-than-days', default=0, type=int,
- help='ignore files older than x days')
- parser.add_option_group(group)
- group = OptionGroup(parser, 'Put options')
- group.add_option('--content-type', default='guess', metavar='CONTENT-TYPE',
- help='set content type, set to "guess" to guess based on file name '
- 'or "magic" to guess by filename and libmagic.')
- group.add_option('--gzip', action='store_true',
- help='gzip values and set content encoding')
- group.add_option('--gzip-type', action='append', default=[],
- help='if --gzip is set, sets what content-type to gzip, defaults '
- 'to a list of known text content types, "all" will gzip everything.'
- ' Specify multiple times for multiple content types. '
- '[default: "guess"]')
- group.add_option('--put', choices=('add', 'stupid', 'update', 'copy'), default='update', metavar='MODE',
- help='set put mode (add, stupid, copy or update)')
- group.add_option('--prefix', default='', metavar='PREFIX',
- help='set key prefix')
- group.add_option('--resume', action='append', default=[], metavar='FILENAME',
- help='resume from log file')
- group.add_option('--grant', metavar='GRANT', default=None, choices=CannedACLStrings,
- help='A canned ACL policy to be applied to each file uploaded.\nChoices: %s' %
- ', '.join(CannedACLStrings))
- group.add_option('--header', metavar='HEADER:VALUE', dest='headers', action='append',
- help='extra headers to add to the file, can be specified multiple times')
- group.add_option('--encrypt-key', action='store_true', default=False, dest='encrypt_key',
- help='use server side encryption')
- parser.add_option_group(group)
- group = OptionGroup(parser, 'Logging options')
- group.add_option('--log-filename', metavar='FILENAME',
- help='set log filename')
- group.add_option('--quiet', '-q', action='count', default=0,
- help='less output')
- group.add_option('--verbose', '-v', action='count', default=0,
- help='more output')
- parser.add_option_group(group)
- group = OptionGroup(parser, 'Debug and performance tuning options')
- group.add_option('--dry-run', action='store_true',
- help='don\'t write to S3')
- group.add_option('--limit', metavar='N', type=int,
- help='set maximum number of keys to put')
- group.add_option('--processes', default=8, metavar='PROCESSES', type=int,
- help='set number of putter processes')
- parser.add_option_group(group)
- options, args = parser.parse_args(argv[1:])
- logging.basicConfig(filename=options.log_filename, level=logging.INFO + 10 * (options.quiet - options.verbose))
- logger = logging.getLogger(os.path.basename(sys.argv[0]))
- if len(args) < 1:
- logger.error('missing source operand')
- return 1
- if not options.bucket:
- logger.error('missing bucket')
- return 1
- if not options.bucket_region:
- options.bucket_region = 'us-east-1'
- if 'AWS_SECURITY_TOKEN' in os.environ:
- connection = boto.s3.connect_to_region(options.bucket_region,
- security_token=os.environ.get('AWS_SECURITY_TOKEN'),
- is_secure=True,
- host=options.host,
- calling_format = boto.s3.connection.OrdinaryCallingFormat(),
- )
- else:
- connection = boto.s3.connect_to_region(options.bucket_region,
- aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
- aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'),
- is_secure=True,
- host=options.host,
- calling_format = boto.s3.connection.OrdinaryCallingFormat(),
- )
-
- import ssl
- if hasattr(ssl, '_create_unverified_context'):
- ssl._create_default_https_context = ssl._create_unverified_context
-
- bucket = connection.get_bucket(options.bucket)
- del bucket
- del connection
- start = time.time()
- put_queue = JoinableQueue(1024 * options.processes)
- stat_queue = JoinableQueue()
- walk = {'filesystem': walk_filesystem, 'tar': walk_tar, 's3': walk_s3}[options.walk]
- walker_process = Process(target=walker, args=(walk, put_queue, args, options))
- walker_process.start()
- put = {'add': put_add, 'stupid': put_stupid, 'update': put_update, 'copy': put_copy}[options.put]
- putter_processes = list(islice(repeatedly(Process, target=putter, args=(put, put_queue, stat_queue, options)), options.processes))
- for putter_process in putter_processes:
- putter_process.start()
- statter_process = Process(target=statter, args=(stat_queue, start, options))
- statter_process.start()
- walker_process.join()
- for putter_process in putter_processes:
- put_queue.put(None)
- put_queue.close()
- for putter_process in putter_processes:
- putter_process.join()
- stat_queue.put(None)
- stat_queue.close()
- statter_process.join()
- put_queue.join_thread()
- stat_queue.join_thread()
- if __name__ == '__main__':
- sys.exit(main(sys.argv))
|