ML-KEM-768 Security-Focused API Documentation

Version: 1.0
Last Updated: 2025-07-05
**Security Classification:
PUBLIC
Author: Phantom (phantom@metamui.id)

Overview

ML-KEM-768 (Module Lattice-Based Key Encapsulation Mechanism) is a post-quantum key encapsulation mechanism standardized by NIST. Based on the Module Learning With Errors (M-LWE) problem, ML-KEM-768 provides NIST security level 3 (equivalent to 192-bit classical security) and is designed to replace RSA and ECDH key exchange in a post-quantum world.

Security Level: NIST Level 3 (192-bit classical, ~2^96 quantum)
Public Key Size: 1184 bytes
Private Key Size: 2400 bytes
Ciphertext Size: 1088 bytes
Shared Secret Size: 32 bytes
Algorithm Family: Module-LWE lattice-based

Security Warnings ⚠️

  1. Post-Quantum Security: Designed specifically for quantum-resistant key exchange
  2. Key Reuse: Public keys can be reused, but each encapsulation is unique
  3. Implementation Security: Requires protection against side-channel attacks
  4. Hybrid Deployment: Often used alongside classical KEMs for transition
  5. Ciphertext Integrity: No built-in authentication - use with AEAD

API Functions

mlkem768_keygen() -> (PublicKey[1184], PrivateKey[2400])

Security Contract:

Attack Resistance: | Attack Type | Protected | Notes | |————-|———–|——-| | Quantum (Shor’s) | ✅ | Lattice-based security | | Quantum (Grover’s) | ✅ | 192-bit security maintained | | Classical Lattice | ✅ | M-LWE hardness assumption | | Side-Channel | ⚠️ | Implementation dependent | | Key Recovery | ✅ | Provable lattice security | | CCA Attacks | ✅ | Fujisaki-Okamoto transform |

Security Requirements:

Secure Usage Example:

import secrets
import hashlib
import time
import json
from typing import Tuple, Dict, Optional, List
from dataclasses import dataclass

@dataclass
class MLKEMKeyPair:
    public_key: bytes
    private_key: bytes
    key_id: str
    generated_at: float
    security_level: int = 768

