1234567891011121314151617181920212223242526 |
- def encrypt(buf, s3_metadata, kms_client):
- """
- Method to encrypt an S3 object with KMS based Client-side encryption (CSE).
- The original object's metadata (previously used to decrypt the content) is
- used to infer some parameters such as the algorithm originally used to encrypt
- the previous version (which is left unchanged) and to store the new envelope,
- including the initialization vector (IV).
- """
- logger.info("Encrypting Object with CSE-KMS")
- content = buf.read()
- alg = s3_metadata.get(HEADER_ALG, None)
- matdesc = json.loads(s3_metadata[HEADER_MATDESC])
- aes_key, matdesc_metadata, key_metadata = get_encryption_aes_key(
- matdesc["kms_cmk_id"], kms_client
- )
- s3_metadata[HEADER_UE_CLENGHT] = str(len(content))
- s3_metadata[HEADER_WRAP_ALG] = "kms"
- s3_metadata[HEADER_KEY] = key_metadata
- s3_metadata[HEADER_ALG] = alg
- if alg == ALG_GCM:
- s3_metadata[HEADER_TAG_LEN] = str(AES_BLOCK_SIZE)
- result, iv = encrypt_gcm(aes_key, content)
- else:
- result, iv = encrypt_cbc(aes_key, content)
- s3_metadata[HEADER_IV] = base64.b64encode(iv).decode()
- return BytesIO(result), s3_metadata
|