1

I have a CloudFormation template which I am using to configure lambda to decrypt AWS RDS database activity stream logs

I used the lambda function from: https://github.com/aws-samples/optimising-aws-kms-decryption-cost-for-das/blob/main/lambda.py

However, in lambda logs, I see InvalidCipherTextException, and I do not understand why I am getting it, because everything seems to be alright.

Lambda logs from CloudWatch:

2025-05-01T18:50:01.777Z Processing record: { "kinesis": { "kinesisSchemaVersion": "1.0", "partitionKey": "741a3939-d892-4df5-934a-9479252cb48d", "sequenceNumber": "49662849271037707186170471169412978102974199784156430338", "data": "eyJ0eXBlIjogIkRhdGFiYXNlQWN0aXZpdHlNb25pdG9yaW5nUmVjb3JkcyIsInZlcnNpb24iOiAiMS4yIiw...", "approximateArrivalTimestamp": 1746125400.734 }, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000000:49662849271037707186170471169412978102974199784156430338", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::xxx:role/ops-rds-stream-decrypted-logs-DevDecryptLambdaRole-g0cLkmhkldp9", "awsRegion": "eu-central-1", "eventSourceARN": "arn:aws:kinesis:eu-central-1:xxx:stream/aws-rds-das-cluster-xxx" } Processing record: {"kinesis": {"kinesisSchemaVersion": "1.0", "partitionKey": "741a3939-d892-4df5-934a-9479252cb48d", "sequenceNumber": "49662849271037707186170471169412978102974199784156430338", "data": "eyJ0eXBlIjogIkRhdGFiYXNlQWN0aXZpdHlNb25pdG9yaW5nUmVjb3JkcyIsInZlcnNpb24iOiAiMS4yIiwiZGF0YWJhc2VBY3Rpdml0eUV2ZW50cyI6ICJBWUFEZUNTbG1tUTFOTjE1UHJmZjZXcVBpMjRBWHdBQkFCVmhkM010WTNKNWNIUnZMWEIxWW14cFl5MXJaWGtBUkVGNFEwd3JNSGhJWm5WSVNXaFZSbGhDTTNZeGNWQkVLMHBDY1dkelNIUmpWMmwwZEdwUFREZ3hNSGhEVG10clJGQXZUa00wVWpoMVRWZGFZVEZyUTFKNlVUMDlBQUVBQWtKREFCdEVZWFJoUzJWNUFBQUFnQUFBQUF3Wnkwdk8vWmVhWDRWZ3g3NEFNSlBTVVlrb1dSM0FKV1QyU2VvcUZJbWl4cHB3Y0g5VzhqdkN2bDdBOVkxNHpHdjR5Q081dnBLTUhGYllBVms4eGdJQUFBQUFEQUFFQUFBQUFBQUFBQUFBQUFBQUFBQXNUZ2lCSEhBSXNUQXpScmZFcFZDci8vLy8vd0FBQUFFQUFBQUFBQUFBQUFBQUFBRUFBQUlJeXoxWnBFLyt4OXF6WmZPUW9EYXB4U3dSRkwwbGJaQ0ppeHhkVnB0dmRodUhRNkM3Z3NJUGlBdTU4SURueGt2WHlNTDRYZTBNZnhNMUh5bWdsYmVCS2FWVDlKQkZtWVpxMTRwa3dGUTlob1dxTUVqc3cxMllwN2IzcDZyRmJIaEkyVDIzdU91TnF5cVNLTGloVVdWbE5PN0FqcWZYNjNWeDhaVXdITXNEWWRkQmFvRkJKVm9WRXpIS3RnbE5DMU9KZXlQa2VqMzIzeFFJUG5JY0RBbFRzdkxpYVlzSWtPdm4rYVRXZVdqaWNoZEdhUngrRkg0ZVJxQmp4ODRBK1VIQ21tbG45WG02QWk3eTB4UjM1N0l5YW5LWW40RUdJT2ZiN0NtSmx0YXE5OHYweUpRVUFrL0lFNzFuSkFpRC9WWkRtdUFjRUJLZkl1V0czaXNieFhIQXJ2bWFRVWdYcVdRaXRlelgvS1hlcE9KeG8zYXd0ZXJUYVVnNmtVeUtzZkpyVHRYUEhKdmd4STY0QWp6eUNXb1l4UXRnR0NZZERrK09NVXEvVmVyOUZ1Ky9IcXRuV2krRzJEMUpwU1gwb1VaTlk3bm80N09FUUE5OFE0bnRaeWNseFptRW1Qc2F2V0hIYWtXNHAyajZhbVRoNjd3NVBnR2E0T0dUZzNEdXhOYjJOeWdGNzJTKzFib1VJSWJHVXQ1ZFJYQkRTUmNodHNYQjNMZ2lRazUrOEZOc3kvYWlVa2NXTjIrRys4Ym5RS250bmlUTThISVIvbnphYTI0dmJIR3VrT0V2WE5QMFc3WmhNTDRmNEU1NHJRY1lnQjN5aWJKNGRMcnB3Nms3dGRnaTRyM3NoYWNPVS9jbmlMaVBYSEZjSkhud1g0NXNRQWkxNVFOcVZFTUN6bE82M2NKL3o0T2VyTmN1N1Frd1lNd01JbENiYkNPUEhVNEFaekJsQWpFQXhpTndDNTc5L2QrdXVHTWxhRTE1dlZiSWdDZ1hPN3Zad3czeXFQS25zMUFUNTgzejBMclVaTngvQTlwU0c3RWZBakFYbk16dkkyMVRhYU5Zem1iekRLZ2czMkJ3ajhCdWhESUhSUDYrNmU2aWViNXR0VHMzQ3B4blkvaHUwY0l0Z1VZPSIsImtleSIgOiAiQVFJREFIZ0RIVGN6V0VXejB1MFB0N0VPMkpISHhTTUVIZFd5bk1rUS9xMDZuVGpmMndGbEcvVDYxNW1TRzI0VzRobm1Fa2JnQUFBQWZqQjhCZ2txaGtpRzl3MEJCd2FnYnpCdEFnRUFNR2dHQ1NxR1NJYjNEUUVIQVRBZUJnbGdoa2dCWlFNRUFTNHdFUVFNTWU0Y0djVFhxZmVXRkJTa0FnRVFnRHY4K3ZKcUM1VTFhZEk3YlZLbkE1eGVNS3ByTHIyQ3hiRVRJUnMxaGdrTURudVl5a2gzb2JtUzdISmNFVHJQeThnMDMwNzdieWVrM1dwMXVnPT0ifQ==", "approximateArrivalTimestamp": 1746125400.734}, "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000000:49662849271037707186170471169412978102974199784156430338", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::618550961437:role/ops-rds-stream-decrypted-logs-DevDecryptLambdaRole-g0cLkmhkldp9", "awsRegion": "eu-central-1", "eventSourceARN": "arn:aws:kinesis:eu-central-1:xxx:stream/aws-rds-das-cluster-xxx"} 2025-05-01T18:50:01.777Z Base64-decoded data: b' { "type": "DatabaseActivityMonitoringRecords", "version": "1.2", "databaseActivityEvents": "AYADeCSlmmQ1NN15Prff6WqPi24AXwABABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREF4Q0wrMHhIZnVISWh...", "key": "AQIDAHgDHTczWEWz0u0Pt7EO2JHHxSMEHdWynMkQ/q06nTjf2wFlG/T615mSG24W4hnmEkbgAAAAfjB8Bgk..." } ' Decoded record data: { "type": "DatabaseActivityMonitoringRecords", "version": "1.2", "databaseActivityEvents": "AYADeCSlmmQ1NN15Prff6WqPi24AXwABABVhd3MtY3J5cHRvLXB1YmxpYy1rZXkAREF4Q0wrMHhIZnVISWh...", "key": "AQIDAHgDHTczWEWz0u0Pt7EO2JHHxSMEHdWynMkQ/q06nTjf2wFlG/T615mSG24W4hnmEkbgAAAAfjB8Bgk..." } Decoded payload: b'\x01\x80\x03x$\xa5\x9ad54\xddy>\xb7\xdf\xe9j\x8f\x8bn\x00_\x00\x01\x00\x15aws-crypto-public-...' Decoded data key: b"\x01\x02\x03\x00x\x03\x1d73XE\xb3\xd2\xed\x0f\xb7\xb1\x0e\xd8\x91\xc7\xc5#\x04\x1d\xd5\xb2\x9c\xc9\x10\xfe\xad:\x9d8\xdf\xdb\x01e\x1b\xf4\xfa\xd7\x99\x92\x1bn\x16\xe2..." 2025-05-01T18:50:01.789Z Decrypting data key: b"\x01\x02\x03\x00x\x03\x1d73XE\xb3\xd2\xed\x0f\xb7\xb1\x0e\xd8\x91\xc7\xc5#\x04\x1d\xd5\xb2\x9c\xc9\x10\xfe\xad:\x9d8\xdf\xdb\x01e\x1b\xf4\xfa\xd7\x99\x92\x1bn\x16\xe2..." Error processing record: An error occurred (InvalidCiphertextException) when calling the Decrypt operation: 

Cloudformation template that I have written:

AWSTemplateFormatVersion: 2010-09-09 Description: Dev-only setup to decrypt RDS DAS logs using KMS and store them in S3 Parameters: SourceKinesisStreamARN: Type: String Default: arn:aws:kinesis:eu-central-1:xxx:stream/aws-rds-das-cluster-xxxx KMSKeyARN: Type: String Default: arn:aws:kms:eu-central-1:xxx:key/xxx Region: Type: String Default: eu-central-1 ResourceId: Type: String Default: srv-rds-cluster-databasecluster-xxx # Aurora RDS Cluster ID Resources: # S3 Bucket to store decrypted logs DevDecryptedLogsBucket: Type: AWS::S3::Bucket Properties: BucketName: rds-das-dev-decrypted-logs # New S3 Bucket in dev account LifecycleConfiguration: Rules: - Status: Enabled Transitions: - StorageClass: GLACIER TransitionInDays: 30 # IAM Role for Lambda to read from Kinesis and write to S3 DevDecryptLambdaRole: Type: AWS::IAM::Role Properties: PermissionsBoundary: !Sub arn:aws:iam::${AWS::AccountId}:policy/ScopePermissions AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole Policies: - PolicyName: DevDecryptLambdaPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - kinesis:GetRecords - kinesis:GetShardIterator - kinesis:DescribeStream - kinesis:ListStreams Resource: !Ref SourceKinesisStreamARN - Effect: Allow Action: - s3:PutObject Resource: arn:aws:s3:::rds-das-dev-decrypted-logs/* - Effect: Allow Action: - kms:Decrypt Resource: !Ref KMSKeyARN - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: '*' # Lambda function to decrypt the logs and store them in S3 DevDASDecryptLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Runtime: python3.10 Architectures: - arm64 Timeout: 60 MemorySize: 512 Role: !GetAtt DevDecryptLambdaRole.Arn Environment: Variables: S3_BUCKET: !Ref DevDecryptedLogsBucket # S3 bucket for decrypted logs KMS_KEY_ID: !Ref KMSKeyARN # KMS key ID for decryption REGION_NAME: !Ref Region # Region for decryption RESOURCE_ID: !Ref ResourceId # RDS Aurora cluster ID STREAM_NAME: !Ref SourceKinesisStreamARN # Kinesis stream name Code: ZipFile: | import json import zlib import aws_encryption_sdk import boto3 import base64 import os from aws_encryption_sdk import CommitmentPolicy from aws_encryption_sdk.internal.crypto import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider from aws_encryption_sdk.identifiers import WrappingAlgorithm, EncryptionKeyType # Constants for AWS resources and region. Please use environment variables instead REGION_NAME = os.environ['REGION_NAME'] RESOURCE_ID = os.environ['RESOURCE_ID'] STREAM_NAME = os.environ['STREAM_NAME'] # Initialize the AWS Encryption SDK client with a specific commitment policy enc_client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.FORBID_ENCRYPT_ALLOW_DECRYPT) # Custom key provider class for raw encryption/decryption operations class MyRawMasterKeyProvider(RawMasterKeyProvider): provider_id = "BC" # Custom provider ID def __new__(cls, *args, **kwargs): # Overriding the object creation process for proper initialization obj = super(RawMasterKeyProvider, cls).__new__(cls) return obj def __init__(self, plain_key): # Initializing the parent class and setting up a wrapping key super().__init__() self.wrapping_key = WrappingKey( wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=plain_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC) def _get_raw_key(self, key_id): # Method to retrieve the raw key; here, it returns the initialized wrapping key return self.wrapping_key # Class for caching decrypted data keys using AWS KMS class KMSDataKeyCache(): def __init__(self, session): # Initialize the KMS client and a simple dictionary for caching keys self.kms_client = session.client('kms', region_name=REGION_NAME) self.key_cache = {} def getDecrypted(self, data_key_decoded): # Attempt to retrieve the decrypted key from cache or decrypt it using KMS if data_key_decoded in self.key_cache: print("Cache hit for data key.") return self.key_cache[data_key_decoded] else: print(f"Decrypting data key: {data_key_decoded}") # Decrypt the key using KMS and store it in the cache data_key_decrypt_result = self.kms_client.decrypt( CiphertextBlob=data_key_decoded, EncryptionContext={'aws:rds:dbc-id': RESOURCE_ID}) self.key_cache[data_key_decoded] = data_key_decrypt_result['Plaintext'] print(f"Data key decrypted successfully.") return data_key_decrypt_result['Plaintext'] # Function to decrypt payload with a provided data key def decrypt_payload(payload, data_key): print(f"Decrypting payload with data key: {data_key}") # Debugging: Print the full data key # Setup the key provider and decrypt the payload my_key_provider = MyRawMasterKeyProvider(data_key) my_key_provider.add_master_key("DataKey") decrypted_plaintext, header = enc_client.decrypt( source=payload, materials_manager=aws_encryption_sdk.materials_managers.default.DefaultCryptoMaterialsManager( master_key_provider=my_key_provider)) print(f"Decrypted payload: {decrypted_plaintext}") # Debugging: Print the full decrypted payload return decrypted_plaintext # Function to decrypt and then decompress the payload def decrypt_decompress(payload, key): print(f"Decompressing payload...") # Debugging: Indicate decompression decrypted = decrypt_payload(payload, key) decompressed = zlib.decompress(decrypted, zlib.MAX_WBITS + 16) print(f"Decompressed payload: {decompressed}") # Debugging: Print the full decompressed payload return decompressed # The main Lambda handler function def lambda_handler(event, context): # Initialize a session and the KMS data key cache print("Initializing KMS Data Key Cache...") session = boto3.session.Session() kms_data_key_cache = KMSDataKeyCache(session) # Process each record in the event for record in event['Records']: try: print(f"Processing record: {json.dumps(record)}") # Debugging: Print the full record # Decode and parse the incoming data data = base64.b64decode(record['kinesis']['data']) print(f"Base64-decoded data: {data}") # Debugging: Print the full base64-decoded data record_data = json.loads(data) print(f"Decoded record data: {json.dumps(record_data)}") # Debugging: Print the full decoded record data payload_decoded = base64.b64decode(record_data['databaseActivityEvents']) print(f"Decoded payload: {payload_decoded}") # Debugging: Print the full decoded payload data_key_decoded = base64.b64decode(record_data['key']) print(f"Decoded data key: {data_key_decoded}") # Debugging: Print the full decoded data key # Get the decrypted data key from the cache or KMS decrypted_data_key = kms_data_key_cache.getDecrypted(data_key_decoded) print(f"Decrypted data key: {decrypted_data_key}") # Debugging: Print the full decrypted data key # Decrypt and decompress the payload decrypted_decompressed_payload = decrypt_decompress(payload_decoded, decrypted_data_key) plaintext = decrypted_decompressed_payload.decode('utf-8') print(f"Decrypted and decompressed payload: {plaintext}") # Debugging: Print the full plaintext # Load the JSON events and log them events = json.loads(plaintext) print("Processed events:", events) # Debugging: Print the full events processed except Exception as e: # Log any errors encountered during processing print(f"Error processing record: {str(e)}") # Return a success status code and message return { 'statusCode': 200, 'body': json.dumps('Processing Complete') } # Event source mapping to trigger the Lambda function from Kinesis stream DevDecryptLambdaEventSource: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 100 EventSourceArn: !Ref SourceKinesisStreamARN FunctionName: !Ref DevDASDecryptLambda StartingPosition: LATEST Outputs: DevDecryptedLogsBucketName: Description: The S3 bucket where decrypted logs will be stored Value: rds-das-dev-decrypted-logs 
3
  • 1
    Fixed! The database cluster has the Resource ID in the format cluster-xxxxxxx. You need to specify this in the encryption context. Decryption works now. Commented May 2 at 11:14
  • 2
    you could put it as answer - and later mark your answer as accepted. It will be more visible for others. Commented May 2 at 11:25
  • Alright, will do :) Commented May 2 at 21:31

1 Answer 1

1

Fixed! The database cluster has Resource ID in the format cluster-xxxxxxx. You need to specify this in the encryption context. Decryption works now

Sign up to request clarification or add additional context in comments.

Comments

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.