Post-Quantum Cryptography Implementation Security Guide

Version: 1.0
Last Updated: 2025-08-28
Security Classification: PUBLIC
Author: MetaMUI Security Team

Executive Summary

This guide provides comprehensive security guidance for implementing post-quantum cryptographic algorithms across multiple programming languages and platforms. It addresses common security pitfalls, cross-language consistency requirements, and deployment best practices for transitioning to quantum-resistant cryptography.

Table of Contents

  1. Security Foundations
  2. Cross-Language Security Patterns
  3. Algorithm-Specific Security
  4. Implementation Security
  5. Deployment Security
  6. Testing and Validation
  7. Migration Strategies

Security Foundations

Threat Model for PQC

class PQCThreatModel:
    """Comprehensive threat model for PQC implementations"""
    
    quantum_threats = {
        'shors_algorithm': 'Breaks RSA, ECDSA, DH',
        'grovers_algorithm': 'Weakens symmetric crypto',
        'quantum_period_finding': 'Attacks on discrete log',
        'quantum_fourier_transform': 'Speedup for certain problems'
    }
    
    classical_threats = {
        'side_channel': 'Timing, power, EM attacks',
        'fault_injection': 'Hardware manipulation',
        'implementation_bugs': 'Coding errors',
        'protocol_attacks': 'Misuse of primitives',
        'cryptanalysis': 'Mathematical advances'
    }
    
    hybrid_threats = {
        'algorithm_confusion': 'Mixing incompatible schemes',
        'downgrade_attacks': 'Forcing weaker algorithms',
        'state_management': 'Stateful vs stateless confusion',
        'key_management': 'Complex multi-key scenarios'
    }

Security Levels and Parameter Selection

NIST Level Classical Security Quantum Security Recommended Algorithms
Level 1 128-bit 64-bit ML-KEM-512, Dilithium2
Level 3 192-bit 96-bit ML-KEM-768, Dilithium3
Level 5 256-bit 128-bit ML-KEM-1024, Dilithium5

Cross-Language Security Patterns

Memory Security

Python Implementation:

import ctypes
import sys

class SecureMemory:
    """Cross-platform secure memory handling"""
    
    @staticmethod
    def secure_zero(data: bytearray):
        """Securely overwrite memory"""
        # Multiple overwrite passes
        for _ in range(3):
            for i in range(len(data)):
                data[i] = 0xFF
            for i in range(len(data)):
                data[i] = 0x00
        
        # Force memory write
        ctypes.memset(ctypes.addressof(data), 0, len(data))
        
        # Prevent optimization
        if sys.getsizeof(data) > 0:
            pass

Rust Implementation:

use zeroize::{Zeroize, ZeroizeOnDrop};
use std::ptr::write_volatile;

#[derive(ZeroizeOnDrop)]
pub struct SecureBuffer {
    data: Vec<u8>,
}

impl SecureBuffer {
    pub fn secure_clear(&mut self) {
        // Use volatile writes to prevent optimization
        for byte in self.data.iter_mut() {
            unsafe {
                write_volatile(byte, 0);
            }
        }
        
        // Additional zeroization
        self.data.zeroize();
    }
}

TypeScript Implementation:

class SecureMemory {
    static secureClear(buffer: Uint8Array): void {
        // Crypto.getRandomValues to overwrite
        crypto.getRandomValues(buffer);
        
        // Multiple passes
        buffer.fill(0xFF);
        buffer.fill(0x00);
        
        // Use slice to prevent optimization
        const copy = buffer.slice();
        if (copy.length > 0) {
            buffer.fill(0);
        }
    }
}

Constant-Time Operations

Critical for all PQC implementations:

def constant_time_compare(a: bytes, b: bytes) -> bool:
    """Constant-time byte comparison"""
    if len(a) != len(b):
        return False
    
    result = 0
    for x, y in zip(a, b):
        result |= x ^ y
    
    return result == 0

def constant_time_select(condition: bool, a: int, b: int) -> int:
    """Constant-time conditional selection"""
    mask = -int(condition)  # 0xFFFFFFFF if true, 0x00000000 if false
    return (a & mask) | (b & ~mask)

Randomness Management

import secrets
import hashlib
from typing import Optional