class SecureMLKEM768:
    """Secure ML-KEM-768 implementation with best practices"""
    
    def __init__(self):
        self.key_pairs = {}
        self.encapsulation_history = {}
        
    def generate_keypair_with_metadata(
        self,
        key_id: Optional[str] = None,
        additional_entropy: Optional[bytes] = None
    ) -> Dict:
        """Generate ML-KEM-768 key pair with comprehensive metadata"""
        
        if key_id is None:
            key_id = self._generate_key_id()
        
        # Gather entropy from multiple sources
        entropy_pool = bytearray()
        
        # System CSPRNG (primary source)
        entropy_pool.extend(secrets.token_bytes(64))
        
        # High-resolution timing entropy
        timing_entropy = str(time.perf_counter_ns()).encode()
        entropy_pool.extend(hashlib.sha256(timing_entropy).digest())
        
        # Process and thread identifiers
        import os, threading
        process_info = f"{os.getpid()}:{threading.get_ident()}:{time.time()}".encode()
        entropy_pool.extend(hashlib.sha256(process_info).digest())
        
        # Additional entropy if provided
        if additional_entropy:
            entropy_pool.extend(additional_entropy[:64])
        
        # Generate deterministic seed from entropy pool
        final_seed = hashlib.shake_256(
            b"ML-KEM-768-KeyGen-v1:" + bytes(entropy_pool)
        ).digest(64)
        
        # Generate key pair using seed
        public_key, private_key = self._mlkem768_keygen_from_seed(final_seed)
        
        # Create key pair object
        key_pair = MLKEMKeyPair(
            public_key=public_key,
            private_key=private_key,
            key_id=key_id,
            generated_at=time.time()
        )
        
        # Store key pair
        self.key_pairs[key_id] = key_pair
        
        # Clear sensitive data
        for i in range(len(entropy_pool)):
            entropy_pool[i] = 0
        for i in range(len(final_seed)):
            final_seed[i] = 0
        
        # Return metadata
        return {
            'key_id': key_id,
            'public_key': public_key.hex(),
            'public_key_hash': hashlib.sha256(public_key).hexdigest()[:16],
            'algorithm': 'ML-KEM-768',
            'security_level': 768,
            'generated_at': key_pair.generated_at,
            'key_size_info': {
                'public_key_bytes': len(public_key),
                'private_key_bytes': len(private_key),
                'ciphertext_bytes': 1088,
                'shared_secret_bytes': 32
            }
        }
    
    def _mlkem768_keygen_from_seed(self, seed: bytes) -> Tuple[bytes, bytes]:
        """Generate ML-KEM-768 keys from seed (implementation placeholder)"""
        
        # This would call the actual ML-KEM-768 implementation
        # Implementation must include:
        # - Proper polynomial sampling from seed
        # - Matrix generation
        # - Error polynomial sampling
        # - Fujisaki-Okamoto transform
        
        from mlkem_crypto import mlkem768_keygen_seeded
        return mlkem768_keygen_seeded(seed)
    
    def _generate_key_id(self) -> str:
        """Generate unique key identifier"""
        
        entropy = secrets.token_bytes(32)
        timestamp = int(time.time() * 1000000)  # microseconds
        
        id_input = entropy + timestamp.to_bytes(8, 'big')
        key_id = hashlib.sha256(id_input).hexdigest()[:16]
        
        return f"mlkem768-{key_id}"
    
    def get_public_key_info(self, key_id: str) -> Optional[Dict]:
        """Get public key information for sharing"""
        
        if key_id not in self.key_pairs:
            return None
        
        key_pair = self.key_pairs[key_id]
        
        return {
            'key_id': key_id,
            'public_key': key_pair.public_key.hex(),
            'algorithm': 'ML-KEM-768',
            'security_level': 768,
            'key_format': 'raw_bytes',
            'usage': 'key_encapsulation',
            'generated_at': key_pair.generated_at
        }
    
    def export_public_key_der(self, key_id: str) -> Optional[bytes]:
        """Export public key in DER format"""
        
        if key_id not in self.key_pairs:
            return None
        
        public_key = self.key_pairs[key_id].public_key
        
        # DER encoding for ML-KEM-768 public key
        # (This would implement actual DER encoding)
        return self._encode_mlkem768_public_key_der(public_key)
    
    def should_rotate_key(self, key_id: str, max_age_days: int = 365) -> bool:
        """Check if key should be rotated"""
        
        if key_id not in self.key_pairs:
            return True
        
        key_pair = self.key_pairs[key_id]
        age_seconds = time.time() - key_pair.generated_at
        age_days = age_seconds / (24 * 3600)
        
        return age_days > max_age_days

