Dilithium Security-Focused API Documentation

Version: 1.0
Last Updated: 2025-07-05
**Security Classification:
PUBLIC
Author: Phantom (phantom@metamui.id)

Overview

Dilithium is a post-quantum digital signature scheme based on the hardness of lattice problems, specifically the Module Learning With Errors (M-LWE) problem. Selected as a NIST post-quantum cryptography standard, Dilithium provides strong security against both classical and quantum attacks with reasonable performance and signature sizes.

Security Level: Multiple levels (Dilithium2: 128-bit, Dilithium3: 192-bit, Dilithium5: 256-bit)
Algorithm Family: Module-LWE lattice-based
Signature Type: Fiat-Shamir with aborts
Key Sizes: Variable by security level
Quantum Security: Strong lattice-based resistance

Security Level Variants

Variant Classical Security Quantum Security Public Key Private Key Signature
Dilithium2 128-bit ~2^64 1312 bytes 2528 bytes ~2420 bytes
Dilithium3 192-bit ~2^96 1952 bytes 4000 bytes ~3293 bytes
Dilithium5 256-bit ~2^128 2592 bytes 4864 bytes ~4595 bytes

Security Warnings ⚠️

  1. Post-Quantum Security: Designed specifically for quantum-resistant security
  2. Deterministic Signatures: Unlike Falcon, Dilithium produces deterministic signatures
  3. Side-Channel Awareness: Implementation must protect against timing/power attacks
  4. Security Level Choice: Select appropriate variant for security requirements
  5. Key Reuse Safe: More tolerant of key reuse than some alternatives

API Functions

dilithium_keygen(security_level: int) -> (PublicKey, PrivateKey)

Security Contract:

Attack Resistance: | Attack Type | Protected | Notes | |————-|———–|——-| | Quantum (Shor’s) | ✅ | Lattice-based security | | Quantum (Grover’s) | ✅ | Security levels maintained | | Classical Lattice | ✅ | M-LWE hardness assumption | | Side-Channel | ⚠️ | Implementation dependent | | Key Recovery | ✅ | Provable lattice security | | Deterministic Forgery | ✅ | Fiat-Shamir security |

Security Requirements:

Secure Usage Example:

import secrets
import hashlib
import json
import time
from typing import Tuple, Dict, Optional
from enum import Enum

class DilithiumLevel(Enum):
    LEVEL2 = 2  # 128-bit security
    LEVEL3 = 3  # 192-bit security  
    LEVEL5 = 5  # 256-bit security

