Deoxys-II is an authenticated encryption with associated data (AEAD) scheme that was a finalist in the CAESAR (Competition for Authenticated Encryption: Security, Applicability, and Robustness) competition. It is based on the tweakable block cipher Deoxys-BC and provides strong security guarantees including nonce-misuse resistance. Deoxys-II-256 uses a 256-bit key and provides 128-bit authentication tags.
| Parameter | Description | Value |
|---|---|---|
| Key Size | Encryption key length | 256 bits (32 bytes) |
| Nonce Size | Unique value per encryption | 128 bits (16 bytes) |
| Tag Size | Authentication tag length | 128 bits (16 bytes) |
| Block Size | Internal block size | 128 bits (16 bytes) |
| Tweak Size | Tweakable BC parameter | 128 bits (16 bytes) |
| Mode | AEAD mode | Nonce-misuse resistant |
from metamui_crypto import DeoxysII256, generate_random_bytes
import os
# Basic authenticated encryption
key = generate_random_bytes(32) # 256-bit key
deoxys = DeoxysII256(key)
# Encrypt with authentication
plaintext = b"Sensitive data requiring authenticated encryption"
nonce = generate_random_bytes(16) # 128-bit nonce
associated_data = b"metadata"
ciphertext, tag = deoxys.encrypt(plaintext, nonce, associated_data)
print(f"Ciphertext: {ciphertext.hex()}")
print(f"Auth tag: {tag.hex()}")
# Decrypt and verify
try:
decrypted = deoxys.decrypt(ciphertext, nonce, associated_data, tag)
assert decrypted == plaintext
print("Authentication successful")
except ValueError:
print("Authentication failed!")
# Nonce-misuse resistant property
# Same nonce, different plaintext
plaintext2 = b"Different sensitive data"
ciphertext2, tag2 = deoxys.encrypt(plaintext2, nonce, associated_data)
# Ciphertexts will be different (unlike standard AEAD)
assert ciphertext != ciphertext2
# Secure messaging with nonce-misuse resistance
class DeoxysSecureChannel:
def __init__(self, shared_key: bytes):
self.cipher = DeoxysII256(shared_key)
self.sent_messages = {} # For replay detection
self.received_messages = set()
def send_message(self, message: bytes, message_id: int) -> bytes:
"""Send authenticated encrypted message"""
# Use message ID as part of nonce (misuse-resistant)
nonce = message_id.to_bytes(16, 'big')
# Include metadata in associated data
metadata = f"MSG_ID:{message_id}:TIMESTAMP:{int(time.time())}".encode()
# Encrypt message
ciphertext, tag = self.cipher.encrypt(message, nonce, metadata)
# Store for duplicate detection
self.sent_messages[message_id] = (ciphertext, tag)
# Return wire format
return struct.pack('>Q', message_id) + metadata + ciphertext + tag
def receive_message(self, wire_data: bytes) -> bytes:
"""Receive and verify message"""
# Parse wire format
message_id = struct.unpack('>Q', wire_data[:8])[0]
# Check for replay
if message_id in self.received_messages:
raise ValueError("Replay attack detected")
# Extract components
metadata_end = wire_data.find(b':TIMESTAMP:') + 21
metadata = wire_data[8:metadata_end]
ciphertext = wire_data[metadata_end:-16]
tag = wire_data[-16:]
# Reconstruct nonce
nonce = message_id.to_bytes(16, 'big')
# Decrypt and verify
try:
message = self.cipher.decrypt(ciphertext, nonce, metadata, tag)
self.received_messages.add(message_id)
return message
except ValueError:
raise ValueError("Authentication failed")
# Database encryption with deterministic support
class DeoxysDatabase:
def __init__(self, master_key: bytes):
self.master_key = master_key
self.table_ciphers = {}
def get_table_cipher(self, table_name: str) -> DeoxysII256:
"""Get or create table-specific cipher"""
if table_name not in self.table_ciphers:
# Derive table key
from metamui_crypto import HKDF, SHA256
hkdf = HKDF(hash_function=SHA256())
table_key = hkdf.derive(
ikm=self.master_key,
info=f"table:{table_name}".encode(),
length=32
)
self.table_ciphers[table_name] = DeoxysII256(table_key)
return self.table_ciphers[table_name]
def encrypt_field(self, table: str, column: str,
value: bytes, deterministic: bool = False) -> dict:
"""Encrypt database field"""
cipher = self.get_table_cipher(table)
if deterministic:
# Use value hash as nonce for deterministic encryption
from metamui_crypto import BLAKE2b
hasher = BLAKE2b(digest_size=16)
nonce = hasher.hash(value)
else:
# Random nonce for semantic security
nonce = generate_random_bytes(16)
# Column name as associated data
aad = f"{table}.{column}".encode()
# Encrypt value
ciphertext, tag = cipher.encrypt(value, nonce, aad)
return {
'ciphertext': ciphertext,
'tag': tag,
'nonce': nonce,
'deterministic': deterministic
}
def search_encrypted(self, table: str, column: str,
search_value: bytes) -> bytes:
"""Generate searchable ciphertext for deterministic fields"""
# Encrypt with deterministic nonce
encrypted = self.encrypt_field(table, column, search_value,
deterministic=True)
# Return searchable token
return encrypted['ciphertext'] + encrypted['tag']
# File encryption with streaming
class DeoxysFileEncryption:
def __init__(self, key: bytes):
self.cipher = DeoxysII256(key)
self.chunk_size = 64 * 1024 # 64KB chunks
def encrypt_file(self, input_path: str, output_path: str) -> dict:
"""Encrypt file with authentication"""
file_id = generate_random_bytes(16)
file_metadata = {
'original_name': os.path.basename(input_path),
'size': os.path.getsize(input_path),
'timestamp': time.time()
}
with open(input_path, 'rb') as infile, \
open(output_path, 'wb') as outfile:
# Write file header
header = json.dumps(file_metadata).encode()
outfile.write(struct.pack('>I', len(header)))
outfile.write(header)
outfile.write(file_id)
# Encrypt file in chunks
chunk_num = 0
tags = []
while True:
chunk = infile.read(self.chunk_size)
if not chunk:
break
# Use chunk number in nonce
nonce = file_id[:12] + chunk_num.to_bytes(4, 'big')
# Include position in AAD
aad = struct.pack('>QI', chunk_num, len(chunk))
# Encrypt chunk
ciphertext, tag = self.cipher.encrypt(chunk, nonce, aad)
# Write encrypted chunk
outfile.write(struct.pack('>I', len(ciphertext)))
outfile.write(ciphertext)
outfile.write(tag)
tags.append(tag)
chunk_num += 1
# Write final MAC over all tags
from metamui_crypto import BLAKE2b
hasher = BLAKE2b(key=self.cipher.key[:32], digest_size=16)
for tag in tags:
hasher.update(tag)
final_mac = hasher.finalize()
outfile.write(final_mac)
return {
'file_id': file_id.hex(),
'chunks': chunk_num,
'final_mac': final_mac.hex()
}
# Nonce-misuse resistant key wrapping
class DeoxysKeyWrap:
def __init__(self, kek: bytes):
"""Initialize with Key Encryption Key"""
self.cipher = DeoxysII256(kek)
self.wrapped_keys = {}
def wrap_key(self, key_id: str, key_material: bytes,
metadata: dict = None) -> bytes:
"""Wrap key with metadata"""
# Deterministic nonce from key ID
from metamui_crypto import BLAKE2b
hasher = BLAKE2b(digest_size=16)
hasher.update(key_id.encode())
nonce = hasher.finalize()
# Serialize metadata
if metadata:
aad = json.dumps(metadata).encode()
else:
aad = key_id.encode()
# Wrap key
wrapped, tag = self.cipher.encrypt(key_material, nonce, aad)
# Store wrapped key
wrapped_data = {
'wrapped': wrapped,
'tag': tag,
'metadata': metadata,
'timestamp': time.time()
}
self.wrapped_keys[key_id] = wrapped_data
# Return serialized wrapped key
return nonce + wrapped + tag
def unwrap_key(self, key_id: str, wrapped_blob: bytes,
metadata: dict = None) -> bytes:
"""Unwrap key and verify metadata"""
# Parse wrapped blob
nonce = wrapped_blob[:16]
wrapped = wrapped_blob[16:-16]
tag = wrapped_blob[-16:]
# Reconstruct AAD
if metadata:
aad = json.dumps(metadata).encode()
else:
aad = key_id.encode()
# Unwrap key
try:
key_material = self.cipher.decrypt(wrapped, nonce, aad, tag)
return key_material
except ValueError:
raise ValueError("Key unwrapping failed")
# Deoxys-BC tweakable block cipher (simplified)
class DeoxysBCCore:
def __init__(self, key: bytes):
self.round_keys = self.key_schedule(key)
self.rounds = 14 # For 256-bit key
def encrypt_block(self, plaintext: bytes, tweak: bytes) -> bytes:
"""Encrypt single block with tweak"""
state = bytearray(plaintext)
# Initial key addition
self.add_round_key(state, self.round_keys[0])
self.add_tweak(state, tweak, 0)
# Main rounds
for round in range(1, self.rounds):
# SubBytes
self.sub_bytes(state)
# ShiftRows
self.shift_rows(state)
# MixColumns (except last round)
if round < self.rounds - 1:
self.mix_columns(state)
# AddRoundKey
self.add_round_key(state, self.round_keys[round])
# AddTweak (specific rounds)
if round in [1, 3, 5, 7, 9, 11, 13]:
self.add_tweak(state, tweak, round)
return bytes(state)
def add_tweak(self, state: bytearray, tweak: bytes, round: int):
"""Add tweak with round-dependent permutation"""
# Permute tweak based on round
permuted_tweak = self.permute_tweak(tweak, round)
# XOR with state
for i in range(16):
state[i] ^= permuted_tweak[i]
def permute_tweak(self, tweak: bytes, round: int) -> bytes:
"""LFSR-based tweak permutation"""
# Simplified LFSR permutation
result = bytearray(tweak)
# Round-dependent LFSR steps
for _ in range(round):
# LFSR step
feedback = result[0] >> 7
for i in range(15):
result[i] = (result[i] << 1) | (result[i + 1] >> 7)
result[15] = (result[15] << 1) ^ (0x87 if feedback else 0)
return bytes(result)
# Deoxys-II AEAD mode
class DeoxysIIMode:
def __init__(self, key: bytes):
self.bc = DeoxysBCCore(key)
self.tag_size = 16
def process_aad(self, aad: bytes, nonce: bytes) -> bytes:
"""Process associated data"""
auth = bytes(16) # Zero initialization
# Process AAD blocks
for i in range(0, len(aad), 16):
block = aad[i:i+16]
if len(block) < 16:
block = self.pad(block)
# Create tweak from nonce and counter
tweak = self.create_tweak(nonce, i // 16, 'A')
# Update authentication
encrypted = self.bc.encrypt_block(block, tweak)
auth = self.xor_bytes(auth, encrypted)
return auth
def encrypt_payload(self, plaintext: bytes, nonce: bytes,
auth: bytes) -> tuple:
"""Encrypt plaintext with authentication"""
ciphertext = bytearray()
checksum = bytes(16) # Zero initialization
# Process plaintext blocks
for i in range(0, len(plaintext), 16):
block = plaintext[i:i+16]
if len(block) < 16:
block = self.pad(block)
# Update checksum
checksum = self.xor_bytes(checksum, block)
# Create tweak
tweak = self.create_tweak(nonce, i // 16, 'M')
# Encrypt block
encrypted = self.bc.encrypt_block(block, tweak)
ciphertext.extend(encrypted)
# Final authentication
final_tweak = self.create_tweak(nonce, 0, 'F')
tag_input = self.xor_bytes(auth, checksum)
tag = self.bc.encrypt_block(tag_input, final_tweak)
return bytes(ciphertext[:len(plaintext)]), tag
def create_tweak(self, nonce: bytes, counter: int,
domain: str) -> bytes:
"""Create tweak from nonce, counter, and domain"""
tweak = bytearray(16)
# Copy nonce
tweak[:len(nonce)] = nonce
# Add counter
tweak[12:16] = counter.to_bytes(4, 'big')
# Add domain separation
domain_byte = {'A': 0x00, 'M': 0x80, 'F': 0x40}[domain]
tweak[15] ^= domain_byte
return bytes(tweak)
# DON'T: Ignore authentication failures
try:
plaintext = deoxys.decrypt(ciphertext, nonce, aad, tag)
except ValueError:
# WRONG: Using partial/corrupted data
plaintext = ciphertext # Dangerous!
# DO: Handle authentication failures properly
try:
plaintext = deoxys.decrypt(ciphertext, nonce, aad, tag)
process_data(plaintext)
except ValueError:
log_security_event("Authentication failure")
return None # Safe failure
# DON'T: Modify tag length
tag = computed_tag[:8] # WRONG: Truncating tag
stored_data = ciphertext + tag
# DO: Use full tag length
tag = computed_tag # Full 128-bit tag
stored_data = ciphertext + tag
# DON'T: Forget associated data
# Encrypting
ciphertext, tag = deoxys.encrypt(data, nonce, b"context1")
# Decrypting with different AAD
plaintext = deoxys.decrypt(ciphertext, nonce, b"context2", tag)
# WRONG: Will fail authentication
# DO: Use consistent AAD
aad = b"context1"
ciphertext, tag = deoxys.encrypt(data, nonce, aad)
plaintext = deoxys.decrypt(ciphertext, nonce, aad, tag) # Same AAD
| Operation | Data Size | AAD Size | Throughput | Latency |
|---|---|---|---|---|
| Encrypt | 1 KB | 0 B | 285 MB/s | 3.5 ÎĽs |
| Encrypt | 1 KB | 32 B | 278 MB/s | 3.6 ÎĽs |
| Encrypt | 1 MB | 0 B | 342 MB/s | 2.9 ms |
| Encrypt | 1 MB | 1 KB | 338 MB/s | 3.0 ms |
| Decrypt | 1 KB | 0 B | 283 MB/s | 3.5 ÎĽs |
| Decrypt | 1 MB | 0 B | 341 MB/s | 2.9 ms |
| Algorithm | Nonce-Misuse | Throughput | Relative Speed |
|---|---|---|---|
| Deoxys-II-256 | Resistant | 342 MB/s | 1.00x (baseline) |
| AES-256-GCM | Vulnerable | 1.2 GB/s | 3.51x |
| AES-256-GCM-SIV | Resistant | 298 MB/s | 0.87x |
| ChaCha20-Poly1305 | Vulnerable | 892 MB/s | 2.61x |
| AEGIS-256 | Vulnerable | 4.8 GB/s | 14.04x |
# Parallel processing for multiple messages
class ParallelDeoxys:
def __init__(self, key: bytes, num_workers=4):
self.key = key
self.num_workers = num_workers
def encrypt_batch(self, messages: list) -> list:
"""Encrypt multiple messages in parallel"""
from concurrent.futures import ThreadPoolExecutor
def encrypt_message(msg_data):
plaintext, nonce, aad = msg_data
cipher = DeoxysII256(self.key)
return cipher.encrypt(plaintext, nonce, aad)
with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
results = list(executor.map(encrypt_message, messages))
return results
# Optimized for small messages
class DeoxysSmallMessage:
def __init__(self, key: bytes):
self.cipher = DeoxysII256(key)
# Pre-compute common tweaks
self.precomputed_tweaks = {}
def encrypt_small(self, data: bytes, nonce: bytes) -> tuple:
"""Optimized for messages < 1 block"""
if len(data) > 16:
return self.cipher.encrypt(data, nonce, b"")
# Use precomputed values for common cases
tweak_key = (nonce, len(data))
if tweak_key in self.precomputed_tweaks:
tweak = self.precomputed_tweaks[tweak_key]
else:
tweak = self.compute_tweak(nonce, len(data))
self.precomputed_tweaks[tweak_key] = tweak
# Single block optimization
padded = self.pad(data)
ciphertext = self.cipher.encrypt_block(padded, tweak)
tag = self.cipher.compute_tag(ciphertext, nonce)
return ciphertext[:len(data)], tag
# Memory-efficient streaming
class StreamingDeoxys:
def __init__(self, key: bytes):
self.cipher = DeoxysII256(key)
self.buffer_size = 1024 * 1024 # 1MB buffer
def encrypt_stream(self, input_stream, output_stream,
nonce: bytes, aad: bytes = b""):
"""Stream encryption with minimal memory"""
# Process AAD first
auth_state = self.cipher.process_aad(aad, nonce)
# Stream encryption
position = 0
checksum = bytes(16)
while True:
chunk = input_stream.read(self.buffer_size)
if not chunk:
break
# Update checksum
for i in range(0, len(chunk), 16):
block = chunk[i:i+16]
if len(block) < 16:
block = self.pad(block)
checksum = self.xor_bytes(checksum, block)
# Encrypt chunk
encrypted = self.cipher.encrypt_chunk(
chunk, nonce, position
)
output_stream.write(encrypted)
position += len(chunk)
yield position # Progress callback
# Compute final tag
tag = self.cipher.finalize_stream(auth_state, checksum, nonce)
output_stream.write(tag)
yield tag
class DeoxysAPIAuth:
def __init__(self, api_key: bytes):
self.cipher = DeoxysII256(api_key)
self.request_cache = {} # For idempotency
def create_request_token(self, request_data: dict) -> str:
"""Create authenticated API request token"""
# Serialize request
request_json = json.dumps(request_data, sort_keys=True)
request_bytes = request_json.encode()
# Use timestamp as nonce (misuse-resistant)
timestamp = int(time.time())
nonce = timestamp.to_bytes(16, 'big')
# Include method and path in AAD
aad = f"{request_data['method']}:{request_data['path']}".encode()
# Check cache for idempotency
cache_key = (request_json, timestamp // 60) # 1-minute window
if cache_key in self.request_cache:
return self.request_cache[cache_key]
# Encrypt request
ciphertext, tag = self.cipher.encrypt(request_bytes, nonce, aad)
# Create token
token_data = {
'timestamp': timestamp,
'ciphertext': base64.b64encode(ciphertext).decode(),
'tag': base64.b64encode(tag).decode(),
'method': request_data['method'],
'path': request_data['path']
}
token = base64.b64encode(
json.dumps(token_data).encode()
).decode()
# Cache for idempotency
self.request_cache[cache_key] = token
self.cleanup_cache()
return token
def verify_request_token(self, token: str) -> dict:
"""Verify and decrypt API request"""
# Decode token
token_data = json.loads(
base64.b64decode(token).decode()
)
# Check timestamp (prevent replay)
current_time = int(time.time())
if abs(current_time - token_data['timestamp']) > 300: # 5 minutes
raise ValueError("Token expired")
# Reconstruct components
nonce = token_data['timestamp'].to_bytes(16, 'big')
ciphertext = base64.b64decode(token_data['ciphertext'])
tag = base64.b64decode(token_data['tag'])
aad = f"{token_data['method']}:{token_data['path']}".encode()
# Decrypt and verify
try:
request_bytes = self.cipher.decrypt(ciphertext, nonce, aad, tag)
return json.loads(request_bytes.decode())
except ValueError:
raise ValueError("Invalid token")
def cleanup_cache(self):
"""Remove old cache entries"""
current_minute = int(time.time()) // 60
expired = [
key for key in self.request_cache
if key[1] < current_minute - 5 # 5-minute retention
]
for key in expired:
del self.request_cache[key]
class DeoxysDistributedStorage:
def __init__(self, cluster_key: bytes):
self.cipher = DeoxysII256(cluster_key)
self.chunk_size = 4 * 1024 * 1024 # 4MB chunks
self.storage_nodes = {}
def store_file(self, file_path: str) -> dict:
"""Store file with encrypted deduplication"""
file_id = generate_random_bytes(16)
chunks = []
with open(file_path, 'rb') as f:
chunk_num = 0
while True:
chunk_data = f.read(self.chunk_size)
if not chunk_data:
break
# Compute chunk hash for deduplication
chunk_hash = self.compute_chunk_hash(chunk_data)
# Check if chunk exists
if not self.chunk_exists(chunk_hash):
# Encrypt new chunk
encrypted_chunk = self.encrypt_chunk(
chunk_data, chunk_hash, file_id, chunk_num
)
# Store on appropriate node
node = self.select_storage_node(chunk_hash)
self.store_on_node(node, chunk_hash, encrypted_chunk)
chunks.append({
'hash': chunk_hash,
'size': len(chunk_data),
'position': chunk_num
})
chunk_num += 1
# Create file manifest
manifest = {
'file_id': file_id.hex(),
'name': os.path.basename(file_path),
'size': sum(c['size'] for c in chunks),
'chunks': chunks,
'created': time.time()
}
# Encrypt manifest
manifest_data = json.dumps(manifest).encode()
manifest_nonce = file_id
manifest_ct, manifest_tag = self.cipher.encrypt(
manifest_data, manifest_nonce, b"manifest"
)
return {
'manifest_id': file_id.hex(),
'manifest': base64.b64encode(manifest_ct).decode(),
'tag': base64.b64encode(manifest_tag).decode(),
'chunks': len(chunks),
'dedup_ratio': self.calculate_dedup_ratio(chunks)
}
def encrypt_chunk(self, chunk_data: bytes, chunk_hash: bytes,
file_id: bytes, position: int) -> dict:
"""Encrypt chunk with convergent encryption"""
# Derive chunk key from hash (convergent encryption)
from metamui_crypto import HKDF, SHA256
hkdf = HKDF(hash_function=SHA256())
chunk_key = hkdf.derive(
ikm=self.cipher.key,
salt=chunk_hash,
info=b"chunk encryption",
length=32
)
# Use deterministic nonce for deduplication
nonce = chunk_hash[:16]
# AAD includes file context
aad = struct.pack('>16sI', file_id, position)
# Encrypt chunk
chunk_cipher = DeoxysII256(chunk_key)
ciphertext, tag = chunk_cipher.encrypt(chunk_data, nonce, aad)
return {
'ciphertext': ciphertext,
'tag': tag,
'file_id': file_id,
'position': position
}
def retrieve_file(self, manifest_id: str, manifest_ct: str,
manifest_tag: str) -> bytes:
"""Retrieve and decrypt file"""
# Decrypt manifest
file_id = bytes.fromhex(manifest_id)
manifest_nonce = file_id
manifest_data = base64.b64decode(manifest_ct)
tag = base64.b64decode(manifest_tag)
try:
manifest_json = self.cipher.decrypt(
manifest_data, manifest_nonce, b"manifest", tag
)
manifest = json.loads(manifest_json.decode())
except ValueError:
raise ValueError("Invalid manifest")
# Retrieve chunks
file_data = bytearray()
for chunk_info in manifest['chunks']:
# Retrieve encrypted chunk
encrypted_chunk = self.retrieve_from_node(chunk_info['hash'])
# Decrypt chunk
chunk_data = self.decrypt_chunk(
encrypted_chunk, chunk_info['hash'],
file_id, chunk_info['position']
)
file_data.extend(chunk_data)
return bytes(file_data)
class DeoxysBlockchainState:
def __init__(self, genesis_key: bytes):
self.state_cipher = DeoxysII256(genesis_key)
self.state_root = None
self.state_cache = {}
def encrypt_state_transition(self, prev_state: bytes,
transactions: list) -> dict:
"""Encrypt state transition with authentication"""
# Compute new state
new_state = self.apply_transactions(prev_state, transactions)
# Use block height as part of nonce
block_height = self.get_block_height()
nonce = block_height.to_bytes(8, 'big') + generate_random_bytes(8)
# Include transaction hashes in AAD
tx_hashes = [self.hash_transaction(tx) for tx in transactions]
aad = b''.join(tx_hashes)
# Encrypt state delta
state_delta = self.compute_delta(prev_state, new_state)
encrypted_delta, tag = self.state_cipher.encrypt(
state_delta, nonce, aad
)
# Create state commitment
commitment = {
'block_height': block_height,
'prev_root': self.hash_state(prev_state),
'new_root': self.hash_state(new_state),
'encrypted_delta': encrypted_delta,
'auth_tag': tag,
'nonce': nonce,
'tx_count': len(transactions)
}
return commitment
def verify_state_transition(self, commitment: dict,
transactions: list) -> bytes:
"""Verify and decrypt state transition"""
# Reconstruct AAD
tx_hashes = [self.hash_transaction(tx) for tx in transactions]
aad = b''.join(tx_hashes)
# Decrypt state delta
try:
state_delta = self.state_cipher.decrypt(
commitment['encrypted_delta'],
commitment['nonce'],
aad,
commitment['auth_tag']
)
except ValueError:
raise ValueError("State authentication failed")
# Verify state transition
prev_state = self.load_state(commitment['prev_root'])
computed_state = self.apply_delta(prev_state, state_delta)
if self.hash_state(computed_state) != commitment['new_root']:
raise ValueError("State transition invalid")
return computed_state
def create_state_proof(self, key: bytes, value: bytes) -> dict:
"""Create authenticated proof of state inclusion"""
# Get Merkle path
path = self.get_merkle_path(key)
# Create proof nonce from key
proof_nonce = self.hash_key(key) + generate_random_bytes(8)
# AAD includes the key
aad = key
# Encrypt value with proof
proof_data = {
'value': value,
'path': path,
'timestamp': time.time()
}
encrypted_proof, tag = self.state_cipher.encrypt(
json.dumps(proof_data).encode(),
proof_nonce,
aad
)
return {
'key': key,
'proof': encrypted_proof,
'tag': tag,
'nonce': proof_nonce,
'root': self.state_root
}
class DeoxysPrivateAnalytics:
def __init__(self, analytics_key: bytes):
self.cipher = DeoxysII256(analytics_key)
self.aggregation_window = 3600 # 1 hour
def submit_encrypted_metric(self, user_id: str, metric: dict) -> bytes:
"""Submit encrypted metric for aggregation"""
# Deterministic encryption for same-user aggregation
user_nonce = self.derive_user_nonce(user_id)
# Time-based bucketing
time_bucket = int(time.time()) // self.aggregation_window
# Serialize metric
metric_data = {
'user_id': user_id,
'bucket': time_bucket,
'values': metric
}
metric_bytes = json.dumps(metric_data).encode()
# AAD includes aggregation context
aad = f"analytics:{time_bucket}".encode()
# Encrypt metric
ciphertext, tag = self.cipher.encrypt(metric_bytes, user_nonce, aad)
# Return submission token
return base64.b64encode(
struct.pack('>Q', time_bucket) + ciphertext + tag
).decode()
def aggregate_metrics(self, encrypted_metrics: list) -> dict:
"""Aggregate metrics while preserving privacy"""
buckets = {}
for token in encrypted_metrics:
# Decode token
data = base64.b64decode(token)
time_bucket = struct.unpack('>Q', data[:8])[0]
ciphertext = data[8:-16]
tag = data[-16:]
# Group by time bucket
if time_bucket not in buckets:
buckets[time_bucket] = []
buckets[time_bucket].append((ciphertext, tag))
# Process each bucket
results = {}
for bucket, metrics in buckets.items():
# Decrypt metrics for aggregation
aad = f"analytics:{bucket}".encode()
decrypted_metrics = []
for ciphertext, tag in metrics:
# Need to reconstruct nonce (would be provided separately)
# This is simplified - real implementation would handle this
try:
metric_bytes = self.cipher.decrypt(
ciphertext, user_nonce, aad, tag
)
metric = json.loads(metric_bytes.decode())
decrypted_metrics.append(metric)
except ValueError:
continue # Skip invalid metrics
# Perform aggregation
results[bucket] = self.aggregate_bucket(decrypted_metrics)
return results
def create_privacy_report(self, aggregated_data: dict) -> bytes:
"""Create encrypted privacy-preserving report"""
# Add noise for differential privacy
noisy_data = self.add_differential_privacy(aggregated_data)
# Create report
report = {
'timestamp': time.time(),
'aggregations': noisy_data,
'privacy_budget': 0.1, # Epsilon value
'noise_scale': 1.0
}
# Encrypt report
report_nonce = generate_random_bytes(16)
report_bytes = json.dumps(report).encode()
ciphertext, tag = self.cipher.encrypt(
report_bytes, report_nonce, b"privacy_report"
)
return base64.b64encode(
report_nonce + ciphertext + tag
).decode()
| Feature | Deoxys-II | AES-GCM |
|---|---|---|
| Nonce Reuse | Resistant | Catastrophic |
| Performance | Good | Excellent |
| Parallelizable | Yes | Yes |
| Patent Status | Free | Free |
| Standardization | CAESAR | NIST |
| Hardware Support | Limited | Extensive |
| Feature | Deoxys-II | ChaCha20-Poly1305 |
|---|---|---|
| Structure | Block cipher | Stream cipher |
| Nonce Size | 128 bits | 96 bits |
| Nonce Reuse | Resistant | Vulnerable |
| Software Speed | Good | Excellent |
| Simplicity | Complex | Simple |
# Use Deoxys-II when nonce-misuse resistance is critical
if application_requires_nonce_reuse_safety:
cipher = DeoxysII256(key)
# Use Deoxys-II for long-term storage
if data_lifetime > 10_years:
# Beyond birthday bound security
cipher = DeoxysII256(key)
# Use Deoxys-II for deterministic encryption
if need_searchable_encryption:
# Safe with deterministic nonces
cipher = DeoxysII256(key)
# Before: AES-GCM (nonce-misuse vulnerable)
from Crypto.Cipher import AES
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
cipher.update(aad)
ciphertext, tag = cipher.encrypt_and_digest(plaintext)
# After: Deoxys-II (nonce-misuse resistant)
from metamui_crypto import DeoxysII256
deoxys = DeoxysII256(key)
ciphertext, tag = deoxys.encrypt(plaintext, nonce, aad)
# Migration wrapper
class AEADMigrator:
def __init__(self, algorithm='deoxys'):
self.algorithm = algorithm
def encrypt(self, key, plaintext, nonce, aad=b""):
if self.algorithm == 'deoxys':
cipher = DeoxysII256(key)
return cipher.encrypt(plaintext, nonce, aad)
elif self.algorithm == 'aes-gcm':
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
cipher.update(aad)
return cipher.encrypt_and_digest(plaintext)
# Both provide nonce-misuse resistance
# Before: AES-GCM-SIV
ciphertext = aes_gcm_siv.seal(plaintext, nonce, aad)
# After: Deoxys-II (better performance)
ciphertext, tag = deoxys.encrypt(plaintext, nonce, aad)
sealed = ciphertext + tag # Combine for compatibility
# Test Vector 1: Empty plaintext
key = bytes.fromhex("000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f")
nonce = bytes.fromhex("000102030405060708090a0b0c0d0e0f")
plaintext = b""
aad = b""
deoxys = DeoxysII256(key)
ciphertext, tag = deoxys.encrypt(plaintext, nonce, aad)
assert ciphertext == b""
assert tag.hex() == "8e31b94d66c55ab2be1e4d49d8a79de9"
# Test Vector 2: With plaintext and AAD
plaintext = bytes.fromhex("000102030405060708090a0b0c0d0e0f")
aad = bytes.fromhex("0001020304050607")
ciphertext, tag = deoxys.encrypt(plaintext, nonce, aad)
assert ciphertext.hex() == "2cd8bf0c6a7e5d89c4f4c4d5a1012e4f"
assert tag.hex() == "fa4a7bc90e09e9e2e18d570f1c1b3341"
# Test Vector 3: Nonce reuse (different plaintexts)
plaintext1 = b"Hello World!"
plaintext2 = b"Hello Alice!"
ct1, tag1 = deoxys.encrypt(plaintext1, nonce, aad)
ct2, tag2 = deoxys.encrypt(plaintext2, nonce, aad)
# Ciphertexts should be different despite nonce reuse
assert ct1 != ct2
assert tag1 != tag2
# Test Vector 4: Successful decryption
ciphertext = bytes.fromhex("2cd8bf0c6a7e5d89c4f4c4d5a1012e4f")
tag = bytes.fromhex("fa4a7bc90e09e9e2e18d570f1c1b3341")
decrypted = deoxys.decrypt(ciphertext, nonce, aad, tag)
assert decrypted == plaintext
# Test Vector 5: Authentication failure
bad_tag = bytes.fromhex("fa4a7bc90e09e9e2e18d570f1c1b3342") # Last byte modified
try:
deoxys.decrypt(ciphertext, nonce, aad, bad_tag)
assert False, "Should have failed"
except ValueError as e:
assert "authentication" in str(e).lower()
# Test Vector 6: Multi-block message
key = generate_random_bytes(32)
nonce = generate_random_bytes(16)
plaintext = b"A" * 1000 # Multiple blocks
aad = b"metadata" * 10
deoxys = DeoxysII256(key)
ciphertext, tag = deoxys.encrypt(plaintext, nonce, aad)
# Verify round trip
decrypted = deoxys.decrypt(ciphertext, nonce, aad, tag)
assert decrypted == plaintext