Post-Quantum Cryptography Implementation Security Guide
Version: 1.0
Last Updated: 2025-08-28
Security Classification: PUBLIC
Author: MetaMUI Security Team
Executive Summary
This guide provides comprehensive security guidance for implementing post-quantum cryptographic algorithms across multiple programming languages and platforms. It addresses common security pitfalls, cross-language consistency requirements, and deployment best practices for transitioning to quantum-resistant cryptography.
Table of Contents
- Security Foundations
- Cross-Language Security Patterns
- Algorithm-Specific Security
- Implementation Security
- Deployment Security
- Testing and Validation
- Migration Strategies
Security Foundations
Threat Model for PQC
class PQCThreatModel:
"""Comprehensive threat model for PQC implementations"""
quantum_threats = {
'shors_algorithm': 'Breaks RSA, ECDSA, DH',
'grovers_algorithm': 'Weakens symmetric crypto',
'quantum_period_finding': 'Attacks on discrete log',
'quantum_fourier_transform': 'Speedup for certain problems'
}
classical_threats = {
'side_channel': 'Timing, power, EM attacks',
'fault_injection': 'Hardware manipulation',
'implementation_bugs': 'Coding errors',
'protocol_attacks': 'Misuse of primitives',
'cryptanalysis': 'Mathematical advances'
}
hybrid_threats = {
'algorithm_confusion': 'Mixing incompatible schemes',
'downgrade_attacks': 'Forcing weaker algorithms',
'state_management': 'Stateful vs stateless confusion',
'key_management': 'Complex multi-key scenarios'
}
Security Levels and Parameter Selection
| NIST Level | Classical Security | Quantum Security | Recommended Algorithms |
|---|---|---|---|
| Level 1 | 128-bit | 64-bit | ML-KEM-512, Dilithium2 |
| Level 3 | 192-bit | 96-bit | ML-KEM-768, Dilithium3 |
| Level 5 | 256-bit | 128-bit | ML-KEM-1024, Dilithium5 |
Cross-Language Security Patterns
Memory Security
Python Implementation:
import ctypes
import sys
class SecureMemory:
"""Cross-platform secure memory handling"""
@staticmethod
def secure_zero(data: bytearray):
"""Securely overwrite memory"""
# Multiple overwrite passes
for _ in range(3):
for i in range(len(data)):
data[i] = 0xFF
for i in range(len(data)):
data[i] = 0x00
# Force memory write
ctypes.memset(ctypes.addressof(data), 0, len(data))
# Prevent optimization
if sys.getsizeof(data) > 0:
pass
Rust Implementation:
use zeroize::{Zeroize, ZeroizeOnDrop};
use std::ptr::write_volatile;
#[derive(ZeroizeOnDrop)]
pub struct SecureBuffer {
data: Vec<u8>,
}
impl SecureBuffer {
pub fn secure_clear(&mut self) {
// Use volatile writes to prevent optimization
for byte in self.data.iter_mut() {
unsafe {
write_volatile(byte, 0);
}
}
// Additional zeroization
self.data.zeroize();
}
}
TypeScript Implementation:
class SecureMemory {
static secureClear(buffer: Uint8Array): void {
// Crypto.getRandomValues to overwrite
crypto.getRandomValues(buffer);
// Multiple passes
buffer.fill(0xFF);
buffer.fill(0x00);
// Use slice to prevent optimization
const copy = buffer.slice();
if (copy.length > 0) {
buffer.fill(0);
}
}
}
Constant-Time Operations
Critical for all PQC implementations:
def constant_time_compare(a: bytes, b: bytes) -> bool:
"""Constant-time byte comparison"""
if len(a) != len(b):
return False
result = 0
for x, y in zip(a, b):
result |= x ^ y
return result == 0
def constant_time_select(condition: bool, a: int, b: int) -> int:
"""Constant-time conditional selection"""
mask = -int(condition) # 0xFFFFFFFF if true, 0x00000000 if false
return (a & mask) | (b & ~mask)
Randomness Management
import secrets
import hashlib
from typing import Optional
class SecureRandomness:
"""Secure randomness for PQC operations"""
def __init__(self, hardware_rng: Optional[callable] = None):
self.hardware_rng = hardware_rng
self.reseed_counter = 0
self.max_reseed = 10000
def get_random_bytes(self, length: int) -> bytes:
"""Get cryptographically secure random bytes"""
# Check reseed requirement
if self.reseed_counter >= self.max_reseed:
self.reseed()
# Gather entropy from multiple sources
entropy = bytearray()
# Primary: Hardware RNG if available
if self.hardware_rng:
entropy.extend(self.hardware_rng(32))
# Secondary: OS randomness
entropy.extend(secrets.token_bytes(32))
# Mix with KDF
output = hashlib.shake_256(entropy).digest(length)
# Update counter
self.reseed_counter += 1
# Clear entropy
for i in range(len(entropy)):
entropy[i] = 0
return output
def reseed(self):
"""Reseed the randomness source"""
self.reseed_counter = 0
# Additional reseeding logic
Algorithm-Specific Security
ML-KEM (Kyber) Security
class MLKEMSecurity:
"""Security wrapper for ML-KEM implementations"""
@staticmethod
def validate_ciphertext(ct: bytes, level: int) -> bool:
"""Validate ML-KEM ciphertext format"""
expected_sizes = {
512: 768,
768: 1088,
1024: 1568
}
if level not in expected_sizes:
return False
if len(ct) != expected_sizes[level]:
return False
# Additional polynomial bound checks
return MLKEMSecurity._check_polynomial_bounds(ct)
@staticmethod
def secure_encapsulation(public_key: bytes) -> tuple:
"""Secure encapsulation with validation"""
# Validate public key
if not MLKEMSecurity._validate_public_key(public_key):
raise ValueError("Invalid public key")
# Generate fresh randomness
rand = SecureRandomness().get_random_bytes(32)
# Perform encapsulation
ct, ss = mlkem_encaps(public_key, rand)
# Clear randomness
rand = bytes(32)
return ct, ss
Dilithium Security
class DilithiumSecurity:
"""Security wrapper for Dilithium signatures"""
@staticmethod
def secure_sign(message: bytes, secret_key: bytes,
deterministic: bool = False) -> bytes:
"""Secure signing with side-channel protection"""
# Add domain separation
domain = b"MetaMUI-Dilithium-Sign-v1"
augmented = domain + message
if deterministic:
# Deterministic signing for reproducibility
sig = dilithium_sign_deterministic(augmented, secret_key)
else:
# Randomized signing for additional security
rand = SecureRandomness().get_random_bytes(32)
sig = dilithium_sign_randomized(augmented, secret_key, rand)
rand = bytes(32)
return sig
@staticmethod
def batch_verify(messages: list, signatures: list,
public_keys: list) -> bool:
"""Efficient batch verification"""
if len(messages) != len(signatures) or \
len(messages) != len(public_keys):
return False
# Use randomized batching for efficiency
batch_rand = SecureRandomness().get_random_bytes(len(messages) * 16)
return dilithium_batch_verify(
messages, signatures, public_keys, batch_rand
)
Falcon Security
class FalconSecurity:
"""Security wrapper for Falcon signatures"""
@staticmethod
def secure_floating_point():
"""Configure floating-point for security"""
import sys
# Set floating-point precision
if hasattr(sys, 'float_info'):
# Ensure double precision
assert sys.float_info.mant_dig >= 53
# Disable unsafe optimizations
import os
os.environ['FALCON_NO_FMA'] = '1'
@staticmethod
def constant_time_sampling(secret_key: bytes) -> bytes:
"""Constant-time Gaussian sampling"""
# Use rejection sampling with constant-time checks
max_iterations = 1000
for i in range(max_iterations):
sample = falcon_sample_gaussian(secret_key)
# Always process all iterations
if i == max_iterations - 1:
return sample
return sample
SLH-DSA (SPHINCS+) Security
class SLHDSASecurity:
"""Security wrapper for SLH-DSA signatures"""
@staticmethod
def optimize_parameters(use_case: str) -> str:
"""Select optimal parameters for use case"""
params = {
'fast_signing': 'slhdsa-128f',
'small_signatures': 'slhdsa-128s',
'balanced': 'slhdsa-192f',
'maximum_security': 'slhdsa-256s'
}
return params.get(use_case, 'slhdsa-192f')
@staticmethod
def streaming_sign(file_path: str, secret_key: bytes) -> bytes:
"""Sign large files efficiently"""
import hashlib
# Hash file in chunks
hasher = hashlib.shake_256()
with open(file_path, 'rb') as f:
while chunk := f.read(1024 * 1024): # 1MB chunks
hasher.update(chunk)
file_hash = hasher.digest(64)
# Sign hash
return slhdsa_sign(file_hash, secret_key)
Implementation Security
Side-Channel Protection
class SideChannelProtection:
"""Comprehensive side-channel countermeasures"""
@staticmethod
def timing_protection(operation: callable) -> callable:
"""Add timing randomization"""
import time
import random
def protected_operation(*args, **kwargs):
# Random delay
delay = random.uniform(0.0001, 0.001)
time.sleep(delay)
# Execute operation
result = operation(*args, **kwargs)
# Additional random delay
time.sleep(delay)
return result
return protected_operation
@staticmethod
def power_analysis_protection(data: bytes) -> bytes:
"""Add noise for power analysis protection"""
# Generate random mask
mask = secrets.token_bytes(len(data))
# Apply mask
masked = bytes(a ^ b for a, b in zip(data, mask))
# Dummy operations
dummy = secrets.token_bytes(len(data))
_ = bytes(a ^ b for a, b in zip(dummy, mask))
# Unmask
result = bytes(a ^ b for a, b in zip(masked, mask))
return result
Error Handling
class SecureErrorHandling:
"""Secure error handling for PQC operations"""
@staticmethod
def handle_decryption_failure(operation: callable):
"""Handle decryption failures securely"""
def wrapper(*args, **kwargs):
try:
return operation(*args, **kwargs)
except DecryptionError:
# Return random value (implicit rejection)
return secrets.token_bytes(32)
except Exception as e:
# Log securely without leaking information
SecureErrorHandling._secure_log(
f"Operation failed: {type(e).__name__}"
)
# Return safe default
return None
return wrapper
@staticmethod
def _secure_log(message: str):
"""Log without leaking sensitive information"""
# Sanitize message
sanitized = message.replace('\n', ' ')
sanitized = sanitized[:200] # Limit length
# Log with timestamp
import logging
logging.info(f"[PQC] {sanitized}")
Deployment Security
Hybrid Deployment
class HybridCryptography:
"""Secure hybrid classical/PQC deployment"""
def __init__(self):
self.pqc_kem = MLKEM768()
self.classical_kem = X25519()
self.combiner = HKDF()
def hybrid_key_exchange(self, pqc_pk: bytes, classical_pk: bytes) -> tuple:
"""Perform hybrid key exchange"""
# PQC key exchange
pqc_ct, pqc_ss = self.pqc_kem.encapsulate(pqc_pk)
# Classical key exchange
classical_ss = self.classical_kem.exchange(classical_pk)
# Combine secrets securely
combined_secret = self.combiner.derive(
pqc_ss + classical_ss,
salt=b"HYBRID-KEX-v1",
info=b"MetaMUI-Hybrid",
length=32
)
# Clear intermediate secrets
pqc_ss = bytes(32)
classical_ss = bytes(32)
return (pqc_ct,), combined_secret
def policy_based_selection(self, peer_capabilities: dict) -> str:
"""Select algorithm based on peer capabilities"""
if 'ml-kem-768' in peer_capabilities:
if 'x25519' in peer_capabilities:
return 'hybrid'
return 'pqc-only'
elif 'x25519' in peer_capabilities:
return 'classical-only'
else:
raise ValueError("No compatible algorithms")
Key Management
class PQCKeyManagement:
"""Secure key management for PQC"""
def __init__(self, hsm_available: bool = False):
self.hsm_available = hsm_available
self.key_store = {}
def generate_and_store(self, algorithm: str, key_id: str) -> dict:
"""Generate and securely store PQC keys"""
# Generate based on algorithm
if algorithm == 'ml-kem-768':
pk, sk = mlkem768_keygen()
key_type = 'kem'
elif algorithm == 'dilithium3':
pk, sk = dilithium3_keygen()
key_type = 'signature'
else:
raise ValueError(f"Unknown algorithm: {algorithm}")
# Store with encryption
if self.hsm_available:
# Use HSM
key_handle = self._store_in_hsm(sk, key_id)
else:
# Software encryption
key_handle = self._encrypt_and_store(sk, key_id)
# Store metadata
self.key_store[key_id] = {
'algorithm': algorithm,
'type': key_type,
'public_key': pk,
'handle': key_handle,
'created': time.time(),
'rotation_due': time.time() + 90*24*3600 # 90 days
}
# Clear secret key
sk = bytes(len(sk))
return self.key_store[key_id]
def rotate_keys(self):
"""Automatic key rotation"""
current_time = time.time()
for key_id, metadata in self.key_store.items():
if metadata['rotation_due'] < current_time:
# Generate new key
new_metadata = self.generate_and_store(
metadata['algorithm'],
f"{key_id}-rotated-{int(current_time)}"
)
# Mark old key for deletion
metadata['deprecated'] = True
metadata['replacement'] = new_metadata['key_id']
Testing and Validation
Test Vector Validation
class PQCTestVectorValidation:
"""Validate against official test vectors"""
@staticmethod
def validate_nist_vectors(algorithm: str) -> bool:
"""Validate against NIST test vectors"""
import json
# Load official test vectors
with open(f'test-vectors/nist/{algorithm}.json', 'r') as f:
vectors = json.load(f)
passed = 0
failed = 0
for vector in vectors:
if algorithm.startswith('ml-kem'):
result = PQCTestVectorValidation._test_kem(vector)
elif algorithm.startswith('dilithium'):
result = PQCTestVectorValidation._test_signature(vector)
else:
result = False
if result:
passed += 1
else:
failed += 1
print(f"Failed vector: {vector['count']}")
print(f"{algorithm}: {passed} passed, {failed} failed")
return failed == 0
@staticmethod
def cross_language_validation():
"""Validate consistency across languages"""
# Generate test data
test_message = b"Cross-language test message"
# Test each language implementation
results = {}
# Python
py_pk, py_sk = python_mlkem768_keygen()
py_ct, py_ss = python_mlkem768_encaps(py_pk)
results['python'] = py_ss
# Rust (via FFI)
rust_ss = rust_mlkem768_decaps(py_ct, py_sk)
results['rust'] = rust_ss
# TypeScript (via WASM)
ts_ss = typescript_mlkem768_decaps(py_ct, py_sk)
results['typescript'] = ts_ss
# Verify all match
reference = results['python']
for lang, ss in results.items():
if ss != reference:
raise ValueError(f"{lang} implementation mismatch")
return True
Security Testing
class PQCSecurityTesting:
"""Security-focused testing for PQC implementations"""
@staticmethod
def test_side_channels():
"""Test for timing side-channels"""
import time
import statistics
# Generate keys
pk, sk = mlkem768_keygen()
# Test with valid ciphertext
valid_ct, _ = mlkem768_encaps(pk)
valid_times = []
for _ in range(1000):
start = time.perf_counter_ns()
_ = mlkem768_decaps(valid_ct, sk)
valid_times.append(time.perf_counter_ns() - start)
# Test with invalid ciphertext
invalid_ct = secrets.token_bytes(len(valid_ct))
invalid_times = []
for _ in range(1000):
start = time.perf_counter_ns()
_ = mlkem768_decaps(invalid_ct, sk)
invalid_times.append(time.perf_counter_ns() - start)
# Statistical analysis
valid_mean = statistics.mean(valid_times)
invalid_mean = statistics.mean(invalid_times)
# Check timing difference is negligible
difference = abs(valid_mean - invalid_mean)
threshold = valid_mean * 0.01 # 1% threshold
if difference > threshold:
print(f"WARNING: Timing difference detected: {difference}ns")
return False
return True
@staticmethod
def fuzzing_test():
"""Fuzz testing for robustness"""
import random
# Generate keys
pk, sk = mlkem768_keygen()
for _ in range(10000):
# Generate random input
fuzz_length = random.randint(0, 2000)
fuzz_input = secrets.token_bytes(fuzz_length)
try:
# Should handle gracefully
result = mlkem768_decaps(fuzz_input, sk)
# Should return consistent length
if len(result) != 32:
return False
except Exception as e:
print(f"Fuzzing caused exception: {e}")
return False
return True
Migration Strategies
Gradual Migration Plan
class PQCMigrationStrategy:
"""Strategic migration to PQC"""
def __init__(self):
self.migration_phase = 1
self.compatibility_matrix = {}
def get_migration_phase(self) -> dict:
"""Get current migration phase configuration"""
phases = {
1: { # Discovery
'name': 'Discovery',
'algorithms': ['classical'],
'monitoring': True,
'testing': 'pqc'
},
2: { # Hybrid Introduction
'name': 'Hybrid Introduction',
'algorithms': ['hybrid'],
'fallback': 'classical',
'monitoring': True
},
3: { # PQC Primary
'name': 'PQC Primary',
'algorithms': ['pqc', 'hybrid'],
'fallback': 'hybrid',
'deprecate': 'classical'
},
4: { # PQC Only
'name': 'PQC Only',
'algorithms': ['pqc'],
'remove': 'classical',
'mandatory': True
}
}
return phases[self.migration_phase]
def select_algorithm(self, peer_capabilities: list) -> str:
"""Select algorithm based on migration phase"""
phase = self.get_migration_phase()
# Check capabilities against phase
if 'pqc' in peer_capabilities and 'pqc' in phase['algorithms']:
return 'pqc'
elif 'hybrid' in peer_capabilities and 'hybrid' in phase['algorithms']:
return 'hybrid'
elif 'classical' in peer_capabilities and 'classical' in phase.get('algorithms', []):
return 'classical'
elif 'fallback' in phase:
return phase['fallback']
else:
raise ValueError("No compatible algorithm for current phase")
def advance_phase(self) -> bool:
"""Advance to next migration phase"""
# Check readiness metrics
if self.check_phase_readiness():
self.migration_phase += 1
self.log_phase_transition()
return True
return False
def check_phase_readiness(self) -> bool:
"""Check if ready for next phase"""
metrics = {
'pqc_support_percentage': self.get_pqc_support_percentage(),
'error_rate': self.get_error_rate(),
'performance_impact': self.get_performance_impact()
}
# Phase-specific thresholds
thresholds = {
1: {'pqc_support_percentage': 50},
2: {'pqc_support_percentage': 80, 'error_rate': 0.01},
3: {'pqc_support_percentage': 95, 'error_rate': 0.001}
}
if self.migration_phase not in thresholds:
return False
for metric, threshold in thresholds[self.migration_phase].items():
if metrics.get(metric, 0) < threshold:
return False
return True
Security Checklist
Implementation Review Checklist
- Memory Security
- All keys zeroized after use
- No sensitive data in logs
- Secure memory allocation used
- Randomness
- Cryptographically secure RNG
- Multiple entropy sources
- Proper seeding and reseeding
- Side-Channels
- Constant-time operations
- Power analysis countermeasures
- Timing attack protection
- Error Handling
- Implicit rejection implemented
- No information leakage
- Graceful failure modes
- Testing
- NIST test vectors pass
- Cross-language consistency
- Security testing complete
- Documentation
- Security considerations documented
- API usage examples provided
- Migration guide available
References
- NIST Post-Quantum Cryptography
- PQC Migration Playbook
- Side-Channel Attacks on PQC
- Hybrid PQC Deployment
- PQC Implementation Security