Crypto shredding is the practice of 'deleting' data through the destruction of the cryptographic keys protecting the data.
You can find the source on GitHub.
- Python 3.6+
Note
If you have not already installed cryptography, you might need to install additional prerequisites as detailed in the cryptography installation guide for your operating system.
$ pip install cryptoshredding import boto3 from cryptoshredding import DynamodbKeyStore from dynamodb_encryption_sdk.material_providers.aws_kms import AwsKmsCryptographicMaterialsProvider aws_cmk_id = "arn:aws:kms:YOUR_KEY" aws_kms_cmp = AwsKmsCryptographicMaterialsProvider(key_id=aws_cmk_id) table = boto3.resource("dynamodb").Table("key_store_table") key_store = DynamodbKeyStore(table=table, materials_provider=aws_kms_cmp) key_id = "key4711" key_store.create_main_key(key_id) main_key = key_store.get_main_key(key_id) key_store.delete_main_key(key_id) # shreddingimport boto3 from cryptoshredding import MainKey main_key = key_store.get_main_key(key_id) data_key, encrypted_data_key = main_key.generate_data_key() decrypted_data_key = main_key.decrypt(encrypted_data_key) assert data_key == decrypted_data_keyimport boto3 from cryptoshredding.dynamodb import CryptoTable table = boto3.resource("dynamodb").Table("data_table") crypto_table = CryptoTable( table=table, key_store=key_store, ) crypto_table.put_item( CSEKeyId=key_id, Item=plaintext_item ) index_key = {"id": "foo"} encrypted_item = table.get_item(Key=index_key)["Item"] decrypted_item = crypto_table.get_item(Key=index_key)["Item"] encrypted_items = table.scan()["Items"] decrypted_items = crypto_table.scan()["Items"] assert len(encrypted_items) == 1 assert len(decrypted_items) == 1 key_store.delete_main_key(key_id) # shredding encrypted_items = table.scan()["Items"] decrypted_items = crypto_table.scan()["Items"] assert len(encrypted_items) == 1 assert len(decrypted_items) == 0 # !!!import boto3 from cryptoshredding.s3 import CryptoClient s3 = boto3.client("s3", region_name="us-east-1") crypto_client = CryptoClient( client=s3, key_store=key_store, ) crypto_s3.put_object( CSEKeyId=key_id, Bucket=bucket.name, Key="object", Body="foo bar"", ) encrypted_obj = s3.get_object( Bucket=bucket.name, Key="object", ) decrypted_obj = crypto_s3.get_object( Bucket=bucket.name, Key="object", )from cryptoshredding.raw import CryptoFile crypto_file = CryptoFile( key_store=key_store, ) crypto_file.encrypt( key_id=key_id, plaintext_filename="plain.txt", ciphertext_filename="cipher.txt" ) crypto_file.decrypt( ciphertext_filename="cipher.txt", plaintext_filename="decrypt.txt", )from cryptoshredding.raw import CryptoBytes crypto_bytes = CryptoBytes( key_store=key_store, ) encrypted, encrypted_header = crypto_bytes.encrypt( key_id=key_id, data=plain, ) decrypted, decrypted_header = crypto_bytes.decrypt( data=encrypted, )import boto3 from cryptoshredding.kinesis import CryptoClient kinesis = boto3.client("kinesis", region_name="us-east-1") crypto_kinesis = CryptoClient( client=kinesis, key_store=key_store, ) data = b"foo bar" crypto_kinesis.put_record( CSEKeyId=key_id, StreamName=stream_name, Data=data, PartitionKey="key1", ) response = crypto_kinesis.describe_stream( StreamName=stream_name, ) shard_id = response["StreamDescription"]["Shards"][0]["ShardId"] response = crypto_kinesis.get_shard_iterator( StreamName=stream_name, ShardId=shard_id, ShardIteratorType="TRIM_HORIZON", ) shard_iterator = response["ShardIterator"] encrypred_response = kinesis.get_records(ShardIterator=shard_iterator) decrypred_response = crypto_kinesis.get_records(ShardIterator=shard_iterator) assert len(encrypred_response["Records"]) == 1 assert data != encrypred_response["Records"][0]["Data"] assert len(decrypred_response["Records"]) == 1 assert data == decrypred_response["Records"][0]["Data"]