# SECURE: Key encapsulation with metadata
class MLKEMEncapsulator:
    def __init__(self, mlkem: SecureMLKEM768):
        self.mlkem = mlkem
        self.encapsulation_counter = 0
        
    def encapsulate_with_context(
        self,
        peer_public_key: bytes,
        context: str,
        session_info: Optional[Dict] = None
    ) -> Dict:
        """Encapsulate shared secret with context information"""
        
        # Validate public key
        if len(peer_public_key) != 1184:
            raise ValueError("Invalid ML-KEM-768 public key size")
        
        if not self._validate_public_key_structure(peer_public_key):
            raise ValueError("Invalid ML-KEM-768 public key structure")
        
        # Build encapsulation metadata
        encap_metadata = {
            'context': context,
            'encapsulator_id': self._get_encapsulator_id(),
            'sequence_number': self.encapsulation_counter,
            'timestamp': int(time.time()),
            'session_info': session_info or {}
        }
        
        # Add metadata to randomness (for domain separation)
        metadata_bytes = json.dumps(encap_metadata, sort_keys=True).encode()
        metadata_hash = hashlib.sha256(metadata_bytes).digest()
        
        # Generate additional entropy for this encapsulation
        encap_entropy = secrets.token_bytes(32)
        
        # Combine entropy sources
        combined_entropy = hashlib.shake_256(
            b"ML-KEM-768-Encap:" + metadata_hash + encap_entropy
        ).digest(32)
        
        # Perform encapsulation
        ciphertext, shared_secret = self._mlkem768_encapsulate(
            peer_public_key, combined_entropy
        )
        
        # Derive session keys from shared secret
        session_keys = self._derive_session_keys(
            shared_secret, metadata_bytes
        )
        
        self.encapsulation_counter += 1
        
        # Clear sensitive data
        for i in range(len(combined_entropy)):
            combined_entropy[i] = 0
        
        return {
            'ciphertext': ciphertext.hex(),
            'session_keys': session_keys,
            'metadata': encap_metadata,
            'algorithm': 'ML-KEM-768'
        }
    
    def _mlkem768_encapsulate(
        self, public_key: bytes, entropy: bytes
    ) -> Tuple[bytes, bytes]:
        """Core ML-KEM-768 encapsulation"""
        
        from mlkem_crypto import mlkem768_encapsulate
        return mlkem768_encapsulate(public_key, entropy)
    
    def _derive_session_keys(
        self, shared_secret: bytes, context: bytes
    ) -> Dict:
        """Derive multiple session keys from shared secret"""
        
        # Use HKDF to derive multiple keys
        from cryptography.hazmat.primitives import hashes
        from cryptography.hazmat.primitives.kdf.hkdf import HKDF
        
        # Extract
        hkdf = HKDF(
            algorithm=hashes.SHA256(),
            length=96,  # 3 × 32-byte keys
            salt=b"ML-KEM-768-SessionKeys-v1",
            info=context
        )
        
        key_material = hkdf.derive(shared_secret)
        
        return {
            'encryption_key': key_material[0:32].hex(),
            'authentication_key': key_material[32:64].hex(),
            'confirmation_key': key_material[64:96].hex()
        }
    
    def _validate_public_key_structure(self, public_key: bytes) -> bool:
        """Validate ML-KEM-768 public key structure"""
        
        # Implementation would check:
        # - Polynomial coefficients are in valid range
        # - Matrix structure is correct
        # - Public key follows ML-KEM specification
        
        return True  # Placeholder
    
    def _get_encapsulator_id(self) -> str:
        """Get unique encapsulator identifier"""
        
        import socket
        hostname = socket.gethostname()
        process_id = os.getpid()
        
        id_data = f"{hostname}:{process_id}:{time.time()}".encode()
        return hashlib.sha256(id_data).hexdigest()[:16]

# SECURE: Key decapsulation with validation
class MLKEMDecapsulator:
    def __init__(self, mlkem: SecureMLKEM768):
        self.mlkem = mlkem
        self.decapsulation_history = {}
        
    def decapsulate_with_validation(
        self,
        key_id: str,
        encapsulation_data: Dict
    ) -> Optional[Dict]:
        """Decapsulate and validate shared secret"""
        
        if key_id not in self.mlkem.key_pairs:
            return None
        
        key_pair = self.mlkem.key_pairs[key_id]
        
        # Extract encapsulation data
        ciphertext = bytes.fromhex(encapsulation_data['ciphertext'])
        metadata = encapsulation_data['metadata']
        
        # Validate ciphertext size
        if len(ciphertext) != 1088:
            return None
        
        # Check replay protection
        if self._is_replay_attack(ciphertext, metadata):
            return None
        
        # Perform decapsulation
        shared_secret = self._mlkem768_decapsulate(
            key_pair.private_key, ciphertext
        )
        
        if shared_secret is None:
            return None
        
        # Derive session keys with same context
        metadata_bytes = json.dumps(metadata, sort_keys=True).encode()
        session_keys = self._derive_session_keys(shared_secret, metadata_bytes)
        
        # Record decapsulation for replay protection
        self._record_decapsulation(ciphertext, metadata)
        
        return {
            'shared_secret': shared_secret.hex(),
            'session_keys': session_keys,
            'metadata': metadata,
            'decapsulated_at': time.time()
        }
    
    def _mlkem768_decapsulate(
        self, private_key: bytes, ciphertext: bytes
    ) -> Optional[bytes]:
        """Core ML-KEM-768 decapsulation"""
        
        from mlkem_crypto import mlkem768_decapsulate
        
        try:
            return mlkem768_decapsulate(private_key, ciphertext)
        except Exception:
            return None
    
    def _is_replay_attack(self, ciphertext: bytes, metadata: Dict) -> bool:
        """Check for replay attacks"""
        
        ciphertext_hash = hashlib.sha256(ciphertext).hexdigest()
        
        if ciphertext_hash in self.decapsulation_history:
            # Check if this is a replay within time window
            previous_time = self.decapsulation_history[ciphertext_hash]
            if time.time() - previous_time < 3600:  # 1 hour window
                return True
        
        return False
    
    def _record_decapsulation(self, ciphertext: bytes, metadata: Dict):
        """Record decapsulation for replay protection"""
        
        ciphertext_hash = hashlib.sha256(ciphertext).hexdigest()
        self.decapsulation_history[ciphertext_hash] = time.time()
        
        # Clean old entries (older than 24 hours)
        cutoff_time = time.time() - (24 * 3600)
        self.decapsulation_history = {
            h: t for h, t in self.decapsulation_history.items()
            if t > cutoff_time
        }
    
    def _derive_session_keys(
        self, shared_secret: bytes, context: bytes
    ) -> Dict:
        """Derive session keys (same as encapsulator)"""
        
        from cryptography.hazmat.primitives import hashes
        from cryptography.hazmat.primitives.kdf.hkdf import HKDF
        
        hkdf = HKDF(
            algorithm=hashes.SHA256(),
            length=96,
            salt=b"ML-KEM-768-SessionKeys-v1",
            info=context
        )
        
        key_material = hkdf.derive(shared_secret)
        
        return {
            'encryption_key': key_material[0:32].hex(),
            'authentication_key': key_material[32:64].hex(),
            'confirmation_key': key_material[64:96].hex()
        }

