ML-KEM-768 Security-Focused API Documentation
Version: 1.0
Last Updated: 2025-07-05
**Security Classification: PUBLIC
Author: Phantom (phantom@metamui.id)
Overview
ML-KEM-768 (Module Lattice-Based Key Encapsulation Mechanism) is a post-quantum key encapsulation mechanism standardized by NIST. Based on the Module Learning With Errors (M-LWE) problem, ML-KEM-768 provides NIST security level 3 (equivalent to 192-bit classical security) and is designed to replace RSA and ECDH key exchange in a post-quantum world.
Security Level: NIST Level 3 (192-bit classical, ~2^96 quantum)
Public Key Size: 1184 bytes
Private Key Size: 2400 bytes
Ciphertext Size: 1088 bytes
Shared Secret Size: 32 bytes
Algorithm Family: Module-LWE lattice-based
Security Warnings ⚠️
- Post-Quantum Security: Designed specifically for quantum-resistant key exchange
- Key Reuse: Public keys can be reused, but each encapsulation is unique
- Implementation Security: Requires protection against side-channel attacks
- Hybrid Deployment: Often used alongside classical KEMs for transition
- Ciphertext Integrity: No built-in authentication - use with AEAD
API Functions
mlkem768_keygen() -> (PublicKey[1184], PrivateKey[2400])
Security Contract:
- Preconditions:
- System CSPRNG must be available
- Sufficient entropy for polynomial sampling
- Implementation protects against side-channels
- Postconditions:
- Returns mathematically valid key pair
- Public key can be safely shared
- Private key enables decapsulation
Attack Resistance: | Attack Type | Protected | Notes | |————-|———–|——-| | Quantum (Shor’s) | ✅ | Lattice-based security | | Quantum (Grover’s) | ✅ | 192-bit security maintained | | Classical Lattice | ✅ | M-LWE hardness assumption | | Side-Channel | ⚠️ | Implementation dependent | | Key Recovery | ✅ | Provable lattice security | | CCA Attacks | ✅ | Fujisaki-Okamoto transform |
Security Requirements:
- Use cryptographically secure randomness
- Protect key generation from observation
- Implement proper polynomial sampling
- Secure storage of private keys
Secure Usage Example:
import secrets
import hashlib
import time
import json
from typing import Tuple, Dict, Optional, List
from dataclasses import dataclass
@dataclass
class MLKEMKeyPair:
public_key: bytes
private_key: bytes
key_id: str
generated_at: float
security_level: int = 768
class SecureMLKEM768:
"""Secure ML-KEM-768 implementation with best practices"""
def __init__(self):
self.key_pairs = {}
self.encapsulation_history = {}
def generate_keypair_with_metadata(
self,
key_id: Optional[str] = None,
additional_entropy: Optional[bytes] = None
) -> Dict:
"""Generate ML-KEM-768 key pair with comprehensive metadata"""
if key_id is None:
key_id = self._generate_key_id()
# Gather entropy from multiple sources
entropy_pool = bytearray()
# System CSPRNG (primary source)
entropy_pool.extend(secrets.token_bytes(64))
# High-resolution timing entropy
timing_entropy = str(time.perf_counter_ns()).encode()
entropy_pool.extend(hashlib.sha256(timing_entropy).digest())
# Process and thread identifiers
import os, threading
process_info = f"{os.getpid()}:{threading.get_ident()}:{time.time()}".encode()
entropy_pool.extend(hashlib.sha256(process_info).digest())
# Additional entropy if provided
if additional_entropy:
entropy_pool.extend(additional_entropy[:64])
# Generate deterministic seed from entropy pool
final_seed = hashlib.shake_256(
b"ML-KEM-768-KeyGen-v1:" + bytes(entropy_pool)
).digest(64)
# Generate key pair using seed
public_key, private_key = self._mlkem768_keygen_from_seed(final_seed)
# Create key pair object
key_pair = MLKEMKeyPair(
public_key=public_key,
private_key=private_key,
key_id=key_id,
generated_at=time.time()
)
# Store key pair
self.key_pairs[key_id] = key_pair
# Clear sensitive data
for i in range(len(entropy_pool)):
entropy_pool[i] = 0
for i in range(len(final_seed)):
final_seed[i] = 0
# Return metadata
return {
'key_id': key_id,
'public_key': public_key.hex(),
'public_key_hash': hashlib.sha256(public_key).hexdigest()[:16],
'algorithm': 'ML-KEM-768',
'security_level': 768,
'generated_at': key_pair.generated_at,
'key_size_info': {
'public_key_bytes': len(public_key),
'private_key_bytes': len(private_key),
'ciphertext_bytes': 1088,
'shared_secret_bytes': 32
}
}
def _mlkem768_keygen_from_seed(self, seed: bytes) -> Tuple[bytes, bytes]:
"""Generate ML-KEM-768 keys from seed (implementation placeholder)"""
# This would call the actual ML-KEM-768 implementation
# Implementation must include:
# - Proper polynomial sampling from seed
# - Matrix generation
# - Error polynomial sampling
# - Fujisaki-Okamoto transform
from mlkem_crypto import mlkem768_keygen_seeded
return mlkem768_keygen_seeded(seed)
def _generate_key_id(self) -> str:
"""Generate unique key identifier"""
entropy = secrets.token_bytes(32)
timestamp = int(time.time() * 1000000) # microseconds
id_input = entropy + timestamp.to_bytes(8, 'big')
key_id = hashlib.sha256(id_input).hexdigest()[:16]
return f"mlkem768-{key_id}"
def get_public_key_info(self, key_id: str) -> Optional[Dict]:
"""Get public key information for sharing"""
if key_id not in self.key_pairs:
return None
key_pair = self.key_pairs[key_id]
return {
'key_id': key_id,
'public_key': key_pair.public_key.hex(),
'algorithm': 'ML-KEM-768',
'security_level': 768,
'key_format': 'raw_bytes',
'usage': 'key_encapsulation',
'generated_at': key_pair.generated_at
}
def export_public_key_der(self, key_id: str) -> Optional[bytes]:
"""Export public key in DER format"""
if key_id not in self.key_pairs:
return None
public_key = self.key_pairs[key_id].public_key
# DER encoding for ML-KEM-768 public key
# (This would implement actual DER encoding)
return self._encode_mlkem768_public_key_der(public_key)
def should_rotate_key(self, key_id: str, max_age_days: int = 365) -> bool:
"""Check if key should be rotated"""
if key_id not in self.key_pairs:
return True
key_pair = self.key_pairs[key_id]
age_seconds = time.time() - key_pair.generated_at
age_days = age_seconds / (24 * 3600)
return age_days > max_age_days
# SECURE: Key encapsulation with metadata
class MLKEMEncapsulator:
def __init__(self, mlkem: SecureMLKEM768):
self.mlkem = mlkem
self.encapsulation_counter = 0
def encapsulate_with_context(
self,
peer_public_key: bytes,
context: str,
session_info: Optional[Dict] = None
) -> Dict:
"""Encapsulate shared secret with context information"""
# Validate public key
if len(peer_public_key) != 1184:
raise ValueError("Invalid ML-KEM-768 public key size")
if not self._validate_public_key_structure(peer_public_key):
raise ValueError("Invalid ML-KEM-768 public key structure")
# Build encapsulation metadata
encap_metadata = {
'context': context,
'encapsulator_id': self._get_encapsulator_id(),
'sequence_number': self.encapsulation_counter,
'timestamp': int(time.time()),
'session_info': session_info or {}
}
# Add metadata to randomness (for domain separation)
metadata_bytes = json.dumps(encap_metadata, sort_keys=True).encode()
metadata_hash = hashlib.sha256(metadata_bytes).digest()
# Generate additional entropy for this encapsulation
encap_entropy = secrets.token_bytes(32)
# Combine entropy sources
combined_entropy = hashlib.shake_256(
b"ML-KEM-768-Encap:" + metadata_hash + encap_entropy
).digest(32)
# Perform encapsulation
ciphertext, shared_secret = self._mlkem768_encapsulate(
peer_public_key, combined_entropy
)
# Derive session keys from shared secret
session_keys = self._derive_session_keys(
shared_secret, metadata_bytes
)
self.encapsulation_counter += 1
# Clear sensitive data
for i in range(len(combined_entropy)):
combined_entropy[i] = 0
return {
'ciphertext': ciphertext.hex(),
'session_keys': session_keys,
'metadata': encap_metadata,
'algorithm': 'ML-KEM-768'
}
def _mlkem768_encapsulate(
self, public_key: bytes, entropy: bytes
) -> Tuple[bytes, bytes]:
"""Core ML-KEM-768 encapsulation"""
from mlkem_crypto import mlkem768_encapsulate
return mlkem768_encapsulate(public_key, entropy)
def _derive_session_keys(
self, shared_secret: bytes, context: bytes
) -> Dict:
"""Derive multiple session keys from shared secret"""
# Use HKDF to derive multiple keys
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
# Extract
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=96, # 3 × 32-byte keys
salt=b"ML-KEM-768-SessionKeys-v1",
info=context
)
key_material = hkdf.derive(shared_secret)
return {
'encryption_key': key_material[0:32].hex(),
'authentication_key': key_material[32:64].hex(),
'confirmation_key': key_material[64:96].hex()
}
def _validate_public_key_structure(self, public_key: bytes) -> bool:
"""Validate ML-KEM-768 public key structure"""
# Implementation would check:
# - Polynomial coefficients are in valid range
# - Matrix structure is correct
# - Public key follows ML-KEM specification
return True # Placeholder
def _get_encapsulator_id(self) -> str:
"""Get unique encapsulator identifier"""
import socket
hostname = socket.gethostname()
process_id = os.getpid()
id_data = f"{hostname}:{process_id}:{time.time()}".encode()
return hashlib.sha256(id_data).hexdigest()[:16]
# SECURE: Key decapsulation with validation
class MLKEMDecapsulator:
def __init__(self, mlkem: SecureMLKEM768):
self.mlkem = mlkem
self.decapsulation_history = {}
def decapsulate_with_validation(
self,
key_id: str,
encapsulation_data: Dict
) -> Optional[Dict]:
"""Decapsulate and validate shared secret"""
if key_id not in self.mlkem.key_pairs:
return None
key_pair = self.mlkem.key_pairs[key_id]
# Extract encapsulation data
ciphertext = bytes.fromhex(encapsulation_data['ciphertext'])
metadata = encapsulation_data['metadata']
# Validate ciphertext size
if len(ciphertext) != 1088:
return None
# Check replay protection
if self._is_replay_attack(ciphertext, metadata):
return None
# Perform decapsulation
shared_secret = self._mlkem768_decapsulate(
key_pair.private_key, ciphertext
)
if shared_secret is None:
return None
# Derive session keys with same context
metadata_bytes = json.dumps(metadata, sort_keys=True).encode()
session_keys = self._derive_session_keys(shared_secret, metadata_bytes)
# Record decapsulation for replay protection
self._record_decapsulation(ciphertext, metadata)
return {
'shared_secret': shared_secret.hex(),
'session_keys': session_keys,
'metadata': metadata,
'decapsulated_at': time.time()
}
def _mlkem768_decapsulate(
self, private_key: bytes, ciphertext: bytes
) -> Optional[bytes]:
"""Core ML-KEM-768 decapsulation"""
from mlkem_crypto import mlkem768_decapsulate
try:
return mlkem768_decapsulate(private_key, ciphertext)
except Exception:
return None
def _is_replay_attack(self, ciphertext: bytes, metadata: Dict) -> bool:
"""Check for replay attacks"""
ciphertext_hash = hashlib.sha256(ciphertext).hexdigest()
if ciphertext_hash in self.decapsulation_history:
# Check if this is a replay within time window
previous_time = self.decapsulation_history[ciphertext_hash]
if time.time() - previous_time < 3600: # 1 hour window
return True
return False
def _record_decapsulation(self, ciphertext: bytes, metadata: Dict):
"""Record decapsulation for replay protection"""
ciphertext_hash = hashlib.sha256(ciphertext).hexdigest()
self.decapsulation_history[ciphertext_hash] = time.time()
# Clean old entries (older than 24 hours)
cutoff_time = time.time() - (24 * 3600)
self.decapsulation_history = {
h: t for h, t in self.decapsulation_history.items()
if t > cutoff_time
}
def _derive_session_keys(
self, shared_secret: bytes, context: bytes
) -> Dict:
"""Derive session keys (same as encapsulator)"""
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
hkdf = HKDF(
algorithm=hashes.SHA256(),
length=96,
salt=b"ML-KEM-768-SessionKeys-v1",
info=context
)
key_material = hkdf.derive(shared_secret)
return {
'encryption_key': key_material[0:32].hex(),
'authentication_key': key_material[32:64].hex(),
'confirmation_key': key_material[64:96].hex()
}
Common Mistakes:
# INSECURE: Weak randomness for key generation
import random
seed = random.getrandbits(256) # NOT cryptographically secure!
# INSECURE: Reusing randomness for encapsulation
entropy = secrets.token_bytes(32)
ct1, ss1 = mlkem768_encapsulate(pk, entropy)
ct2, ss2 = mlkem768_encapsulate(pk, entropy) # SAME ENTROPY!
# INSECURE: No ciphertext validation
shared_secret = mlkem768_decapsulate(sk, ciphertext) # No error checking
# INSECURE: Using shared secret directly
key = shared_secret # Should derive keys with KDF
# INSECURE: No replay protection
# Accepting same ciphertext multiple times
mlkem768_encapsulate(public_key: bytes[1184]) -> (ciphertext: bytes[1088], shared_secret: bytes[32])
Security Contract:
- Preconditions:
public_keymust be valid ML-KEM-768 public key- Implementation uses fresh randomness
- Public key format validated
- Postconditions:
- Returns ciphertext and shared secret
- Each call produces different ciphertext
- Shared secret is computationally indistinguishable from random
Attack Resistance: | Attack Type | Protected | Notes | |————-|———–|——-| | CCA Security | ✅ | Fujisaki-Okamoto transform | | Quantum Attacks | ✅ | Lattice-based security | | Key Recovery | ✅ | M-LWE hardness | | Ciphertext Malleability | ✅ | CCA security prevents | | Replay | ❌ | Application must handle |
Security Requirements:
- Use fresh randomness for each encapsulation
- Validate public key format
- Implement side-channel protections
- Derive session keys from shared secret
Secure Usage Example:
use ml_kem::{MlKem768, PublicKey, EncapsulatedSecret};
use rand::{RngCore, thread_rng};
use sha3::{Sha3_256, Digest};
use zeroize::Zeroize;
/// Secure ML-KEM-768 encapsulation
pub struct SecureKemEncapsulator {
rng: Box<dyn RngCore + Send>,
session_counter: u64,
}
impl SecureKemEncapsulator {
pub fn new() -> Self {
Self {
rng: Box::new(thread_rng()),
session_counter: 0,
}
}
pub fn encapsulate_with_context(
&mut self,
public_key: &PublicKey,
context: &str,
) -> Result<EncapsulationResult, Error> {
// Validate public key
if !self.validate_public_key(public_key) {
return Err(Error::InvalidPublicKey);
}
// Build context for domain separation
let mut context_builder = ContextBuilder::new();
context_builder.add_field("protocol", "ML-KEM-768");
context_builder.add_field("context", context);
context_builder.add_field("session", &self.session_counter.to_string());
context_builder.add_field("timestamp", &chrono::Utc::now().timestamp().to_string());
let context_bytes = context_builder.finalize();
// Generate encapsulation with context
let (ciphertext, shared_secret) = MlKem768::encapsulate(
public_key,
&mut self.rng
)?;
// Derive session keys
let session_keys = self.derive_session_keys(&shared_secret, &context_bytes)?;
self.session_counter += 1;
Ok(EncapsulationResult {
ciphertext,
session_keys,
context: context_bytes,
})
}
pub fn batch_encapsulate(
&mut self,
public_keys: &[PublicKey],
shared_context: &str,
) -> Result<Vec<EncapsulationResult>, Error> {
let mut results = Vec::with_capacity(public_keys.len());
for (i, public_key) in public_keys.iter().enumerate() {
let context = format!("{}:batch_item:{}", shared_context, i);
let result = self.encapsulate_with_context(public_key, &context)?;
results.push(result);
}
Ok(results)
}
fn validate_public_key(&self, public_key: &PublicKey) -> bool {
// Implement ML-KEM-768 public key validation
// - Check polynomial coefficients are in range
// - Verify matrix structure
// - Validate format compliance
true // Placeholder
}
fn derive_session_keys(
&self,
shared_secret: &[u8; 32],
context: &[u8],
) -> Result<SessionKeys, Error> {
// Use HKDF for key derivation
let hkdf = hkdf::Hkdf::<Sha3_256>::new(
Some(b"ML-KEM-768-v1"),
shared_secret
);
let mut encryption_key = [0u8; 32];
let mut authentication_key = [0u8; 32];
let mut confirmation_key = [0u8; 32];
hkdf.expand(b"encryption", &mut encryption_key)
.map_err(|_| Error::KeyDerivationFailed)?;
hkdf.expand(b"authentication", &mut authentication_key)
.map_err(|_| Error::KeyDerivationFailed)?;
hkdf.expand(b"confirmation", &mut confirmation_key)
.map_err(|_| Error::KeyDerivationFailed)?;
Ok(SessionKeys {
encryption_key,
authentication_key,
confirmation_key,
})
}
}
#[derive(Debug)]
pub struct EncapsulationResult {
pub ciphertext: EncapsulatedSecret,
pub session_keys: SessionKeys,
pub context: Vec<u8>,
}
#[derive(Debug, Zeroize)]
#[zeroize(drop)]
pub struct SessionKeys {
pub encryption_key: [u8; 32],
pub authentication_key: [u8; 32],
pub confirmation_key: [u8; 32],
}
/// Context builder for domain separation
pub struct ContextBuilder {
fields: Vec<(String, String)>,
}
impl ContextBuilder {
pub fn new() -> Self {
Self { fields: Vec::new() }
}
pub fn add_field(&mut self, name: &str, value: &str) {
self.fields.push((name.to_string(), value.to_string()));
}
pub fn finalize(mut self) -> Vec<u8> {
// Sort fields for canonical representation
self.fields.sort_by(|a, b| a.0.cmp(&b.0));
let mut context = String::new();
for (name, value) in &self.fields {
context.push_str(&format!("{}:{}\n", name, value));
}
context.into_bytes()
}
}
mlkem768_decapsulate(private_key: bytes[2400], ciphertext: bytes[1088]) -> Optional[shared_secret: bytes[32]]
Security Contract:
- Preconditions:
private_keymust be valid ML-KEM-768 private keyciphertextshould be well-formed- Implementation validates inputs
- Postconditions:
- Returns shared secret if ciphertext is valid
- Returns None/error for invalid ciphertext
- Constant-time operation preferred
Security Requirements:
- Validate ciphertext format before processing
- Implement constant-time operations
- Protect private key from side-channels
- Handle decapsulation failures securely
Secure Usage Example:
class SecureMLKEMDecapsulator:
"""Secure ML-KEM-768 decapsulation with protections"""
def __init__(self, private_key: bytes):
if len(private_key) != 2400:
raise ValueError("Invalid ML-KEM-768 private key size")
self.private_key = private_key
self.decapsulation_cache = {}
self.failure_count = 0
self.max_failures = 1000
def decapsulate_secure(
self,
ciphertext: bytes,
context: bytes = b"",
expected_session_id: Optional[str] = None
) -> Optional[Dict]:
"""Secure decapsulation with validation"""
# Validate ciphertext format
if not self._validate_ciphertext_format(ciphertext):
self._handle_decapsulation_failure()
return None
# Check for excessive failures (DoS protection)
if self.failure_count > self.max_failures:
raise RuntimeError("Too many decapsulation failures")
# Check decapsulation cache for replay detection
ciphertext_hash = hashlib.sha256(ciphertext).hexdigest()
if ciphertext_hash in self.decapsulation_cache:
cache_entry = self.decapsulation_cache[ciphertext_hash]
if time.time() - cache_entry['timestamp'] < 3600: # 1 hour
return None # Potential replay attack
# Perform decapsulation
try:
shared_secret = self._mlkem768_decapsulate_core(
self.private_key, ciphertext
)
if shared_secret is None:
self._handle_decapsulation_failure()
return None
# Derive keys with context
session_keys = self._derive_contextual_keys(shared_secret, context)
# Cache successful decapsulation
self.decapsulation_cache[ciphertext_hash] = {
'timestamp': time.time(),
'success': True
}
# Clean cache periodically
if len(self.decapsulation_cache) > 10000:
self._clean_cache()
result = {
'shared_secret': shared_secret.hex(),
'session_keys': session_keys,
'context': context.hex() if context else "",
'decapsulated_at': time.time()
}
# Clear shared secret from memory
for i in range(len(shared_secret)):
shared_secret[i] = 0
return result
except Exception as e:
self._handle_decapsulation_failure()
return None
def _validate_ciphertext_format(self, ciphertext: bytes) -> bool:
"""Validate ML-KEM-768 ciphertext format"""
# Check size
if len(ciphertext) != 1088:
return False
# Additional format validation would go here
# - Check polynomial coefficient ranges
# - Verify structure constraints
# - Validate encoding
return True
def _mlkem768_decapsulate_core(
self, private_key: bytes, ciphertext: bytes
) -> Optional[bytes]:
"""Core ML-KEM-768 decapsulation implementation"""
from mlkem_crypto import mlkem768_decapsulate
try:
# Implementation should be constant-time
return mlkem768_decapsulate(private_key, ciphertext)
except Exception:
return None
def _derive_contextual_keys(
self, shared_secret: bytes, context: bytes
) -> Dict:
"""Derive keys with context binding"""
# Build key derivation input
kdf_input = b"ML-KEM-768-Keys-v1:" + context + shared_secret
# Use SHAKE256 for key derivation
key_material = hashlib.shake_256(kdf_input).digest(128) # 4 × 32-byte keys
return {
'encryption_key': key_material[0:32].hex(),
'authentication_key': key_material[32:64].hex(),
'key_confirmation': key_material[64:96].hex(),
'export_key': key_material[96:128].hex()
}
def _handle_decapsulation_failure(self):
"""Handle decapsulation failure securely"""
self.failure_count += 1
# Add delay to prevent timing attacks
import time
time.sleep(0.001) # 1ms delay
# Log failure for monitoring (in real implementation)
# logger.warning(f"ML-KEM decapsulation failure #{self.failure_count}")
def _clean_cache(self):
"""Clean old entries from decapsulation cache"""
current_time = time.time()
cutoff_time = current_time - 3600 # 1 hour
self.decapsulation_cache = {
h: entry for h, entry in self.decapsulation_cache.items()
if entry['timestamp'] > cutoff_time
}
Security Best Practices
Hybrid Classical/Post-Quantum Key Exchange
class HybridKeyExchange:
"""Hybrid classical (ECDH) + post-quantum (ML-KEM) key exchange"""
def __init__(self):
self.mlkem = SecureMLKEM768()
self.classical_keys = {}
def create_hybrid_keypair(self, key_id: str) -> Dict:
"""Create hybrid key pair"""
# Generate ML-KEM-768 key pair
mlkem_keys = self.mlkem.generate_keypair_with_metadata(
key_id=f"{key_id}-mlkem"
)
# Generate ECDH key pair (P-256)
from cryptography.hazmat.primitives.asymmetric import ec
private_key = ec.generate_private_key(ec.SECP256R1())
public_key = private_key.public_key()
# Store classical keys
self.classical_keys[key_id] = {
'private_key': private_key,
'public_key': public_key
}
# Export public keys
classical_public_bytes = public_key.public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint
)
return {
'key_id': key_id,
'mlkem_public_key': mlkem_keys['public_key'],
'classical_public_key': classical_public_bytes.hex(),
'algorithm': 'Hybrid-ECDH-P256-ML-KEM-768'
}
def hybrid_encapsulate(
self,
peer_mlkem_public_key: bytes,
peer_classical_public_key: bytes,
context: str
) -> Dict:
"""Perform hybrid key encapsulation"""
# ML-KEM-768 encapsulation
encapsulator = MLKEMEncapsulator(self.mlkem)
mlkem_result = encapsulator.encapsulate_with_context(
peer_mlkem_public_key,
f"Hybrid-{context}"
)
# ECDH key exchange
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives import serialization
# Generate ephemeral ECDH key
ephemeral_private = ec.generate_private_key(ec.SECP256R1())
ephemeral_public = ephemeral_private.public_key()
# Load peer's classical public key
peer_public = ec.EllipticCurvePublicKey.from_encoded_point(
ec.SECP256R1(), peer_classical_public_key
)
# Perform ECDH
ecdh_shared = ephemeral_private.exchange(
ec.ECDH(), peer_public
)
# Combine shared secrets
combined_secret = self._combine_shared_secrets(
mlkem_result['session_keys']['encryption_key'],
ecdh_shared,
context.encode()
)
# Export ephemeral public key
ephemeral_public_bytes = ephemeral_public.public_bytes(
encoding=serialization.Encoding.X962,
format=serialization.PublicFormat.UncompressedPoint
)
return {
'mlkem_ciphertext': mlkem_result['ciphertext'],
'ecdh_ephemeral_public': ephemeral_public_bytes.hex(),
'combined_session_keys': combined_secret,
'algorithm': 'Hybrid-ECDH-P256-ML-KEM-768',
'context': context
}
def _combine_shared_secrets(
self,
mlkem_secret: str,
ecdh_secret: bytes,
context: bytes
) -> Dict:
"""Securely combine classical and post-quantum secrets"""
# Convert ML-KEM secret back to bytes
mlkem_bytes = bytes.fromhex(mlkem_secret)
# Combine secrets using key combiner
combiner_input = (
b"HybridKeyCombiner-v1:" +
context + b":" +
len(mlkem_bytes).to_bytes(2, 'big') + mlkem_bytes +
len(ecdh_secret).to_bytes(2, 'big') + ecdh_secret
)
# Derive combined keys
combined_material = hashlib.shake_256(combiner_input).digest(128)
return {
'encryption_key': combined_material[0:32].hex(),
'authentication_key': combined_material[32:64].hex(),
'key_confirmation': combined_material[64:96].hex(),
'export_key': combined_material[96:128].hex()
}
TLS Integration Pattern
/// TLS 1.3 style key schedule with ML-KEM
pub struct TlsKeySchedule {
transcript_hash: Vec<u8>,
early_secret: [u8; 32],
handshake_secret: [u8; 32],
master_secret: [u8; 32],
}
impl TlsKeySchedule {
pub fn new() -> Self {
Self {
transcript_hash: Vec::new(),
early_secret: [0u8; 32],
handshake_secret: [0u8; 32],
master_secret: [0u8; 32],
}
}
pub fn update_transcript(&mut self, data: &[u8]) {
use sha3::{Sha3_256, Digest};
let mut hasher = Sha3_256::new();
hasher.update(&self.transcript_hash);
hasher.update(data);
self.transcript_hash = hasher.finalize().to_vec();
}
pub fn derive_handshake_secret(&mut self, mlkem_shared_secret: &[u8; 32]) {
// HKDF-Extract(early_secret, ML-KEM shared secret)
let hkdf = hkdf::Hkdf::<Sha3_256>::new(
Some(&self.early_secret),
mlkem_shared_secret
);
hkdf.expand(b"handshake secret", &mut self.handshake_secret)
.expect("handshake secret derivation failed");
}
pub fn derive_application_keys(&self) -> ApplicationKeys {
let hkdf = hkdf::Hkdf::<Sha3_256>::new(
Some(&self.master_secret),
&[0u8; 32] // No input keying material
);
let mut client_write_key = [0u8; 32];
let mut server_write_key = [0u8; 32];
let mut client_write_iv = [0u8; 12];
let mut server_write_iv = [0u8; 12];
hkdf.expand(b"client write key", &mut client_write_key).unwrap();
hkdf.expand(b"server write key", &mut server_write_key).unwrap();
hkdf.expand(b"client write iv", &mut client_write_iv).unwrap();
hkdf.expand(b"server write iv", &mut server_write_iv).unwrap();
ApplicationKeys {
client_write_key,
server_write_key,
client_write_iv,
server_write_iv,
}
}
}
#[derive(Debug, Zeroize)]
#[zeroize(drop)]
pub struct ApplicationKeys {
pub client_write_key: [u8; 32],
pub server_write_key: [u8; 32],
pub client_write_iv: [u8; 12],
pub server_write_iv: [u8; 12],
}
Common Integration Patterns
VPN Key Exchange
def mlkem_vpn_key_exchange():
"""ML-KEM for VPN key establishment"""
class PostQuantumVPN:
def __init__(self, server_mlkem_keypair):
self.server_keypair = server_mlkem_keypair
self.active_sessions = {}
def handle_client_hello(self, client_hello: dict) -> dict:
"""Handle client hello with ML-KEM public key"""
client_mlkem_public = bytes.fromhex(client_hello['mlkem_public_key'])
session_id = client_hello['session_id']
# Encapsulate shared secret for client
encapsulator = MLKEMEncapsulator(SecureMLKEM768())
encap_result = encapsulator.encapsulate_with_context(
client_mlkem_public,
f"VPN-Session-{session_id}"
)
# Store session keys
self.active_sessions[session_id] = {
'session_keys': encap_result['session_keys'],
'established_at': time.time(),
'client_public_key': client_mlkem_public
}
return {
'session_id': session_id,
'mlkem_ciphertext': encap_result['ciphertext'],
'server_certificate': self._get_server_certificate(),
'algorithm': 'ML-KEM-768-VPN'
}
Performance Considerations
| Operation | Time (ms) | Notes |
|---|---|---|
| Key Generation | 0.8-1.2 | Polynomial sampling |
| Encapsulation | 0.9-1.3 | Fresh randomness required |
| Decapsulation | 0.8-1.1 | Faster than encapsulation |
| Public Key | 1184 bytes | Moderate size |
| Private Key | 2400 bytes | Larger than classical |
| Ciphertext | 1088 bytes | Per encapsulation |
Optimization Strategies
- Pre-generate key pairs for batch operations
- Use hardware acceleration where available
- Implement efficient polynomial arithmetic
- Cache public key validations
Security Auditing
Verification Checklist
- Using cryptographically secure randomness
- Validating all input parameters
- Implementing proper error handling
- Protecting against side-channel attacks
- Adding replay protection for ciphertexts
- Deriving session keys properly
- Implementing constant-time operations
Common Vulnerabilities
# AUDIT: Look for these patterns
# ❌ Weak randomness
import random
entropy = random.getrandbits(256) # NOT secure!
# ❌ Reusing encapsulation randomness
ct1, ss1 = mlkem768_encapsulate(pk, entropy)
ct2, ss2 = mlkem768_encapsulate(pk, entropy) # BROKEN!
# ❌ No ciphertext validation
shared_secret = mlkem768_decapsulate(sk, ciphertext) # Always trusts input
# ❌ Direct shared secret usage
encryption_key = shared_secret # Should use KDF
# ❌ No replay protection
# Accepting same ciphertext multiple times
Security Analysis
Threat Model: ML-KEM-768 Threat Model
The comprehensive threat analysis covers:
- Algorithm-specific attack vectors
- Implementation vulnerabilities
- Side-channel considerations
- Quantum resistance analysis (where applicable)
- Deployment recommendations
For complete security analysis and risk assessment, see the dedicated threat model documentation.
References
Support
Security Issues: security@metamui.id
Documentation Updates: phantom@metamui.id
Vulnerability Disclosure: See SECURITY.md
Document Version: 1.0
Review Cycle: Quarterly
Next Review: 2025-04-05
Classification: PUBLIC