ChaCha20-Poly1305

ChaCha20-Poly1305 is an authenticated encryption with associated data (AEAD) algorithm that combines the ChaCha20 stream cipher with the Poly1305 message authentication code. It provides both confidentiality and authenticity in a single, efficient operation.

Overview

ChaCha20-Poly1305 is specified in RFC 8439 and is widely used in modern protocols like TLS 1.3, WireGuard, and SSH. It’s particularly attractive for mobile and embedded devices due to its excellent software performance.

Key Features

  • Authenticated Encryption: Confidentiality and authenticity combined
  • High Performance: Fast on all platforms
  • Simple Implementation: No complex operations
  • Nonce Misuse Resistant: Catastrophic failure only on nonce reuse
  • IETF Standard: RFC 8439

Technical Details

  • Key Size: 256 bits
  • Nonce Size: 96 bits
  • Tag Size: 128 bits
  • Associated Data: Unlimited
  • Maximum Message: 2^64 - 1 bytes

Algorithm Parameters

Parameter Value
Key Size 256 bits
Nonce Size 96 bits
Tag Size 128 bits
ChaCha20 Rounds 20
Security Level 256-bit encryption, 128-bit authentication

Usage Examples

Basic Authenticated Encryption

from metamui_crypto import ChaCha20Poly1305
import os

# Create cipher with key
key = os.urandom(32)  # 256 bits
cipher = ChaCha20Poly1305(key)

# Encrypt with authentication
plaintext = b"Secret message"
nonce = os.urandom(12)  # 96 bits
ciphertext, tag = cipher.encrypt(plaintext, nonce)

# Decrypt and verify
try:
    decrypted = cipher.decrypt(ciphertext, tag, nonce)
    print("Authentication successful")
    assert decrypted == plaintext
except ValueError:
    print("Authentication failed!")

Using Associated Data (AEAD)

from metamui_crypto import ChaCha20Poly1305

# Associated data is authenticated but not encrypted
key = os.urandom(32)
cipher = ChaCha20Poly1305(key)

# Message with metadata
plaintext = b"Transfer $1000"
associated_data = b"from=Alice,to=Bob,timestamp=2024-01-01"
nonce = os.urandom(12)

# Encrypt with associated data
ciphertext, tag = cipher.encrypt(
    plaintext,
    nonce,
    associated_data=associated_data
)

# Must provide same associated data for decryption
decrypted = cipher.decrypt(
    ciphertext,
    tag,
    nonce,
    associated_data=associated_data
)

# Wrong associated data causes authentication failure
try:
    wrong_ad = b"from=Alice,to=Charlie,timestamp=2024-01-01"
    cipher.decrypt(ciphertext, tag, nonce, associated_data=wrong_ad)
except ValueError:
    print("Associated data mismatch detected")

Streaming Encryption

from metamui_crypto import ChaCha20Poly1305

class StreamingAEAD:
    """Streaming authenticated encryption"""
    
    def __init__(self, key):
        self.cipher = ChaCha20Poly1305(key)
        self.chunk_size = 64 * 1024  # 64KB chunks
    
    def encrypt_stream(self, input_stream, output_stream):
        """Encrypt stream with authentication"""
        # Generate nonce for entire stream
        stream_nonce = os.urandom(12)
        output_stream.write(stream_nonce)
        
        chunk_index = 0
        tags = []
        
        while True:
            chunk = input_stream.read(self.chunk_size)
            if not chunk:
                break
            
            # Use chunk index as additional data
            ad = f"chunk:{chunk_index}".encode()
            
            # Derive per-chunk nonce
            chunk_nonce = self._derive_chunk_nonce(stream_nonce, chunk_index)
            
            # Encrypt chunk
            ciphertext, tag = self.cipher.encrypt(chunk, chunk_nonce, ad)
            
            # Write length, tag, and ciphertext
            output_stream.write(struct.pack('<I', len(ciphertext)))
            output_stream.write(tag)
            output_stream.write(ciphertext)
            
            tags.append(tag)
            chunk_index += 1
        
        # Write final MAC over all tags
        final_mac = self._compute_final_mac(stream_nonce, tags)
        output_stream.write(final_mac)
    
    def _derive_chunk_nonce(self, stream_nonce, index):
        """Derive unique nonce for chunk"""
        # XOR index into nonce
        nonce = bytearray(stream_nonce)
        index_bytes = index.to_bytes(4, 'little')
        for i in range(4):
            nonce[i] ^= index_bytes[i]
        return bytes(nonce)

Protocol Implementation

from metamui_crypto import ChaCha20Poly1305
import json
import time