class SecureRandomness:
    """Secure randomness for PQC operations"""
    
    def __init__(self, hardware_rng: Optional[callable] = None):
        self.hardware_rng = hardware_rng
        self.reseed_counter = 0
        self.max_reseed = 10000
    
    def get_random_bytes(self, length: int) -> bytes:
        """Get cryptographically secure random bytes"""
        
        # Check reseed requirement
        if self.reseed_counter >= self.max_reseed:
            self.reseed()
        
        # Gather entropy from multiple sources
        entropy = bytearray()
        
        # Primary: Hardware RNG if available
        if self.hardware_rng:
            entropy.extend(self.hardware_rng(32))
        
        # Secondary: OS randomness
        entropy.extend(secrets.token_bytes(32))
        
        # Mix with KDF
        output = hashlib.shake_256(entropy).digest(length)
        
        # Update counter
        self.reseed_counter += 1
        
        # Clear entropy
        for i in range(len(entropy)):
            entropy[i] = 0
        
        return output
    
    def reseed(self):
        """Reseed the randomness source"""
        self.reseed_counter = 0
        # Additional reseeding logic

Algorithm-Specific Security

ML-KEM (Kyber) Security

class MLKEMSecurity:
    """Security wrapper for ML-KEM implementations"""
    
    @staticmethod
    def validate_ciphertext(ct: bytes, level: int) -> bool:
        """Validate ML-KEM ciphertext format"""
        
        expected_sizes = {
            512: 768,
            768: 1088,
            1024: 1568
        }
        
        if level not in expected_sizes:
            return False
        
        if len(ct) != expected_sizes[level]:
            return False
        
        # Additional polynomial bound checks
        return MLKEMSecurity._check_polynomial_bounds(ct)
    
    @staticmethod
    def secure_encapsulation(public_key: bytes) -> tuple:
        """Secure encapsulation with validation"""
        
        # Validate public key
        if not MLKEMSecurity._validate_public_key(public_key):
            raise ValueError("Invalid public key")
        
        # Generate fresh randomness
        rand = SecureRandomness().get_random_bytes(32)
        
        # Perform encapsulation
        ct, ss = mlkem_encaps(public_key, rand)
        
        # Clear randomness
        rand = bytes(32)
        
        return ct, ss

Dilithium Security

class DilithiumSecurity:
    """Security wrapper for Dilithium signatures"""
    
    @staticmethod
    def secure_sign(message: bytes, secret_key: bytes, 
                   deterministic: bool = False) -> bytes:
        """Secure signing with side-channel protection"""
        
        # Add domain separation
        domain = b"MetaMUI-Dilithium-Sign-v1"
        augmented = domain + message
        
        if deterministic:
            # Deterministic signing for reproducibility
            sig = dilithium_sign_deterministic(augmented, secret_key)
        else:
            # Randomized signing for additional security
            rand = SecureRandomness().get_random_bytes(32)
            sig = dilithium_sign_randomized(augmented, secret_key, rand)
            rand = bytes(32)
        
        return sig
    
    @staticmethod
    def batch_verify(messages: list, signatures: list, 
                    public_keys: list) -> bool:
        """Efficient batch verification"""
        
        if len(messages) != len(signatures) or \
           len(messages) != len(public_keys):
            return False
        
        # Use randomized batching for efficiency
        batch_rand = SecureRandomness().get_random_bytes(len(messages) * 16)
        
        return dilithium_batch_verify(
            messages, signatures, public_keys, batch_rand
        )

Falcon Security

class FalconSecurity:
    """Security wrapper for Falcon signatures"""
    
    @staticmethod
    def secure_floating_point():
        """Configure floating-point for security"""
        
        import sys
        
        # Set floating-point precision
        if hasattr(sys, 'float_info'):
            # Ensure double precision
            assert sys.float_info.mant_dig >= 53
        
        # Disable unsafe optimizations
        import os
        os.environ['FALCON_NO_FMA'] = '1'
    
    @staticmethod
    def constant_time_sampling(secret_key: bytes) -> bytes:
        """Constant-time Gaussian sampling"""
        
        # Use rejection sampling with constant-time checks
        max_iterations = 1000
        
        for i in range(max_iterations):
            sample = falcon_sample_gaussian(secret_key)
            
            # Always process all iterations
            if i == max_iterations - 1:
                return sample
        
        return sample

SLH-DSA (SPHINCS+) Security

