Dilithium Security-Focused API Documentation
Version: 1.0
Last Updated: 2025-07-05
**Security Classification: PUBLIC
Author: Phantom (phantom@metamui.id)
Overview
Dilithium is a post-quantum digital signature scheme based on the hardness of lattice problems, specifically the Module Learning With Errors (M-LWE) problem. Selected as a NIST post-quantum cryptography standard, Dilithium provides strong security against both classical and quantum attacks with reasonable performance and signature sizes.
Security Level: Multiple levels (Dilithium2: 128-bit, Dilithium3: 192-bit, Dilithium5: 256-bit)
Algorithm Family: Module-LWE lattice-based
Signature Type: Fiat-Shamir with aborts
Key Sizes: Variable by security level
Quantum Security: Strong lattice-based resistance
Security Level Variants
| Variant | Classical Security | Quantum Security | Public Key | Private Key | Signature |
|---|---|---|---|---|---|
| Dilithium2 | 128-bit | ~2^64 | 1312 bytes | 2528 bytes | ~2420 bytes |
| Dilithium3 | 192-bit | ~2^96 | 1952 bytes | 4000 bytes | ~3293 bytes |
| Dilithium5 | 256-bit | ~2^128 | 2592 bytes | 4864 bytes | ~4595 bytes |
Security Warnings ⚠️
- Post-Quantum Security: Designed specifically for quantum-resistant security
- Deterministic Signatures: Unlike Falcon, Dilithium produces deterministic signatures
- Side-Channel Awareness: Implementation must protect against timing/power attacks
- Security Level Choice: Select appropriate variant for security requirements
- Key Reuse Safe: More tolerant of key reuse than some alternatives
API Functions
dilithium_keygen(security_level: int) -> (PublicKey, PrivateKey)
Security Contract:
- Preconditions:
security_levelmust be 2, 3, or 5- System CSPRNG must be available
- Sufficient entropy for polynomial sampling
- Postconditions:
- Returns mathematically valid key pair
- Private key enables signing, public key enables verification
- Keys are statistically independent
Attack Resistance: | Attack Type | Protected | Notes | |————-|———–|——-| | Quantum (Shor’s) | ✅ | Lattice-based security | | Quantum (Grover’s) | ✅ | Security levels maintained | | Classical Lattice | ✅ | M-LWE hardness assumption | | Side-Channel | ⚠️ | Implementation dependent | | Key Recovery | ✅ | Provable lattice security | | Deterministic Forgery | ✅ | Fiat-Shamir security |
Security Requirements:
- Use cryptographically secure randomness
- Protect polynomial sampling from observation
- Implement rejection sampling correctly
- Secure key storage and handling
Secure Usage Example:
import secrets
import hashlib
import json
import time
from typing import Tuple, Dict, Optional
from enum import Enum
class DilithiumLevel(Enum):
LEVEL2 = 2 # 128-bit security
LEVEL3 = 3 # 192-bit security
LEVEL5 = 5 # 256-bit security
class SecureDilithium:
"""Secure Dilithium implementation with best practices"""
KEY_SIZES = {
DilithiumLevel.LEVEL2: {'public': 1312, 'private': 2528, 'signature': 2420},
DilithiumLevel.LEVEL3: {'public': 1952, 'private': 4000, 'signature': 3293},
DilithiumLevel.LEVEL5: {'public': 2592, 'private': 4864, 'signature': 4595}
}
def __init__(self, security_level: DilithiumLevel):
self.security_level = security_level
self.key_pair = None
self.generation_entropy = None
def generate_keypair_with_audit(
self,
additional_entropy: Optional[bytes] = None
) -> Dict:
"""Generate Dilithium key pair with entropy auditing"""
# Collect entropy from multiple sources
entropy_sources = []
# System CSPRNG
system_entropy = secrets.token_bytes(64)
entropy_sources.append(('system', system_entropy))
# High-resolution timer
timer_entropy = str(time.perf_counter_ns()).encode()
timer_hash = hashlib.sha256(timer_entropy).digest()
entropy_sources.append(('timer', timer_hash))
# Process ID and thread info
import os, threading
process_entropy = f"{os.getpid()}:{threading.get_ident()}".encode()
process_hash = hashlib.sha256(process_entropy).digest()
entropy_sources.append(('process', process_hash))
# Additional entropy if provided
if additional_entropy:
entropy_sources.append(('additional', additional_entropy[:64]))
# Combine all entropy sources
combined_entropy = bytearray()
for source_name, entropy in entropy_sources:
combined_entropy.extend(f"{source_name}:".encode())
combined_entropy.extend(entropy)
# Generate final seed with SHAKE256
final_seed = hashlib.shake_256(
b"Dilithium-KeyGen-v1:" + bytes(combined_entropy)
).digest(64)
# Generate key pair
public_key, private_key = self._dilithium_keygen_from_seed(final_seed)
# Store key pair and generation info
self.key_pair = {
'public_key': public_key,
'private_key': private_key,
'security_level': self.security_level.value,
'generated_at': time.time(),
'algorithm': f'Dilithium{self.security_level.value}'
}
# Store entropy info for auditing
self.generation_entropy = {
'sources': [name for name, _ in entropy_sources],
'total_entropy_bits': len(combined_entropy) * 8,
'final_seed_size': len(final_seed)
}
# Clear sensitive data
for i in range(len(combined_entropy)):
combined_entropy[i] = 0
for i in range(len(final_seed)):
final_seed[i] = 0
return {
'public_key': public_key.hex(),
'key_id': self._compute_key_id(public_key),
'security_level': self.security_level.value,
'algorithm': f'Dilithium{self.security_level.value}',
'generated_at': self.key_pair['generated_at'],
'entropy_audit': self.generation_entropy
}
def _dilithium_keygen_from_seed(self, seed: bytes) -> Tuple[bytes, bytes]:
"""Generate Dilithium keys from seed (implementation placeholder)"""
# This would call the actual Dilithium implementation
# Implementation must include:
# - Proper polynomial sampling
# - Rejection sampling for uniform distribution
# - Side-channel protections
from dilithium_crypto import dilithium_keygen_seeded
return dilithium_keygen_seeded(self.security_level.value, seed)
def _compute_key_id(self, public_key: bytes) -> str:
"""Compute stable identifier for public key"""
key_hash = hashlib.sha256(
f"Dilithium{self.security_level.value}-KeyID:".encode() + public_key
).digest()
return key_hash[:20].hex() # 160-bit key ID
def export_public_key_info(self) -> Dict:
"""Export public key with security information"""
if not self.key_pair:
raise ValueError("No key pair generated")
public_key = self.key_pair['public_key']
expected_size = self.KEY_SIZES[self.security_level]['public']
if len(public_key) != expected_size:
raise ValueError(f"Invalid public key size for Dilithium{self.security_level.value}")
return {
'public_key': public_key.hex(),
'algorithm': f'Dilithium{self.security_level.value}',
'security_level': self.security_level.value,
'key_size': len(public_key),
'classical_security_bits': {2: 128, 3: 192, 5: 256}[self.security_level.value],
'quantum_security_bits': {2: 64, 3: 96, 5: 128}[self.security_level.value],
'key_id': self._compute_key_id(public_key),
'usage_recommendations': self._get_usage_recommendations()
}
def _get_usage_recommendations(self) -> Dict:
"""Get usage recommendations for security level"""
recommendations = {
DilithiumLevel.LEVEL2: {
'use_cases': ['IoT devices', 'short-term security', 'performance-critical'],
'max_signatures': 2**32,
'recommended_expiry_years': 5
},
DilithiumLevel.LEVEL3: {
'use_cases': ['general purpose', 'medium-term security', 'balanced performance'],
'max_signatures': 2**40,
'recommended_expiry_years': 10
},
DilithiumLevel.LEVEL5: {
'use_cases': ['high security', 'long-term protection', 'critical infrastructure'],
'max_signatures': 2**50,
'recommended_expiry_years': 20
}
}
return recommendations[self.security_level]
# SECURE: Enterprise key management
class DilithiumKeyManager:
def __init__(self, default_security_level: DilithiumLevel = DilithiumLevel.LEVEL3):
self.default_level = default_security_level
self.managed_keys = {}
self.key_policies = {}
def create_managed_key(
self,
key_id: str,
purpose: str,
security_level: Optional[DilithiumLevel] = None,
custom_policy: Optional[Dict] = None
) -> Dict:
"""Create managed key with policy enforcement"""
level = security_level or self.default_level
# Create Dilithium instance
dilithium = SecureDilithium(level)
key_info = dilithium.generate_keypair_with_audit()
# Define key policy
policy = custom_policy or self._get_default_policy(level, purpose)
# Store managed key
self.managed_keys[key_id] = {
'dilithium': dilithium,
'key_info': key_info,
'purpose': purpose,
'created_at': time.time(),
'signature_count': 0,
'policy': policy,
'status': 'active'
}
self.key_policies[key_id] = policy
return {
'key_id': key_id,
'public_key_info': key_info,
'policy': policy
}
def _get_default_policy(self, level: DilithiumLevel, purpose: str) -> Dict:
"""Get default policy for key"""
base_policies = {
'code_signing': {
'max_signatures_per_day': 1000,
'require_multi_party_approval': True,
'allowed_signature_contexts': ['CodeSigning-v1'],
'max_key_age_days': 365
},
'document_signing': {
'max_signatures_per_day': 10000,
'require_multi_party_approval': False,
'allowed_signature_contexts': ['DocumentSigning-v1'],
'max_key_age_days': 730
},
'api_authentication': {
'max_signatures_per_day': 1000000,
'require_multi_party_approval': False,
'allowed_signature_contexts': ['API-Auth-v1'],
'max_key_age_days': 180
}
}
return base_policies.get(purpose, base_policies['document_signing'])
def get_signing_key(self, key_id: str, context: str) -> Optional['ManagedSigningKey']:
"""Get key for signing with policy enforcement"""
if key_id not in self.managed_keys:
return None
key_data = self.managed_keys[key_id]
policy = key_data['policy']
# Check key status
if key_data['status'] != 'active':
return None
# Check context permission
if context not in policy['allowed_signature_contexts']:
return None
# Check daily signature limit
today = time.time() // (24 * 3600)
daily_count = key_data.get(f'signatures_{today}', 0)
if daily_count >= policy['max_signatures_per_day']:
return None
# Check key age
age_days = (time.time() - key_data['created_at']) / (24 * 3600)
if age_days > policy['max_key_age_days']:
return None
return ManagedSigningKey(key_id, key_data['dilithium'], self)
def record_signature_usage(self, key_id: str):
"""Record signature usage for policy enforcement"""
if key_id not in self.managed_keys:
return
key_data = self.managed_keys[key_id]
# Update signature count
key_data['signature_count'] += 1
# Update daily count
today = time.time() // (24 * 3600)
daily_key = f'signatures_{today}'
key_data[daily_key] = key_data.get(daily_key, 0) + 1
class ManagedSigningKey:
def __init__(self, key_id: str, dilithium: SecureDilithium, manager: DilithiumKeyManager):
self.key_id = key_id
self.dilithium = dilithium
self.manager = manager
def sign(self, message: bytes, context: str) -> Dict:
"""Sign with managed key"""
# Perform signing (implementation would call actual Dilithium)
signature = self._dilithium_sign(message, context)
# Record usage
self.manager.record_signature_usage(self.key_id)
return signature
def _dilithium_sign(self, message: bytes, context: str) -> Dict:
"""Implementation placeholder for Dilithium signing"""
# This would call the actual implementation
private_key = self.dilithium.key_pair['private_key']
# Build signing input with context
signing_input = f"Context:{context}\n".encode() + message
from dilithium_crypto import dilithium_sign
signature = dilithium_sign(
private_key,
signing_input,
self.dilithium.security_level.value
)
return {
'signature': signature.hex(),
'algorithm': f'Dilithium{self.dilithium.security_level.value}',
'context': context,
'key_id': self.key_id,
'timestamp': int(time.time())
}
Common Mistakes:
# INSECURE: Wrong security level selection
# Using Dilithium2 for long-term cryptographic protection
dilithium = SecureDilithium(DilithiumLevel.LEVEL2) # Too weak for long-term!
# INSECURE: No entropy validation
private_key, public_key = dilithium_keygen(3) # No entropy source verification
# INSECURE: Plaintext key storage
with open('private_key.bin', 'wb') as f:
f.write(private_key) # Unencrypted!
# INSECURE: No context in signatures
signature = dilithium_sign(private_key, message) # Missing domain separation
# INSECURE: Ignoring signature size variations
# Not accounting for variable signature sizes in protocols
dilithium_sign(private_key: bytes, message: bytes, security_level: int) -> bytes
Security Contract:
- Preconditions:
private_keymust be valid Dilithium private keymessagecan be any lengthsecurity_levelmust match key generation level
- Postconditions:
- Returns valid deterministic signature
- Signature size depends on security level
- Same message produces same signature
Attack Resistance: | Attack Type | Protected | Notes | |————-|———–|——-| | Existential Forgery | ✅ | M-LWE hardness | | Key Recovery | ✅ | Lattice-based security | | Quantum Attacks | ✅ | Post-quantum secure | | Side-Channel | ⚠️ | Implementation dependent | | Replay | ❌ | Add timestamps/nonces |
Security Requirements:
- Include domain separation in message
- Protect signing process from observation
- Validate private key format
- Use appropriate security level
Secure Usage Example:
use dilithium::{Dilithium2, Dilithium3, Dilithium5, PrivateKey, Signature};
use sha3::{Sha3_256, Digest};
use chrono::{DateTime, Utc};
/// Secure Dilithium signing with metadata
pub struct DilithiumSigner {
private_key: PrivateKey,
security_level: u8,
signing_counter: u64,
context: String,
}
impl DilithiumSigner {
pub fn new(private_key: PrivateKey, security_level: u8, context: String) -> Self {
Self {
private_key,
security_level,
signing_counter: 0,
context,
}
}
pub fn sign_with_metadata(
&mut self,
message: &[u8],
timestamp: Option<DateTime<Utc>>,
additional_context: Option<&str>,
) -> Result<SignatureEnvelope, Error> {
// Build comprehensive signing input
let mut signing_input = Vec::new();
// Add primary context
signing_input.extend_from_slice(b"DilithiumSignature:");
signing_input.extend_from_slice(self.context.as_bytes());
signing_input.push(b'\n');
// Add security level
signing_input.extend_from_slice(b"SecurityLevel:");
signing_input.push(b'0' + self.security_level);
signing_input.push(b'\n');
// Add signing counter
signing_input.extend_from_slice(b"Counter:");
signing_input.extend_from_slice(&self.signing_counter.to_le_bytes());
signing_input.push(b'\n');
// Add timestamp if provided
let ts = timestamp.unwrap_or_else(Utc::now);
signing_input.extend_from_slice(b"Timestamp:");
signing_input.extend_from_slice(ts.timestamp().to_string().as_bytes());
signing_input.push(b'\n');
// Add additional context if provided
if let Some(ctx) = additional_context {
signing_input.extend_from_slice(b"AdditionalContext:");
signing_input.extend_from_slice(ctx.as_bytes());
signing_input.push(b'\n');
}
// Add message hash for large messages
let message_hash = if message.len() > 1024 {
let hash = Sha3_256::digest(message);
signing_input.extend_from_slice(b"MessageHash:");
signing_input.extend_from_slice(&hash);
hash.to_vec()
} else {
signing_input.extend_from_slice(b"Message:");
signing_input.extend_from_slice(message);
Vec::new()
};
// Sign the complete input
let signature = match self.security_level {
2 => Dilithium2::sign(&self.private_key, &signing_input)?,
3 => Dilithium3::sign(&self.private_key, &signing_input)?,
5 => Dilithium5::sign(&self.private_key, &signing_input)?,
_ => return Err(Error::InvalidSecurityLevel),
};
self.signing_counter += 1;
Ok(SignatureEnvelope {
signature,
metadata: SignatureMetadata {
context: self.context.clone(),
additional_context: additional_context.map(|s| s.to_string()),
security_level: self.security_level,
counter: self.signing_counter - 1,
timestamp: ts,
message_hash: if message_hash.is_empty() { None } else { Some(message_hash) },
},
})
}
pub fn sign_document(
&mut self,
document: &Document,
) -> Result<SignedDocument, Error> {
// Create canonical representation
let canonical_doc = document.canonicalize()?;
// Extract metadata for signature
let doc_metadata = DocumentMetadata {
title: document.title.clone(),
version: document.version,
created_at: document.created_at,
hash: Sha3_256::digest(&canonical_doc).to_vec(),
};
// Sign document with metadata
let signature_envelope = self.sign_with_metadata(
&canonical_doc,
Some(Utc::now()),
Some(&format!("Document:{}", document.title)),
)?;
Ok(SignedDocument {
document: document.clone(),
signature: signature_envelope,
doc_metadata,
})
}
}
#[derive(Debug, Clone)]
pub struct SignatureEnvelope {
pub signature: Signature,
pub metadata: SignatureMetadata,
}
#[derive(Debug, Clone)]
pub struct SignatureMetadata {
pub context: String,
pub additional_context: Option<String>,
pub security_level: u8,
pub counter: u64,
pub timestamp: DateTime<Utc>,
pub message_hash: Option<Vec<u8>>,
}
/// Batch signing for efficiency
pub struct BatchDilithiumSigner {
signer: DilithiumSigner,
}
impl BatchDilithiumSigner {
pub fn sign_batch(
&mut self,
messages: &[&[u8]],
shared_context: &str,
) -> Result<Vec<SignatureEnvelope>, Error> {
let mut signatures = Vec::with_capacity(messages.len());
for (i, message) in messages.iter().enumerate() {
let additional_context = format!("{}:BatchItem:{}", shared_context, i);
let signature = self.signer.sign_with_metadata(
message,
Some(Utc::now()),
Some(&additional_context),
)?;
signatures.push(signature);
}
Ok(signatures)
}
pub fn sign_merkle_batch(
&mut self,
leaves: &[&[u8]],
) -> Result<MerkleSignature, Error> {
// Build Merkle tree
let tree = MerkleTree::new(leaves)?;
// Sign Merkle root
let root_signature = self.signer.sign_with_metadata(
&tree.root(),
Some(Utc::now()),
Some("MerkleTreeRoot"),
)?;
Ok(MerkleSignature {
tree,
root_signature,
})
}
}
dilithium_verify(public_key: bytes, message: bytes, signature: bytes, security_level: int) -> bool
Security Contract:
- Preconditions:
public_keymust be valid Dilithium public keymessageis the original signed messagesignatureis putative Dilithium signaturesecurity_levelmust match key type
- Postconditions:
- Returns true if signature is valid
- Returns false for any invalid signature
- Verification is deterministic
Security Requirements:
- Validate all input formats before processing
- Use constant-time comparison operations
- Check signature bounds and structure
- Verify security level consistency
Secure Usage Example:
class DilithiumVerifier:
"""Secure Dilithium signature verification"""
def __init__(self):
self.trusted_keys = {}
self.verification_cache = {}
self.max_cache_size = 10000
def add_trusted_public_key(
self,
key_id: str,
public_key: bytes,
security_level: int,
metadata: dict
):
"""Add trusted public key with validation"""
# Validate security level
if security_level not in [2, 3, 5]:
raise ValueError(f"Invalid Dilithium security level: {security_level}")
# Validate key size
expected_sizes = {2: 1312, 3: 1952, 5: 2592}
if len(public_key) != expected_sizes[security_level]:
raise ValueError(f"Invalid key size for Dilithium{security_level}")
# Validate key structure (implementation specific)
if not self._validate_public_key_structure(public_key, security_level):
raise ValueError("Invalid public key structure")
self.trusted_keys[key_id] = {
'public_key': public_key,
'security_level': security_level,
'metadata': metadata,
'added_at': time.time()
}
def verify_signature_envelope(
self,
envelope: dict,
key_id: str,
original_message: bytes
) -> bool:
"""Verify complete signature envelope"""
if key_id not in self.trusted_keys:
return False
key_info = self.trusted_keys[key_id]
# Reconstruct signing input
signing_input = self._reconstruct_signing_input(
envelope, original_message
)
# Verify signature
signature = bytes.fromhex(envelope['signature'])
return self._dilithium_verify_core(
key_info['public_key'],
signing_input,
signature,
key_info['security_level']
)
def _reconstruct_signing_input(
self,
envelope: dict,
original_message: bytes
) -> bytes:
"""Reconstruct signing input from envelope"""
signing_input = bytearray()
# Add context
if 'context' in envelope:
signing_input.extend(b"DilithiumSignature:")
signing_input.extend(envelope['context'].encode())
signing_input.extend(b"\n")
# Add security level
if 'security_level' in envelope:
signing_input.extend(b"SecurityLevel:")
signing_input.extend(str(envelope['security_level']).encode())
signing_input.extend(b"\n")
# Add counter
if 'counter' in envelope:
signing_input.extend(b"Counter:")
signing_input.extend(envelope['counter'].to_bytes(8, 'little'))
signing_input.extend(b"\n")
# Add timestamp
if 'timestamp' in envelope:
signing_input.extend(b"Timestamp:")
signing_input.extend(str(envelope['timestamp']).encode())
signing_input.extend(b"\n")
# Add additional context
if 'additional_context' in envelope:
signing_input.extend(b"AdditionalContext:")
signing_input.extend(envelope['additional_context'].encode())
signing_input.extend(b"\n")
# Add message or message hash
if 'message_hash' in envelope:
message_hash = bytes.fromhex(envelope['message_hash'])
computed_hash = hashlib.sha256(original_message).digest()
if message_hash != computed_hash:
return b"" # Hash mismatch
signing_input.extend(b"MessageHash:")
signing_input.extend(message_hash)
else:
signing_input.extend(b"Message:")
signing_input.extend(original_message)
return bytes(signing_input)
def batch_verify(
self,
verifications: list
) -> list:
"""Batch verify multiple signatures efficiently"""
results = []
for verification in verifications:
key_id = verification['key_id']
message = verification['message']
envelope = verification['envelope']
# Use caching for performance
cache_key = self._compute_cache_key(key_id, message, envelope)
if cache_key in self.verification_cache:
result = self.verification_cache[cache_key]
else:
result = self.verify_signature_envelope(envelope, key_id, message)
# Cache result
if len(self.verification_cache) < self.max_cache_size:
self.verification_cache[cache_key] = result
results.append(result)
return results
def verify_document_signature(
self,
signed_document: dict,
key_id: str
) -> dict:
"""Verify document signature with detailed result"""
document = signed_document['document']
signature_envelope = signed_document['signature_envelope']
doc_metadata = signed_document['doc_metadata']
# Canonicalize document
canonical_doc = self._canonicalize_document(document)
# Verify document hash
computed_hash = hashlib.sha256(canonical_doc).digest()
stored_hash = bytes.fromhex(doc_metadata['hash'])
if computed_hash != stored_hash:
return {
'valid': False,
'error': 'Document hash mismatch',
'details': {
'computed_hash': computed_hash.hex(),
'stored_hash': stored_hash.hex()
}
}
# Verify signature
signature_valid = self.verify_signature_envelope(
signature_envelope,
key_id,
canonical_doc
)
return {
'valid': signature_valid,
'document_hash_valid': True,
'signature_metadata': signature_envelope.get('metadata', {}),
'verification_timestamp': time.time()
}
def _validate_public_key_structure(
self,
public_key: bytes,
security_level: int
) -> bool:
"""Validate Dilithium public key structure"""
# Implementation would check:
# - Polynomial coefficients are in valid range
# - Matrix dimensions match security level
# - Key follows Dilithium format specification
# Placeholder implementation
return True
def _dilithium_verify_core(
self,
public_key: bytes,
message: bytes,
signature: bytes,
security_level: int
) -> bool:
"""Core Dilithium verification"""
from dilithium_crypto import dilithium_verify
try:
return dilithium_verify(
public_key,
message,
signature,
security_level
)
except Exception:
return False
def _compute_cache_key(self, key_id: str, message: bytes, envelope: dict) -> str:
"""Compute cache key for verification result"""
import json
cache_data = {
'key_id': key_id,
'message_hash': hashlib.sha256(message).hexdigest(),
'envelope': envelope
}
cache_json = json.dumps(cache_data, sort_keys=True)
return hashlib.sha256(cache_json.encode()).hexdigest()[:32]
Security Best Practices
Certificate Authority with Dilithium
class DilithiumCertificateAuthority:
"""Post-quantum CA using Dilithium"""
def __init__(self, ca_key_manager: DilithiumKeyManager, ca_cert: dict):
self.key_manager = ca_key_manager
self.ca_cert = ca_cert
self.issued_certificates = {}
self.certificate_templates = {}
def create_certificate_template(
self,
template_name: str,
template_config: dict
):
"""Create certificate template for consistent issuance"""
self.certificate_templates[template_name] = {
'security_level': template_config.get('security_level', 3),
'validity_years': template_config.get('validity_years', 2),
'key_usage': template_config.get('key_usage', ['digital_signature']),
'extended_key_usage': template_config.get('extended_key_usage', []),
'subject_constraints': template_config.get('subject_constraints', {}),
'extensions': template_config.get('extensions', {})
}
def issue_certificate(
self,
csr: dict,
template_name: str,
ca_key_id: str
) -> dict:
"""Issue post-quantum certificate using Dilithium"""
if template_name not in self.certificate_templates:
raise ValueError(f"Unknown template: {template_name}")
template = self.certificate_templates[template_name]
# Validate CSR
if not self._validate_csr(csr, template):
raise ValueError("CSR validation failed")
# Create certificate structure
cert_data = {
'version': 3,
'serial_number': self._generate_serial_number(),
'issuer': self.ca_cert['subject'],
'subject': csr['subject'],
'public_key': {
'algorithm': f"Dilithium{template['security_level']}",
'key': csr['public_key'],
'security_level': template['security_level']
},
'validity': {
'not_before': int(time.time()),
'not_after': int(time.time()) + (template['validity_years'] * 365 * 24 * 3600)
},
'extensions': {
'key_usage': template['key_usage'],
'extended_key_usage': template['extended_key_usage'],
**template['extensions']
}
}
# Sign certificate
ca_signing_key = self.key_manager.get_signing_key(
ca_key_id,
'X.509-Certificate-v3'
)
if not ca_signing_key:
raise ValueError("CA signing key not available")
# Serialize and sign
cert_bytes = self._serialize_certificate(cert_data)
signature_envelope = ca_signing_key.sign(
cert_bytes,
'X.509-Certificate-v3'
)
# Complete certificate
certificate = {
**cert_data,
'signature': signature_envelope
}
# Store issued certificate
serial = cert_data['serial_number']
self.issued_certificates[serial] = {
'certificate': certificate,
'issued_at': time.time(),
'template_used': template_name,
'ca_key_id': ca_key_id
}
return certificate
Multi-Level Security Architecture
/// Multi-level security with different Dilithium variants
pub struct MultiLevelSecurity {
level2_signer: Option<DilithiumSigner>, // Fast operations
level3_signer: Option<DilithiumSigner>, // Balanced security
level5_signer: Option<DilithiumSigner>, // High security
}
impl MultiLevelSecurity {
pub fn new() -> Self {
Self {
level2_signer: None,
level3_signer: None,
level5_signer: None,
}
}
pub fn sign_by_classification(
&mut self,
message: &[u8],
classification: SecurityClassification,
) -> Result<SignatureEnvelope, Error> {
match classification {
SecurityClassification::Internal => {
let signer = self.level2_signer.as_mut()
.ok_or(Error::SignerNotAvailable)?;
signer.sign_with_metadata(message, None, Some("Internal"))
},
SecurityClassification::Confidential => {
let signer = self.level3_signer.as_mut()
.ok_or(Error::SignerNotAvailable)?;
signer.sign_with_metadata(message, None, Some("Confidential"))
},
SecurityClassification::Secret => {
let signer = self.level5_signer.as_mut()
.ok_or(Error::SignerNotAvailable)?;
signer.sign_with_metadata(message, None, Some("Secret"))
},
}
}
}
#[derive(Debug, Clone, Copy)]
pub enum SecurityClassification {
Internal, // Dilithium2
Confidential,// Dilithium3
Secret, // Dilithium5
}
Common Integration Patterns
Blockchain with Post-Quantum Security
def dilithium_blockchain_integration():
"""Example of Dilithium in blockchain"""
class PostQuantumBlock:
def __init__(self, previous_hash: str, transactions: list):
self.previous_hash = previous_hash
self.transactions = transactions
self.timestamp = int(time.time())
self.nonce = secrets.token_bytes(32).hex()
def compute_merkle_root(self) -> str:
"""Compute Merkle root of transactions"""
if not self.transactions:
return hashlib.sha256(b"empty").hexdigest()
# Hash all transactions
tx_hashes = [
hashlib.sha256(json.dumps(tx, sort_keys=True).encode()).digest()
for tx in self.transactions
]
# Build Merkle tree
while len(tx_hashes) > 1:
next_level = []
for i in range(0, len(tx_hashes), 2):
if i + 1 < len(tx_hashes):
combined = tx_hashes[i] + tx_hashes[i + 1]
else:
combined = tx_hashes[i]
next_level.append(hashlib.sha256(combined).digest())
tx_hashes = next_level
return tx_hashes[0].hex()
def sign_block(self, dilithium_signer) -> dict:
"""Sign block with Dilithium"""
# Create block header
header = {
'previous_hash': self.previous_hash,
'merkle_root': self.compute_merkle_root(),
'timestamp': self.timestamp,
'nonce': self.nonce
}
# Serialize and sign
header_bytes = json.dumps(header, sort_keys=True).encode()
signature = dilithium_signer.sign(
header_bytes,
'Blockchain-Block-v1'
)
return {
'header': header,
'transactions': self.transactions,
'signature': signature
}
Performance Considerations
| Operation | Dilithium2 | Dilithium3 | Dilithium5 | Notes |
|---|---|---|---|---|
| Key Generation | ~2.5ms | ~4.2ms | ~6.8ms | One-time cost |
| Signing | ~3.1ms | ~4.9ms | ~8.2ms | Deterministic |
| Verification | ~1.4ms | ~2.1ms | ~3.5ms | Fast verification |
| Public Key | 1312 bytes | 1952 bytes | 2592 bytes | Moderate size |
| Signature | ~2420 bytes | ~3293 bytes | ~4595 bytes | Larger than classical |
Optimization Strategies
- Pre-generate keys for batch operations
- Cache verification results for repeated checks
- Use appropriate security level for use case
- Implement signature aggregation where possible
Security Auditing
Verification Checklist
- Using appropriate security level for threat model
- Implementing proper rejection sampling
- Protecting key generation from side-channels
- Adding domain separation to signatures
- Validating all input parameters
- Implementing secure key lifecycle management
- Using constant-time operations where possible
Common Vulnerabilities
# AUDIT: Look for these patterns
# ❌ Wrong security level
# Using Dilithium2 for 20-year protection
signer = DilithiumSigner(level=2) # Too weak for long-term!
# ❌ No domain separation
signature = dilithium_sign(key, message) # Missing context
# ❌ Insecure key storage
pickle.dump(private_key, open('key.pkl', 'wb')) # Unencrypted!
# ❌ Side-channel vulnerable implementation
# Timing attacks on polynomial operations
# ❌ Improper signature format validation
if len(signature) > 0: # Should validate proper format
verify_signature(...)
Security Analysis
Threat Model: Dilithium (ML-DSA-65) Threat Model
The comprehensive threat analysis covers:
- Algorithm-specific attack vectors
- Implementation vulnerabilities
- Side-channel considerations
- Quantum resistance analysis (where applicable)
- Deployment recommendations
For complete security analysis and risk assessment, see the dedicated threat model documentation.
References
Support
Security Issues: security@metamui.id
Documentation Updates: phantom@metamui.id
Vulnerability Disclosure: See SECURITY.md
Document Version: 1.0
Review Cycle: Quarterly
Next Review: 2025-04-05
Classification: PUBLIC