checks.py 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235
  1. import json
  2. import botocore
  3. class Check:
  4. def __init__(self, bucket, **kwargs):
  5. self.bucket = bucket
  6. self.options = kwargs
  7. def perform(self):
  8. try:
  9. self.status = 'passed' if self._passed() else 'failed'
  10. except botocore.exceptions.ClientError:
  11. self.status = 'denied'
  12. def fix(self, options):
  13. self._fix(options)
  14. self.status = 'passed'
  15. class AclCheck(Check):
  16. name = 'ACL'
  17. pass_message = 'not open to public'
  18. fail_message = 'open to public'
  19. bad_grantees = [
  20. 'http://acs.amazonaws.com/groups/global/AllUsers',
  21. 'http://acs.amazonaws.com/groups/global/AuthenticatedUsers'
  22. ]
  23. def _passed(self):
  24. for grant in self.bucket.Acl().grants:
  25. if grant['Grantee'].get('URI', None) in self.bad_grantees:
  26. return False
  27. return True
  28. class PolicyCheck(Check):
  29. name = 'Policy'
  30. pass_message = 'not open to public'
  31. fail_message = 'open to public'
  32. def _passed(self):
  33. policy = None
  34. try:
  35. policy = self.bucket.Policy().policy
  36. except botocore.exceptions.ClientError as e:
  37. if 'NoSuchBucket' not in str(e):
  38. raise
  39. if policy is not None:
  40. policy = json.loads(policy)
  41. for s in policy['Statement']:
  42. if s['Effect'] == 'Allow' and (s['Principal'] == '*' or s['Principal'] == {'AWS': '*'}):
  43. return False
  44. return True
  45. class PublicAccessCheck(Check):
  46. name = 'Public access'
  47. pass_message = 'blocked'
  48. fail_message = 'not explicitly blocked'
  49. def _passed(self):
  50. response = None
  51. try:
  52. response = self.bucket.meta.client.get_public_access_block(
  53. Bucket=self.bucket.name
  54. )
  55. except botocore.exceptions.ClientError as e:
  56. if 'NoSuchPublicAccessBlockConfiguration' not in str(e):
  57. raise
  58. return False
  59. config = response['PublicAccessBlockConfiguration']
  60. return (config['BlockPublicAcls'] and config['IgnorePublicAcls'] and config['BlockPublicPolicy'] and config['RestrictPublicBuckets'])
  61. def _fix(self, options):
  62. self.bucket.meta.client.put_public_access_block(
  63. Bucket=self.bucket.name,
  64. PublicAccessBlockConfiguration={
  65. 'BlockPublicAcls': True,
  66. 'IgnorePublicAcls': True,
  67. 'BlockPublicPolicy': True,
  68. 'RestrictPublicBuckets': True
  69. }
  70. )
  71. class LoggingCheck(Check):
  72. name = 'Logging'
  73. pass_message = 'enabled'
  74. fail_message = 'disabled'
  75. def _passed(self):
  76. enabled = self.bucket.Logging().logging_enabled
  77. log_bucket = self.options.get('log_bucket', None)
  78. log_prefix = self.options.get('log_prefix', None)
  79. if log_prefix:
  80. log_prefix = log_prefix.replace("{bucket}", self.bucket.name)
  81. if not enabled:
  82. return False
  83. elif log_bucket and enabled['TargetBucket'] not in log_bucket:
  84. self.fail_message = 'to wrong bucket: ' + enabled['TargetBucket']
  85. return False
  86. elif log_prefix and enabled['TargetPrefix'] != log_prefix:
  87. self.fail_message = 'to wrong prefix: ' + enabled['TargetPrefix']
  88. return False
  89. self.pass_message = 'to ' + enabled['TargetBucket']
  90. if enabled['TargetPrefix']:
  91. self.pass_message = self.pass_message + '/' + enabled['TargetPrefix']
  92. return True
  93. def _fix(self, options):
  94. log_prefix = (options['log_prefix'] or '{bucket}/').replace("{bucket}", self.bucket.name)
  95. self.bucket.Logging().put(
  96. BucketLoggingStatus={
  97. 'LoggingEnabled': {
  98. 'TargetBucket': options['log_bucket'],
  99. 'TargetGrants': [
  100. {
  101. 'Grantee': {
  102. 'Type': 'Group',
  103. 'URI': 'http://acs.amazonaws.com/groups/s3/LogDelivery'
  104. },
  105. 'Permission': 'WRITE'
  106. },
  107. {
  108. 'Grantee': {
  109. 'Type': 'Group',
  110. 'URI': 'http://acs.amazonaws.com/groups/s3/LogDelivery'
  111. },
  112. 'Permission': 'READ_ACP'
  113. },
  114. ],
  115. 'TargetPrefix': log_prefix
  116. }
  117. }
  118. )
  119. class VersioningCheck(Check):
  120. name = 'Versioning'
  121. pass_message = 'enabled'
  122. fail_message = 'disabled'
  123. def _passed(self):
  124. return self.bucket.Versioning().status == 'Enabled'
  125. def _fix(self, options):
  126. self.bucket.Versioning().enable()
  127. class EncryptionCheck(Check):
  128. name = 'Default encryption'
  129. pass_message = 'enabled'
  130. fail_message = 'disabled'
  131. def _passed(self):
  132. response = None
  133. try:
  134. response = self.bucket.meta.client.get_bucket_encryption(
  135. Bucket=self.bucket.name
  136. )
  137. except botocore.exceptions.ClientError as e:
  138. if 'ServerSideEncryptionConfigurationNotFoundError' not in str(e):
  139. raise
  140. return response is not None
  141. def _fix(self, options):
  142. self.bucket.meta.client.put_bucket_encryption(
  143. Bucket=self.bucket.name,
  144. ServerSideEncryptionConfiguration={
  145. 'Rules': [
  146. {
  147. 'ApplyServerSideEncryptionByDefault': {
  148. 'SSEAlgorithm': 'AES256'
  149. }
  150. }
  151. ]
  152. }
  153. )
  154. class ObjectLoggingCheck(Check):
  155. name = 'CloudTrail object-level logging'
  156. pass_message = 'enabled'
  157. fail_message = 'disabled'
  158. def _passed(self):
  159. event_selectors = self.options['event_selectors']
  160. selectors = []
  161. selectors += event_selectors.get(('global'), [])
  162. # handle single-region trails
  163. if any(k for k in event_selectors.keys() if k[0] == 'region'):
  164. region = self.bucket.meta.client.get_bucket_location(Bucket=self.bucket.name)['LocationConstraint']
  165. # https://github.com/aws/aws-sdk-net/issues/323
  166. if region is None:
  167. region = 'us-east-1'
  168. selectors += event_selectors.get(('region', region), [])
  169. selectors += event_selectors.get(('bucket', self.bucket.name), [])
  170. passed = any(selectors)
  171. if passed:
  172. messages = []
  173. for event_selector in selectors:
  174. message = event_selector['trail'] + ' ('
  175. if event_selector['read_write_type'] == 'All':
  176. message += 'read & write'
  177. elif event_selector['read_write_type'] == 'ReadOnly':
  178. message += 'read'
  179. elif event_selector['read_write_type'] == 'WriteOnly':
  180. message += 'write'
  181. else:
  182. message += 'unknown'
  183. if event_selector['path'] != '':
  184. message += ' for /' + event_selector['path']
  185. messages.append(message + ')')
  186. self.pass_message = 'to ' + ' and '.join(messages)
  187. return passed