class SLHDSASecurity:
    """Security wrapper for SLH-DSA signatures"""
    
    @staticmethod
    def optimize_parameters(use_case: str) -> str:
        """Select optimal parameters for use case"""
        
        params = {
            'fast_signing': 'slhdsa-128f',
            'small_signatures': 'slhdsa-128s',
            'balanced': 'slhdsa-192f',
            'maximum_security': 'slhdsa-256s'
        }
        
        return params.get(use_case, 'slhdsa-192f')
    
    @staticmethod
    def streaming_sign(file_path: str, secret_key: bytes) -> bytes:
        """Sign large files efficiently"""
        
        import hashlib
        
        # Hash file in chunks
        hasher = hashlib.shake_256()
        
        with open(file_path, 'rb') as f:
            while chunk := f.read(1024 * 1024):  # 1MB chunks
                hasher.update(chunk)
        
        file_hash = hasher.digest(64)
        
        # Sign hash
        return slhdsa_sign(file_hash, secret_key)

Implementation Security

Side-Channel Protection

class SideChannelProtection:
    """Comprehensive side-channel countermeasures"""
    
    @staticmethod
    def timing_protection(operation: callable) -> callable:
        """Add timing randomization"""
        
        import time
        import random
        
        def protected_operation(*args, **kwargs):
            # Random delay
            delay = random.uniform(0.0001, 0.001)
            time.sleep(delay)
            
            # Execute operation
            result = operation(*args, **kwargs)
            
            # Additional random delay
            time.sleep(delay)
            
            return result
        
        return protected_operation
    
    @staticmethod
    def power_analysis_protection(data: bytes) -> bytes:
        """Add noise for power analysis protection"""
        
        # Generate random mask
        mask = secrets.token_bytes(len(data))
        
        # Apply mask
        masked = bytes(a ^ b for a, b in zip(data, mask))
        
        # Dummy operations
        dummy = secrets.token_bytes(len(data))
        _ = bytes(a ^ b for a, b in zip(dummy, mask))
        
        # Unmask
        result = bytes(a ^ b for a, b in zip(masked, mask))
        
        return result

Error Handling

class SecureErrorHandling:
    """Secure error handling for PQC operations"""
    
    @staticmethod
    def handle_decryption_failure(operation: callable):
        """Handle decryption failures securely"""
        
        def wrapper(*args, **kwargs):
            try:
                return operation(*args, **kwargs)
            except DecryptionError:
                # Return random value (implicit rejection)
                return secrets.token_bytes(32)
            except Exception as e:
                # Log securely without leaking information
                SecureErrorHandling._secure_log(
                    f"Operation failed: {type(e).__name__}"
                )
                # Return safe default
                return None
        
        return wrapper
    
    @staticmethod
    def _secure_log(message: str):
        """Log without leaking sensitive information"""
        
        # Sanitize message
        sanitized = message.replace('\n', ' ')
        sanitized = sanitized[:200]  # Limit length
        
        # Log with timestamp
        import logging
        logging.info(f"[PQC] {sanitized}")

Deployment Security

Hybrid Deployment

class HybridCryptography:
    """Secure hybrid classical/PQC deployment"""
    
    def __init__(self):
        self.pqc_kem = MLKEM768()
        self.classical_kem = X25519()
        self.combiner = HKDF()
    
    def hybrid_key_exchange(self, pqc_pk: bytes, classical_pk: bytes) -> tuple:
        """Perform hybrid key exchange"""
        
        # PQC key exchange
        pqc_ct, pqc_ss = self.pqc_kem.encapsulate(pqc_pk)
        
        # Classical key exchange  
        classical_ss = self.classical_kem.exchange(classical_pk)
        
        # Combine secrets securely
        combined_secret = self.combiner.derive(
            pqc_ss + classical_ss,
            salt=b"HYBRID-KEX-v1",
            info=b"MetaMUI-Hybrid",
            length=32
        )
        
        # Clear intermediate secrets
        pqc_ss = bytes(32)
        classical_ss = bytes(32)
        
        return (pqc_ct,), combined_secret
    
    def policy_based_selection(self, peer_capabilities: dict) -> str:
        """Select algorithm based on peer capabilities"""
        
        if 'ml-kem-768' in peer_capabilities:
            if 'x25519' in peer_capabilities:
                return 'hybrid'
            return 'pqc-only'
        elif 'x25519' in peer_capabilities:
            return 'classical-only'
        else:
            raise ValueError("No compatible algorithms")

