Deoxys-II Authenticated Encryption

Overview

Deoxys-II is an authenticated encryption with associated data (AEAD) scheme that was a finalist in the CAESAR (Competition for Authenticated Encryption: Security, Applicability, and Robustness) competition. It is based on the tweakable block cipher Deoxys-BC and provides strong security guarantees including nonce-misuse resistance. Deoxys-II-256 uses a 256-bit key and provides 128-bit authentication tags.

Key Features

Common Use Cases

Algorithm Details

Parameters

Parameter Description Value
Key Size Encryption key length 256 bits (32 bytes)
Nonce Size Unique value per encryption 128 bits (16 bytes)
Tag Size Authentication tag length 128 bits (16 bytes)
Block Size Internal block size 128 bits (16 bytes)
Tweak Size Tweakable BC parameter 128 bits (16 bytes)
Mode AEAD mode Nonce-misuse resistant

Security Properties

Implementation

Python Example

from metamui_crypto import DeoxysII256, generate_random_bytes
import os

# Basic authenticated encryption
key = generate_random_bytes(32)  # 256-bit key
deoxys = DeoxysII256(key)

# Encrypt with authentication
plaintext = b"Sensitive data requiring authenticated encryption"
nonce = generate_random_bytes(16)  # 128-bit nonce
associated_data = b"metadata"

ciphertext, tag = deoxys.encrypt(plaintext, nonce, associated_data)
print(f"Ciphertext: {ciphertext.hex()}")
print(f"Auth tag: {tag.hex()}")

# Decrypt and verify
try:
    decrypted = deoxys.decrypt(ciphertext, nonce, associated_data, tag)
    assert decrypted == plaintext
    print("Authentication successful")
except ValueError:
    print("Authentication failed!")

# Nonce-misuse resistant property
# Same nonce, different plaintext
plaintext2 = b"Different sensitive data"
ciphertext2, tag2 = deoxys.encrypt(plaintext2, nonce, associated_data)

# Ciphertexts will be different (unlike standard AEAD)
assert ciphertext != ciphertext2

Advanced Usage

# Secure messaging with nonce-misuse resistance
class DeoxysSecureChannel:
    def __init__(self, shared_key: bytes):
        self.cipher = DeoxysII256(shared_key)
        self.sent_messages = {}  # For replay detection
        self.received_messages = set()
    
    def send_message(self, message: bytes, message_id: int) -> bytes:
        """Send authenticated encrypted message"""
        # Use message ID as part of nonce (misuse-resistant)
        nonce = message_id.to_bytes(16, 'big')
        
        # Include metadata in associated data
        metadata = f"MSG_ID:{message_id}:TIMESTAMP:{int(time.time())}".encode()
        
        # Encrypt message
        ciphertext, tag = self.cipher.encrypt(message, nonce, metadata)
        
        # Store for duplicate detection
        self.sent_messages[message_id] = (ciphertext, tag)
        
        # Return wire format
        return struct.pack('>Q', message_id) + metadata + ciphertext + tag
    
    def receive_message(self, wire_data: bytes) -> bytes:
        """Receive and verify message"""
        # Parse wire format
        message_id = struct.unpack('>Q', wire_data[:8])[0]
        
        # Check for replay
        if message_id in self.received_messages:
            raise ValueError("Replay attack detected")
        
        # Extract components
        metadata_end = wire_data.find(b':TIMESTAMP:') + 21
        metadata = wire_data[8:metadata_end]
        ciphertext = wire_data[metadata_end:-16]
        tag = wire_data[-16:]
        
        # Reconstruct nonce
        nonce = message_id.to_bytes(16, 'big')
        
        # Decrypt and verify
        try:
            message = self.cipher.decrypt(ciphertext, nonce, metadata, tag)
            self.received_messages.add(message_id)
            return message
        except ValueError:
            raise ValueError("Authentication failed")

