123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235 |
- import json
- import botocore
- class Check:
- def __init__(self, bucket, **kwargs):
- self.bucket = bucket
- self.options = kwargs
- def perform(self):
- try:
- self.status = 'passed' if self._passed() else 'failed'
- except botocore.exceptions.ClientError:
- self.status = 'denied'
- def fix(self, options):
- self._fix(options)
- self.status = 'passed'
- class AclCheck(Check):
- name = 'ACL'
- pass_message = 'not open to public'
- fail_message = 'open to public'
- bad_grantees = [
- 'http://acs.amazonaws.com/groups/global/AllUsers',
- 'http://acs.amazonaws.com/groups/global/AuthenticatedUsers'
- ]
- def _passed(self):
- for grant in self.bucket.Acl().grants:
- if grant['Grantee'].get('URI', None) in self.bad_grantees:
- return False
- return True
- class PolicyCheck(Check):
- name = 'Policy'
- pass_message = 'not open to public'
- fail_message = 'open to public'
- def _passed(self):
- policy = None
- try:
- policy = self.bucket.Policy().policy
- except botocore.exceptions.ClientError as e:
- if 'NoSuchBucket' not in str(e):
- raise
- if policy is not None:
- policy = json.loads(policy)
- for s in policy['Statement']:
- if s['Effect'] == 'Allow' and (s['Principal'] == '*' or s['Principal'] == {'AWS': '*'}):
- return False
- return True
- class PublicAccessCheck(Check):
- name = 'Public access'
- pass_message = 'blocked'
- fail_message = 'not explicitly blocked'
- def _passed(self):
- response = None
- try:
- response = self.bucket.meta.client.get_public_access_block(
- Bucket=self.bucket.name
- )
- except botocore.exceptions.ClientError as e:
- if 'NoSuchPublicAccessBlockConfiguration' not in str(e):
- raise
- return False
- config = response['PublicAccessBlockConfiguration']
- return (config['BlockPublicAcls'] and config['IgnorePublicAcls'] and config['BlockPublicPolicy'] and config['RestrictPublicBuckets'])
- def _fix(self, options):
- self.bucket.meta.client.put_public_access_block(
- Bucket=self.bucket.name,
- PublicAccessBlockConfiguration={
- 'BlockPublicAcls': True,
- 'IgnorePublicAcls': True,
- 'BlockPublicPolicy': True,
- 'RestrictPublicBuckets': True
- }
- )
- class LoggingCheck(Check):
- name = 'Logging'
- pass_message = 'enabled'
- fail_message = 'disabled'
- def _passed(self):
- enabled = self.bucket.Logging().logging_enabled
- log_bucket = self.options.get('log_bucket', None)
- log_prefix = self.options.get('log_prefix', None)
- if log_prefix:
- log_prefix = log_prefix.replace("{bucket}", self.bucket.name)
- if not enabled:
- return False
- elif log_bucket and enabled['TargetBucket'] not in log_bucket:
- self.fail_message = 'to wrong bucket: ' + enabled['TargetBucket']
- return False
- elif log_prefix and enabled['TargetPrefix'] != log_prefix:
- self.fail_message = 'to wrong prefix: ' + enabled['TargetPrefix']
- return False
- self.pass_message = 'to ' + enabled['TargetBucket']
- if enabled['TargetPrefix']:
- self.pass_message = self.pass_message + '/' + enabled['TargetPrefix']
- return True
- def _fix(self, options):
- log_prefix = (options['log_prefix'] or '{bucket}/').replace("{bucket}", self.bucket.name)
- self.bucket.Logging().put(
- BucketLoggingStatus={
- 'LoggingEnabled': {
- 'TargetBucket': options['log_bucket'],
- 'TargetGrants': [
- {
- 'Grantee': {
- 'Type': 'Group',
- 'URI': 'http://acs.amazonaws.com/groups/s3/LogDelivery'
- },
- 'Permission': 'WRITE'
- },
- {
- 'Grantee': {
- 'Type': 'Group',
- 'URI': 'http://acs.amazonaws.com/groups/s3/LogDelivery'
- },
- 'Permission': 'READ_ACP'
- },
- ],
- 'TargetPrefix': log_prefix
- }
- }
- )
- class VersioningCheck(Check):
- name = 'Versioning'
- pass_message = 'enabled'
- fail_message = 'disabled'
- def _passed(self):
- return self.bucket.Versioning().status == 'Enabled'
- def _fix(self, options):
- self.bucket.Versioning().enable()
- class EncryptionCheck(Check):
- name = 'Default encryption'
- pass_message = 'enabled'
- fail_message = 'disabled'
- def _passed(self):
- response = None
- try:
- response = self.bucket.meta.client.get_bucket_encryption(
- Bucket=self.bucket.name
- )
- except botocore.exceptions.ClientError as e:
- if 'ServerSideEncryptionConfigurationNotFoundError' not in str(e):
- raise
- return response is not None
- def _fix(self, options):
- self.bucket.meta.client.put_bucket_encryption(
- Bucket=self.bucket.name,
- ServerSideEncryptionConfiguration={
- 'Rules': [
- {
- 'ApplyServerSideEncryptionByDefault': {
- 'SSEAlgorithm': 'AES256'
- }
- }
- ]
- }
- )
- class ObjectLoggingCheck(Check):
- name = 'CloudTrail object-level logging'
- pass_message = 'enabled'
- fail_message = 'disabled'
- def _passed(self):
- event_selectors = self.options['event_selectors']
- selectors = []
- selectors += event_selectors.get(('global'), [])
-
- if any(k for k in event_selectors.keys() if k[0] == 'region'):
- region = self.bucket.meta.client.get_bucket_location(Bucket=self.bucket.name)['LocationConstraint']
-
- if region is None:
- region = 'us-east-1'
- selectors += event_selectors.get(('region', region), [])
- selectors += event_selectors.get(('bucket', self.bucket.name), [])
- passed = any(selectors)
- if passed:
- messages = []
- for event_selector in selectors:
- message = event_selector['trail'] + ' ('
- if event_selector['read_write_type'] == 'All':
- message += 'read & write'
- elif event_selector['read_write_type'] == 'ReadOnly':
- message += 'read'
- elif event_selector['read_write_type'] == 'WriteOnly':
- message += 'write'
- else:
- message += 'unknown'
- if event_selector['path'] != '':
- message += ' for /' + event_selector['path']
- messages.append(message + ')')
- self.pass_message = 'to ' + ' and '.join(messages)
- return passed
|