Key Management

class PQCKeyManagement:
    """Secure key management for PQC"""
    
    def __init__(self, hsm_available: bool = False):
        self.hsm_available = hsm_available
        self.key_store = {}
    
    def generate_and_store(self, algorithm: str, key_id: str) -> dict:
        """Generate and securely store PQC keys"""
        
        # Generate based on algorithm
        if algorithm == 'ml-kem-768':
            pk, sk = mlkem768_keygen()
            key_type = 'kem'
        elif algorithm == 'dilithium3':
            pk, sk = dilithium3_keygen()
            key_type = 'signature'
        else:
            raise ValueError(f"Unknown algorithm: {algorithm}")
        
        # Store with encryption
        if self.hsm_available:
            # Use HSM
            key_handle = self._store_in_hsm(sk, key_id)
        else:
            # Software encryption
            key_handle = self._encrypt_and_store(sk, key_id)
        
        # Store metadata
        self.key_store[key_id] = {
            'algorithm': algorithm,
            'type': key_type,
            'public_key': pk,
            'handle': key_handle,
            'created': time.time(),
            'rotation_due': time.time() + 90*24*3600  # 90 days
        }
        
        # Clear secret key
        sk = bytes(len(sk))
        
        return self.key_store[key_id]
    
    def rotate_keys(self):
        """Automatic key rotation"""
        
        current_time = time.time()
        
        for key_id, metadata in self.key_store.items():
            if metadata['rotation_due'] < current_time:
                # Generate new key
                new_metadata = self.generate_and_store(
                    metadata['algorithm'],
                    f"{key_id}-rotated-{int(current_time)}"
                )
                
                # Mark old key for deletion
                metadata['deprecated'] = True
                metadata['replacement'] = new_metadata['key_id']

Testing and Validation

Test Vector Validation

class PQCTestVectorValidation:
    """Validate against official test vectors"""
    
    @staticmethod
    def validate_nist_vectors(algorithm: str) -> bool:
        """Validate against NIST test vectors"""
        
        import json
        
        # Load official test vectors
        with open(f'test-vectors/nist/{algorithm}.json', 'r') as f:
            vectors = json.load(f)
        
        passed = 0
        failed = 0
        
        for vector in vectors:
            if algorithm.startswith('ml-kem'):
                result = PQCTestVectorValidation._test_kem(vector)
            elif algorithm.startswith('dilithium'):
                result = PQCTestVectorValidation._test_signature(vector)
            else:
                result = False
            
            if result:
                passed += 1
            else:
                failed += 1
                print(f"Failed vector: {vector['count']}")
        
        print(f"{algorithm}: {passed} passed, {failed} failed")
        return failed == 0
    
    @staticmethod
    def cross_language_validation():
        """Validate consistency across languages"""
        
        # Generate test data
        test_message = b"Cross-language test message"
        
        # Test each language implementation
        results = {}
        
        # Python
        py_pk, py_sk = python_mlkem768_keygen()
        py_ct, py_ss = python_mlkem768_encaps(py_pk)
        results['python'] = py_ss
        
        # Rust (via FFI)
        rust_ss = rust_mlkem768_decaps(py_ct, py_sk)
        results['rust'] = rust_ss
        
        # TypeScript (via WASM)
        ts_ss = typescript_mlkem768_decaps(py_ct, py_sk)
        results['typescript'] = ts_ss
        
        # Verify all match
        reference = results['python']
        for lang, ss in results.items():
            if ss != reference:
                raise ValueError(f"{lang} implementation mismatch")
        
        return True

Security Testing