Common Mistakes:

# INSECURE: Weak randomness for key generation
import random
seed = random.getrandbits(256)  # NOT cryptographically secure!

# INSECURE: Reusing randomness for encapsulation
entropy = secrets.token_bytes(32)
ct1, ss1 = mlkem768_encapsulate(pk, entropy)
ct2, ss2 = mlkem768_encapsulate(pk, entropy)  # SAME ENTROPY!

# INSECURE: No ciphertext validation
shared_secret = mlkem768_decapsulate(sk, ciphertext)  # No error checking

# INSECURE: Using shared secret directly
key = shared_secret  # Should derive keys with KDF

# INSECURE: No replay protection
# Accepting same ciphertext multiple times

mlkem768_encapsulate(public_key: bytes[1184]) -> (ciphertext: bytes[1088], shared_secret: bytes[32])

Security Contract:

Attack Resistance: | Attack Type | Protected | Notes | |————-|———–|——-| | CCA Security | ✅ | Fujisaki-Okamoto transform | | Quantum Attacks | ✅ | Lattice-based security | | Key Recovery | ✅ | M-LWE hardness | | Ciphertext Malleability | ✅ | CCA security prevents | | Replay | ❌ | Application must handle |

Security Requirements:

Secure Usage Example:

use ml_kem::{MlKem768, PublicKey, EncapsulatedSecret};
use rand::{RngCore, thread_rng};
use sha3::{Sha3_256, Digest};
use zeroize::Zeroize;

/// Secure ML-KEM-768 encapsulation
pub struct SecureKemEncapsulator {
    rng: Box<dyn RngCore + Send>,
    session_counter: u64,
}

impl SecureKemEncapsulator {
    pub fn new() -> Self {
        Self {
            rng: Box::new(thread_rng()),
            session_counter: 0,
        }
    }
    
    pub fn encapsulate_with_context(
        &mut self,
        public_key: &PublicKey,
        context: &str,
    ) -> Result<EncapsulationResult, Error> {
        // Validate public key
        if !self.validate_public_key(public_key) {
            return Err(Error::InvalidPublicKey);
        }
        
        // Build context for domain separation
        let mut context_builder = ContextBuilder::new();
        context_builder.add_field("protocol", "ML-KEM-768");
        context_builder.add_field("context", context);
        context_builder.add_field("session", &self.session_counter.to_string());
        context_builder.add_field("timestamp", &chrono::Utc::now().timestamp().to_string());
        
        let context_bytes = context_builder.finalize();
        
        // Generate encapsulation with context
        let (ciphertext, shared_secret) = MlKem768::encapsulate(
            public_key,
            &mut self.rng
        )?;
        
        // Derive session keys
        let session_keys = self.derive_session_keys(&shared_secret, &context_bytes)?;
        
        self.session_counter += 1;
        
        Ok(EncapsulationResult {
            ciphertext,
            session_keys,
            context: context_bytes,
        })
    }
    
