ChaCha20-Poly1305 Security-Focused API Documentation

Version: 1.0
Last Updated: 2025-07-05
**Security Classification:
PUBLIC
Author: Phantom (phantom@metamui.id)

Overview

ChaCha20-Poly1305 is an authenticated encryption with associated data (AEAD) algorithm combining the ChaCha20 stream cipher with the Poly1305 authenticator. Standardized in RFC 8439, it provides both confidentiality and authenticity in a single operation. It’s the preferred alternative to AES-GCM on platforms without AES hardware acceleration.

Security Level: 256-bit key, 128-bit authentication
Key Size: 32 bytes (256 bits)
Nonce Size: 12 bytes (96 bits)
Tag Size: 16 bytes (128 bits)

Security Warnings ⚠️

  1. Nonce Reuse Catastrophic: Reusing a nonce with the same key destroys ALL security
  2. Tag Truncation Forbidden: Always use full 16-byte authentication tag
  3. Verify Before Use: Never use decrypted data before verifying authentication tag
  4. 96-bit Nonce Limit: Use XChaCha20-Poly1305 for random nonces
  5. Message Length Limit: Maximum 2^64 bytes per message (practically unlimited)

API Functions

encrypt(key: bytes[32], nonce: bytes[12], plaintext: bytes, associated_data: bytes = b"") -> (bytes, bytes[16])

Security Contract:

Attack Resistance: | Attack Type | Protected | Notes | |————-|———–|——-| | Confidentiality | ✅ | 256-bit security | | Authenticity | ✅ | 128-bit security | | Nonce Reuse | ❌ | Catastrophic failure | | Replay Attack | ⚠️ | Need external counter | | Bit Flipping | ✅ | Detected by Poly1305 | | Timing Attack | ✅ | Constant time | | Related Key | ✅ | Keys independent |

Security Requirements:

Secure Usage Example:

import secrets
from typing import Tuple

class SecureChannel:
    def __init__(self):
        self.key = secrets.token_bytes(32)
        self.nonce_counter = 0
        self.max_messages = 2**32  # Conservative limit
    
    def encrypt_message(self, message: bytes, metadata: dict) -> bytes:
        # Check nonce counter limit
        if self.nonce_counter >= self.max_messages:
            raise SecurityError("Key rotation required")
        
        # Generate unique nonce from counter
        nonce = self.nonce_counter.to_bytes(12, 'little')
        self.nonce_counter += 1
        
        # Include metadata in authenticated data
        ad = json.dumps(metadata, sort_keys=True).encode()
        
        # Encrypt with authentication
        ciphertext, tag = chacha20poly1305.encrypt(
            key=self.key,
            nonce=nonce,
            plaintext=message,
            associated_data=ad
        )
        
        # Package for transmission
        return b''.join([
            nonce,           # 12 bytes
            len(ad).to_bytes(4, 'little'),  # AD length
            ad,              # Variable length
            ciphertext,      # Same as plaintext length
            tag              # 16 bytes
        ])
    
    def rotate_key(self) -> bytes:
        """Derive new key and reset counter"""
        # Derive new key from old (one-way)
        new_key = hkdf(
            secret=self.key,
            salt=b"rotation",
            info=b"chacha20poly1305-key-v1",
            length=32
        )
        
        # Securely overwrite old key
        secure_zero(self.key)
        
        # Reset state
        self.key = new_key
        self.nonce_counter = 0
        
        return new_key

# SECURE: File encryption with random nonce
def encrypt_file(filepath: str, key: bytes[32]) -> None:
    """Encrypt file with XChaCha20-Poly1305 for random nonces"""
    
    # Read plaintext
    with open(filepath, 'rb') as f:
        plaintext = f.read()
    
    # Generate random 24-byte nonce (safe for XChaCha20)
    nonce = secrets.token_bytes(24)
    
    # Include file metadata
    metadata = {
        'filename': os.path.basename(filepath),
        'size': len(plaintext),
        'modified': os.path.getmtime(filepath),
        'algorithm': 'xchacha20poly1305'
    }
    
    # Encrypt
    ciphertext, tag = xchacha20poly1305.encrypt(
        key=key,
        nonce=nonce,
        plaintext=plaintext,
        associated_data=json.dumps(metadata).encode()
    )
    
    # Write encrypted file
    with open(filepath + '.enc', 'wb') as f:
        f.write(nonce)
        f.write(tag)
        f.write(len(metadata).to_bytes(4, 'little'))
        f.write(json.dumps(metadata).encode())
        f.write(ciphertext)