class SecureDilithium:
    """Secure Dilithium implementation with best practices"""
    
    KEY_SIZES = {
        DilithiumLevel.LEVEL2: {'public': 1312, 'private': 2528, 'signature': 2420},
        DilithiumLevel.LEVEL3: {'public': 1952, 'private': 4000, 'signature': 3293},
        DilithiumLevel.LEVEL5: {'public': 2592, 'private': 4864, 'signature': 4595}
    }
    
    def __init__(self, security_level: DilithiumLevel):
        self.security_level = security_level
        self.key_pair = None
        self.generation_entropy = None
        
    def generate_keypair_with_audit(
        self,
        additional_entropy: Optional[bytes] = None
    ) -> Dict:
        """Generate Dilithium key pair with entropy auditing"""
        
        # Collect entropy from multiple sources
        entropy_sources = []
        
        # System CSPRNG
        system_entropy = secrets.token_bytes(64)
        entropy_sources.append(('system', system_entropy))
        
        # High-resolution timer
        timer_entropy = str(time.perf_counter_ns()).encode()
        timer_hash = hashlib.sha256(timer_entropy).digest()
        entropy_sources.append(('timer', timer_hash))
        
        # Process ID and thread info
        import os, threading
        process_entropy = f"{os.getpid()}:{threading.get_ident()}".encode()
        process_hash = hashlib.sha256(process_entropy).digest()
        entropy_sources.append(('process', process_hash))
        
        # Additional entropy if provided
        if additional_entropy:
            entropy_sources.append(('additional', additional_entropy[:64]))
        
        # Combine all entropy sources
        combined_entropy = bytearray()
        for source_name, entropy in entropy_sources:
            combined_entropy.extend(f"{source_name}:".encode())
            combined_entropy.extend(entropy)
        
        # Generate final seed with SHAKE256
        final_seed = hashlib.shake_256(
            b"Dilithium-KeyGen-v1:" + bytes(combined_entropy)
        ).digest(64)
        
        # Generate key pair
        public_key, private_key = self._dilithium_keygen_from_seed(final_seed)
        
        # Store key pair and generation info
        self.key_pair = {
            'public_key': public_key,
            'private_key': private_key,
            'security_level': self.security_level.value,
            'generated_at': time.time(),
            'algorithm': f'Dilithium{self.security_level.value}'
        }
        
        # Store entropy info for auditing
        self.generation_entropy = {
            'sources': [name for name, _ in entropy_sources],
            'total_entropy_bits': len(combined_entropy) * 8,
            'final_seed_size': len(final_seed)
        }
        
        # Clear sensitive data
        for i in range(len(combined_entropy)):
            combined_entropy[i] = 0
        for i in range(len(final_seed)):
            final_seed[i] = 0
        
        return {
            'public_key': public_key.hex(),
            'key_id': self._compute_key_id(public_key),
            'security_level': self.security_level.value,
            'algorithm': f'Dilithium{self.security_level.value}',
            'generated_at': self.key_pair['generated_at'],
            'entropy_audit': self.generation_entropy
        }
    
    def _dilithium_keygen_from_seed(self, seed: bytes) -> Tuple[bytes, bytes]:
        """Generate Dilithium keys from seed (implementation placeholder)"""
        
        # This would call the actual Dilithium implementation
        # Implementation must include:
        # - Proper polynomial sampling
        # - Rejection sampling for uniform distribution
        # - Side-channel protections
        
        from dilithium_crypto import dilithium_keygen_seeded
        return dilithium_keygen_seeded(self.security_level.value, seed)
    
    def _compute_key_id(self, public_key: bytes) -> str:
        """Compute stable identifier for public key"""
        
        key_hash = hashlib.sha256(
            f"Dilithium{self.security_level.value}-KeyID:".encode() + public_key
        ).digest()
        
        return key_hash[:20].hex()  # 160-bit key ID
    
    def export_public_key_info(self) -> Dict:
        """Export public key with security information"""
        
        if not self.key_pair:
            raise ValueError("No key pair generated")
        
        public_key = self.key_pair['public_key']
        expected_size = self.KEY_SIZES[self.security_level]['public']
        
        if len(public_key) != expected_size:
            raise ValueError(f"Invalid public key size for Dilithium{self.security_level.value}")
        
        return {
            'public_key': public_key.hex(),
            'algorithm': f'Dilithium{self.security_level.value}',
            'security_level': self.security_level.value,
            'key_size': len(public_key),
            'classical_security_bits': {2: 128, 3: 192, 5: 256}[self.security_level.value],
            'quantum_security_bits': {2: 64, 3: 96, 5: 128}[self.security_level.value],
            'key_id': self._compute_key_id(public_key),
            'usage_recommendations': self._get_usage_recommendations()
        }
    
    def _get_usage_recommendations(self) -> Dict:
        """Get usage recommendations for security level"""
        
        recommendations = {
            DilithiumLevel.LEVEL2: {
                'use_cases': ['IoT devices', 'short-term security', 'performance-critical'],
                'max_signatures': 2**32,
                'recommended_expiry_years': 5
            },
            DilithiumLevel.LEVEL3: {
                'use_cases': ['general purpose', 'medium-term security', 'balanced performance'],
                'max_signatures': 2**40,
                'recommended_expiry_years': 10
            },
            DilithiumLevel.LEVEL5: {
                'use_cases': ['high security', 'long-term protection', 'critical infrastructure'],
                'max_signatures': 2**50,
                'recommended_expiry_years': 20
            }
        }
        
        return recommendations[self.security_level]