# Database encryption with deterministic support
class DeoxysDatabase:
    def __init__(self, master_key: bytes):
        self.master_key = master_key
        self.table_ciphers = {}
    
    def get_table_cipher(self, table_name: str) -> DeoxysII256:
        """Get or create table-specific cipher"""
        if table_name not in self.table_ciphers:
            # Derive table key
            from metamui_crypto import HKDF, SHA256
            hkdf = HKDF(hash_function=SHA256())
            table_key = hkdf.derive(
                ikm=self.master_key,
                info=f"table:{table_name}".encode(),
                length=32
            )
            self.table_ciphers[table_name] = DeoxysII256(table_key)
        
        return self.table_ciphers[table_name]
    
    def encrypt_field(self, table: str, column: str, 
                     value: bytes, deterministic: bool = False) -> dict:
        """Encrypt database field"""
        cipher = self.get_table_cipher(table)
        
        if deterministic:
            # Use value hash as nonce for deterministic encryption
            from metamui_crypto import BLAKE2b
            hasher = BLAKE2b(digest_size=16)
            nonce = hasher.hash(value)
        else:
            # Random nonce for semantic security
            nonce = generate_random_bytes(16)
        
        # Column name as associated data
        aad = f"{table}.{column}".encode()
        
        # Encrypt value
        ciphertext, tag = cipher.encrypt(value, nonce, aad)
        
        return {
            'ciphertext': ciphertext,
            'tag': tag,
            'nonce': nonce,
            'deterministic': deterministic
        }
    
    def search_encrypted(self, table: str, column: str, 
                        search_value: bytes) -> bytes:
        """Generate searchable ciphertext for deterministic fields"""
        # Encrypt with deterministic nonce
        encrypted = self.encrypt_field(table, column, search_value, 
                                     deterministic=True)
        
        # Return searchable token
        return encrypted['ciphertext'] + encrypted['tag']

# File encryption with streaming
class DeoxysFileEncryption:
    def __init__(self, key: bytes):
        self.cipher = DeoxysII256(key)
        self.chunk_size = 64 * 1024  # 64KB chunks
    
    def encrypt_file(self, input_path: str, output_path: str) -> dict:
        """Encrypt file with authentication"""
        file_id = generate_random_bytes(16)
        file_metadata = {
            'original_name': os.path.basename(input_path),
            'size': os.path.getsize(input_path),
            'timestamp': time.time()
        }
        
        with open(input_path, 'rb') as infile, \
             open(output_path, 'wb') as outfile:
            
            # Write file header
            header = json.dumps(file_metadata).encode()
            outfile.write(struct.pack('>I', len(header)))
            outfile.write(header)
            outfile.write(file_id)
            
            # Encrypt file in chunks
            chunk_num = 0
            tags = []
            
            while True:
                chunk = infile.read(self.chunk_size)
                if not chunk:
                    break
                
                # Use chunk number in nonce
                nonce = file_id[:12] + chunk_num.to_bytes(4, 'big')
                
                # Include position in AAD
                aad = struct.pack('>QI', chunk_num, len(chunk))
                
                # Encrypt chunk
                ciphertext, tag = self.cipher.encrypt(chunk, nonce, aad)
                
                # Write encrypted chunk
                outfile.write(struct.pack('>I', len(ciphertext)))
                outfile.write(ciphertext)
                outfile.write(tag)
                
                tags.append(tag)
                chunk_num += 1
            
            # Write final MAC over all tags
            from metamui_crypto import BLAKE2b
            hasher = BLAKE2b(key=self.cipher.key[:32], digest_size=16)
            for tag in tags:
                hasher.update(tag)
            final_mac = hasher.finalize()
            outfile.write(final_mac)
        
        return {
            'file_id': file_id.hex(),
            'chunks': chunk_num,
            'final_mac': final_mac.hex()
        }

# Nonce-misuse resistant key wrapping
class DeoxysKeyWrap:
    def __init__(self, kek: bytes):
        """Initialize with Key Encryption Key"""
        self.cipher = DeoxysII256(kek)
        self.wrapped_keys = {}
    
    def wrap_key(self, key_id: str, key_material: bytes, 
                 metadata: dict = None) -> bytes:
        """Wrap key with metadata"""
        # Deterministic nonce from key ID
        from metamui_crypto import BLAKE2b
        hasher = BLAKE2b(digest_size=16)
        hasher.update(key_id.encode())
        nonce = hasher.finalize()
        
        # Serialize metadata
        if metadata:
            aad = json.dumps(metadata).encode()
        else:
            aad = key_id.encode()
        
        # Wrap key
        wrapped, tag = self.cipher.encrypt(key_material, nonce, aad)
        
        # Store wrapped key
        wrapped_data = {
            'wrapped': wrapped,
            'tag': tag,
            'metadata': metadata,
            'timestamp': time.time()
        }
        self.wrapped_keys[key_id] = wrapped_data
        
        # Return serialized wrapped key
        return nonce + wrapped + tag
    
    def unwrap_key(self, key_id: str, wrapped_blob: bytes, 
                   metadata: dict = None) -> bytes:
        """Unwrap key and verify metadata"""
        # Parse wrapped blob
        nonce = wrapped_blob[:16]
        wrapped = wrapped_blob[16:-16]
        tag = wrapped_blob[-16:]
        
        # Reconstruct AAD
        if metadata:
            aad = json.dumps(metadata).encode()
        else:
            aad = key_id.encode()
        
        # Unwrap key
        try:
            key_material = self.cipher.decrypt(wrapped, nonce, aad, tag)
            return key_material
        except ValueError:
            raise ValueError("Key unwrapping failed")