Common Mistakes:

# CATASTROPHIC: Nonce reuse
nonce = b"000000000000"  # Fixed nonce
ciphertext1, tag1 = encrypt(key, nonce, message1)
ciphertext2, tag2 = encrypt(key, nonce, message2)  # BROKEN!
# Attacker can recover: message1 XOR message2

# INSECURE: Predictable nonces
nonce = int(time.time()).to_bytes(12, 'little')  # PREDICTABLE!

# INSECURE: Short random nonces with ChaCha20
nonce = secrets.token_bytes(12)  # 96 bits insufficient for random!
# Use XChaCha20-Poly1305 for random nonces

# BROKEN: Tag truncation
ciphertext, tag = encrypt(key, nonce, plaintext)
short_tag = tag[:8]  # NO! Reduces security to 64 bits

decrypt(key: bytes[32], nonce: bytes[12], ciphertext: bytes, tag: bytes[16], associated_data: bytes = b"") -> bytes

Security Contract:

Attack Resistance: | Attack Type | Protected | Notes | |————-|———–|——-| | Forgery | ✅ | 2^128 attempts needed | | Tampering | ✅ | Any bit flip detected | | Timing Oracle | ✅ | Constant-time verification | | Padding Oracle | ✅ | No padding needed | | Truncation | ✅ | Length included in auth |

Security Requirements:

Secure Usage Example:

use constant_time_eq::constant_time_eq;

pub struct SecureDecryptor {
    key: [u8; 32],
}

impl SecureDecryptor {
    pub fn decrypt_message(&self, encrypted: &[u8]) -> Result<(Vec<u8>, Metadata), Error> {
        // Parse message format
        if encrypted.len() < 32 {
            return Err(Error::InvalidMessage);
        }
        
        let (nonce, rest) = encrypted.split_at(12);
        let (ad_len_bytes, rest) = rest.split_at(4);
        let ad_len = u32::from_le_bytes(ad_len_bytes.try_into()?);
        
        if rest.len() < ad_len as usize + 16 {
            return Err(Error::InvalidMessage);
        }
        
        let (ad, rest) = rest.split_at(ad_len as usize);
        let (ciphertext, tag) = rest.split_at(rest.len() - 16);
        
        // Decrypt and verify
        let plaintext = chacha20poly1305::decrypt(
            &self.key,
            nonce.try_into()?,
            ciphertext,
            tag.try_into()?,
            ad
        ).map_err(|_| Error::AuthenticationFailed)?;
        
        // Parse metadata only after authentication
        let metadata: Metadata = serde_json::from_slice(ad)?;
        
        Ok((plaintext, metadata))
    }
    
    pub fn decrypt_streaming<R: Read, W: Write>(
        &self,
        input: &mut R,
        output: &mut W,
        nonce: &[u8; 12],
        expected_tag: &[u8; 16],
        associated_data: &[u8]
    ) -> Result<(), Error> {
        // Create decryption context
        let mut decryptor = ChaCha20Poly1305::new(&self.key, nonce);
        decryptor.update_aad(associated_data);
        
        // Process in chunks
        let mut buffer = vec![0u8; 4096];
        let mut computed_tag = [0u8; 16];
        
        loop {
            let n = input.read(&mut buffer)?;
            if n == 0 {
                break;
            }
            
            // Decrypt chunk (but don't release yet)
            let chunk = decryptor.decrypt_chunk(&buffer[..n]);
            
            // Buffer decrypted data
            output.write_all(&chunk)?;
        }
        
        // Verify tag before committing
        decryptor.finalize_into(&mut computed_tag);
        
        if !constant_time_eq(&computed_tag, expected_tag) {
            // Authentication failed - remove output
            output.rewind()?;
            output.set_len(0)?;
            return Err(Error::AuthenticationFailed);
        }
        
        Ok(())
    }
}

generate_key() -> bytes[32]

Security Contract:

Security Notes:

Secure Usage Example:

def secure_key_management():
    # Generate new key
    key = chacha20poly1305.generate_key()
    
    # Derive subkeys for different purposes
    encryption_key = hkdf(key, salt=b"enc", info=b"encryption-v1", length=32)
    storage_key = hkdf(key, salt=b"store", info=b"storage-v1", length=32)
    
    # Encrypt key for storage
    wrapped_key = key_wrap(storage_key, key)
    
    # Secure deletion
    secure_zero(key)
    
    return wrapped_key, encryption_key