class SecureProtocol:
    """Secure communication protocol using ChaCha20-Poly1305"""
    
    def __init__(self, shared_key):
        self.cipher = ChaCha20Poly1305(shared_key)
        self.send_counter = 0
        self.recv_counter = 0
    
    def send_message(self, message_type, payload):
        """Send authenticated message"""
        # Create message structure
        message = {
            'type': message_type,
            'payload': payload,
            'timestamp': int(time.time()),
            'sequence': self.send_counter
        }
        
        # Serialize
        plaintext = json.dumps(message).encode()
        
        # Generate nonce from counter
        nonce = struct.pack('<Q', self.send_counter) + b'\x00' * 4
        
        # Create associated data
        ad = f"{message_type}:{self.send_counter}".encode()
        
        # Encrypt
        ciphertext, tag = self.cipher.encrypt(plaintext, nonce, ad)
        
        self.send_counter += 1
        
        # Create wire format
        return {
            'sequence': self.send_counter - 1,
            'ciphertext': base64.b64encode(ciphertext).decode(),
            'tag': base64.b64encode(tag).decode()
        }
    
    def receive_message(self, wire_message):
        """Receive and verify message"""
        sequence = wire_message['sequence']
        
        # Check sequence number
        if sequence <= self.recv_counter:
            raise ValueError("Replay attack detected")
        
        # Decode
        ciphertext = base64.b64decode(wire_message['ciphertext'])
        tag = base64.b64decode(wire_message['tag'])
        
        # Reconstruct nonce
        nonce = struct.pack('<Q', sequence) + b'\x00' * 4
        
        # Reconstruct associated data (must match sender)
        # This is where you'd need protocol knowledge
        ad = f"unknown:{sequence}".encode()  # Need actual type
        
        try:
            # Decrypt and verify
            plaintext = self.cipher.decrypt(ciphertext, tag, nonce, ad)
            message = json.loads(plaintext)
            
            # Update counter
            self.recv_counter = sequence
            
            return message
        except ValueError:
            raise ValueError("Message authentication failed")

Implementation Details

AEAD Construction

ChaCha20-Poly1305 combines two primitives:

  1. ChaCha20 for encryption
  2. Poly1305 for authentication
def chacha20_poly1305_encrypt(key, nonce, plaintext, ad):
    """ChaCha20-Poly1305 AEAD encryption"""
    # Generate Poly1305 key
    poly_key = chacha20_block(key, 0, nonce)[:32]
    
    # Encrypt plaintext
    ciphertext = chacha20_encrypt(key, 1, nonce, plaintext)
    
    # Construct MAC input
    mac_data = pad16(ad) + pad16(ciphertext) + len(ad).to_bytes(8, 'little') + len(ciphertext).to_bytes(8, 'little')
    
    # Compute tag
    tag = poly1305_mac(poly_key, mac_data)
    
    return ciphertext, tag

Poly1305 MAC

def poly1305_mac(key, message):
    """Poly1305 one-time authenticator"""
    r = int.from_bytes(key[:16], 'little') & 0x0ffffffc0ffffffc0ffffffc0fffffff
    s = int.from_bytes(key[16:32], 'little')
    
    accumulator = 0
    p = (1 << 130) - 5
    
    # Process message in 16-byte blocks
    for i in range(0, len(message), 16):
        block = message[i:i+16]
        n = int.from_bytes(block + b'\x01', 'little')
        accumulator = ((accumulator + n) * r) % p
    
    # Add secret
    accumulator = (accumulator + s) & ((1 << 128) - 1)
    
    return accumulator.to_bytes(16, 'little')

Advanced Usage

File Encryption with Chunking

from metamui_crypto import ChaCha20Poly1305
import os

class FileEncryptor:
    """Encrypt large files with ChaCha20-Poly1305"""
    
    def __init__(self, key):
        self.cipher = ChaCha20Poly1305(key)
        self.chunk_size = 1024 * 1024  # 1MB chunks
    
    def encrypt_file(self, input_path, output_path):
        """Encrypt file with per-chunk authentication"""
        file_size = os.path.getsize(input_path)
        
        with open(input_path, 'rb') as infile, \
             open(output_path, 'wb') as outfile:
            
            # Write header
            header = {
                'version': 1,
                'chunk_size': self.chunk_size,
                'file_size': file_size
            }
            header_bytes = json.dumps(header).encode()
            outfile.write(struct.pack('<I', len(header_bytes)))
            outfile.write(header_bytes)
            
            # Process chunks
            chunk_index = 0
            while True:
                chunk = infile.read(self.chunk_size)
                if not chunk:
                    break
                
                # Generate per-chunk nonce
                nonce = os.urandom(12)
                
                # Use chunk metadata as AD
                ad = json.dumps({
                    'index': chunk_index,
                    'offset': chunk_index * self.chunk_size,
                    'size': len(chunk)
                }).encode()
                
                # Encrypt chunk
                ciphertext, tag = self.cipher.encrypt(chunk, nonce, ad)
                
                # Write chunk data
                outfile.write(nonce)
                outfile.write(tag)
                outfile.write(struct.pack('<I', len(ciphertext)))
                outfile.write(ciphertext)
                
                chunk_index += 1