# SECURE: Enterprise key management
class DilithiumKeyManager:
    def __init__(self, default_security_level: DilithiumLevel = DilithiumLevel.LEVEL3):
        self.default_level = default_security_level
        self.managed_keys = {}
        self.key_policies = {}
        
    def create_managed_key(
        self,
        key_id: str,
        purpose: str,
        security_level: Optional[DilithiumLevel] = None,
        custom_policy: Optional[Dict] = None
    ) -> Dict:
        """Create managed key with policy enforcement"""
        
        level = security_level or self.default_level
        
        # Create Dilithium instance
        dilithium = SecureDilithium(level)
        key_info = dilithium.generate_keypair_with_audit()
        
        # Define key policy
        policy = custom_policy or self._get_default_policy(level, purpose)
        
        # Store managed key
        self.managed_keys[key_id] = {
            'dilithium': dilithium,
            'key_info': key_info,
            'purpose': purpose,
            'created_at': time.time(),
            'signature_count': 0,
            'policy': policy,
            'status': 'active'
        }
        
        self.key_policies[key_id] = policy
        
        return {
            'key_id': key_id,
            'public_key_info': key_info,
            'policy': policy
        }
    
    def _get_default_policy(self, level: DilithiumLevel, purpose: str) -> Dict:
        """Get default policy for key"""
        
        base_policies = {
            'code_signing': {
                'max_signatures_per_day': 1000,
                'require_multi_party_approval': True,
                'allowed_signature_contexts': ['CodeSigning-v1'],
                'max_key_age_days': 365
            },
            'document_signing': {
                'max_signatures_per_day': 10000,
                'require_multi_party_approval': False,
                'allowed_signature_contexts': ['DocumentSigning-v1'],
                'max_key_age_days': 730
            },
            'api_authentication': {
                'max_signatures_per_day': 1000000,
                'require_multi_party_approval': False,
                'allowed_signature_contexts': ['API-Auth-v1'],
                'max_key_age_days': 180
            }
        }
        
        return base_policies.get(purpose, base_policies['document_signing'])
    
    def get_signing_key(self, key_id: str, context: str) -> Optional['ManagedSigningKey']:
        """Get key for signing with policy enforcement"""
        
        if key_id not in self.managed_keys:
            return None
        
        key_data = self.managed_keys[key_id]
        policy = key_data['policy']
        
        # Check key status
        if key_data['status'] != 'active':
            return None
        
        # Check context permission
        if context not in policy['allowed_signature_contexts']:
            return None
        
        # Check daily signature limit
        today = time.time() // (24 * 3600)
        daily_count = key_data.get(f'signatures_{today}', 0)
        
        if daily_count >= policy['max_signatures_per_day']:
            return None
        
        # Check key age
        age_days = (time.time() - key_data['created_at']) / (24 * 3600)
        if age_days > policy['max_key_age_days']:
            return None
        
        return ManagedSigningKey(key_id, key_data['dilithium'], self)
    
    def record_signature_usage(self, key_id: str):
        """Record signature usage for policy enforcement"""
        
        if key_id not in self.managed_keys:
            return
        
        key_data = self.managed_keys[key_id]
        
        # Update signature count
        key_data['signature_count'] += 1
        
        # Update daily count
        today = time.time() // (24 * 3600)
        daily_key = f'signatures_{today}'
        key_data[daily_key] = key_data.get(daily_key, 0) + 1

class ManagedSigningKey:
    def __init__(self, key_id: str, dilithium: SecureDilithium, manager: DilithiumKeyManager):
        self.key_id = key_id
        self.dilithium = dilithium
        self.manager = manager
        
    def sign(self, message: bytes, context: str) -> Dict:
        """Sign with managed key"""
        
        # Perform signing (implementation would call actual Dilithium)
        signature = self._dilithium_sign(message, context)
        
        # Record usage
        self.manager.record_signature_usage(self.key_id)
        
        return signature
    
    def _dilithium_sign(self, message: bytes, context: str) -> Dict:
        """Implementation placeholder for Dilithium signing"""
        
        # This would call the actual implementation
        private_key = self.dilithium.key_pair['private_key']
        
        # Build signing input with context
        signing_input = f"Context:{context}\n".encode() + message
        
        from dilithium_crypto import dilithium_sign
        signature = dilithium_sign(
            private_key,
            signing_input,
            self.dilithium.security_level.value
        )
        
        return {
            'signature': signature.hex(),
            'algorithm': f'Dilithium{self.dilithium.security_level.value}',
            'context': context,
            'key_id': self.key_id,
            'timestamp': int(time.time())
        }

Common Mistakes:

# INSECURE: Wrong security level selection
# Using Dilithium2 for long-term cryptographic protection
dilithium = SecureDilithium(DilithiumLevel.LEVEL2)  # Too weak for long-term!

# INSECURE: No entropy validation
private_key, public_key = dilithium_keygen(3)  # No entropy source verification

# INSECURE: Plaintext key storage
with open('private_key.bin', 'wb') as f:
    f.write(private_key)  # Unencrypted!

# INSECURE: No context in signatures
signature = dilithium_sign(private_key, message)  # Missing domain separation

# INSECURE: Ignoring signature size variations
# Not accounting for variable signature sizes in protocols

dilithium_sign(private_key: bytes, message: bytes, security_level: int) -> bytes

Security Contract:

Attack Resistance: | Attack Type | Protected | Notes | |————-|———–|——-| | Existential Forgery | ✅ | M-LWE hardness | | Key Recovery | ✅ | Lattice-based security | | Quantum Attacks | ✅ | Post-quantum secure | | Side-Channel | ⚠️ | Implementation dependent | | Replay | ❌ | Add timestamps/nonces |

Security Requirements:

Secure Usage Example:

use dilithium::{Dilithium2, Dilithium3, Dilithium5, PrivateKey, Signature};
use sha3::{Sha3_256, Digest};
use chrono::{DateTime, Utc};

/// Secure Dilithium signing with metadata
pub struct DilithiumSigner {
    private_key: PrivateKey,
    security_level: u8,
    signing_counter: u64,
    context: String,
}

impl DilithiumSigner {
    pub fn new(private_key: PrivateKey, security_level: u8, context: String) -> Self {
        Self {
            private_key,
            security_level,
            signing_counter: 0,
            context,
        }
    }
    
    pub fn sign_with_metadata(
        &mut self,
        message: &[u8],
        timestamp: Option<DateTime<Utc>>,
        additional_context: Option<&str>,
    ) -> Result<SignatureEnvelope, Error> {
        // Build comprehensive signing input
        let mut signing_input = Vec::new();
        
        // Add primary context
        signing_input.extend_from_slice(b"DilithiumSignature:");
        signing_input.extend_from_slice(self.context.as_bytes());
        signing_input.push(b'\n');
        
        // Add security level
        signing_input.extend_from_slice(b"SecurityLevel:");
        signing_input.push(b'0' + self.security_level);
        signing_input.push(b'\n');
        
        // Add signing counter
        signing_input.extend_from_slice(b"Counter:");
        signing_input.extend_from_slice(&self.signing_counter.to_le_bytes());
        signing_input.push(b'\n');
        
        // Add timestamp if provided
        let ts = timestamp.unwrap_or_else(Utc::now);
        signing_input.extend_from_slice(b"Timestamp:");
        signing_input.extend_from_slice(ts.timestamp().to_string().as_bytes());
        signing_input.push(b'\n');
        
        // Add additional context if provided
        if let Some(ctx) = additional_context {
            signing_input.extend_from_slice(b"AdditionalContext:");
            signing_input.extend_from_slice(ctx.as_bytes());
            signing_input.push(b'\n');
        }
        
        // Add message hash for large messages
        let message_hash = if message.len() > 1024 {
            let hash = Sha3_256::digest(message);
            signing_input.extend_from_slice(b"MessageHash:");
            signing_input.extend_from_slice(&hash);
            hash.to_vec()
        } else {
            signing_input.extend_from_slice(b"Message:");
            signing_input.extend_from_slice(message);
            Vec::new()
        };
        
        // Sign the complete input
        let signature = match self.security_level {
            2 => Dilithium2::sign(&self.private_key, &signing_input)?,
            3 => Dilithium3::sign(&self.private_key, &signing_input)?,
            5 => Dilithium5::sign(&self.private_key, &signing_input)?,
            _ => return Err(Error::InvalidSecurityLevel),
        };
        
        self.signing_counter += 1;
        
        Ok(SignatureEnvelope {
            signature,
            metadata: SignatureMetadata {
                context: self.context.clone(),
                additional_context: additional_context.map(|s| s.to_string()),
                security_level: self.security_level,
                counter: self.signing_counter - 1,
                timestamp: ts,
                message_hash: if message_hash.is_empty() { None } else { Some(message_hash) },
            },
        })
    }
    
