123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706 |
- # -*- coding: utf-8 -*-
- import sys
- import os.path
- import json
- import fnmatch
- from collections import Counter, OrderedDict
- import boto3
- import botocore
- import click
- from joblib import Parallel, delayed
- from clint.textui import colored, puts, indent
- from .checks import AclCheck, PolicyCheck, PublicAccessCheck, LoggingCheck, VersioningCheck, EncryptionCheck, ObjectLoggingCheck
- __version__ = '0.3.1'
- canned_acls = [
- {
- 'acl': 'private',
- 'grants': []
- },
- {
- 'acl': 'public-read',
- 'grants': [
- {'Grantee': {'Type': 'Group', 'URI': 'http://acs.amazonaws.com/groups/global/AllUsers'}, 'Permission': 'READ'}
- ]
- },
- {
- 'acl': 'public-read-write',
- 'grants': [
- {'Grantee': {'Type': 'Group', 'URI': 'http://acs.amazonaws.com/groups/global/AllUsers'}, 'Permission': 'READ'},
- {'Grantee': {u'Type': 'Group', u'URI': 'http://acs.amazonaws.com/groups/global/AllUsers'}, 'Permission': 'WRITE'}
- ]
- },
- {
- 'acl': 'authenticated-read',
- 'grants': [
- {'Grantee': {'Type': 'Group', 'URI': 'http://acs.amazonaws.com/groups/global/AuthenticatedUsers'}, 'Permission': 'READ'}
- ]
- },
- {
- 'acl': 'aws-exec-read',
- 'grants': [
- {'Grantee': {'Type': 'CanonicalUser', 'DisplayName': 'za-team', 'ID': '6aa5a366c34c1cbe25dc49211496e913e0351eb0e8c37aa3477e40942ec6b97c'}, 'Permission': 'READ'}
- ]
- }
- ]
- cached_s3 = None
- def s3():
- # memoize
- global cached_s3
- if cached_s3 is None:
- cached_s3 = boto3.resource('s3')
- return cached_s3
- def notice(message):
- puts(colored.yellow(message))
- def abort(message):
- puts(colored.red(message))
- sys.exit(1)
- def unicode_key(key):
- if sys.version_info[0] < 3 and isinstance(key, unicode):
- return key.encode('utf-8')
- else:
- return key
- def perform(check):
- check.perform()
- with indent(2):
- if check.status == 'passed':
- puts(colored.green('✔ ' + check.name + ' ' + unicode_key(check.pass_message)))
- elif check.status == 'failed':
- puts(colored.red('✘ ' + check.name + ' ' + check.fail_message))
- else:
- puts(colored.red('✘ ' + check.name + ' access denied'))
- return check
- def fetch_buckets(buckets):
- if buckets:
- if any('*' in b for b in buckets):
- return [b for b in s3().buckets.all() if any(fnmatch.fnmatch(b.name, bn) for bn in buckets)]
- else:
- return [s3().Bucket(bn) for bn in buckets]
- else:
- return s3().buckets.all()
- def fix_check(klass, buckets, dry_run, fix_args={}):
- for bucket in fetch_buckets(buckets):
- check = klass(bucket)
- check.perform()
- if check.status == 'passed':
- message = colored.green('already ' + check.pass_message)
- elif check.status == 'denied':
- message = colored.red('access denied')
- else:
- if dry_run:
- message = colored.yellow('to be ' + check.pass_message)
- else:
- try:
- check.fix(fix_args)
- message = colored.blue('just ' + check.pass_message)
- except botocore.exceptions.ClientError as e:
- message = colored.red(str(e))
- puts(bucket.name + ' ' + message)
- def encrypt_object(bucket_name, key, dry_run, kms_key_id, customer_key):
- obj = s3().Object(bucket_name, key)
- str_key = unicode_key(key)
- try:
- if customer_key:
- obj.load(SSECustomerAlgorithm='AES256', SSECustomerKey=customer_key)
- encrypted = None
- if customer_key:
- encrypted = obj.sse_customer_algorithm is not None
- elif kms_key_id:
- encrypted = obj.server_side_encryption == 'aws:kms'
- else:
- encrypted = obj.server_side_encryption == 'AES256'
- if encrypted:
- puts(str_key + ' ' + colored.green('already encrypted'))
- return 'already encrypted'
- else:
- if dry_run:
- puts(str_key + ' ' + colored.yellow('to be encrypted'))
- return 'to be encrypted'
- else:
- copy_source = {'Bucket': bucket_name, 'Key': obj.key}
- # TODO support going from customer encryption to other forms
- if kms_key_id:
- obj.copy_from(
- CopySource=copy_source,
- ServerSideEncryption='aws:kms',
- SSEKMSKeyId=kms_key_id
- )
- elif customer_key:
- obj.copy_from(
- CopySource=copy_source,
- SSECustomerAlgorithm='AES256',
- SSECustomerKey=customer_key
- )
- else:
- obj.copy_from(
- CopySource=copy_source,
- ServerSideEncryption='AES256'
- )
- puts(str_key + ' ' + colored.blue('just encrypted'))
- return 'just encrypted'
- except (botocore.exceptions.ClientError, botocore.exceptions.NoCredentialsError) as e:
- puts(str_key + ' ' + colored.red(str(e)))
- return 'error'
- def determine_mode(acl):
- owner = acl.owner
- grants = acl.grants
- non_owner_grants = [grant for grant in grants if not (grant['Grantee'].get('ID') == owner['ID'] and grant['Permission'] == 'FULL_CONTROL')]
- # TODO bucket-owner-read and bucket-owner-full-control
- return next((ca['acl'] for ca in canned_acls if ca['grants'] == non_owner_grants), 'custom')
- def scan_object(bucket_name, key):
- obj = s3().Object(bucket_name, key)
- str_key = unicode_key(key)
- try:
- mode = determine_mode(obj.Acl())
- if mode == 'private':
- puts(str_key + ' ' + colored.green(mode))
- else:
- puts(str_key + ' ' + colored.yellow(mode))
- return mode
- except (botocore.exceptions.ClientError, botocore.exceptions.NoCredentialsError) as e:
- puts(str_key + ' ' + colored.red(str(e)))
- return 'error'
- def reset_object(bucket_name, key, dry_run, acl):
- obj = s3().Object(bucket_name, key)
- str_key = unicode_key(key)
- try:
- obj_acl = obj.Acl()
- mode = determine_mode(obj_acl)
- if mode == acl:
- puts(str_key + ' ' + colored.green('ACL already ' + acl))
- return 'ACL already ' + acl
- elif dry_run:
- puts(str_key + ' ' + colored.yellow('ACL to be updated to ' + acl))
- return 'ACL to be updated to ' + acl
- else:
- obj_acl.put(ACL=acl)
- puts(str_key + ' ' + colored.blue('ACL updated to ' + acl))
- return 'ACL updated to ' + acl
- except (botocore.exceptions.ClientError, botocore.exceptions.NoCredentialsError) as e:
- puts(str_key + ' ' + colored.red(str(e)))
- return 'error'
- def delete_unencrypted_version(bucket_name, key, id, dry_run):
- object_version = s3().ObjectVersion(bucket_name, key, id)
- try:
- obj = object_version.get()
- if obj.get('ServerSideEncryption') or obj.get('SSECustomerAlgorithm'):
- puts(key + ' ' + id + ' ' + colored.green('encrypted'))
- return 'encrypted'
- else:
- if dry_run:
- puts(key + ' ' + id + ' ' + colored.blue('to be deleted'))
- return 'to be deleted'
- else:
- puts(key + ' ' + id + ' ' + colored.blue('deleted'))
- object_version.delete()
- return 'deleted'
- except (botocore.exceptions.ClientError, botocore.exceptions.NoCredentialsError) as e:
- puts(key + ' ' + id + ' ' + colored.red(str(e)))
- return 'error'
- def object_matches(key, only, _except):
- match = True
- if only:
- match = fnmatch.fnmatch(key, only)
- if _except and match:
- match = not fnmatch.fnmatch(key, _except)
- return match
- def parallelize(bucket, only, _except, fn, args=(), versions=False):
- bucket = s3().Bucket(bucket)
- # use prefix for performance
- prefix = None
- if only:
- # get the first prefix before wildcard
- prefix = '/'.join(only.split('*')[0].split('/')[:-1])
- if prefix:
- prefix = prefix + '/'
- if versions:
- object_versions = bucket.object_versions.filter(Prefix=prefix) if prefix else bucket.object_versions.all()
- # delete markers have no size
- return Parallel(n_jobs=24)(delayed(fn)(bucket.name, ov.object_key, ov.id, *args) for ov in object_versions if object_matches(ov.object_key, only, _except) and not ov.is_latest and ov.size is not None)
- else:
- objects = bucket.objects.filter(Prefix=prefix) if prefix else bucket.objects.all()
- if only and not '*' in only:
- objects = [s3().Object(bucket, only)]
- return Parallel(n_jobs=24)(delayed(fn)(bucket.name, os.key, *args) for os in objects if object_matches(os.key, only, _except))
- def public_statement(bucket):
- return OrderedDict([
- ('Sid', 'Public'),
- ('Effect', 'Allow'),
- ('Principal', '*'),
- ('Action', 's3:GetObject'),
- ('Resource', 'arn:aws:s3:::%s/*' % bucket.name)
- ])
- def no_object_acl_statement(bucket):
- return OrderedDict([
- ('Sid', 'NoObjectAcl'),
- ('Effect', 'Deny'),
- ('Principal', '*'),
- ('Action', 's3:PutObjectAcl'),
- ('Resource', 'arn:aws:s3:::%s/*' % bucket.name)
- ])
- def public_uploads_statement(bucket):
- return OrderedDict([
- ('Sid', 'PublicUploads'),
- ('Effect', 'Deny'),
- ('Principal', '*'),
- ('Action', ['s3:PutObject', 's3:PutObjectAcl']),
- ('Resource', 'arn:aws:s3:::%s/*' % bucket.name),
- ('Condition', {'StringNotEquals': {'s3:x-amz-acl': 'public-read'}})
- ])
- def no_uploads_statement(bucket):
- return OrderedDict([
- ('Sid', 'NoUploads'),
- ('Effect', 'Deny'),
- ('Principal', '*'),
- ('Action', 's3:PutObject'),
- ('Resource', 'arn:aws:s3:::%s/*' % bucket.name)
- ])
- def encryption_statement(bucket):
- return OrderedDict([
- ('Sid', 'Hash'),
- ('Effect', 'Deny'),
- ('Principal', '*'),
- ('Action', 's3:PutObject'),
- ('Resource', 'arn:aws:s3:::%s/*' % bucket.name),
- ('Condition', {'StringNotEquals': {'s3:x-amz-server-side-encryption': 'AES256'}})
- ])
- def statement_matches(s1, s2):
- s1 = dict(s1)
- s2 = dict(s2)
- s1.pop('Sid', None)
- s2.pop('Sid', None)
- return s1 == s2
- def fetch_policy(bucket):
- policy = None
- try:
- policy = bucket.Policy().policy
- except botocore.exceptions.ClientError as e:
- if 'NoSuchBucket' not in str(e):
- raise
- if policy:
- policy = json.loads(policy, object_pairs_hook=OrderedDict)
- return policy
- def print_dns_bucket(name, buckets, found_buckets):
- if not name in found_buckets:
- puts(name)
- with indent(2):
- if name in buckets:
- puts(colored.green('owned'))
- else:
- puts(colored.red('not owned'))
- puts()
- found_buckets.add(name)
- def print_policy(policy):
- with indent(2):
- if any(policy['Statement']):
- puts(colored.yellow(json.dumps(policy, indent=4)))
- else:
- puts(colored.yellow("None"))
- def summarize(values):
- summary = Counter(values)
- puts()
- puts("Summary")
- for k, v in summary.most_common():
- puts(k + ': ' + str(v))
- def fetch_event_selectors():
- # TODO get trails across all regions
- # even regions without buckets may have multi-region trails
- client = boto3.client('cloudtrail')
- paginator = client.get_paginator('list_trails')
- event_selectors = {}
- for page in paginator.paginate():
- for trail in page['Trails']:
- name = trail['Name']
- region_client = boto3.client('cloudtrail', region_name=trail['HomeRegion'])
- response = region_client.get_event_selectors(TrailName=name)
- for event_selector in response['EventSelectors']:
- read_write_type = event_selector['ReadWriteType']
- for data_resource in event_selector['DataResources']:
- if data_resource['Type'] == 'AWS::S3::Object':
- for value in data_resource['Values']:
- if value == 'arn:aws:s3':
- trail_response = region_client.get_trail(Name=name)['Trail']
- if trail_response['IsMultiRegionTrail']:
- bucket = ('global')
- else:
- bucket = ('region', trail['HomeRegion'])
- path = ''
- else:
- parts = value.split("/", 2)
- bucket = ('bucket', parts[0].replace('arn:aws:s3:::', ''))
- path = parts[1]
- if bucket not in event_selectors:
- event_selectors[bucket] = []
- event_selectors[bucket].append({'trail': name, 'path': path, 'read_write_type': read_write_type})
- return event_selectors
- @click.group()
- @click.version_option(version=__version__)
- def cli():
- pass
- @cli.command()
- @click.argument('buckets', nargs=-1)
- @click.option('--log-bucket', multiple=True, help='Check log bucket(s)')
- @click.option('--log-prefix', help='Check log prefix')
- @click.option('--skip-logging', is_flag=True, help='Skip logging check')
- @click.option('--skip-versioning', is_flag=True, help='Skip versioning check')
- @click.option('--skip-default-encryption', is_flag=True, help='Skip default encryption check')
- @click.option('--default-encryption', is_flag=True) # no op, can't hide from help until click 7 released
- @click.option('--object-level-logging', is_flag=True)
- @click.option('--sns-topic', help='Send SNS notification for failures')
- def scan(buckets, log_bucket=None, log_prefix=None, skip_logging=False, skip_versioning=False, skip_default_encryption=False, default_encryption=True, object_level_logging=False, sns_topic=None):
- event_selectors = fetch_event_selectors() if object_level_logging else {}
- checks = []
- for bucket in fetch_buckets(buckets):
- puts(bucket.name)
- checks.append(perform(AclCheck(bucket)))
- checks.append(perform(PolicyCheck(bucket)))
- checks.append(perform(PublicAccessCheck(bucket)))
- if not skip_logging:
- checks.append(perform(LoggingCheck(bucket, log_bucket=log_bucket, log_prefix=log_prefix)))
- if not skip_versioning:
- checks.append(perform(VersioningCheck(bucket)))
- if not skip_default_encryption:
- checks.append(perform(EncryptionCheck(bucket)))
- if object_level_logging:
- checks.append(perform(ObjectLoggingCheck(bucket, event_selectors=event_selectors)))
- puts()
- failed_checks = [c for c in checks if c.status != 'passed']
- if any(failed_checks):
- if sns_topic:
- topic = boto3.resource('sns').Topic(sns_topic)
- message = ''
- for check in failed_checks:
- msg = check.fail_message if check.status == 'failed' else 'access denied'
- message += check.bucket.name + ': ' + check.name + ' ' + msg + '\n'
- topic.publish(Message=message, Subject='[s3tk] Scan Failures')
- sys.exit(1)
- @cli.command(name='scan-dns')
- def scan_dns():
- buckets = set([b.name for b in s3().buckets.all()])
- found_buckets = set()
- client = boto3.client('route53')
- paginator = client.get_paginator('list_hosted_zones')
- for page in paginator.paginate():
- for hosted_zone in page['HostedZones']:
- paginator2 = client.get_paginator('list_resource_record_sets')
- for page2 in paginator2.paginate(HostedZoneId=hosted_zone['Id']):
- for resource_set in page2['ResourceRecordSets']:
- if resource_set.get('AliasTarget'):
- value = resource_set['AliasTarget']['DNSName']
- if value.startswith('s3-website-') and value.endswith('.amazonaws.com.'):
- print_dns_bucket(resource_set['Name'][:-1], buckets, found_buckets)
- elif resource_set.get('ResourceRecords'):
- for record in resource_set['ResourceRecords']:
- value = record['Value']
- if value.endswith('.s3.amazonaws.com'):
- print_dns_bucket('.'.join(value.split('.')[:-3]), buckets, found_buckets)
- if 's3-website-' in value and value.endswith('.amazonaws.com'):
- print_dns_bucket(resource_set['Name'][:-1], buckets, found_buckets)
- @cli.command(name='block-public-access')
- @click.argument('buckets', nargs=-1)
- @click.option('--dry-run', is_flag=True, help='Dry run')
- def block_public_access(buckets, dry_run=False):
- if not buckets:
- abort('Must specify at least one bucket or wildcard')
- fix_check(PublicAccessCheck, buckets, dry_run)
- @cli.command(name='enable-logging')
- @click.argument('buckets', nargs=-1)
- @click.option('--dry-run', is_flag=True, help='Dry run')
- @click.option('--log-bucket', required=True, help='Bucket to store logs')
- @click.option('--log-prefix', help='Log prefix')
- def enable_logging(buckets, log_bucket=None, log_prefix=None, dry_run=False):
- fix_check(LoggingCheck, buckets, dry_run, {'log_bucket': log_bucket, 'log_prefix': log_prefix})
- @cli.command(name='enable-versioning')
- @click.argument('buckets', nargs=-1)
- @click.option('--dry-run', is_flag=True, help='Dry run')
- def enable_versioning(buckets, dry_run=False):
- fix_check(VersioningCheck, buckets, dry_run)
- @cli.command(name='enable-default-encryption')
- @click.argument('buckets', nargs=-1)
- @click.option('--dry-run', is_flag=True, help='Dry run')
- def enable_versioning(buckets, dry_run=False):
- fix_check(EncryptionCheck, buckets, dry_run)
- @cli.command()
- @click.argument('bucket')
- @click.option('--only', help='Only certain objects')
- @click.option('--except', '_except', help='Except certain objects')
- @click.option('--dry-run', is_flag=True, help='Dry run')
- @click.option('--kms-key-id', help='KMS key id')
- @click.option('--customer-key', help='Customer key')
- def encrypt(bucket, only=None, _except=None, dry_run=False, kms_key_id=None, customer_key=None):
- summarize(parallelize(bucket, only, _except, encrypt_object, (dry_run, kms_key_id, customer_key,)))
- @cli.command(name='scan-object-acl')
- @click.argument('bucket')
- @click.option('--only', help='Only certain objects')
- @click.option('--except', '_except', help='Except certain objects')
- def scan_object_acl(bucket, only=None, _except=None):
- summarize(parallelize(bucket, only, _except, scan_object))
- @cli.command(name='reset-object-acl')
- @click.argument('bucket')
- @click.option('--only', help='Only certain objects')
- @click.option('--except', '_except', help='Except certain objects')
- @click.option('--acl', default='private', help='ACL to use')
- @click.option('--dry-run', is_flag=True, help='Dry run')
- def reset_object_acl(bucket, only=None, _except=None, acl=None, dry_run=False):
- summarize(parallelize(bucket, only, _except, reset_object, (dry_run, acl,)))
- @cli.command(name='delete-unencrypted-versions')
- @click.argument('bucket')
- @click.option('--only', help='Only certain objects')
- @click.option('--except', '_except', help='Except certain objects')
- @click.option('--dry-run', is_flag=True, help='Dry run')
- def delete_unencrypted_versions(bucket, only=None, _except=None, dry_run=False):
- summarize(parallelize(bucket, only, _except, delete_unencrypted_version, (dry_run,), True))
- @cli.command(name='list-policy')
- @click.argument('buckets', nargs=-1)
- @click.option('--named', is_flag=True, help='Print named statements')
- def list_policy(buckets, named=False):
- for bucket in fetch_buckets(buckets):
- puts(bucket.name)
- policy = fetch_policy(bucket)
- with indent(2):
- if policy is None:
- puts(colored.yellow('None'))
- else:
- if named:
- public = public_statement(bucket)
- no_object_acl = no_object_acl_statement(bucket)
- public_uploads = public_uploads_statement(bucket)
- no_uploads = no_uploads_statement(bucket)
- encryption = encryption_statement(bucket)
- for statement in policy['Statement']:
- if statement_matches(statement, public):
- named_statement = 'Public'
- elif statement_matches(statement, no_object_acl):
- named_statement = 'No object ACL'
- elif statement_matches(statement, public_uploads):
- named_statement = 'Public uploads'
- elif statement_matches(statement, no_uploads):
- named_statement = 'No uploads'
- elif statement_matches(statement, encryption):
- named_statement = 'Hash'
- else:
- named_statement = 'Custom'
- puts(colored.yellow(named_statement))
- else:
- puts(colored.yellow(json.dumps(policy, indent=4)))
- puts()
- @cli.command(name='set-policy')
- @click.argument('bucket')
- @click.option('--public', is_flag=True, help='Make all objects public')
- @click.option('--no-object-acl', is_flag=True, help='Prevent object ACL')
- @click.option('--public-uploads', is_flag=True, help='Only public uploads')
- @click.option('--no-uploads', is_flag=True, help='Prevent new uploads')
- @click.option('--encryption', is_flag=True, help='Require encryption')
- @click.option('--dry-run', is_flag=True, help='Dry run')
- def set_policy(bucket, public=False, no_object_acl=False, public_uploads=False, no_uploads=False, encryption=False, dry_run=False):
- bucket = s3().Bucket(bucket)
- bucket_policy = bucket.Policy()
- statements = []
- if public:
- statements.append(public_statement(bucket))
- if no_object_acl:
- statements.append(no_object_acl_statement(bucket))
- if public_uploads:
- statements.append(public_uploads_statement(bucket))
- if no_uploads:
- statements.append(no_uploads_statement(bucket))
- if encryption:
- statements.append(encryption_statement(bucket))
- if any(statements):
- puts('New policy')
- policy = OrderedDict([
- ('Version', '2012-10-17'),
- ('Statement', statements)
- ])
- print_policy(policy)
- if not dry_run:
- bucket_policy.put(Policy=json.dumps(policy))
- else:
- abort('No policies specified')
- # experimental
- @cli.command(name='update-policy')
- @click.argument('bucket')
- @click.option('--encryption/--no-encryption', default=None, help='Require encryption')
- @click.option('--dry-run', is_flag=True, help='Dry run')
- def update_policy(bucket, encryption=None, dry_run=False):
- bucket = s3().Bucket(bucket)
- policy = fetch_policy(bucket)
- if not policy:
- policy = OrderedDict([
- ('Version', '2012-10-17'),
- ('Statement', [])
- ])
- es = encryption_statement(bucket)
- es_index = next((i for i, s in enumerate(policy['Statement']) if statement_matches(s, es)), -1)
- if es_index != -1:
- if encryption:
- puts("No encryption change")
- print_policy(policy)
- elif encryption is False:
- puts("Removing encryption")
- policy['Statement'].pop(es_index)
- print_policy(policy)
- if not dry_run:
- if any(policy['Statement']):
- bucket.Policy().put(Policy=json.dumps(policy))
- else:
- bucket.Policy().delete()
- else:
- if encryption:
- puts("Adding encryption")
- policy['Statement'].append(es)
- print_policy(policy)
- if not dry_run:
- bucket.Policy().put(Policy=json.dumps(policy))
- elif encryption is False:
- puts(colored.yellow("No encryption change"))
- print_policy(policy)
- @cli.command(name='delete-policy')
- @click.argument('bucket')
- def delete_policy(bucket):
- s3().Bucket(bucket).Policy().delete()
- puts('Policy deleted')
|