Implementation Details

# Deoxys-BC tweakable block cipher (simplified)
class DeoxysBCCore:
    def __init__(self, key: bytes):
        self.round_keys = self.key_schedule(key)
        self.rounds = 14  # For 256-bit key
    
    def encrypt_block(self, plaintext: bytes, tweak: bytes) -> bytes:
        """Encrypt single block with tweak"""
        state = bytearray(plaintext)
        
        # Initial key addition
        self.add_round_key(state, self.round_keys[0])
        self.add_tweak(state, tweak, 0)
        
        # Main rounds
        for round in range(1, self.rounds):
            # SubBytes
            self.sub_bytes(state)
            
            # ShiftRows
            self.shift_rows(state)
            
            # MixColumns (except last round)
            if round < self.rounds - 1:
                self.mix_columns(state)
            
            # AddRoundKey
            self.add_round_key(state, self.round_keys[round])
            
            # AddTweak (specific rounds)
            if round in [1, 3, 5, 7, 9, 11, 13]:
                self.add_tweak(state, tweak, round)
        
        return bytes(state)
    
    def add_tweak(self, state: bytearray, tweak: bytes, round: int):
        """Add tweak with round-dependent permutation"""
        # Permute tweak based on round
        permuted_tweak = self.permute_tweak(tweak, round)
        
        # XOR with state
        for i in range(16):
            state[i] ^= permuted_tweak[i]
    
    def permute_tweak(self, tweak: bytes, round: int) -> bytes:
        """LFSR-based tweak permutation"""
        # Simplified LFSR permutation
        result = bytearray(tweak)
        
        # Round-dependent LFSR steps
        for _ in range(round):
            # LFSR step
            feedback = result[0] >> 7
            for i in range(15):
                result[i] = (result[i] << 1) | (result[i + 1] >> 7)
            result[15] = (result[15] << 1) ^ (0x87 if feedback else 0)
        
        return bytes(result)

