test_cse.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. from botocore.exceptions import ClientError
  2. from mock import patch, MagicMock, call, ANY
  3. import pytest
  4. import base64
  5. import json
  6. from io import BytesIO
  7. import os
  8. from cryptography.hazmat.primitives.ciphers import Cipher
  9. from cryptography.hazmat.primitives.ciphers.algorithms import AES
  10. from cryptography.hazmat.primitives.padding import PKCS7
  11. from cryptography.hazmat.primitives.ciphers.modes import ECB, GCM
  12. from backend.ecs_tasks.delete_files.cse import is_kms_cse_encrypted, encrypt, decrypt
  13. pytestmark = [pytest.mark.unit, pytest.mark.ecs_tasks]
  14. class KmsMock:
  15. """
  16. This is not what the real KMS does.
  17. Only used for unit testing.
  18. """
  19. def __init__(self, key_id):
  20. plaintext = os.urandom(32)
  21. iv = os.urandom(16)
  22. encryptor = Cipher(AES(base64.b64decode(key_id)), GCM(iv)).encryptor()
  23. aad = BytesIO()
  24. aad.write("kms_cmk_id".encode("utf-8"))
  25. aad.write(key_id.encode("utf-8"))
  26. encryptor.authenticate_additional_data(aad.getvalue())
  27. ciphertext = encryptor.update(plaintext) + encryptor.finalize()
  28. self.key_id = key_id
  29. self.plaintext = plaintext
  30. self.ciphertext = ciphertext
  31. def generate_data_key(self):
  32. return {
  33. "Plaintext": self.plaintext,
  34. "CiphertextBlob": self.ciphertext,
  35. "KeyId": self.key_id,
  36. }
  37. def decrypt(self):
  38. return {"Plaintext": self.plaintext}
  39. def test_it_recognises_supported_kms_cse_object():
  40. valid_cbc = {
  41. "x-amz-key-v2": "key",
  42. "x-amz-wrap-alg": "kms",
  43. "x-amz-cek-alg": "AES/CBC/PKCS5Padding",
  44. }
  45. valid_gcm = {
  46. "x-amz-key-v2": "key",
  47. "x-amz-wrap-alg": "kms",
  48. "x-amz-cek-alg": "AES/GCM/NoPadding",
  49. }
  50. not_encrypted = {}
  51. assert is_kms_cse_encrypted(valid_cbc)
  52. assert is_kms_cse_encrypted(valid_gcm)
  53. assert not is_kms_cse_encrypted(not_encrypted)
  54. def test_it_throws_exception_for_encryption_sdk_v1():
  55. old_sdk = {
  56. "x-amz-key": "key",
  57. "x-amz-wrap-alg": "kms",
  58. "x-amz-cek-alg": "AES/CBC/PKCS5Padding",
  59. }
  60. with pytest.raises(ValueError) as e:
  61. is_kms_cse_encrypted(old_sdk)
  62. assert e.value.args[0] == "Unsupported Amazon S3 Hash Client Version"
  63. def test_it_throws_exception_for_unsupported_encryption_algorithm():
  64. invalid_algorithm = {
  65. "x-amz-key-v2": "key",
  66. "x-amz-wrap-alg": "kms",
  67. "x-amz-cek-alg": "AES/FOO/Bar",
  68. }
  69. with pytest.raises(ValueError) as e:
  70. is_kms_cse_encrypted(invalid_algorithm)
  71. assert e.value.args[0] == "Unsupported Hash algorithm"
  72. def test_it_throws_exception_for_unsupported_encryption_strategy():
  73. not_kms = {"x-amz-key-v2": "key", "x-amz-cek-alg": "AES/CBC/PKCS5Padding"}
  74. with pytest.raises(ValueError) as e:
  75. is_kms_cse_encrypted(not_kms)
  76. assert e.value.args[0] == "Unsupported Hash strategy"
  77. def test_it_encrypts_and_decrypts_data_cbc():
  78. key_id = "1234abcd-12ab-34cd-56ef-1234567890ab"
  79. kms_client = MagicMock()
  80. kms_mock = KmsMock(key_id)
  81. kms_client.generate_data_key.return_value = kms_mock.generate_data_key()
  82. kms_client.decrypt.return_value = kms_mock.decrypt()
  83. metadata = {
  84. "x-amz-key-v2": "key",
  85. "x-amz-iv": "IV",
  86. "x-amz-matdesc": json.dumps({"kms_cmk_id": key_id}),
  87. "x-amz-wrap-alg": "kms",
  88. "x-amz-cek-alg": "AES/CBC/PKCS5Padding",
  89. "x-amz-unencrypted-content-length": "890",
  90. }
  91. content = b'{"customer_id":12345}\n'
  92. encrypted, new_metadata = encrypt(BytesIO(content), metadata, kms_client)
  93. decrypted = decrypt(encrypted, new_metadata, kms_client)
  94. assert new_metadata == {
  95. "x-amz-key-v2": ANY,
  96. "x-amz-iv": ANY,
  97. "x-amz-matdesc": json.dumps({"kms_cmk_id": key_id}),
  98. "x-amz-wrap-alg": "kms",
  99. "x-amz-cek-alg": "AES/CBC/PKCS5Padding",
  100. "x-amz-unencrypted-content-length": "22",
  101. }
  102. assert new_metadata["x-amz-key-v2"] != "key"
  103. assert new_metadata["x-amz-iv"] != "IV"
  104. assert encrypted != content
  105. assert decrypted.read() == content
  106. kms_client.generate_data_key.assert_called_with(
  107. KeyId=key_id,
  108. EncryptionContext={"kms_cmk_id": key_id},
  109. KeySpec="AES_256",
  110. )
  111. kms_client.decrypt.assert_called_with(
  112. CiphertextBlob=base64.b64decode(new_metadata["x-amz-key-v2"]),
  113. EncryptionContext={"kms_cmk_id": key_id},
  114. )
  115. def test_it_encrypts_and_decrypts_data_gcm():
  116. key_id = "1234abcd-12ab-34cd-56ef-1234567890ab"
  117. kms_client = MagicMock()
  118. kms_mock = KmsMock(key_id)
  119. kms_client.generate_data_key.return_value = kms_mock.generate_data_key()
  120. kms_client.decrypt.return_value = kms_mock.decrypt()
  121. metadata = {
  122. "x-amz-key-v2": "key",
  123. "x-amz-iv": "IV",
  124. "x-amz-matdesc": json.dumps({"kms_cmk_id": key_id}),
  125. "x-amz-wrap-alg": "kms",
  126. "x-amz-cek-alg": "AES/GCM/NoPadding",
  127. "x-amz-unencrypted-content-length": "11",
  128. "x-amz-tag-len": "111",
  129. }
  130. content = b'{"customer_id":12345}\n'
  131. encrypted, new_metadata = encrypt(BytesIO(content), metadata, kms_client)
  132. decrypted = decrypt(encrypted, new_metadata, kms_client)
  133. assert new_metadata == {
  134. "x-amz-key-v2": ANY,
  135. "x-amz-iv": ANY,
  136. "x-amz-matdesc": json.dumps({"kms_cmk_id": key_id}),
  137. "x-amz-wrap-alg": "kms",
  138. "x-amz-cek-alg": "AES/GCM/NoPadding",
  139. "x-amz-unencrypted-content-length": "22",
  140. "x-amz-tag-len": "128",
  141. }
  142. assert new_metadata["x-amz-key-v2"] != "key"
  143. assert new_metadata["x-amz-iv"] != "IV"
  144. assert encrypted != content
  145. assert decrypted.read() == content
  146. kms_client.generate_data_key.assert_called_with(
  147. KeyId=key_id,
  148. EncryptionContext={"kms_cmk_id": key_id},
  149. KeySpec="AES_256",
  150. )
  151. kms_client.decrypt.assert_called_with(
  152. CiphertextBlob=base64.b64decode(new_metadata["x-amz-key-v2"]),
  153. EncryptionContext={"kms_cmk_id": key_id},
  154. )