Security Best Practices

Nonce Management Strategies

class NonceStrategy(Enum):
    COUNTER = "counter"          # Best for single-threaded
    RANDOM_XCHACHA = "random"    # Best for multi-threaded
    TIME_BASED = "time"          # For distributed systems
    DERIVED = "derived"          # For deterministic encryption

class NonceManager:
    def __init__(self, strategy: NonceStrategy, key: bytes):
        self.strategy = strategy
        self.key = key
        self.counter = 0
        self.last_time = 0
        
    def get_nonce(self, message_id: Optional[str] = None) -> Tuple[bytes, str]:
        if self.strategy == NonceStrategy.COUNTER:
            # Simple counter - fastest, safest for single-threaded
            nonce = self.counter.to_bytes(12, 'little')
            self.counter += 1
            algo = "chacha20poly1305"
            
        elif self.strategy == NonceStrategy.RANDOM_XCHACHA:
            # Random nonce with XChaCha20 - safe for parallel
            nonce = secrets.token_bytes(24)
            algo = "xchacha20poly1305"
            
        elif self.strategy == NonceStrategy.TIME_BASED:
            # Time + counter for distributed systems
            current_time = int(time.time() * 1000000)  # Microseconds
            if current_time <= self.last_time:
                current_time = self.last_time + 1
            self.last_time = current_time
            
            nonce = current_time.to_bytes(12, 'big')
            algo = "chacha20poly1305"
            
        elif self.strategy == NonceStrategy.DERIVED:
            # Derive from message ID - deterministic
            if not message_id:
                raise ValueError("Message ID required for derived nonces")
            
            derived = hkdf(
                secret=self.key,
                salt=b"nonce-derivation",
                info=message_id.encode(),
                length=12
            )
            nonce = derived
            algo = "chacha20poly1305"
            
        return nonce, algo

Protocol Design

/// Secure protocol with replay protection
pub struct SecureProtocol {
    local_key: [u8; 32],
    remote_key: [u8; 32],
    send_counter: u64,
    recv_counter: u64,
    replay_window: BitSet,
}

impl SecureProtocol {
    pub fn send_message(&mut self, msg_type: MessageType, payload: &[u8]) -> Vec<u8> {
        // Create message header
        let header = MessageHeader {
            version: 1,
            msg_type,
            sequence: self.send_counter,
            timestamp: SystemTime::now(),
        };
        
        // Serialize header for AD
        let ad = bincode::serialize(&header).unwrap();
        
        // Generate nonce from counter
        let mut nonce = [0u8; 12];
        nonce[..8].copy_from_slice(&self.send_counter.to_le_bytes());
        
        // Encrypt
        let (ciphertext, tag) = chacha20poly1305::encrypt(
            &self.local_key,
            &nonce,
            payload,
            &ad
        );
        
        self.send_counter += 1;
        
        // Build message
        let mut message = Vec::new();
        message.extend_from_slice(&ad);
        message.extend_from_slice(&tag);
        message.extend_from_slice(&ciphertext);
        
        message
    }
    
    pub fn receive_message(&mut self, message: &[u8]) -> Result<(MessageHeader, Vec<u8>), Error> {
        // Parse message
        let header_size = bincode::serialized_size(&MessageHeader::default())?;
        
        if message.len() < header_size + 16 {
            return Err(Error::InvalidMessage);
        }
        
        let (header_bytes, rest) = message.split_at(header_size);
        let (tag, ciphertext) = rest.split_at(16);
        
        // Deserialize header
        let header: MessageHeader = bincode::deserialize(header_bytes)?;
        
        // Check replay
        if !self.check_replay_window(header.sequence) {
            return Err(Error::ReplayDetected);
        }
        
        // Generate nonce
        let mut nonce = [0u8; 12];
        nonce[..8].copy_from_slice(&header.sequence.to_le_bytes());
        
        // Decrypt
        let plaintext = chacha20poly1305::decrypt(
            &self.remote_key,
            &nonce,
            ciphertext,
            tag.try_into()?,
            header_bytes
        )?;
        
        // Update replay window
        self.update_replay_window(header.sequence);
        
        Ok((header, plaintext))
    }
}

Memory Protection