    pub fn batch_encapsulate(
        &mut self,
        public_keys: &[PublicKey],
        shared_context: &str,
    ) -> Result<Vec<EncapsulationResult>, Error> {
        let mut results = Vec::with_capacity(public_keys.len());
        
        for (i, public_key) in public_keys.iter().enumerate() {
            let context = format!("{}:batch_item:{}", shared_context, i);
            let result = self.encapsulate_with_context(public_key, &context)?;
            results.push(result);
        }
        
        Ok(results)
    }
    
    fn validate_public_key(&self, public_key: &PublicKey) -> bool {
        // Implement ML-KEM-768 public key validation
        // - Check polynomial coefficients are in range
        // - Verify matrix structure
        // - Validate format compliance
        
        true // Placeholder
    }
    
    fn derive_session_keys(
        &self,
        shared_secret: &[u8; 32],
        context: &[u8],
    ) -> Result<SessionKeys, Error> {
        // Use HKDF for key derivation
        let hkdf = hkdf::Hkdf::<Sha3_256>::new(
            Some(b"ML-KEM-768-v1"),
            shared_secret
        );
        
        let mut encryption_key = [0u8; 32];
        let mut authentication_key = [0u8; 32];
        let mut confirmation_key = [0u8; 32];
        
        hkdf.expand(b"encryption", &mut encryption_key)
            .map_err(|_| Error::KeyDerivationFailed)?;
        hkdf.expand(b"authentication", &mut authentication_key)
            .map_err(|_| Error::KeyDerivationFailed)?;
        hkdf.expand(b"confirmation", &mut confirmation_key)
            .map_err(|_| Error::KeyDerivationFailed)?;
        
        Ok(SessionKeys {
            encryption_key,
            authentication_key,
            confirmation_key,
        })
    }
}

#[derive(Debug)]
pub struct EncapsulationResult {
    pub ciphertext: EncapsulatedSecret,
    pub session_keys: SessionKeys,
    pub context: Vec<u8>,
}

#[derive(Debug, Zeroize)]
#[zeroize(drop)]
pub struct SessionKeys {
    pub encryption_key: [u8; 32],
    pub authentication_key: [u8; 32],
    pub confirmation_key: [u8; 32],
}

/// Context builder for domain separation
pub struct ContextBuilder {
    fields: Vec<(String, String)>,
}

impl ContextBuilder {
    pub fn new() -> Self {
        Self { fields: Vec::new() }
    }
    
    pub fn add_field(&mut self, name: &str, value: &str) {
        self.fields.push((name.to_string(), value.to_string()));
    }
    
    pub fn finalize(mut self) -> Vec<u8> {
        // Sort fields for canonical representation
        self.fields.sort_by(|a, b| a.0.cmp(&b.0));
        
        let mut context = String::new();
        for (name, value) in &self.fields {
            context.push_str(&format!("{}:{}\n", name, value));
        }
        
        context.into_bytes()
    }
}

mlkem768_decapsulate(private_key: bytes[2400], ciphertext: bytes[1088]) -> Optional[shared_secret: bytes[32]]

Security Contract:

Security Requirements:

Secure Usage Example:

class SecureMLKEMDecapsulator:
    """Secure ML-KEM-768 decapsulation with protections"""
    
    def __init__(self, private_key: bytes):
        if len(private_key) != 2400:
            raise ValueError("Invalid ML-KEM-768 private key size")
        
        self.private_key = private_key
        self.decapsulation_cache = {}
        self.failure_count = 0
        self.max_failures = 1000
        
    def decapsulate_secure(
        self,
        ciphertext: bytes,
        context: bytes = b"",
        expected_session_id: Optional[str] = None
    ) -> Optional[Dict]:
        """Secure decapsulation with validation"""
        
        # Validate ciphertext format
        if not self._validate_ciphertext_format(ciphertext):
            self._handle_decapsulation_failure()
            return None
        
        # Check for excessive failures (DoS protection)
        if self.failure_count > self.max_failures:
            raise RuntimeError("Too many decapsulation failures")
        
        # Check decapsulation cache for replay detection
        ciphertext_hash = hashlib.sha256(ciphertext).hexdigest()
        if ciphertext_hash in self.decapsulation_cache:
            cache_entry = self.decapsulation_cache[ciphertext_hash]
            if time.time() - cache_entry['timestamp'] < 3600:  # 1 hour
                return None  # Potential replay attack
        
        # Perform decapsulation
        try:
            shared_secret = self._mlkem768_decapsulate_core(
                self.private_key, ciphertext
            )
            
            if shared_secret is None:
                self._handle_decapsulation_failure()
                return None
            
            # Derive keys with context
            session_keys = self._derive_contextual_keys(shared_secret, context)
            
            # Cache successful decapsulation
            self.decapsulation_cache[ciphertext_hash] = {
                'timestamp': time.time(),
                'success': True
            }
            
            # Clean cache periodically
            if len(self.decapsulation_cache) > 10000:
                self._clean_cache()
            
            result = {
                'shared_secret': shared_secret.hex(),
                'session_keys': session_keys,
                'context': context.hex() if context else "",
                'decapsulated_at': time.time()
            }
            
            # Clear shared secret from memory
            for i in range(len(shared_secret)):
                shared_secret[i] = 0
            
            return result
            
        except Exception as e:
            self._handle_decapsulation_failure()
            return None
    
    def _validate_ciphertext_format(self, ciphertext: bytes) -> bool:
        """Validate ML-KEM-768 ciphertext format"""
        
        # Check size
        if len(ciphertext) != 1088:
            return False
        
        # Additional format validation would go here
        # - Check polynomial coefficient ranges
        # - Verify structure constraints
        # - Validate encoding
        
        return True
    
    def _mlkem768_decapsulate_core(
        self, private_key: bytes, ciphertext: bytes
    ) -> Optional[bytes]:
        """Core ML-KEM-768 decapsulation implementation"""
        
        from mlkem_crypto import mlkem768_decapsulate
        
        try:
            # Implementation should be constant-time
            return mlkem768_decapsulate(private_key, ciphertext)
        except Exception:
            return None
    
    def _derive_contextual_keys(
        self, shared_secret: bytes, context: bytes
    ) -> Dict:
        """Derive keys with context binding"""
        
        # Build key derivation input
        kdf_input = b"ML-KEM-768-Keys-v1:" + context + shared_secret
        
        # Use SHAKE256 for key derivation
        key_material = hashlib.shake_256(kdf_input).digest(128)  # 4 × 32-byte keys
        
        return {
            'encryption_key': key_material[0:32].hex(),
            'authentication_key': key_material[32:64].hex(),
            'key_confirmation': key_material[64:96].hex(),
            'export_key': key_material[96:128].hex()
        }
    
    def _handle_decapsulation_failure(self):
        """Handle decapsulation failure securely"""
        
        self.failure_count += 1
        
        # Add delay to prevent timing attacks
        import time
        time.sleep(0.001)  # 1ms delay
        
        # Log failure for monitoring (in real implementation)
        # logger.warning(f"ML-KEM decapsulation failure #{self.failure_count}")
    
    def _clean_cache(self):
        """Clean old entries from decapsulation cache"""
        
        current_time = time.time()
        cutoff_time = current_time - 3600  # 1 hour
        
        self.decapsulation_cache = {
            h: entry for h, entry in self.decapsulation_cache.items()
            if entry['timestamp'] > cutoff_time
        }

Security Best Practices

Hybrid Classical/Post-Quantum Key Exchange