    pub fn sign_document(
        &mut self,
        document: &Document,
    ) -> Result<SignedDocument, Error> {
        // Create canonical representation
        let canonical_doc = document.canonicalize()?;
        
        // Extract metadata for signature
        let doc_metadata = DocumentMetadata {
            title: document.title.clone(),
            version: document.version,
            created_at: document.created_at,
            hash: Sha3_256::digest(&canonical_doc).to_vec(),
        };
        
        // Sign document with metadata
        let signature_envelope = self.sign_with_metadata(
            &canonical_doc,
            Some(Utc::now()),
            Some(&format!("Document:{}", document.title)),
        )?;
        
        Ok(SignedDocument {
            document: document.clone(),
            signature: signature_envelope,
            doc_metadata,
        })
    }
}

#[derive(Debug, Clone)]
pub struct SignatureEnvelope {
    pub signature: Signature,
    pub metadata: SignatureMetadata,
}

#[derive(Debug, Clone)]
pub struct SignatureMetadata {
    pub context: String,
    pub additional_context: Option<String>,
    pub security_level: u8,
    pub counter: u64,
    pub timestamp: DateTime<Utc>,
    pub message_hash: Option<Vec<u8>>,
}

/// Batch signing for efficiency
pub struct BatchDilithiumSigner {
    signer: DilithiumSigner,
}

impl BatchDilithiumSigner {
    pub fn sign_batch(
        &mut self,
        messages: &[&[u8]],
        shared_context: &str,
    ) -> Result<Vec<SignatureEnvelope>, Error> {
        let mut signatures = Vec::with_capacity(messages.len());
        
        for (i, message) in messages.iter().enumerate() {
            let additional_context = format!("{}:BatchItem:{}", shared_context, i);
            let signature = self.signer.sign_with_metadata(
                message,
                Some(Utc::now()),
                Some(&additional_context),
            )?;
            signatures.push(signature);
        }
        
        Ok(signatures)
    }
    
    pub fn sign_merkle_batch(
        &mut self,
        leaves: &[&[u8]],
    ) -> Result<MerkleSignature, Error> {
        // Build Merkle tree
        let tree = MerkleTree::new(leaves)?;
        
        // Sign Merkle root
        let root_signature = self.signer.sign_with_metadata(
            &tree.root(),
            Some(Utc::now()),
            Some("MerkleTreeRoot"),
        )?;
        
        Ok(MerkleSignature {
            tree,
            root_signature,
        })
    }
}

dilithium_verify(public_key: bytes, message: bytes, signature: bytes, security_level: int) -> bool

Security Contract:

Security Requirements:

Secure Usage Example:

class DilithiumVerifier:
    """Secure Dilithium signature verification"""
    
    def __init__(self):
        self.trusted_keys = {}
        self.verification_cache = {}
        self.max_cache_size = 10000
        
    def add_trusted_public_key(
        self,
        key_id: str,
        public_key: bytes,
        security_level: int,
        metadata: dict
    ):
        """Add trusted public key with validation"""
        
        # Validate security level
        if security_level not in [2, 3, 5]:
            raise ValueError(f"Invalid Dilithium security level: {security_level}")
        
        # Validate key size
        expected_sizes = {2: 1312, 3: 1952, 5: 2592}
        if len(public_key) != expected_sizes[security_level]:
            raise ValueError(f"Invalid key size for Dilithium{security_level}")
        
        # Validate key structure (implementation specific)
        if not self._validate_public_key_structure(public_key, security_level):
            raise ValueError("Invalid public key structure")
        
        self.trusted_keys[key_id] = {
            'public_key': public_key,
            'security_level': security_level,
            'metadata': metadata,
            'added_at': time.time()
        }
    
    def verify_signature_envelope(
        self,
        envelope: dict,
        key_id: str,
        original_message: bytes
    ) -> bool:
        """Verify complete signature envelope"""
        
        if key_id not in self.trusted_keys:
            return False
        
        key_info = self.trusted_keys[key_id]
        
        # Reconstruct signing input
        signing_input = self._reconstruct_signing_input(
            envelope, original_message
        )
        
        # Verify signature
        signature = bytes.fromhex(envelope['signature'])
        
        return self._dilithium_verify_core(
            key_info['public_key'],
            signing_input,
            signature,
            key_info['security_level']
        )
    
    def _reconstruct_signing_input(
        self,
        envelope: dict,
        original_message: bytes
    ) -> bytes:
        """Reconstruct signing input from envelope"""
        
        signing_input = bytearray()
        
        # Add context
        if 'context' in envelope:
            signing_input.extend(b"DilithiumSignature:")
            signing_input.extend(envelope['context'].encode())
            signing_input.extend(b"\n")
        
        # Add security level
        if 'security_level' in envelope:
            signing_input.extend(b"SecurityLevel:")
            signing_input.extend(str(envelope['security_level']).encode())
            signing_input.extend(b"\n")
        
        # Add counter
        if 'counter' in envelope:
            signing_input.extend(b"Counter:")
            signing_input.extend(envelope['counter'].to_bytes(8, 'little'))
            signing_input.extend(b"\n")
        
        # Add timestamp
        if 'timestamp' in envelope:
            signing_input.extend(b"Timestamp:")
            signing_input.extend(str(envelope['timestamp']).encode())
            signing_input.extend(b"\n")
        
        # Add additional context
        if 'additional_context' in envelope:
            signing_input.extend(b"AdditionalContext:")
            signing_input.extend(envelope['additional_context'].encode())
            signing_input.extend(b"\n")
        
        # Add message or message hash
        if 'message_hash' in envelope:
            message_hash = bytes.fromhex(envelope['message_hash'])
            computed_hash = hashlib.sha256(original_message).digest()
            
            if message_hash != computed_hash:
                return b""  # Hash mismatch
            
            signing_input.extend(b"MessageHash:")
            signing_input.extend(message_hash)
        else:
            signing_input.extend(b"Message:")
            signing_input.extend(original_message)
        
        return bytes(signing_input)
    
    def batch_verify(
        self,
        verifications: list
    ) -> list:
        """Batch verify multiple signatures efficiently"""
        
        results = []
        
        for verification in verifications:
            key_id = verification['key_id']
            message = verification['message']
            envelope = verification['envelope']
            
            # Use caching for performance
            cache_key = self._compute_cache_key(key_id, message, envelope)
            
            if cache_key in self.verification_cache:
                result = self.verification_cache[cache_key]
            else:
                result = self.verify_signature_envelope(envelope, key_id, message)
                
                # Cache result
                if len(self.verification_cache) < self.max_cache_size:
                    self.verification_cache[cache_key] = result
            
            results.append(result)
        
        return results
    
    def verify_document_signature(
        self,
        signed_document: dict,
        key_id: str
    ) -> dict:
        """Verify document signature with detailed result"""
        
        document = signed_document['document']
        signature_envelope = signed_document['signature_envelope']
        doc_metadata = signed_document['doc_metadata']
        
        # Canonicalize document
        canonical_doc = self._canonicalize_document(document)
        
        # Verify document hash
        computed_hash = hashlib.sha256(canonical_doc).digest()
        stored_hash = bytes.fromhex(doc_metadata['hash'])
        
        if computed_hash != stored_hash:
            return {
                'valid': False,
                'error': 'Document hash mismatch',
                'details': {
                    'computed_hash': computed_hash.hex(),
                    'stored_hash': stored_hash.hex()
                }
            }
        
        # Verify signature
        signature_valid = self.verify_signature_envelope(
            signature_envelope,
            key_id,
            canonical_doc
        )
        
        return {
            'valid': signature_valid,
            'document_hash_valid': True,
            'signature_metadata': signature_envelope.get('metadata', {}),
            'verification_timestamp': time.time()
        }
    
    def _validate_public_key_structure(
        self,
        public_key: bytes,
        security_level: int
    ) -> bool:
        """Validate Dilithium public key structure"""
        
        # Implementation would check:
        # - Polynomial coefficients are in valid range
        # - Matrix dimensions match security level
        # - Key follows Dilithium format specification
        
        # Placeholder implementation
        return True
    
    def _dilithium_verify_core(
        self,
        public_key: bytes,
        message: bytes,
        signature: bytes,
        security_level: int
    ) -> bool:
        """Core Dilithium verification"""
        
        from dilithium_crypto import dilithium_verify
        
        try:
            return dilithium_verify(
                public_key,
                message,
                signature,
                security_level
            )
        except Exception:
            return False
    
    def _compute_cache_key(self, key_id: str, message: bytes, envelope: dict) -> str:
        """Compute cache key for verification result"""
        
        import json
        
        cache_data = {
            'key_id': key_id,
            'message_hash': hashlib.sha256(message).hexdigest(),
            'envelope': envelope
        }
        
        cache_json = json.dumps(cache_data, sort_keys=True)
        return hashlib.sha256(cache_json.encode()).hexdigest()[:32]