Key Commitment

from metamui_crypto import ChaCha20Poly1305, SHA256

class CommittingAEAD:
    """ChaCha20-Poly1305 with key commitment"""
    
    def __init__(self, key):
        self.key = key
        self.cipher = ChaCha20Poly1305(key)
    
    def encrypt_with_commitment(self, plaintext, nonce, ad=b''):
        """Encrypt with key commitment"""
        # Compute key commitment
        commitment = SHA256.hash(self.key + nonce)[:16]
        
        # Include commitment in AD
        extended_ad = commitment + ad
        
        # Encrypt
        ciphertext, tag = self.cipher.encrypt(
            plaintext, nonce, extended_ad
        )
        
        return ciphertext, tag, commitment
    
    def decrypt_with_commitment(self, ciphertext, tag, nonce, commitment, ad=b''):
        """Decrypt and verify commitment"""
        # Verify commitment
        expected = SHA256.hash(self.key + nonce)[:16]
        if commitment != expected:
            raise ValueError("Key commitment mismatch")
        
        # Decrypt with extended AD
        extended_ad = commitment + ad
        return self.cipher.decrypt(
            ciphertext, tag, nonce, extended_ad
        )

Multi-Recipient Encryption

class MultiRecipientAEAD:
    """Encrypt for multiple recipients efficiently"""
    
    @staticmethod
    def encrypt_for_multiple(plaintext, recipient_keys):
        """Encrypt once, authenticate for multiple recipients"""
        # Generate random content encryption key
        cek = os.urandom(32)
        content_cipher = ChaCha20Poly1305(cek)
        
        # Encrypt content once
        content_nonce = os.urandom(12)
        ciphertext, content_tag = content_cipher.encrypt(
            plaintext, content_nonce
        )
        
        # Wrap CEK for each recipient
        recipients = []
        for i, recipient_key in enumerate(recipient_keys):
            # Create per-recipient cipher
            recipient_cipher = ChaCha20Poly1305(recipient_key)
            
            # Wrap CEK
            wrap_nonce = os.urandom(12)
            wrapped_cek, wrap_tag = recipient_cipher.encrypt(
                cek,
                wrap_nonce,
                associated_data=f"recipient:{i}".encode()
            )
            
            recipients.append({
                'id': i,
                'wrapped_key': wrapped_cek,
                'wrap_tag': wrap_tag,
                'wrap_nonce': wrap_nonce
            })
        
        return {
            'ciphertext': ciphertext,
            'content_tag': content_tag,
            'content_nonce': content_nonce,
            'recipients': recipients
        }

Security Considerations

Nonce Generation

class SecureNonceGenerator:
    """Secure nonce generation strategies"""
    
    def __init__(self, strategy='random'):
        self.strategy = strategy
        self.counter = 0
        self.prefix = os.urandom(8) if strategy == 'counter' else None
    
    def get_nonce(self):
        """Generate unique nonce"""
        if self.strategy == 'random':
            # Random nonces - collision probability negligible
            return os.urandom(12)
        
        elif self.strategy == 'counter':
            # Counter-based - guaranteed unique
            nonce = self.prefix + self.counter.to_bytes(4, 'big')
            self.counter += 1
            
            if self.counter >= 2**32:
                raise ValueError("Counter exhausted")
            
            return nonce
        
        elif self.strategy == 'time':
            # Time-based with randomness
            timestamp = int(time.time() * 1000000).to_bytes(8, 'big')
            random_part = os.urandom(4)
            return timestamp + random_part
# ChaCha20-Poly1305 produces 128-bit tags
# Truncation reduces security

def truncate_tag(tag, bits):
    """Truncate tag to specified bits (NOT RECOMMENDED)"""
    if bits < 64:
        raise ValueError("Tag too short for security")
    if bits > 128:
        raise ValueError("Tag cannot be extended")
    
    bytes_needed = (bits + 7) // 8
    return tag[:bytes_needed]

# Security levels with truncated tags:
# 128 bits: Full security (recommended)
# 96 bits: Reduced security
# 64 bits: Minimal acceptable
# <64 bits: Insecure

Misuse Resistance