class PQCSecurityTesting:
    """Security-focused testing for PQC implementations"""
    
    @staticmethod
    def test_side_channels():
        """Test for timing side-channels"""
        
        import time
        import statistics
        
        # Generate keys
        pk, sk = mlkem768_keygen()
        
        # Test with valid ciphertext
        valid_ct, _ = mlkem768_encaps(pk)
        valid_times = []
        
        for _ in range(1000):
            start = time.perf_counter_ns()
            _ = mlkem768_decaps(valid_ct, sk)
            valid_times.append(time.perf_counter_ns() - start)
        
        # Test with invalid ciphertext
        invalid_ct = secrets.token_bytes(len(valid_ct))
        invalid_times = []
        
        for _ in range(1000):
            start = time.perf_counter_ns()
            _ = mlkem768_decaps(invalid_ct, sk)
            invalid_times.append(time.perf_counter_ns() - start)
        
        # Statistical analysis
        valid_mean = statistics.mean(valid_times)
        invalid_mean = statistics.mean(invalid_times)
        
        # Check timing difference is negligible
        difference = abs(valid_mean - invalid_mean)
        threshold = valid_mean * 0.01  # 1% threshold
        
        if difference > threshold:
            print(f"WARNING: Timing difference detected: {difference}ns")
            return False
        
        return True
    
    @staticmethod
    def fuzzing_test():
        """Fuzz testing for robustness"""
        
        import random
        
        # Generate keys
        pk, sk = mlkem768_keygen()
        
        for _ in range(10000):
            # Generate random input
            fuzz_length = random.randint(0, 2000)
            fuzz_input = secrets.token_bytes(fuzz_length)
            
            try:
                # Should handle gracefully
                result = mlkem768_decaps(fuzz_input, sk)
                
                # Should return consistent length
                if len(result) != 32:
                    return False
                    
            except Exception as e:
                print(f"Fuzzing caused exception: {e}")
                return False
        
        return True

Migration Strategies

Gradual Migration Plan

class PQCMigrationStrategy:
    """Strategic migration to PQC"""
    
    def __init__(self):
        self.migration_phase = 1
        self.compatibility_matrix = {}
    
    def get_migration_phase(self) -> dict:
        """Get current migration phase configuration"""
        
        phases = {
            1: {  # Discovery
                'name': 'Discovery',
                'algorithms': ['classical'],
                'monitoring': True,
                'testing': 'pqc'
            },
            2: {  # Hybrid Introduction
                'name': 'Hybrid Introduction',
                'algorithms': ['hybrid'],
                'fallback': 'classical',
                'monitoring': True
            },
            3: {  # PQC Primary
                'name': 'PQC Primary',
                'algorithms': ['pqc', 'hybrid'],
                'fallback': 'hybrid',
                'deprecate': 'classical'
            },
            4: {  # PQC Only
                'name': 'PQC Only',
                'algorithms': ['pqc'],
                'remove': 'classical',
                'mandatory': True
            }
        }
        
        return phases[self.migration_phase]
    
    def select_algorithm(self, peer_capabilities: list) -> str:
        """Select algorithm based on migration phase"""
        
        phase = self.get_migration_phase()
        
        # Check capabilities against phase
        if 'pqc' in peer_capabilities and 'pqc' in phase['algorithms']:
            return 'pqc'
        elif 'hybrid' in peer_capabilities and 'hybrid' in phase['algorithms']:
            return 'hybrid'
        elif 'classical' in peer_capabilities and 'classical' in phase.get('algorithms', []):
            return 'classical'
        elif 'fallback' in phase:
            return phase['fallback']
        else:
            raise ValueError("No compatible algorithm for current phase")
    
    def advance_phase(self) -> bool:
        """Advance to next migration phase"""
        
        # Check readiness metrics
        if self.check_phase_readiness():
            self.migration_phase += 1
            self.log_phase_transition()
            return True
        
        return False
    
    def check_phase_readiness(self) -> bool:
        """Check if ready for next phase"""
        
        metrics = {
            'pqc_support_percentage': self.get_pqc_support_percentage(),
            'error_rate': self.get_error_rate(),
            'performance_impact': self.get_performance_impact()
        }
        
        # Phase-specific thresholds
        thresholds = {
            1: {'pqc_support_percentage': 50},
            2: {'pqc_support_percentage': 80, 'error_rate': 0.01},
            3: {'pqc_support_percentage': 95, 'error_rate': 0.001}
        }
        
        if self.migration_phase not in thresholds:
            return False
        
        for metric, threshold in thresholds[self.migration_phase].items():
            if metrics.get(metric, 0) < threshold:
                return False
        
        return True

Security Checklist

Implementation Review Checklist

References

  1. NIST Post-Quantum Cryptography
  2. PQC Migration Playbook
  3. Side-Channel Attacks on PQC
  4. Hybrid PQC Deployment
  5. PQC Implementation Security