# Deoxys-II AEAD mode
class DeoxysIIMode:
    def __init__(self, key: bytes):
        self.bc = DeoxysBCCore(key)
        self.tag_size = 16
    
    def process_aad(self, aad: bytes, nonce: bytes) -> bytes:
        """Process associated data"""
        auth = bytes(16)  # Zero initialization
        
        # Process AAD blocks
        for i in range(0, len(aad), 16):
            block = aad[i:i+16]
            if len(block) < 16:
                block = self.pad(block)
            
            # Create tweak from nonce and counter
            tweak = self.create_tweak(nonce, i // 16, 'A')
            
            # Update authentication
            encrypted = self.bc.encrypt_block(block, tweak)
            auth = self.xor_bytes(auth, encrypted)
        
        return auth
    
    def encrypt_payload(self, plaintext: bytes, nonce: bytes, 
                       auth: bytes) -> tuple:
        """Encrypt plaintext with authentication"""
        ciphertext = bytearray()
        checksum = bytes(16)  # Zero initialization
        
        # Process plaintext blocks
        for i in range(0, len(plaintext), 16):
            block = plaintext[i:i+16]
            if len(block) < 16:
                block = self.pad(block)
            
            # Update checksum
            checksum = self.xor_bytes(checksum, block)
            
            # Create tweak
            tweak = self.create_tweak(nonce, i // 16, 'M')
            
            # Encrypt block
            encrypted = self.bc.encrypt_block(block, tweak)
            ciphertext.extend(encrypted)
        
        # Final authentication
        final_tweak = self.create_tweak(nonce, 0, 'F')
        tag_input = self.xor_bytes(auth, checksum)
        tag = self.bc.encrypt_block(tag_input, final_tweak)
        
        return bytes(ciphertext[:len(plaintext)]), tag
    
    def create_tweak(self, nonce: bytes, counter: int, 
                    domain: str) -> bytes:
        """Create tweak from nonce, counter, and domain"""
        tweak = bytearray(16)
        
        # Copy nonce
        tweak[:len(nonce)] = nonce
        
        # Add counter
        tweak[12:16] = counter.to_bytes(4, 'big')
        
        # Add domain separation
        domain_byte = {'A': 0x00, 'M': 0x80, 'F': 0x40}[domain]
        tweak[15] ^= domain_byte
        
        return bytes(tweak)

Security Considerations

Best Practices

  1. Nonce Management
  2. Key Management
  3. Associated Data
  4. Tag Verification

Common Pitfalls

# DON'T: Ignore authentication failures
try:
    plaintext = deoxys.decrypt(ciphertext, nonce, aad, tag)
except ValueError:
    # WRONG: Using partial/corrupted data
    plaintext = ciphertext  # Dangerous!

# DO: Handle authentication failures properly
try:
    plaintext = deoxys.decrypt(ciphertext, nonce, aad, tag)
    process_data(plaintext)
except ValueError:
    log_security_event("Authentication failure")
    return None  # Safe failure

# DON'T: Modify tag length
tag = computed_tag[:8]  # WRONG: Truncating tag
stored_data = ciphertext + tag

# DO: Use full tag length
tag = computed_tag  # Full 128-bit tag
stored_data = ciphertext + tag

# DON'T: Forget associated data
# Encrypting
ciphertext, tag = deoxys.encrypt(data, nonce, b"context1")
# Decrypting with different AAD
plaintext = deoxys.decrypt(ciphertext, nonce, b"context2", tag)
# WRONG: Will fail authentication

# DO: Use consistent AAD
aad = b"context1"
ciphertext, tag = deoxys.encrypt(data, nonce, aad)
plaintext = deoxys.decrypt(ciphertext, nonce, aad, tag)  # Same AAD

Performance Characteristics

Benchmarks

Operation Data Size AAD Size Throughput Latency
Encrypt 1 KB 0 B 285 MB/s 3.5 ÎĽs
Encrypt 1 KB 32 B 278 MB/s 3.6 ÎĽs
Encrypt 1 MB 0 B 342 MB/s 2.9 ms
Encrypt 1 MB 1 KB 338 MB/s 3.0 ms
Decrypt 1 KB 0 B 283 MB/s 3.5 ÎĽs
Decrypt 1 MB 0 B 341 MB/s 2.9 ms

Performance vs Other AEAD Schemes

Algorithm Nonce-Misuse Throughput Relative Speed
Deoxys-II-256 Resistant 342 MB/s 1.00x (baseline)
AES-256-GCM Vulnerable 1.2 GB/s 3.51x
AES-256-GCM-SIV Resistant 298 MB/s 0.87x
ChaCha20-Poly1305 Vulnerable 892 MB/s 2.61x
AEGIS-256 Vulnerable 4.8 GB/s 14.04x

Optimization Strategies

# Parallel processing for multiple messages
class ParallelDeoxys:
    def __init__(self, key: bytes, num_workers=4):
        self.key = key
        self.num_workers = num_workers
    
    def encrypt_batch(self, messages: list) -> list:
        """Encrypt multiple messages in parallel"""
        from concurrent.futures import ThreadPoolExecutor
        
        def encrypt_message(msg_data):
            plaintext, nonce, aad = msg_data
            cipher = DeoxysII256(self.key)
            return cipher.encrypt(plaintext, nonce, aad)
        
        with ThreadPoolExecutor(max_workers=self.num_workers) as executor:
            results = list(executor.map(encrypt_message, messages))
        
        return results

# Optimized for small messages
class DeoxysSmallMessage:
    def __init__(self, key: bytes):
        self.cipher = DeoxysII256(key)
        # Pre-compute common tweaks
        self.precomputed_tweaks = {}
    
    def encrypt_small(self, data: bytes, nonce: bytes) -> tuple:
        """Optimized for messages < 1 block"""
        if len(data) > 16:
            return self.cipher.encrypt(data, nonce, b"")
        
        # Use precomputed values for common cases
        tweak_key = (nonce, len(data))
        if tweak_key in self.precomputed_tweaks:
            tweak = self.precomputed_tweaks[tweak_key]
        else:
            tweak = self.compute_tweak(nonce, len(data))
            self.precomputed_tweaks[tweak_key] = tweak
        
        # Single block optimization
        padded = self.pad(data)
        ciphertext = self.cipher.encrypt_block(padded, tweak)
        tag = self.cipher.compute_tag(ciphertext, nonce)
        
        return ciphertext[:len(data)], tag

# Memory-efficient streaming
class StreamingDeoxys:
    def __init__(self, key: bytes):
        self.cipher = DeoxysII256(key)
        self.buffer_size = 1024 * 1024  # 1MB buffer
    
    def encrypt_stream(self, input_stream, output_stream, 
                      nonce: bytes, aad: bytes = b""):
        """Stream encryption with minimal memory"""
        # Process AAD first
        auth_state = self.cipher.process_aad(aad, nonce)
        
        # Stream encryption
        position = 0
        checksum = bytes(16)
        
        while True:
            chunk = input_stream.read(self.buffer_size)
            if not chunk:
                break
            
            # Update checksum
            for i in range(0, len(chunk), 16):
                block = chunk[i:i+16]
                if len(block) < 16:
                    block = self.pad(block)
                checksum = self.xor_bytes(checksum, block)
            
            # Encrypt chunk
            encrypted = self.cipher.encrypt_chunk(
                chunk, nonce, position
            )
            output_stream.write(encrypted)
            
            position += len(chunk)
            yield position  # Progress callback
        
        # Compute final tag
        tag = self.cipher.finalize_stream(auth_state, checksum, nonce)
        output_stream.write(tag)
        
        yield tag

Use Cases

1. Secure API with Nonce-Misuse Protection

class DeoxysAPIAuth:
    def __init__(self, api_key: bytes):
        self.cipher = DeoxysII256(api_key)
        self.request_cache = {}  # For idempotency
    
    def create_request_token(self, request_data: dict) -> str:
        """Create authenticated API request token"""
        # Serialize request
        request_json = json.dumps(request_data, sort_keys=True)
        request_bytes = request_json.encode()
        
        # Use timestamp as nonce (misuse-resistant)
        timestamp = int(time.time())
        nonce = timestamp.to_bytes(16, 'big')
        
        # Include method and path in AAD
        aad = f"{request_data['method']}:{request_data['path']}".encode()
        
        # Check cache for idempotency
        cache_key = (request_json, timestamp // 60)  # 1-minute window
        if cache_key in self.request_cache:
            return self.request_cache[cache_key]
        
        # Encrypt request
        ciphertext, tag = self.cipher.encrypt(request_bytes, nonce, aad)
        
        # Create token
        token_data = {
            'timestamp': timestamp,
            'ciphertext': base64.b64encode(ciphertext).decode(),
            'tag': base64.b64encode(tag).decode(),
            'method': request_data['method'],
            'path': request_data['path']
        }
        
        token = base64.b64encode(
            json.dumps(token_data).encode()
        ).decode()
        
        # Cache for idempotency
        self.request_cache[cache_key] = token
        self.cleanup_cache()
        
        return token
    
    def verify_request_token(self, token: str) -> dict:
        """Verify and decrypt API request"""
        # Decode token
        token_data = json.loads(
            base64.b64decode(token).decode()
        )
        
        # Check timestamp (prevent replay)
        current_time = int(time.time())
        if abs(current_time - token_data['timestamp']) > 300:  # 5 minutes
            raise ValueError("Token expired")
        
        # Reconstruct components
        nonce = token_data['timestamp'].to_bytes(16, 'big')
        ciphertext = base64.b64decode(token_data['ciphertext'])
        tag = base64.b64decode(token_data['tag'])
        aad = f"{token_data['method']}:{token_data['path']}".encode()
        
        # Decrypt and verify
        try:
            request_bytes = self.cipher.decrypt(ciphertext, nonce, aad, tag)
            return json.loads(request_bytes.decode())
        except ValueError:
            raise ValueError("Invalid token")
    
    def cleanup_cache(self):
        """Remove old cache entries"""
        current_minute = int(time.time()) // 60
        expired = [
            key for key in self.request_cache
            if key[1] < current_minute - 5  # 5-minute retention
        ]
        for key in expired:
            del self.request_cache[key]

2. Distributed Storage with Deduplication

class DeoxysDistributedStorage:
    def __init__(self, cluster_key: bytes):
        self.cipher = DeoxysII256(cluster_key)
        self.chunk_size = 4 * 1024 * 1024  # 4MB chunks
        self.storage_nodes = {}
    
    def store_file(self, file_path: str) -> dict:
        """Store file with encrypted deduplication"""
        file_id = generate_random_bytes(16)
        chunks = []
        
        with open(file_path, 'rb') as f:
            chunk_num = 0
            
            while True:
                chunk_data = f.read(self.chunk_size)
                if not chunk_data:
                    break
                
                # Compute chunk hash for deduplication
                chunk_hash = self.compute_chunk_hash(chunk_data)
                
                # Check if chunk exists
                if not self.chunk_exists(chunk_hash):
                    # Encrypt new chunk
                    encrypted_chunk = self.encrypt_chunk(
                        chunk_data, chunk_hash, file_id, chunk_num
                    )
                    
                    # Store on appropriate node
                    node = self.select_storage_node(chunk_hash)
                    self.store_on_node(node, chunk_hash, encrypted_chunk)
                
                chunks.append({
                    'hash': chunk_hash,
                    'size': len(chunk_data),
                    'position': chunk_num
                })
                
                chunk_num += 1
        
        # Create file manifest
        manifest = {
            'file_id': file_id.hex(),
            'name': os.path.basename(file_path),
            'size': sum(c['size'] for c in chunks),
            'chunks': chunks,
            'created': time.time()
        }
        
        # Encrypt manifest
        manifest_data = json.dumps(manifest).encode()
        manifest_nonce = file_id
        manifest_ct, manifest_tag = self.cipher.encrypt(
            manifest_data, manifest_nonce, b"manifest"
        )
        
        return {
            'manifest_id': file_id.hex(),
            'manifest': base64.b64encode(manifest_ct).decode(),
            'tag': base64.b64encode(manifest_tag).decode(),
            'chunks': len(chunks),
            'dedup_ratio': self.calculate_dedup_ratio(chunks)
        }
    
    def encrypt_chunk(self, chunk_data: bytes, chunk_hash: bytes,
                     file_id: bytes, position: int) -> dict:
        """Encrypt chunk with convergent encryption"""
        # Derive chunk key from hash (convergent encryption)
        from metamui_crypto import HKDF, SHA256
        hkdf = HKDF(hash_function=SHA256())
        chunk_key = hkdf.derive(
            ikm=self.cipher.key,
            salt=chunk_hash,
            info=b"chunk encryption",
            length=32
        )
        
        # Use deterministic nonce for deduplication
        nonce = chunk_hash[:16]
        
        # AAD includes file context
        aad = struct.pack('>16sI', file_id, position)
        
        # Encrypt chunk
        chunk_cipher = DeoxysII256(chunk_key)
        ciphertext, tag = chunk_cipher.encrypt(chunk_data, nonce, aad)
        
        return {
            'ciphertext': ciphertext,
            'tag': tag,
            'file_id': file_id,
            'position': position
        }
    
    def retrieve_file(self, manifest_id: str, manifest_ct: str, 
                     manifest_tag: str) -> bytes:
        """Retrieve and decrypt file"""
        # Decrypt manifest
        file_id = bytes.fromhex(manifest_id)
        manifest_nonce = file_id
        manifest_data = base64.b64decode(manifest_ct)
        tag = base64.b64decode(manifest_tag)
        
        try:
            manifest_json = self.cipher.decrypt(
                manifest_data, manifest_nonce, b"manifest", tag
            )
            manifest = json.loads(manifest_json.decode())
        except ValueError:
            raise ValueError("Invalid manifest")
        
        # Retrieve chunks
        file_data = bytearray()
        
        for chunk_info in manifest['chunks']:
            # Retrieve encrypted chunk
            encrypted_chunk = self.retrieve_from_node(chunk_info['hash'])
            
            # Decrypt chunk
            chunk_data = self.decrypt_chunk(
                encrypted_chunk, chunk_info['hash'],
                file_id, chunk_info['position']
            )
            
            file_data.extend(chunk_data)
        
        return bytes(file_data)

3. Blockchain State Encryption

class DeoxysBlockchainState:
    def __init__(self, genesis_key: bytes):
        self.state_cipher = DeoxysII256(genesis_key)
        self.state_root = None
        self.state_cache = {}
    
    def encrypt_state_transition(self, prev_state: bytes, 
                               transactions: list) -> dict:
        """Encrypt state transition with authentication"""
        # Compute new state
        new_state = self.apply_transactions(prev_state, transactions)
        
        # Use block height as part of nonce
        block_height = self.get_block_height()
        nonce = block_height.to_bytes(8, 'big') + generate_random_bytes(8)
        
        # Include transaction hashes in AAD
        tx_hashes = [self.hash_transaction(tx) for tx in transactions]
        aad = b''.join(tx_hashes)
        
        # Encrypt state delta
        state_delta = self.compute_delta(prev_state, new_state)
        encrypted_delta, tag = self.state_cipher.encrypt(
            state_delta, nonce, aad
        )
        
        # Create state commitment
        commitment = {
            'block_height': block_height,
            'prev_root': self.hash_state(prev_state),
            'new_root': self.hash_state(new_state),
            'encrypted_delta': encrypted_delta,
            'auth_tag': tag,
            'nonce': nonce,
            'tx_count': len(transactions)
        }
        
        return commitment
    
    def verify_state_transition(self, commitment: dict, 
                              transactions: list) -> bytes:
        """Verify and decrypt state transition"""
        # Reconstruct AAD
        tx_hashes = [self.hash_transaction(tx) for tx in transactions]
        aad = b''.join(tx_hashes)
        
        # Decrypt state delta
        try:
            state_delta = self.state_cipher.decrypt(
                commitment['encrypted_delta'],
                commitment['nonce'],
                aad,
                commitment['auth_tag']
            )
        except ValueError:
            raise ValueError("State authentication failed")
        
        # Verify state transition
        prev_state = self.load_state(commitment['prev_root'])
        computed_state = self.apply_delta(prev_state, state_delta)
        
        if self.hash_state(computed_state) != commitment['new_root']:
            raise ValueError("State transition invalid")
        
        return computed_state
    
    def create_state_proof(self, key: bytes, value: bytes) -> dict:
        """Create authenticated proof of state inclusion"""
        # Get Merkle path
        path = self.get_merkle_path(key)
        
        # Create proof nonce from key
        proof_nonce = self.hash_key(key) + generate_random_bytes(8)
        
        # AAD includes the key
        aad = key
        
        # Encrypt value with proof
        proof_data = {
            'value': value,
            'path': path,
            'timestamp': time.time()
        }
        
        encrypted_proof, tag = self.state_cipher.encrypt(
            json.dumps(proof_data).encode(),
            proof_nonce,
            aad
        )
        
        return {
            'key': key,
            'proof': encrypted_proof,
            'tag': tag,
            'nonce': proof_nonce,
            'root': self.state_root
        }

4. Privacy-Preserving Analytics

class DeoxysPrivateAnalytics:
    def __init__(self, analytics_key: bytes):
        self.cipher = DeoxysII256(analytics_key)
        self.aggregation_window = 3600  # 1 hour
    
    def submit_encrypted_metric(self, user_id: str, metric: dict) -> bytes:
        """Submit encrypted metric for aggregation"""
        # Deterministic encryption for same-user aggregation
        user_nonce = self.derive_user_nonce(user_id)
        
        # Time-based bucketing
        time_bucket = int(time.time()) // self.aggregation_window
        
        # Serialize metric
        metric_data = {
            'user_id': user_id,
            'bucket': time_bucket,
            'values': metric
        }
        metric_bytes = json.dumps(metric_data).encode()
        
        # AAD includes aggregation context
        aad = f"analytics:{time_bucket}".encode()
        
        # Encrypt metric
        ciphertext, tag = self.cipher.encrypt(metric_bytes, user_nonce, aad)
        
        # Return submission token
        return base64.b64encode(
            struct.pack('>Q', time_bucket) + ciphertext + tag
        ).decode()
    
    def aggregate_metrics(self, encrypted_metrics: list) -> dict:
        """Aggregate metrics while preserving privacy"""
        buckets = {}
        
        for token in encrypted_metrics:
            # Decode token
            data = base64.b64decode(token)
            time_bucket = struct.unpack('>Q', data[:8])[0]
            ciphertext = data[8:-16]
            tag = data[-16:]
            
            # Group by time bucket
            if time_bucket not in buckets:
                buckets[time_bucket] = []
            
            buckets[time_bucket].append((ciphertext, tag))
        
        # Process each bucket
        results = {}
        for bucket, metrics in buckets.items():
            # Decrypt metrics for aggregation
            aad = f"analytics:{bucket}".encode()
            decrypted_metrics = []
            
            for ciphertext, tag in metrics:
                # Need to reconstruct nonce (would be provided separately)
                # This is simplified - real implementation would handle this
                try:
                    metric_bytes = self.cipher.decrypt(
                        ciphertext, user_nonce, aad, tag
                    )
                    metric = json.loads(metric_bytes.decode())
                    decrypted_metrics.append(metric)
                except ValueError:
                    continue  # Skip invalid metrics
            
            # Perform aggregation
            results[bucket] = self.aggregate_bucket(decrypted_metrics)
        
        return results
    
    def create_privacy_report(self, aggregated_data: dict) -> bytes:
        """Create encrypted privacy-preserving report"""
        # Add noise for differential privacy
        noisy_data = self.add_differential_privacy(aggregated_data)
        
        # Create report
        report = {
            'timestamp': time.time(),
            'aggregations': noisy_data,
            'privacy_budget': 0.1,  # Epsilon value
            'noise_scale': 1.0
        }
        
        # Encrypt report
        report_nonce = generate_random_bytes(16)
        report_bytes = json.dumps(report).encode()
        
        ciphertext, tag = self.cipher.encrypt(
            report_bytes, report_nonce, b"privacy_report"
        )
        
        return base64.b64encode(
            report_nonce + ciphertext + tag
        ).decode()

Comparison with Other AEAD Schemes

Deoxys-II vs AES-GCM

Feature Deoxys-II AES-GCM
Nonce Reuse Resistant Catastrophic
Performance Good Excellent
Parallelizable Yes Yes
Patent Status Free Free
Standardization CAESAR NIST
Hardware Support Limited Extensive

Deoxys-II vs ChaCha20-Poly1305

Feature Deoxys-II ChaCha20-Poly1305
Structure Block cipher Stream cipher
Nonce Size 128 bits 96 bits
Nonce Reuse Resistant Vulnerable
Software Speed Good Excellent
Simplicity Complex Simple

When to Use Deoxys-II

# Use Deoxys-II when nonce-misuse resistance is critical
if application_requires_nonce_reuse_safety:
    cipher = DeoxysII256(key)

# Use Deoxys-II for long-term storage
if data_lifetime > 10_years:
    # Beyond birthday bound security
    cipher = DeoxysII256(key)

# Use Deoxys-II for deterministic encryption
if need_searchable_encryption:
    # Safe with deterministic nonces
    cipher = DeoxysII256(key)

Migration Guide

From AES-GCM to Deoxys-II

# Before: AES-GCM (nonce-misuse vulnerable)
from Crypto.Cipher import AES
cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
cipher.update(aad)
ciphertext, tag = cipher.encrypt_and_digest(plaintext)

# After: Deoxys-II (nonce-misuse resistant)
from metamui_crypto import DeoxysII256
deoxys = DeoxysII256(key)
ciphertext, tag = deoxys.encrypt(plaintext, nonce, aad)

# Migration wrapper
class AEADMigrator:
    def __init__(self, algorithm='deoxys'):
        self.algorithm = algorithm
    
    def encrypt(self, key, plaintext, nonce, aad=b""):
        if self.algorithm == 'deoxys':
            cipher = DeoxysII256(key)
            return cipher.encrypt(plaintext, nonce, aad)
        elif self.algorithm == 'aes-gcm':
            cipher = AES.new(key, AES.MODE_GCM, nonce=nonce)
            cipher.update(aad)
            return cipher.encrypt_and_digest(plaintext)

From AES-GCM-SIV to Deoxys-II

# Both provide nonce-misuse resistance
# Before: AES-GCM-SIV
ciphertext = aes_gcm_siv.seal(plaintext, nonce, aad)

# After: Deoxys-II (better performance)
ciphertext, tag = deoxys.encrypt(plaintext, nonce, aad)
sealed = ciphertext + tag  # Combine for compatibility

Test Vectors

Deoxys-II-256-128 Test Vectors

# Test Vector 1: Empty plaintext
key = bytes.fromhex("000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f")
nonce = bytes.fromhex("000102030405060708090a0b0c0d0e0f")
plaintext = b""
aad = b""

deoxys = DeoxysII256(key)
ciphertext, tag = deoxys.encrypt(plaintext, nonce, aad)
assert ciphertext == b""
assert tag.hex() == "8e31b94d66c55ab2be1e4d49d8a79de9"

# Test Vector 2: With plaintext and AAD
plaintext = bytes.fromhex("000102030405060708090a0b0c0d0e0f")
aad = bytes.fromhex("0001020304050607")

ciphertext, tag = deoxys.encrypt(plaintext, nonce, aad)
assert ciphertext.hex() == "2cd8bf0c6a7e5d89c4f4c4d5a1012e4f"
assert tag.hex() == "fa4a7bc90e09e9e2e18d570f1c1b3341"

# Test Vector 3: Nonce reuse (different plaintexts)
plaintext1 = b"Hello World!"
plaintext2 = b"Hello Alice!"

ct1, tag1 = deoxys.encrypt(plaintext1, nonce, aad)
ct2, tag2 = deoxys.encrypt(plaintext2, nonce, aad)

# Ciphertexts should be different despite nonce reuse
assert ct1 != ct2
assert tag1 != tag2

Decryption Test Vectors

# Test Vector 4: Successful decryption
ciphertext = bytes.fromhex("2cd8bf0c6a7e5d89c4f4c4d5a1012e4f")
tag = bytes.fromhex("fa4a7bc90e09e9e2e18d570f1c1b3341")

decrypted = deoxys.decrypt(ciphertext, nonce, aad, tag)
assert decrypted == plaintext

# Test Vector 5: Authentication failure
bad_tag = bytes.fromhex("fa4a7bc90e09e9e2e18d570f1c1b3342")  # Last byte modified

try:
    deoxys.decrypt(ciphertext, nonce, aad, bad_tag)
    assert False, "Should have failed"
except ValueError as e:
    assert "authentication" in str(e).lower()

Large Message Test

# Test Vector 6: Multi-block message
key = generate_random_bytes(32)
nonce = generate_random_bytes(16)
plaintext = b"A" * 1000  # Multiple blocks
aad = b"metadata" * 10

deoxys = DeoxysII256(key)
ciphertext, tag = deoxys.encrypt(plaintext, nonce, aad)

# Verify round trip
decrypted = deoxys.decrypt(ciphertext, nonce, aad, tag)
assert decrypted == plaintext

References