class MisuseResistantAEAD:
    """Add misuse resistance layer"""
    
    def __init__(self, key):
        # Derive two keys
        from metamui_crypto import HKDF
        self.enc_key = HKDF(key, info=b"encryption", length=32)
        self.mac_key = HKDF(key, info=b"mac", length=32)
        self.cipher = ChaCha20Poly1305(self.enc_key)
    
    def encrypt(self, plaintext, nonce, ad=b''):
        """Encrypt with synthetic IV"""
        # Create synthetic IV
        from metamui_crypto import HMAC
        siv = HMAC(self.mac_key, 'sha256')
        siv.update(nonce)
        siv.update(plaintext)
        siv.update(ad)
        synthetic_nonce = siv.finalize()[:12]
        
        # Encrypt with synthetic nonce
        return self.cipher.encrypt(plaintext, synthetic_nonce, ad)

Performance Optimization

Parallel Processing

from concurrent.futures import ThreadPoolExecutor
import multiprocessing

class ParallelChaCha20Poly1305:
    """Parallel encryption for multiple messages"""
    
    def __init__(self, key, workers=None):
        self.key = key
        self.workers = workers or multiprocessing.cpu_count()
    
    def encrypt_batch(self, messages):
        """Encrypt multiple messages in parallel"""
        cipher = ChaCha20Poly1305(self.key)
        
        def encrypt_one(item):
            plaintext, ad = item
            nonce = os.urandom(12)
            ciphertext, tag = cipher.encrypt(plaintext, nonce, ad)
            return (nonce, ciphertext, tag)
        
        with ThreadPoolExecutor(max_workers=self.workers) as executor:
            results = list(executor.map(encrypt_one, messages))
        
        return results

Precomputation

class PrecomputedChaCha20Poly1305:
    """Precompute keystream for known message lengths"""
    
    def __init__(self, key):
        self.key = key
        self.keystream_cache = {}
    
    def precompute_keystream(self, length, nonce):
        """Precompute keystream for given length"""
        cache_key = (length, nonce)
        
        if cache_key not in self.keystream_cache:
            # Generate keystream
            from metamui_crypto import ChaCha20
            cipher = ChaCha20(self.key, nonce, counter=1)
            keystream = cipher.encrypt(b'\x00' * length)
            self.keystream_cache[cache_key] = keystream
        
        return self.keystream_cache[cache_key]

Common Pitfalls

1. Nonce Reuse

# Bad: Reusing nonce breaks security completely
# nonce = b'0' * 12
# ct1, tag1 = cipher.encrypt(msg1, nonce)
# ct2, tag2 = cipher.encrypt(msg2, nonce)  # BROKEN!

# Good: Unique nonce for every encryption
nonce1 = os.urandom(12)
nonce2 = os.urandom(12)

2. Tag Verification Timing

# Bad: Early return on tag mismatch (timing attack)
# if computed_tag != provided_tag:
#     return None  # Timing leak

# Good: Constant-time comparison (done internally)
try:
    plaintext = cipher.decrypt(ciphertext, tag, nonce)
except ValueError:
    # Generic error
    return None

3. AD Mismatch

# Bad: Forgetting associated data
# ciphertext, tag = cipher.encrypt(data, nonce, ad=metadata)
# plaintext = cipher.decrypt(ciphertext, tag, nonce)  # Missing AD!

# Good: Same AD for encrypt and decrypt
ciphertext, tag = cipher.encrypt(data, nonce, ad=metadata)
plaintext = cipher.decrypt(ciphertext, tag, nonce, ad=metadata)

Integration Examples

TLS 1.3 Record Layer

class TLS13Record:
    """Simplified TLS 1.3 record encryption"""
    
    def __init__(self, traffic_secret):
        # Derive key and IV
        from metamui_crypto import HKDF
        self.key = HKDF(
            traffic_secret,
            info=b"tls13 key",
            length=32
        )
        self.iv = HKDF(
            traffic_secret,
            info=b"tls13 iv",
            length=12
        )
        self.cipher = ChaCha20Poly1305(self.key)
        self.sequence = 0
    
    def encrypt_record(self, content_type, fragment):
        """Encrypt TLS record"""
        # Build plaintext
        plaintext = fragment + bytes([content_type])
        
        # Compute nonce
        nonce = bytearray(self.iv)
        seq_bytes = self.sequence.to_bytes(8, 'big')
        for i in range(8):
            nonce[4 + i] ^= seq_bytes[i]
        
        # Build additional data
        ad = struct.pack('>BBH', 0x17, 0x03, 0x03) + \
             struct.pack('>H', len(plaintext) + 16)
        
        # Encrypt
        ciphertext, tag = self.cipher.encrypt(
            plaintext, bytes(nonce), ad
        )
        
        self.sequence += 1
        
        return ciphertext + tag

Resources