Security Best Practices

Certificate Authority with Dilithium

class DilithiumCertificateAuthority:
    """Post-quantum CA using Dilithium"""
    
    def __init__(self, ca_key_manager: DilithiumKeyManager, ca_cert: dict):
        self.key_manager = ca_key_manager
        self.ca_cert = ca_cert
        self.issued_certificates = {}
        self.certificate_templates = {}
        
    def create_certificate_template(
        self,
        template_name: str,
        template_config: dict
    ):
        """Create certificate template for consistent issuance"""
        
        self.certificate_templates[template_name] = {
            'security_level': template_config.get('security_level', 3),
            'validity_years': template_config.get('validity_years', 2),
            'key_usage': template_config.get('key_usage', ['digital_signature']),
            'extended_key_usage': template_config.get('extended_key_usage', []),
            'subject_constraints': template_config.get('subject_constraints', {}),
            'extensions': template_config.get('extensions', {})
        }
    
    def issue_certificate(
        self,
        csr: dict,
        template_name: str,
        ca_key_id: str
    ) -> dict:
        """Issue post-quantum certificate using Dilithium"""
        
        if template_name not in self.certificate_templates:
            raise ValueError(f"Unknown template: {template_name}")
        
        template = self.certificate_templates[template_name]
        
        # Validate CSR
        if not self._validate_csr(csr, template):
            raise ValueError("CSR validation failed")
        
        # Create certificate structure
        cert_data = {
            'version': 3,
            'serial_number': self._generate_serial_number(),
            'issuer': self.ca_cert['subject'],
            'subject': csr['subject'],
            'public_key': {
                'algorithm': f"Dilithium{template['security_level']}",
                'key': csr['public_key'],
                'security_level': template['security_level']
            },
            'validity': {
                'not_before': int(time.time()),
                'not_after': int(time.time()) + (template['validity_years'] * 365 * 24 * 3600)
            },
            'extensions': {
                'key_usage': template['key_usage'],
                'extended_key_usage': template['extended_key_usage'],
                **template['extensions']
            }
        }
        
        # Sign certificate
        ca_signing_key = self.key_manager.get_signing_key(
            ca_key_id,
            'X.509-Certificate-v3'
        )
        
        if not ca_signing_key:
            raise ValueError("CA signing key not available")
        
        # Serialize and sign
        cert_bytes = self._serialize_certificate(cert_data)
        signature_envelope = ca_signing_key.sign(
            cert_bytes,
            'X.509-Certificate-v3'
        )
        
        # Complete certificate
        certificate = {
            **cert_data,
            'signature': signature_envelope
        }
        
        # Store issued certificate
        serial = cert_data['serial_number']
        self.issued_certificates[serial] = {
            'certificate': certificate,
            'issued_at': time.time(),
            'template_used': template_name,
            'ca_key_id': ca_key_id
        }
        
        return certificate

Multi-Level Security Architecture

/// Multi-level security with different Dilithium variants
pub struct MultiLevelSecurity {
    level2_signer: Option<DilithiumSigner>,  // Fast operations
    level3_signer: Option<DilithiumSigner>,  // Balanced security
    level5_signer: Option<DilithiumSigner>,  // High security
}

impl MultiLevelSecurity {
    pub fn new() -> Self {
        Self {
            level2_signer: None,
            level3_signer: None,
            level5_signer: None,
        }
    }
    
