s3-parallel-put 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484
  1. #!/usr/bin/env python
  2. # Parallel uploads to Amazon AWS S3
  3. #
  4. # The MIT License (MIT)
  5. #
  6. # Copyright (c) 2011-2014 Tom Payne
  7. #
  8. # Permission is hereby granted, free of charge, to any person obtaining a copy
  9. # of this software and associated documentation files (the "Software"), to deal
  10. # in the Software without restriction, including without limitation the rights
  11. # to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  12. # copies of the Software, and to permit persons to whom the Software is
  13. # furnished to do so, subject to the following conditions:
  14. #
  15. # The above copyright notice and this permission notice shall be included in
  16. # all copies or substantial portions of the Software.
  17. #
  18. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  19. # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  20. # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  21. # AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  22. # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  23. # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
  24. # SOFTWARE.
  25. try:
  26. from cStringIO import StringIO
  27. except ImportError:
  28. from StringIO import StringIO
  29. from fnmatch import fnmatch
  30. from gzip import GzipFile
  31. from itertools import chain, imap, islice
  32. import logging
  33. from multiprocessing import JoinableQueue, Process, current_process
  34. from optparse import OptionGroup, OptionParser
  35. import os.path
  36. import re
  37. from ssl import SSLError
  38. import sys
  39. import tarfile
  40. import time
  41. from datetime import datetime
  42. import magic #python-magic
  43. import mimetypes
  44. import time
  45. import boto
  46. from boto.s3.connection import S3Connection
  47. from boto.s3.acl import CannedACLStrings
  48. from boto.utils import compute_md5
  49. from boto.exception import BotoServerError
  50. current_time = time.time()
  51. DONE_RE = re.compile(r'\AINFO:s3-parallel-put\[putter-\d+\]:\S+\s+->\s+(\S+)\s*\Z')
  52. # These content types are amenable to compression
  53. # WISHLIST more types means more internets
  54. GZIP_CONTENT_TYPES = (
  55. 'application/javascript',
  56. 'application/x-javascript',
  57. 'text/css',
  58. 'text/html',
  59. 'text/javascript',
  60. )
  61. GZIP_ALL = 'all'
  62. def repeatedly(func, *args, **kwargs):
  63. while True:
  64. yield func(*args, **kwargs)
  65. class FileObjectCache(object):
  66. def __init__(self):
  67. self.name = None
  68. self.file_object = None
  69. def open(self, name, *args):
  70. if name != self.name:
  71. self.name = name
  72. self.file_object = open(self.name, *args)
  73. return self
  74. def __enter__(self):
  75. return self.file_object
  76. def __exit__(self, exc_type, exc_value, traceback):
  77. pass
  78. class Value(object):
  79. def __init__(self, file_object_cache, content=None, filename=None, md5=None, offset=None, path=None, size=None, bucket_name=None):
  80. self.file_object_cache = file_object_cache
  81. self.content = content
  82. self.filename = filename
  83. self.md5 = md5
  84. self.offset = offset
  85. self.path = path
  86. self.size = size
  87. self.bucket_name = bucket_name
  88. def get_content(self):
  89. if self.content is None:
  90. if self.filename:
  91. with self.file_object_cache.open(self.filename) as file_object:
  92. file_object.seek(self.offset)
  93. self.content = file_object.read(self.size)
  94. elif self.path:
  95. with open(self.path) as file_object:
  96. self.content = file_object.read()
  97. else:
  98. assert False
  99. return self.content
  100. def calculate_md5(self):
  101. if self.md5 is None:
  102. self.md5 = compute_md5(StringIO(self.get_content()))
  103. return self.md5
  104. def get_size(self):
  105. if self.size is None:
  106. if self.content:
  107. self.size = len(self.content)
  108. elif self.path:
  109. self.size = os.stat(self.path).st_size
  110. else:
  111. assert False
  112. return self.size
  113. def should_copy_content(self):
  114. return self.bucket_name is None
  115. def excluded(pathname, options):
  116. for glob in options.include:
  117. if fnmatch(pathname, glob):
  118. return False
  119. for glob in options.exclude:
  120. if fnmatch(pathname, glob):
  121. return True
  122. if options.ignore_files_older_than_days > 0:
  123. if os.path.isdir(pathname):
  124. creation_time = os.path.getctime(pathname)
  125. if (current_time - creation_time) // (24 * 3600) >= options.ignore_files_older_than_days:
  126. logger = logging.getLogger('%s[walker-%d]' % (os.path.basename(sys.argv[0]), current_process().pid))
  127. logger.info('ignore file %s because crtime=%s older than %s days' % (pathname, datetime.fromtimestamp(creation_time).strftime("%d.%m.%Y, %H:%M:%S"), options.ignore_files_older_than_days))
  128. return True
  129. return False
  130. def walk_filesystem(source, options):
  131. if os.path.isdir(source):
  132. for dirpath, dirnames, filenames in os.walk(source):
  133. if excluded(dirpath, options):
  134. continue
  135. for filename in filenames:
  136. abs_path = os.path.join(dirpath, filename)
  137. if not os.path.isfile(abs_path):
  138. continue
  139. if excluded(filename, options):
  140. continue
  141. rel_path = os.path.relpath(abs_path, source)
  142. key_name = '/'.join([options.prefix] + rel_path.split(os.sep))
  143. yield (key_name, dict(path=abs_path))
  144. elif os.path.isfile(source):
  145. if excluded(source, options):
  146. return
  147. key_name = os.path.normpath(os.path.join(options.prefix, source))
  148. yield (key_name, dict(path=source))
  149. def walk_tar(source, options):
  150. try:
  151. tar_file = tarfile.open(source, 'r:')
  152. for tarinfo in tar_file:
  153. if tarinfo.isfile():
  154. path = tarinfo.name
  155. if excluded(path, options):
  156. continue
  157. key_name = os.path.normpath(os.path.join(options.prefix, path))
  158. filename = source
  159. offset = tarinfo.offset_data
  160. size = tarinfo.size
  161. yield (key_name, dict(filename=filename, offset=offset, path=path, size=size))
  162. # http://blogs.oucs.ox.ac.uk/inapickle/2011/06/20/high-memory-usage-when-using-pythons-tarfile-module/
  163. tar_file.members = []
  164. except tarfile.ReadError:
  165. tar_file = tarfile.open(source)
  166. for tarinfo in tar_file:
  167. if tarinfo.isfile():
  168. path = tarinfo.name
  169. if excluded(path, options):
  170. continue
  171. key_name = os.path.normpath(os.path.join(options.prefix, path))
  172. content = tar_file.extractfile(tarinfo).read()
  173. yield (key_name, dict(content=content, path=path))
  174. def walk_s3(source, options):
  175. connection = S3Connection(host=options.host, is_secure=options.secure)
  176. for key in connection.get_bucket(source).list():
  177. if excluded(key.name, options):
  178. continue
  179. yield (
  180. key.name,
  181. dict(
  182. bucket_name=key.bucket.name,
  183. md5=key.etag,
  184. size=key.size,
  185. path='%s/%s' % (source, key.name)))
  186. def walker(walk, put_queue, sources, options):
  187. logger = logging.getLogger('%s[walker-%d]' % (os.path.basename(sys.argv[0]), current_process().pid))
  188. pairs = chain(*imap(lambda source: walk(source, options), sources))
  189. if options.resume:
  190. done = set()
  191. for filename in options.resume:
  192. with open(filename) as file_object:
  193. for line in file_object:
  194. match = DONE_RE.match(line)
  195. if match:
  196. done.add(match.group(1))
  197. pairs = ((key_name, args) for key_name, args in pairs if key_name not in done)
  198. if options.limit:
  199. pairs = islice(pairs, options.limit)
  200. for pair in pairs:
  201. put_queue.put(pair)
  202. def put_add(bucket, key_name, value):
  203. key = bucket.get_key(key_name)
  204. if key is None:
  205. return bucket.new_key(key_name)
  206. else:
  207. return None
  208. def put_stupid(bucket, key_name, value):
  209. return bucket.new_key(key_name)
  210. def put_update(bucket, key_name, value):
  211. key = bucket.get_key(key_name)
  212. if key is None:
  213. return bucket.new_key(key_name)
  214. else:
  215. # Boto's md5 function actually returns 3-tuple: (hexdigest, base64, size)
  216. value.calculate_md5()
  217. if key.etag == '"%s"' % value.md5[0]:
  218. return None
  219. else:
  220. return key
  221. def put_copy(bucket, key_name, value):
  222. return bucket.copy_key(key_name, value.bucket_name, key_name)
  223. def putter(put, put_queue, stat_queue, options):
  224. logger = logging.getLogger('%s[putter-%d]' % (os.path.basename(sys.argv[0]), current_process().pid))
  225. connection, bucket = None, None
  226. file_object_cache = FileObjectCache()
  227. # Figure out what content types we want to gzip
  228. if not options.gzip_type: # default
  229. gzip_content_types = GZIP_CONTENT_TYPES
  230. elif 'all' in options.gzip_type:
  231. gzip_content_types = GZIP_ALL
  232. else:
  233. gzip_content_types = options.gzip_type
  234. if 'guess' in gzip_content_types:
  235. # don't bother removing 'guess' from the list since nothing will match it
  236. gzip_content_types.extend(GZIP_CONTENT_TYPES)
  237. if options.gzip:
  238. logger.debug('These content types will be gzipped: %s' % unicode(gzip_content_types))
  239. while True:
  240. args = put_queue.get()
  241. if args is None:
  242. put_queue.task_done()
  243. break
  244. key_name, value_kwargs = args
  245. value = Value(file_object_cache, **value_kwargs)
  246. should_gzip = False
  247. try:
  248. if connection is None:
  249. connection = S3Connection(is_secure=options.secure, host=options.host)
  250. if bucket is None:
  251. bucket = connection.get_bucket(options.bucket)
  252. key = put(bucket, key_name, value)
  253. if key:
  254. if value.should_copy_content():
  255. if options.headers:
  256. headers = dict(tuple(header.split(':', 1)) for header in options.headers)
  257. else:
  258. headers = {}
  259. content_type = None
  260. if options.content_type:
  261. if options.content_type == 'guess':
  262. content_type = mimetypes.guess_type(value.path)[0]
  263. elif options.content_type == 'magic':
  264. content_type = mimetypes.guess_type(value.path)[0]
  265. if content_type is None:
  266. content_type = magic.from_file(value.path, mime=True)
  267. else:
  268. content_type = options.content_type
  269. headers['Content-Type'] = content_type
  270. content = value.get_content()
  271. md5 = value.md5
  272. should_gzip = options.gzip and (
  273. content_type and content_type in gzip_content_types or
  274. gzip_content_types == GZIP_ALL)
  275. if should_gzip:
  276. headers['Content-Encoding'] = 'gzip'
  277. string_io = StringIO()
  278. gzip_file = GzipFile(compresslevel=9, fileobj=string_io, mode='w')
  279. gzip_file.write(content)
  280. gzip_file.close()
  281. content = string_io.getvalue()
  282. md5 = compute_md5(StringIO(content))
  283. if not options.dry_run:
  284. key.set_contents_from_string(content, headers, md5=md5, policy=options.grant, encrypt_key=options.encrypt_key)
  285. logger.info('%s %s> %s' % (
  286. value.path, 'z' if should_gzip else '-', key.name))
  287. stat_queue.put(dict(size=value.get_size()))
  288. else:
  289. logger.info('skipping %s -> %s' % (value.path, key_name))
  290. except SSLError as exc:
  291. logger.error('%s -> %s (%s)' % (value.path, key_name, exc))
  292. put_queue.put(args)
  293. connection, bucket = None, None
  294. except IOError as exc:
  295. logger.error('%s -> %s (%s)' % (value.path, key_name, exc))
  296. except BotoServerError as exc:
  297. logger.error('%s -> %s (%s)' % (value.path, key_name, exc))
  298. put_queue.put(args)
  299. put_queue.task_done()
  300. def statter(stat_queue, start, options):
  301. logger = logging.getLogger('%s[statter-%d]' % (os.path.basename(sys.argv[0]), current_process().pid))
  302. count, total_size = 0, 0
  303. while True:
  304. kwargs = stat_queue.get()
  305. if kwargs is None:
  306. stat_queue.task_done()
  307. break
  308. count += 1
  309. total_size += kwargs.get('size', 0)
  310. stat_queue.task_done()
  311. duration = time.time() - start
  312. logger.info('put %d bytes in %d files in %.1f seconds (%d bytes/s, %.1f files/s)' % (total_size, count, duration, total_size / duration, count / duration))
  313. def main(argv):
  314. parser = OptionParser()
  315. group = OptionGroup(parser, 'S3 options')
  316. group.add_option('--bucket', metavar='BUCKET',
  317. help='set bucket')
  318. group.add_option('--bucket_region', default='us-east-1',
  319. help='set bucket region if not in us-east-1 (default new bucket region)')
  320. group.add_option('--host', default='s3.amazonaws.com',
  321. help='set AWS host name')
  322. group.add_option('--insecure', action='store_false', dest='secure',
  323. help='use insecure connection')
  324. group.add_option('--secure', action='store_true', default=True, dest='secure',
  325. help='use secure connection')
  326. parser.add_option_group(group)
  327. group = OptionGroup(parser, 'Source options')
  328. group.add_option('--walk', choices=('filesystem', 'tar', 's3'), default='filesystem', metavar='MODE',
  329. help='set walk mode (filesystem or tar)')
  330. group.add_option('--exclude', action='append', default=[], metavar='PATTERN',
  331. help='exclude files matching PATTERN')
  332. group.add_option('--include', action='append', default=[], metavar='PATTERN',
  333. help='don\'t exclude files matching PATTERN')
  334. group.add_option('--ignore-files-older-than-days', default=0, type=int,
  335. help='ignore files older than x days')
  336. parser.add_option_group(group)
  337. group = OptionGroup(parser, 'Put options')
  338. group.add_option('--content-type', default='guess', metavar='CONTENT-TYPE',
  339. help='set content type, set to "guess" to guess based on file name '
  340. 'or "magic" to guess by filename and libmagic.')
  341. group.add_option('--gzip', action='store_true',
  342. help='gzip values and set content encoding')
  343. group.add_option('--gzip-type', action='append', default=[],
  344. help='if --gzip is set, sets what content-type to gzip, defaults '
  345. 'to a list of known text content types, "all" will gzip everything.'
  346. ' Specify multiple times for multiple content types. '
  347. '[default: "guess"]')
  348. group.add_option('--put', choices=('add', 'stupid', 'update', 'copy'), default='update', metavar='MODE',
  349. help='set put mode (add, stupid, copy or update)')
  350. group.add_option('--prefix', default='', metavar='PREFIX',
  351. help='set key prefix')
  352. group.add_option('--resume', action='append', default=[], metavar='FILENAME',
  353. help='resume from log file')
  354. group.add_option('--grant', metavar='GRANT', default=None, choices=CannedACLStrings,
  355. help='A canned ACL policy to be applied to each file uploaded.\nChoices: %s' %
  356. ', '.join(CannedACLStrings))
  357. group.add_option('--header', metavar='HEADER:VALUE', dest='headers', action='append',
  358. help='extra headers to add to the file, can be specified multiple times')
  359. group.add_option('--encrypt-key', action='store_true', default=False, dest='encrypt_key',
  360. help='use server side encryption')
  361. parser.add_option_group(group)
  362. group = OptionGroup(parser, 'Logging options')
  363. group.add_option('--log-filename', metavar='FILENAME',
  364. help='set log filename')
  365. group.add_option('--quiet', '-q', action='count', default=0,
  366. help='less output')
  367. group.add_option('--verbose', '-v', action='count', default=0,
  368. help='more output')
  369. parser.add_option_group(group)
  370. group = OptionGroup(parser, 'Debug and performance tuning options')
  371. group.add_option('--dry-run', action='store_true',
  372. help='don\'t write to S3')
  373. group.add_option('--limit', metavar='N', type=int,
  374. help='set maximum number of keys to put')
  375. group.add_option('--processes', default=8, metavar='PROCESSES', type=int,
  376. help='set number of putter processes')
  377. parser.add_option_group(group)
  378. options, args = parser.parse_args(argv[1:])
  379. logging.basicConfig(filename=options.log_filename, level=logging.INFO + 10 * (options.quiet - options.verbose))
  380. logger = logging.getLogger(os.path.basename(sys.argv[0]))
  381. if len(args) < 1:
  382. logger.error('missing source operand')
  383. return 1
  384. if not options.bucket:
  385. logger.error('missing bucket')
  386. return 1
  387. if not options.bucket_region:
  388. options.bucket_region = 'us-east-1'
  389. if 'AWS_SECURITY_TOKEN' in os.environ:
  390. connection = boto.s3.connect_to_region(options.bucket_region,
  391. security_token=os.environ.get('AWS_SECURITY_TOKEN'),
  392. is_secure=True,
  393. host=options.host,
  394. calling_format = boto.s3.connection.OrdinaryCallingFormat(),
  395. )
  396. else:
  397. connection = boto.s3.connect_to_region(options.bucket_region,
  398. aws_access_key_id=os.environ.get('AWS_ACCESS_KEY_ID'),
  399. aws_secret_access_key=os.environ.get('AWS_SECRET_ACCESS_KEY'),
  400. is_secure=True,
  401. host=options.host,
  402. calling_format = boto.s3.connection.OrdinaryCallingFormat(),
  403. )
  404. import ssl
  405. if hasattr(ssl, '_create_unverified_context'):
  406. ssl._create_default_https_context = ssl._create_unverified_context
  407. bucket = connection.get_bucket(options.bucket)
  408. del bucket
  409. del connection
  410. start = time.time()
  411. put_queue = JoinableQueue(1024 * options.processes)
  412. stat_queue = JoinableQueue()
  413. walk = {'filesystem': walk_filesystem, 'tar': walk_tar, 's3': walk_s3}[options.walk]
  414. walker_process = Process(target=walker, args=(walk, put_queue, args, options))
  415. walker_process.start()
  416. put = {'add': put_add, 'stupid': put_stupid, 'update': put_update, 'copy': put_copy}[options.put]
  417. putter_processes = list(islice(repeatedly(Process, target=putter, args=(put, put_queue, stat_queue, options)), options.processes))
  418. for putter_process in putter_processes:
  419. putter_process.start()
  420. statter_process = Process(target=statter, args=(stat_queue, start, options))
  421. statter_process.start()
  422. walker_process.join()
  423. for putter_process in putter_processes:
  424. put_queue.put(None)
  425. put_queue.close()
  426. for putter_process in putter_processes:
  427. putter_process.join()
  428. stat_queue.put(None)
  429. stat_queue.close()
  430. statter_process.join()
  431. put_queue.join_thread()
  432. stat_queue.join_thread()
  433. if __name__ == '__main__':
  434. sys.exit(main(sys.argv))