class HybridKeyExchange:
    """Hybrid classical (ECDH) + post-quantum (ML-KEM) key exchange"""
    
    def __init__(self):
        self.mlkem = SecureMLKEM768()
        self.classical_keys = {}
        
    def create_hybrid_keypair(self, key_id: str) -> Dict:
        """Create hybrid key pair"""
        
        # Generate ML-KEM-768 key pair
        mlkem_keys = self.mlkem.generate_keypair_with_metadata(
            key_id=f"{key_id}-mlkem"
        )
        
        # Generate ECDH key pair (P-256)
        from cryptography.hazmat.primitives.asymmetric import ec
        private_key = ec.generate_private_key(ec.SECP256R1())
        public_key = private_key.public_key()
        
        # Store classical keys
        self.classical_keys[key_id] = {
            'private_key': private_key,
            'public_key': public_key
        }
        
        # Export public keys
        classical_public_bytes = public_key.public_bytes(
            encoding=serialization.Encoding.X962,
            format=serialization.PublicFormat.UncompressedPoint
        )
        
        return {
            'key_id': key_id,
            'mlkem_public_key': mlkem_keys['public_key'],
            'classical_public_key': classical_public_bytes.hex(),
            'algorithm': 'Hybrid-ECDH-P256-ML-KEM-768'
        }
    
    def hybrid_encapsulate(
        self,
        peer_mlkem_public_key: bytes,
        peer_classical_public_key: bytes,
        context: str
    ) -> Dict:
        """Perform hybrid key encapsulation"""
        
        # ML-KEM-768 encapsulation
        encapsulator = MLKEMEncapsulator(self.mlkem)
        mlkem_result = encapsulator.encapsulate_with_context(
            peer_mlkem_public_key,
            f"Hybrid-{context}"
        )
        
        # ECDH key exchange
        from cryptography.hazmat.primitives.asymmetric import ec
        from cryptography.hazmat.primitives import serialization
        
        # Generate ephemeral ECDH key
        ephemeral_private = ec.generate_private_key(ec.SECP256R1())
        ephemeral_public = ephemeral_private.public_key()
        
        # Load peer's classical public key
        peer_public = ec.EllipticCurvePublicKey.from_encoded_point(
            ec.SECP256R1(), peer_classical_public_key
        )
        
        # Perform ECDH
        ecdh_shared = ephemeral_private.exchange(
            ec.ECDH(), peer_public
        )
        
        # Combine shared secrets
        combined_secret = self._combine_shared_secrets(
            mlkem_result['session_keys']['encryption_key'],
            ecdh_shared,
            context.encode()
        )
        
        # Export ephemeral public key
        ephemeral_public_bytes = ephemeral_public.public_bytes(
            encoding=serialization.Encoding.X962,
            format=serialization.PublicFormat.UncompressedPoint
        )
        
        return {
            'mlkem_ciphertext': mlkem_result['ciphertext'],
            'ecdh_ephemeral_public': ephemeral_public_bytes.hex(),
            'combined_session_keys': combined_secret,
            'algorithm': 'Hybrid-ECDH-P256-ML-KEM-768',
            'context': context
        }
    
    def _combine_shared_secrets(
        self,
        mlkem_secret: str,
        ecdh_secret: bytes,
        context: bytes
    ) -> Dict:
        """Securely combine classical and post-quantum secrets"""
        
        # Convert ML-KEM secret back to bytes
        mlkem_bytes = bytes.fromhex(mlkem_secret)
        
        # Combine secrets using key combiner
        combiner_input = (
            b"HybridKeyCombiner-v1:" +
            context + b":" +
            len(mlkem_bytes).to_bytes(2, 'big') + mlkem_bytes +
            len(ecdh_secret).to_bytes(2, 'big') + ecdh_secret
        )
        
        # Derive combined keys
        combined_material = hashlib.shake_256(combiner_input).digest(128)
        
        return {
            'encryption_key': combined_material[0:32].hex(),
            'authentication_key': combined_material[32:64].hex(),
            'key_confirmation': combined_material[64:96].hex(),
            'export_key': combined_material[96:128].hex()
        }

TLS Integration Pattern

/// TLS 1.3 style key schedule with ML-KEM
pub struct TlsKeySchedule {
    transcript_hash: Vec<u8>,
    early_secret: [u8; 32],
    handshake_secret: [u8; 32],
    master_secret: [u8; 32],
}

impl TlsKeySchedule {
    pub fn new() -> Self {
        Self {
            transcript_hash: Vec::new(),
            early_secret: [0u8; 32],
            handshake_secret: [0u8; 32],
            master_secret: [0u8; 32],
        }
    }
    
    pub fn update_transcript(&mut self, data: &[u8]) {
        use sha3::{Sha3_256, Digest};
        
        let mut hasher = Sha3_256::new();
        hasher.update(&self.transcript_hash);
        hasher.update(data);
        self.transcript_hash = hasher.finalize().to_vec();
    }
    
    pub fn derive_handshake_secret(&mut self, mlkem_shared_secret: &[u8; 32]) {
        // HKDF-Extract(early_secret, ML-KEM shared secret)
        let hkdf = hkdf::Hkdf::<Sha3_256>::new(
            Some(&self.early_secret),
            mlkem_shared_secret
        );
        
        hkdf.expand(b"handshake secret", &mut self.handshake_secret)
            .expect("handshake secret derivation failed");
    }
    
