I have a CloudFormation template which I am using to configure lambda to decrypt AWS RDS database activity stream logs
I used the lambda function from: https://github.com/aws-samples/optimising-aws-kms-decryption-cost-for-das/blob/main/lambda.py
However, in lambda logs, I see InvalidCipherTextException, and I do not understand why I am getting it, because everything seems to be alright.
Lambda logs from CloudWatch:
2025-05-01T18:50:01.777Z Processing record: { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "741a3939-d892-4df5-934a-9479252cb48d", "sequenceNumber": "49662849271037707186170471169412978102974199784156430338", "data": "eyJ0eXBlIjogIkRhdGFiYXNlQWN0aXZpdHlNb25pdG9yaW5nUmVjb3JkcyIsInZlcnNpb24iOiAiMS4yIiw...", "approximateArrivalTimestamp": 1746125400.734 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000000:49662849271037707186170471169412978102974199784156430338", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::xxx:role/ops-rds-stream-decrypted-logs-DevDecryptLambdaRole-g0cLkmhkldp9", "awsRegion": "eu-central-1", "eventSourceARN": "arn:aws:kinesis:eu-central-1:xxx:stream/aws-rds-das-cluster-xxx" } Processing record: {"kinesis": {"kinesisSchemaVersion": "1.0", "partitionKey": "741a3939-d892-4df5-934a-9479252cb48d", "sequenceNumber": "49662849271037707186170471169412978102974199784156430338", "data": "eyJ0eXBlIjogIkRhdGFiYXNlQWN0aXZpdHlNb25pdG9yaW5nUmVjb3JkcyIsInZlcnNpb24iOiAiMS4yIiwiZGF0YWJhc2VBY3Rpdml0eUV2ZW50cyI6ICJBWUFEZUNTbG1tUTFOTjE1UHJmZjZXcVBpMjRBWHdBQkFCVmhkM010WTNKNWNIUnZMWEIxWW14cFl5MXJaWGtBUkVGNFEwd3JNSGhJWm5WSVNXaFZSbGhDTTNZeGNWQkVLMHBDY1dkelNIUmpWMmwwZEdwUFREZ3hNSGhEVG10clJGQXZUa00wVWpoMVRWZGFZVEZyUTFKNlVUMDlBQUVBQWtKREFCdEVZWFJoUzJWNUFBQUFnQUFBQUF3Wnkwdk8vWmVhWDRWZ3g3NEFNSlBTVVlrb1dSM0FKV1QyU2VvcUZJbWl4cHB3Y0g5VzhqdkN2bDdBOVkxNHpHdjR5Q081dnBLTUhGYllBVms4eGdJQUFBQUFEQUFFQUFBQUFBQUFBQUFBQUFBQUFBQXNUZ2lCSEhBSXNUQXpScmZFcFZDci8vLy8vd0FBQUFFQUFBQUFBQUFBQUFBQUFBRUFBQUlJeXoxWnBFLyt4OXF6WmZPUW9EYXB4U3dSRkwwbGJaQ0ppeHhkVnB0dmRodUhRNkM3Z3NJUGlBdTU4SURueGt2WHlNTDRYZTBNZnhNMUh5bWdsYmVCS2FWVDlKQkZtWVpxMTRwa3dGUTlob1dxTUVqc3cxMllwN2IzcDZyRmJIaEkyVDIzdU91TnF5cVNLTGloVVdWbE5PN0FqcWZYNjNWeDhaVXdITXNEWWRkQmFvRkJKVm9WRXpIS3RnbE5DMU9KZXlQa2VqMzIzeFFJUG5JY0RBbFRzdkxpYVlzSWtPdm4rYVRXZVdqaWNoZEdhUngrRkg0ZVJxQmp4ODRBK1VIQ21tbG45WG02QWk3eTB4UjM1N0l5YW5LWW40RUdJT2ZiN0NtSmx0YXE5OHYweUpRVUFrL0lFNzFuSkFpRC9WWkRtdUFjRUJLZkl1V0czaXNieFhIQXJ2bWFRVWdYcVdRaXRlelgvS1hlcE9KeG8zYXd0ZXJUYVVnNmtVeUtzZkpyVHRYUEhKdmd4STY0QWp6eUNXb1l4UXRnR0NZZERrK09NVXEvVmVyOUZ1Ky9IcXRuV2krRzJEMUpwU1gwb1VaTlk3bm80N09FUUE5OFE0bnRaeWNseFptRW1Qc2F2V0hIYWtXNHAyajZhbVRoNjd3NVBnR2E0T0dUZzNEdXhOYjJOeWdGNzJTKzFib1VJSWJHVXQ1ZFJYQkRTUmNodHNYQjNMZ2lRazUrOEZOc3kvYWlVa2NXTjIrRys4Ym5RS250bmlUTThISVIvbnphYTI0dmJIR3VrT0V2WE5QMFc3WmhNTDRmNEU1NHJRY1lnQjN5aWJKNGRMcnB3Nms3dGRnaTRyM3NoYWNPVS9jbmlMaVBYSEZjSkhud1g0NXNRQWkxNVFOcVZFTUN6bE82M2NKL3o0T2VyTmN1N1Frd1lNd01JbENiYkNPUEhVNEFaekJsQWpFQXhpTndDNTc5L2QrdXVHTWxhRTE1dlZiSWdDZ1hPN3Zad3czeXFQS25zMUFUNTgzejBMclVaTngvQTlwU0c3RWZBakFYbk16dkkyMVRhYU5Zem1iekRLZ2czMkJ3ajhCdWhESUhSUDYrNmU2aWViNXR0VHMzQ3B4blkvaHUwY0l0Z1VZPSIsImtleSIgOiAiQVFJREFIZ0RIVGN6V0VXejB1MFB0N0VPMkpISHhTTUVIZFd5bk1rUS9xMDZuVGpmMndGbEcvVDYxNW1TRzI0VzRobm1Fa2JnQUFBQWZqQjhCZ2txaGtpRzl3MEJCd2FnYnpCdEFnRUFNR2dHQ1NxR1NJYjNEUUVIQVRBZUJnbGdoa2dCWlFNRUFTNHdFUVFNTWU0Y0djVFhxZmVXRkJTa0FnRVFnRHY4K3ZKcUM1VTFhZEk3YlZLbkE1eGVNS3ByTHIyQ3hiRVRJUnMxaGdrTURudVl5a2gzb2JtUzdISmNFVHJQeThnMDMwNzdieWVrM1dwMXVnPT0ifQ==", "approximateArrivalTimestamp": 1746125400.734}, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000000:49662849271037707186170471169412978102974199784156430338", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::618550961437:role/ops-rds-stream-decrypted-logs-DevDecryptLambdaRole-g0cLkmhkldp9", "awsRegion": "eu-central-1", "eventSourceARN": "arn:aws:kinesis:eu-central-1:xxx:stream/aws-rds-das-cluster-xxx"} 2025-05-01T18:50:01.777Z Base64-decoded data: b' { "type": "DatabaseActivityMonitoringRecords", "version": "1.2", "databaseActivityEvents": "AYADeCSlmmQ1NN15Prff6WqPi24AXwABABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREF4Q0wrMHhIZnVISWh...", "key": "AQIDAHgDHTczWEWz0u0Pt7EO2JHHxSMEHdWynMkQ/q06nTjf2wFlG/T615mSG24W4hnmEkbgAAAAfjB8Bgk..." } ' Decoded record data: { "type": "DatabaseActivityMonitoringRecords", "version": "1.2", "databaseActivityEvents": "AYADeCSlmmQ1NN15Prff6WqPi24AXwABABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREF4Q0wrMHhIZnVISWh...", "key": "AQIDAHgDHTczWEWz0u0Pt7EO2JHHxSMEHdWynMkQ/q06nTjf2wFlG/T615mSG24W4hnmEkbgAAAAfjB8Bgk..." } Decoded payload: b'\x01\x80\x03x$\xa5\x9ad54\xddy>\xb7\xdf\xe9j\x8f\x8bn\x00_\x00\x01\x00\x15aws-crypto-public-...' Decoded data key: b"\x01\x02\x03\x00x\x03\x1d73XE\xb3\xd2\xed\x0f\xb7\xb1\x0e\xd8\x91\xc7\xc5#\x04\x1d\xd5\xb2\x9c\xc9\x10\xfe\xad:\x9d8\xdf\xdb\x01e\x1b\xf4\xfa\xd7\x99\x92\x1bn\x16\xe2..." 2025-05-01T18:50:01.789Z Decrypting data key: b"\x01\x02\x03\x00x\x03\x1d73XE\xb3\xd2\xed\x0f\xb7\xb1\x0e\xd8\x91\xc7\xc5#\x04\x1d\xd5\xb2\x9c\xc9\x10\xfe\xad:\x9d8\xdf\xdb\x01e\x1b\xf4\xfa\xd7\x99\x92\x1bn\x16\xe2..." Error processing record: An error occurred (InvalidCiphertextException) when calling the Decrypt operation: Cloudformation template that I have written:
AWSTemplateFormatVersion: 2010-09-09 Description: Dev-only setup to decrypt RDS DAS logs using KMS and store them in S3 Parameters: SourceKinesisStreamARN: Type: String Default: arn:aws:kinesis:eu-central-1:xxx:stream/aws-rds-das-cluster-xxxx KMSKeyARN: Type: String Default: arn:aws:kms:eu-central-1:xxx:key/xxx Region: Type: String Default: eu-central-1 ResourceId: Type: String Default: srv-rds-cluster-databasecluster-xxx # Aurora RDS Cluster ID Resources: # S3 Bucket to store decrypted logs DevDecryptedLogsBucket: Type: AWS::S3::Bucket Properties: BucketName: rds-das-dev-decrypted-logs # New S3 Bucket in dev account LifecycleConfiguration: Rules: - Status: Enabled Transitions: - StorageClass: GLACIER TransitionInDays: 30 # IAM Role for Lambda to read from Kinesis and write to S3 DevDecryptLambdaRole: Type: AWS::IAM::Role Properties: PermissionsBoundary: !Sub arn:aws:iam::${AWS::AccountId}:policy/ScopePermissions AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole Policies: - PolicyName: DevDecryptLambdaPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - kinesis:GetRecords - kinesis:GetShardIterator - kinesis:DescribeStream - kinesis:ListStreams Resource: !Ref SourceKinesisStreamARN - Effect: Allow Action: - s3:PutObject Resource: arn:aws:s3:::rds-das-dev-decrypted-logs/* - Effect: Allow Action: - kms:Decrypt Resource: !Ref KMSKeyARN - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: '*' # Lambda function to decrypt the logs and store them in S3 DevDASDecryptLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Runtime: python3.10 Architectures: - arm64 Timeout: 60 MemorySize: 512 Role: !GetAtt DevDecryptLambdaRole.Arn Environment: Variables: S3_BUCKET: !Ref DevDecryptedLogsBucket # S3 bucket for decrypted logs KMS_KEY_ID: !Ref KMSKeyARN # KMS key ID for decryption REGION_NAME: !Ref Region # Region for decryption RESOURCE_ID: !Ref ResourceId # RDS Aurora cluster ID STREAM_NAME: !Ref SourceKinesisStreamARN # Kinesis stream name Code: ZipFile: | import json import zlib import aws_encryption_sdk import boto3 import base64 import os from aws_encryption_sdk import CommitmentPolicy from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType # Constants for AWS resources and region. Please use environment variables instead REGION_NAME = os.environ['REGION_NAME'] RESOURCE_ID = os.environ['RESOURCE_ID'] STREAM_NAME = os.environ['STREAM_NAME'] # Initialize the AWS Encryption SDK client with a specific commitment policy enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT) # Custom key provider class for raw encryption/decryption operations class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" # Custom provider ID def __new__(cls, *args, **kwargs): # Overriding the object creation process for proper initialization obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, plain_key): # Initializing the parent class and setting up a wrapping key super().__init__() self.wrapping_key = WrappingKey( wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC) def _get_raw_key(self, key_id): # Method to retrieve the raw key; here, it returns the initialized wrapping key return self.wrapping_key # Class for caching decrypted data keys using AWS KMS class KMSDataKeyCache(): def __init__(self, session): # Initialize the KMS client and a simple dictionary for caching keys self.kms_client = session.client('kms', region_name=REGION_NAME) self.key_cache = {} def getDecrypted(self, data_key_decoded): # Attempt to retrieve the decrypted key from cache or decrypt it using KMS if data_key_decoded in self.key_cache: print("Cache hit for data key.") return self.key_cache[data_key_decoded] else: print(f"Decrypting data key: {data_key_decoded}") # Decrypt the key using KMS and store it in the cache data_key_decrypt_result = self.kms_client.decrypt( CiphertextBlob=data_key_decoded, EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID}) self.key_cache[data_key_decoded] = data_key_decrypt_result['Plaintext'] print(f"Data key decrypted successfully.") return data_key_decrypt_result['Plaintext'] # Function to decrypt payload with a provided data key def decrypt_payload(payload, data_key): print(f"Decrypting payload with data key: {data_key}") # Debugging: Print the full data key # Setup the key provider and decrypt the payload my_key_provider = MyRawMasterKeyProvider(data_key) my_key_provider.add_master_key("DataKey") decrypted_plaintext, header = enc_client.decrypt( source=payload, materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager( master_key_provider=my_key_provider)) print(f"Decrypted payload: {decrypted_plaintext}") # Debugging: Print the full decrypted payload return decrypted_plaintext # Function to decrypt and then decompress the payload def decrypt_decompress(payload, key): print(f"Decompressing payload...") # Debugging: Indicate decompression decrypted = decrypt_payload(payload, key) decompressed = zlib.decompress(decrypted, zlib.MAX_WBITS + 16) print(f"Decompressed payload: {decompressed}") # Debugging: Print the full decompressed payload return decompressed # The main Lambda handler function def lambda_handler(event, context): # Initialize a session and the KMS data key cache print("Initializing KMS Data Key Cache...") session = boto3.session.Session() kms_data_key_cache = KMSDataKeyCache(session) # Process each record in the event for record in event['Records']: try: print(f"Processing record: {json.dumps(record)}") # Debugging: Print the full record # Decode and parse the incoming data data = base64.b64decode(record['kinesis']['data']) print(f"Base64-decoded data: {data}") # Debugging: Print the full base64-decoded data record_data = json.loads(data) print(f"Decoded record data: {json.dumps(record_data)}") # Debugging: Print the full decoded record data payload_decoded = base64.b64decode(record_data['databaseActivityEvents']) print(f"Decoded payload: {payload_decoded}") # Debugging: Print the full decoded payload data_key_decoded = base64.b64decode(record_data['key']) print(f"Decoded data key: {data_key_decoded}") # Debugging: Print the full decoded data key # Get the decrypted data key from the cache or KMS decrypted_data_key = kms_data_key_cache.getDecrypted(data_key_decoded) print(f"Decrypted data key: {decrypted_data_key}") # Debugging: Print the full decrypted data key # Decrypt and decompress the payload decrypted_decompressed_payload = decrypt_decompress(payload_decoded, decrypted_data_key) plaintext = decrypted_decompressed_payload.decode('utf-8') print(f"Decrypted and decompressed payload: {plaintext}") # Debugging: Print the full plaintext # Load the JSON events and log them events = json.loads(plaintext) print("Processed events:", events) # Debugging: Print the full events processed except Exception as e: # Log any errors encountered during processing print(f"Error processing record: {str(e)}") # Return a success status code and message return { 'statusCode': 200, 'body': json.dumps('Processing Complete') } # Event source mapping to trigger the Lambda function from Kinesis stream DevDecryptLambdaEventSource: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 100 EventSourceArn: !Ref SourceKinesisStreamARN FunctionName: !Ref DevDASDecryptLambda StartingPosition: LATEST Outputs: DevDecryptedLogsBucketName: Description: The S3 bucket where decrypted logs will be stored Value: rds-das-dev-decrypted-logs