class SecureMemoryHandler:
    """Protect keys and plaintexts in memory"""
    
    @staticmethod
    def encrypt_with_cleanup(key: bytes, plaintext: bytes, nonce: bytes) -> Tuple[bytes, bytes]:
        try:
            # Encrypt
            ciphertext, tag = chacha20poly1305.encrypt(key, nonce, plaintext)
            
            # Clear sensitive data
            secure_zero(plaintext)
            
            return ciphertext, tag
            
        finally:
            # Ensure cleanup even on error
            secure_zero(key)
            secure_zero(nonce)
    
    @staticmethod
    @contextmanager
    def temporary_key():
        """Context manager for temporary keys"""
        key = chacha20poly1305.generate_key()
        try:
            yield key
        finally:
            secure_zero(key)

Common Integration Patterns

API Authentication

class APIAuthenticator:
    def __init__(self, shared_key: bytes):
        self.key = shared_key
    
    def create_request(self, method: str, path: str, body: bytes) -> dict:
        # Create request ID
        request_id = secrets.token_urlsafe(16)
        
        # Build canonical request
        canonical = f"{method}\n{path}\n{request_id}\n".encode()
        canonical += body
        
        # Encrypt body if present
        if body:
            nonce = request_id.encode()[:12].ljust(12, b'\0')
            encrypted_body, tag = chacha20poly1305.encrypt(
                self.key, nonce, body, canonical
            )
            
            return {
                'method': method,
                'path': path,
                'headers': {
                    'X-Request-ID': request_id,
                    'X-Auth-Tag': base64.b64encode(tag).decode(),
                },
                'body': encrypted_body
            }
        else:
            # Just authenticate headers
            tag = poly1305.mac(self.key, canonical)
            
            return {
                'method': method,
                'path': path,
                'headers': {
                    'X-Request-ID': request_id,
                    'X-Auth-Tag': base64.b64encode(tag).decode(),
                },
                'body': None
            }

Database Field Encryption

/// Encrypt sensitive database fields
pub struct FieldEncryptor {
    key: [u8; 32],
}

impl FieldEncryptor {
    pub fn encrypt_field(&self, field_name: &str, value: &[u8]) -> EncryptedField {
        // Derive field-specific key
        let field_key = hkdf::expand(
            &self.key,
            format!("field-key-{}", field_name).as_bytes(),
            32
        );
        
        // Generate random nonce
        let nonce: [u8; 12] = random();
        
        // Include field name in AD
        let ad = field_name.as_bytes();
        
        // Encrypt
        let (ciphertext, tag) = chacha20poly1305::encrypt(
            &field_key,
            &nonce,
            value,
            ad
        );
        
        EncryptedField {
            nonce,
            ciphertext,
            tag,
            algorithm: "chacha20poly1305",
            field_name: field_name.to_string(),
        }
    }
}

Performance Considerations

Operation Speed vs AES-GCM Notes
Encrypt (no AES-NI) 3 GB/s 3x faster Software implementation
Encrypt (with AES-NI) 3 GB/s 0.7x AES hardware faster
Small messages (<1KB) ~1 μs Similar Setup dominates
Parallel Good Similar Both parallelizable

Platform Optimization

Security Auditing

Verification Checklist

Common Vulnerabilities

# AUDIT: Detect these patterns

# ❌ Nonce reuse
encrypt(key, b"00000000", msg1)
encrypt(key, b"00000000", msg2)  # SAME NONCE!

# ❌ Tag truncation
tag = tag[:8]  # NEVER DO THIS

# ❌ Using plaintext before verification
plaintext = decrypt_no_verify(...)
if verify_tag(tag):  # TOO LATE!
    use(plaintext)

# ❌ Weak nonce generation
nonce = str(user_id).encode().ljust(12, b'0')  # PREDICTABLE

Troubleshooting

Authentication Failures

  1. Verify all inputs match exactly
  2. Check byte order (endianness)
  3. Ensure AD is identical
  4. Verify tag not truncated
  5. Check for nonce reuse

Performance Issues

  1. Use streaming for large data
  2. Enable CPU optimizations
  3. Consider parallelization
  4. Batch small messages
  5. Profile implementation

Security Analysis

Threat Model: ChaCha20-Poly1305 Threat Model

The comprehensive threat analysis covers:

For complete security analysis and risk assessment, see the dedicated threat model documentation.

References

  1. RFC 8439 - ChaCha20 and Poly1305
  2. XChaCha20-Poly1305
  3. AEAD Security

Support

Security Issues: security@metamui.id
Documentation Updates: phantom@metamui.id
Vulnerability Disclosure: See SECURITY.md


Document Version: 1.0
Review Cycle: Quarterly
Next Review: 2025-04-05
Classification: PUBLIC