cse_4.py 1.2 KB

1234567891011121314151617181920212223242526
  1. def encrypt(buf, s3_metadata, kms_client):
  2. """
  3. Method to encrypt an S3 object with KMS based Client-side encryption (CSE).
  4. The original object's metadata (previously used to decrypt the content) is
  5. used to infer some parameters such as the algorithm originally used to encrypt
  6. the previous version (which is left unchanged) and to store the new envelope,
  7. including the initialization vector (IV).
  8. """
  9. logger.info("Encrypting Object with CSE-KMS")
  10. content = buf.read()
  11. alg = s3_metadata.get(HEADER_ALG, None)
  12. matdesc = json.loads(s3_metadata[HEADER_MATDESC])
  13. aes_key, matdesc_metadata, key_metadata = get_encryption_aes_key(
  14. matdesc["kms_cmk_id"], kms_client
  15. )
  16. s3_metadata[HEADER_UE_CLENGHT] = str(len(content))
  17. s3_metadata[HEADER_WRAP_ALG] = "kms"
  18. s3_metadata[HEADER_KEY] = key_metadata
  19. s3_metadata[HEADER_ALG] = alg
  20. if alg == ALG_GCM:
  21. s3_metadata[HEADER_TAG_LEN] = str(AES_BLOCK_SIZE)
  22. result, iv = encrypt_gcm(aes_key, content)
  23. else:
  24. result, iv = encrypt_cbc(aes_key, content)
  25. s3_metadata[HEADER_IV] = base64.b64encode(iv).decode()
  26. return BytesIO(result), s3_metadata