    pub fn derive_application_keys(&self) -> ApplicationKeys {
        let hkdf = hkdf::Hkdf::<Sha3_256>::new(
            Some(&self.master_secret),
            &[0u8; 32] // No input keying material
        );
        
        let mut client_write_key = [0u8; 32];
        let mut server_write_key = [0u8; 32];
        let mut client_write_iv = [0u8; 12];
        let mut server_write_iv = [0u8; 12];
        
        hkdf.expand(b"client write key", &mut client_write_key).unwrap();
        hkdf.expand(b"server write key", &mut server_write_key).unwrap();
        hkdf.expand(b"client write iv", &mut client_write_iv).unwrap();
        hkdf.expand(b"server write iv", &mut server_write_iv).unwrap();
        
        ApplicationKeys {
            client_write_key,
            server_write_key,
            client_write_iv,
            server_write_iv,
        }
    }
}

#[derive(Debug, Zeroize)]
#[zeroize(drop)]
pub struct ApplicationKeys {
    pub client_write_key: [u8; 32],
    pub server_write_key: [u8; 32],
    pub client_write_iv: [u8; 12],
    pub server_write_iv: [u8; 12],
}

Common Integration Patterns

VPN Key Exchange

def mlkem_vpn_key_exchange():
    """ML-KEM for VPN key establishment"""
    
    class PostQuantumVPN:
        def __init__(self, server_mlkem_keypair):
            self.server_keypair = server_mlkem_keypair
            self.active_sessions = {}
            
        def handle_client_hello(self, client_hello: dict) -> dict:
            """Handle client hello with ML-KEM public key"""
            
            client_mlkem_public = bytes.fromhex(client_hello['mlkem_public_key'])
            session_id = client_hello['session_id']
            
            # Encapsulate shared secret for client
            encapsulator = MLKEMEncapsulator(SecureMLKEM768())
            encap_result = encapsulator.encapsulate_with_context(
                client_mlkem_public,
                f"VPN-Session-{session_id}"
            )
            
            # Store session keys
            self.active_sessions[session_id] = {
                'session_keys': encap_result['session_keys'],
                'established_at': time.time(),
                'client_public_key': client_mlkem_public
            }
            
            return {
                'session_id': session_id,
                'mlkem_ciphertext': encap_result['ciphertext'],
                'server_certificate': self._get_server_certificate(),
                'algorithm': 'ML-KEM-768-VPN'
            }

Performance Considerations

Operation Time (ms) Notes
Key Generation 0.8-1.2 Polynomial sampling
Encapsulation 0.9-1.3 Fresh randomness required
Decapsulation 0.8-1.1 Faster than encapsulation
Public Key 1184 bytes Moderate size
Private Key 2400 bytes Larger than classical
Ciphertext 1088 bytes Per encapsulation

Optimization Strategies

Security Auditing

Verification Checklist

Common Vulnerabilities

# AUDIT: Look for these patterns

# ❌ Weak randomness
import random
entropy = random.getrandbits(256)  # NOT secure!

# ❌ Reusing encapsulation randomness
ct1, ss1 = mlkem768_encapsulate(pk, entropy)
ct2, ss2 = mlkem768_encapsulate(pk, entropy)  # BROKEN!

# ❌ No ciphertext validation
shared_secret = mlkem768_decapsulate(sk, ciphertext)  # Always trusts input

# ❌ Direct shared secret usage
encryption_key = shared_secret  # Should use KDF

# ❌ No replay protection
# Accepting same ciphertext multiple times

Security Analysis

Threat Model: ML-KEM-768 Threat Model

The comprehensive threat analysis covers:

For complete security analysis and risk assessment, see the dedicated threat model documentation.

References

  1. NIST PQC Standardization - ML-KEM
  2. FIPS 203 - ML-KEM Standard
  3. Kyber Original Paper

Support

Security Issues: security@metamui.id
Documentation Updates: phantom@metamui.id
Vulnerability Disclosure: See SECURITY.md


Document Version: 1.0
Review Cycle: Quarterly
Next Review: 2025-04-05
Classification: PUBLIC