s3grouper.py 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217
  1. #!/usr/bin/env python3
  2. ##
  3. ## Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
  4. ##
  5. ## Licensed under the Amazon Software License (the "License"). You may not use this
  6. ## file except in compliance with the License. A copy of the License is located at
  7. ##
  8. ## http://aws.amazon.com/asl/
  9. ##
  10. ## or in the "license" file accompanying this file. This file is distributed on an
  11. ## "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, express or implied.
  12. ## See the License for the specific language governing permissions and limitations
  13. ## under the License.
  14. ##
  15. import csv,os,sys,boto3,json,io,argparse
  16. from io import BytesIO, TextIOWrapper
  17. from botocore.exceptions import ClientError
  18. from gzip import GzipFile
  19. from urllib.parse import urlparse
  20. from tempfile import TemporaryFile
  21. import logging
  22. logger = logging.getLogger(__name__)
  23. logger.addHandler(logging.StreamHandler())
  24. logger.setLevel(logging.ERROR)
  25. # return headers,list of keys for inventory gz files
  26. def get_gz_from_manifest(s3,manifest):
  27. try:
  28. obj = s3.Object(manifest['bucket'], manifest['key'])
  29. data = BytesIO(obj.get()['Body'].read())
  30. mjson = json.loads(data.getvalue().decode())
  31. headers = [x.strip() for x in mjson['fileSchema'].split(',')]
  32. filelist = [d['key'] for d in mjson['files']]
  33. except ClientError as e:
  34. logger.error('Unable to get manifest file at s3://{}/{}'.format(manifest['bucket'], manifest['key']))
  35. logger.debug('Received error: {}'.format(e))
  36. sys.exit(5)
  37. except ValueError as e:
  38. logger.error('Unable to read manifest file.')
  39. logger.debug('Received error: {}'.format(e))
  40. sys.exit(3)
  41. return headers,filelist
  42. # upload index data to S3
  43. def s3_upload(s3,data,target,out_index_num,norr, extension=".index"):
  44. keyname = '/'.join([target['prefix'],target['key']+'.'+str(out_index_num)+extension])
  45. s3uri = 's3://'+target['bucket']+'/'+keyname
  46. data.seek(0)
  47. extra_args = {'StorageClass':'REDUCED_REDUNDANCY'}
  48. if norr:
  49. extra_args = {}
  50. try:
  51. # HeadObject to see if exist (needs ListBucket or this returns 403)
  52. s3.Object(target['bucket'], keyname).load()
  53. logger.info("{} exist. Skipping...".format(s3uri))
  54. except ClientError as ce: # if does not exist, then upload
  55. if ce.response["Error"]["Code"] == "404":
  56. try:
  57. s3.Bucket(target['bucket']).upload_fileobj(data,keyname,ExtraArgs=extra_args)
  58. except ClientError as e:
  59. logger.error('Unable to upload to {}'.format(s3uri))
  60. logger.debug('Received error: {}'.format(e))
  61. sys.exit(4)
  62. logger.info("Uploaded to {}".format(s3uri))
  63. elif ce.response["Error"]["Code"] == "403":
  64. logger.error('Permission error loading {}\n Please check your IAM policy.'.format(s3uri))
  65. logger.debug('Received error: {}'.format(ce))
  66. sys.exit(7)
  67. else:
  68. logger.error('Unknown error loading {}'.format(s3uri))
  69. logger.debug('Received error: {}'.format(ce))
  70. sys.exit(7)
  71. return
  72. # helper function to download and yield csv row
  73. def dl_and_parse(s3,headers,keylist,bucket):
  74. for key in keylist:
  75. with TemporaryFile() as fp:
  76. try:
  77. s3.Bucket(bucket).download_fileobj(key,fp)
  78. except ClientError as e:
  79. logger.error('Unable to download s3://{}/{}'.format(bucket, key))
  80. logger.debug('Received error: {}'.format(e))
  81. sys.exit(5)
  82. fp.seek(0)
  83. with TextIOWrapper(GzipFile(fileobj=fp,mode='r')) as f:
  84. try:
  85. reader = csv.DictReader(f,fieldnames=headers,delimiter=',',quoting=csv.QUOTE_MINIMAL)
  86. for row in reader:
  87. yield row
  88. except csv.Error as e:
  89. logger.error("Unable to read CSV '{}'".format(reader.line))
  90. logger.debug('Received error: {}'.format(e))
  91. sys.exit(3)
  92. # main parser
  93. def parse(s3,args,headers,keylist):
  94. # init
  95. data = TemporaryFile()
  96. out_index_num = 0
  97. curr_size = curr_count = 0
  98. newkey = args.manifest['key'].split('/')
  99. newkey.pop() # discard manfiest.json
  100. newkey+=[args.key]
  101. target={"bucket":args.bucket,
  102. "prefix":args.prefix,
  103. "key":'-'.join(newkey)}
  104. norr = args.no_reduced_redundancy
  105. total_count = 0
  106. glacier_match = False
  107. for row in dl_and_parse(s3,headers,keylist,args.manifest['bucket']):
  108. if args.glacier and row['StorageClass'] == 'GLACIER':
  109. #glacier mode - only objects in glacier matches
  110. glacier_match = True
  111. elif not args.glacier and row['StorageClass'] != 'GLACIER':
  112. #not glacier mode - only objects not glacier matches
  113. glacier_match = True
  114. # check key prefix
  115. if row['Key'].startswith(args.include) and glacier_match:
  116. glacier_match = False #reset match
  117. try:
  118. data.write(','.join([row['Bucket'],row['Key'],row['Size']]).encode())
  119. data.write('\n'.encode())
  120. except (TypeError,OSError) as e:
  121. logger.error("Unable to write {},{},{} to TemporaryFile".format(row['Bucket'],row['Key'],str(row['Size'])))
  122. logger.debug('Received error: {}'.format(e))
  123. sys.exit(2)
  124. curr_size += int(row['Size'])
  125. curr_count += 1
  126. total_count += 1
  127. # upload data upon break conditions
  128. if curr_size >= args.size_threshold or curr_count >= args.count_threshold:
  129. logger.info("Uploading {} objects ({} MiB)".format(str(curr_count),str(curr_size/1024/1024)))
  130. if args.glacier:
  131. s3_upload(s3,data,target,out_index_num,norr,".glacier.index")
  132. else:
  133. s3_upload(s3,data,target,out_index_num,norr)
  134. # reset
  135. out_index_num+=1
  136. curr_size = curr_count = 0
  137. data.close()
  138. data = TemporaryFile()
  139. # upload last batch
  140. logger.info("Uploading {} objects ({} MiB)".format(str(curr_count),str(curr_size/1024/1024)))
  141. if args.glacier:
  142. s3_upload(s3,data,target,out_index_num,norr,".glacier.index")
  143. else:
  144. s3_upload(s3,data,target,out_index_num,norr)
  145. data.close()
  146. logger.info("Total number of objects processed: {}".format(str(total_count)))
  147. return
  148. def s3uri(uri):
  149. if not uri.startswith('s3://'):
  150. raise argparse.ArgumentTypeError("manifest uri is not an s3uri: s3://bucket/key")
  151. else:
  152. o = urlparse(uri)
  153. return {'bucket': o.netloc,
  154. 'key': o.path.lstrip('/'),
  155. 'name': os.path.basename(o.path)}
  156. def main(args):
  157. session = boto3.session.Session(region_name=args.region)
  158. s3 = session.resource('s3')
  159. h,l = get_gz_from_manifest(s3,args.manifest)
  160. parse(s3,args,h,l)
  161. sys.exit(0)
  162. if __name__ == "__main__":
  163. parser = argparse.ArgumentParser(description="Take inventory of objects, group into separate manifests")
  164. parser.add_argument("-m","--manifest", metavar="s3://BUCKET/KEY", type=s3uri,
  165. help="Manifest produced by S3 Inventory (s3 path to manifest.json)",required=True)
  166. parser.add_argument("-b","--bucket", metavar="BUCKET", type=str,
  167. help="Target S3 bucket to write indices",required=True)
  168. parser.add_argument("-p","--prefix", metavar="PREFIX", type=str,
  169. help="Target S3 prefix",default="s3grouper-output")
  170. parser.add_argument("-k","--key", metavar="KEY", type=str,
  171. help="Target key name for indicies on S3",default='manifest')
  172. parser.add_argument("-s","--size-threshold", metavar="SIZE_THRESHOLD", type=int,
  173. help="Create bundle when over this size (bytes)",default=4*1024*1024*1024)
  174. parser.add_argument("-c","--count-threshold", metavar="COUNT_THRESHOLD", type=int,
  175. help="Create bundle when over this number of objects",default=65000)
  176. parser.add_argument("-i","--include", metavar="INCLUDE_PREFIX", type=str,
  177. help="Only include objects with specified prefix, \
  178. skip otherwise. (Defaults: include all)", default='')
  179. parser.add_argument("-no-rr","--no-reduced-redundancy", action="store_true",default="True",
  180. help="Do not use the default REDUCED_REDUNDANCY Storage Class")
  181. parser.add_argument("-r","--region",metavar="REGION",type=str,
  182. help="Use regional endpoint (Example: us-west-2)")
  183. #parser.add_argument("--profile") # not used
  184. parser.add_argument("-v", "--verbose", action="store_true", default=False,
  185. help="Enable verbose messages")
  186. parser.add_argument("-d", "--debug", action="store_true", default=False,
  187. help="Enable debug messages")
  188. parser.add_argument("-g", "--glacier", action="store_true",
  189. help="Glacier mode. Creates manifest only out of Glacier objects.")
  190. args = parser.parse_args()
  191. if args.verbose:
  192. logger.setLevel(logging.INFO)
  193. if args.debug:
  194. logger.setLevel(logging.DEBUG)
  195. main(args)