    pub fn sign_by_classification(
        &mut self,
        message: &[u8],
        classification: SecurityClassification,
    ) -> Result<SignatureEnvelope, Error> {
        match classification {
            SecurityClassification::Internal => {
                let signer = self.level2_signer.as_mut()
                    .ok_or(Error::SignerNotAvailable)?;
                signer.sign_with_metadata(message, None, Some("Internal"))
            },
            SecurityClassification::Confidential => {
                let signer = self.level3_signer.as_mut()
                    .ok_or(Error::SignerNotAvailable)?;
                signer.sign_with_metadata(message, None, Some("Confidential"))
            },
            SecurityClassification::Secret => {
                let signer = self.level5_signer.as_mut()
                    .ok_or(Error::SignerNotAvailable)?;
                signer.sign_with_metadata(message, None, Some("Secret"))
            },
        }
    }
}

#[derive(Debug, Clone, Copy)]
pub enum SecurityClassification {
    Internal,    // Dilithium2
    Confidential,// Dilithium3
    Secret,      // Dilithium5
}

Common Integration Patterns

Blockchain with Post-Quantum Security

def dilithium_blockchain_integration():
    """Example of Dilithium in blockchain"""
    
    class PostQuantumBlock:
        def __init__(self, previous_hash: str, transactions: list):
            self.previous_hash = previous_hash
            self.transactions = transactions
            self.timestamp = int(time.time())
            self.nonce = secrets.token_bytes(32).hex()
            
        def compute_merkle_root(self) -> str:
            """Compute Merkle root of transactions"""
            
            if not self.transactions:
                return hashlib.sha256(b"empty").hexdigest()
            
            # Hash all transactions
            tx_hashes = [
                hashlib.sha256(json.dumps(tx, sort_keys=True).encode()).digest()
                for tx in self.transactions
            ]
            
            # Build Merkle tree
            while len(tx_hashes) > 1:
                next_level = []
                for i in range(0, len(tx_hashes), 2):
                    if i + 1 < len(tx_hashes):
                        combined = tx_hashes[i] + tx_hashes[i + 1]
                    else:
                        combined = tx_hashes[i]
                    next_level.append(hashlib.sha256(combined).digest())
                tx_hashes = next_level
            
            return tx_hashes[0].hex()
        
        def sign_block(self, dilithium_signer) -> dict:
            """Sign block with Dilithium"""
            
            # Create block header
            header = {
                'previous_hash': self.previous_hash,
                'merkle_root': self.compute_merkle_root(),
                'timestamp': self.timestamp,
                'nonce': self.nonce
            }
            
            # Serialize and sign
            header_bytes = json.dumps(header, sort_keys=True).encode()
            signature = dilithium_signer.sign(
                header_bytes,
                'Blockchain-Block-v1'
            )
            
            return {
                'header': header,
                'transactions': self.transactions,
                'signature': signature
            }

Performance Considerations

Operation Dilithium2 Dilithium3 Dilithium5 Notes
Key Generation ~2.5ms ~4.2ms ~6.8ms One-time cost
Signing ~3.1ms ~4.9ms ~8.2ms Deterministic
Verification ~1.4ms ~2.1ms ~3.5ms Fast verification
Public Key 1312 bytes 1952 bytes 2592 bytes Moderate size
Signature ~2420 bytes ~3293 bytes ~4595 bytes Larger than classical

Optimization Strategies

Security Auditing

Verification Checklist

Common Vulnerabilities

# AUDIT: Look for these patterns

# ❌ Wrong security level
# Using Dilithium2 for 20-year protection
signer = DilithiumSigner(level=2)  # Too weak for long-term!

# ❌ No domain separation
signature = dilithium_sign(key, message)  # Missing context

# ❌ Insecure key storage
pickle.dump(private_key, open('key.pkl', 'wb'))  # Unencrypted!

# ❌ Side-channel vulnerable implementation
# Timing attacks on polynomial operations

# ❌ Improper signature format validation
if len(signature) > 0:  # Should validate proper format
    verify_signature(...)

Security Analysis

Threat Model: Dilithium (ML-DSA-65) Threat Model

The comprehensive threat analysis covers:

For complete security analysis and risk assessment, see the dedicated threat model documentation.

References

  1. NIST PQC Standardization - Dilithium
  2. Dilithium Specification
  3. NIST SP 800-208 - PQC Recommendation

Support

Security Issues: security@metamui.id
Documentation Updates: phantom@metamui.id
Vulnerability Disclosure: See SECURITY.md


Document Version: 1.0
Review Cycle: Quarterly
Next Review: 2025-04-05
Classification: PUBLIC