ChaCha20-Poly1305
ChaCha20-Poly1305 is an authenticated encryption with associated data (AEAD) algorithm that combines the ChaCha20 stream cipher with the Poly1305 message authentication code. It provides both confidentiality and authenticity in a single, efficient operation.
Overview
ChaCha20-Poly1305 is specified in RFC 8439 and is widely used in modern protocols like TLS 1.3, WireGuard, and SSH. It’s particularly attractive for mobile and embedded devices due to its excellent software performance.
Key Features
- Authenticated Encryption: Confidentiality and authenticity combined
- High Performance: Fast on all platforms
- Simple Implementation: No complex operations
- Nonce Misuse Resistant: Catastrophic failure only on nonce reuse
- IETF Standard: RFC 8439
Technical Details
- Key Size: 256 bits
- Nonce Size: 96 bits
- Tag Size: 128 bits
- Associated Data: Unlimited
- Maximum Message: 2^64 - 1 bytes
Algorithm Parameters
| Parameter | Value |
|---|---|
| Key Size | 256 bits |
| Nonce Size | 96 bits |
| Tag Size | 128 bits |
| ChaCha20 Rounds | 20 |
| Security Level | 256-bit encryption, 128-bit authentication |
Usage Examples
Basic Authenticated Encryption
from metamui_crypto import ChaCha20Poly1305
import os
# Create cipher with key
key = os.urandom(32) # 256 bits
cipher = ChaCha20Poly1305(key)
# Encrypt with authentication
plaintext = b"Secret message"
nonce = os.urandom(12) # 96 bits
ciphertext, tag = cipher.encrypt(plaintext, nonce)
# Decrypt and verify
try:
decrypted = cipher.decrypt(ciphertext, tag, nonce)
print("Authentication successful")
assert decrypted == plaintext
except ValueError:
print("Authentication failed!")
Using Associated Data (AEAD)
from metamui_crypto import ChaCha20Poly1305
# Associated data is authenticated but not encrypted
key = os.urandom(32)
cipher = ChaCha20Poly1305(key)
# Message with metadata
plaintext = b"Transfer $1000"
associated_data = b"from=Alice,to=Bob,timestamp=2024-01-01"
nonce = os.urandom(12)
# Encrypt with associated data
ciphertext, tag = cipher.encrypt(
plaintext,
nonce,
associated_data=associated_data
)
# Must provide same associated data for decryption
decrypted = cipher.decrypt(
ciphertext,
tag,
nonce,
associated_data=associated_data
)
# Wrong associated data causes authentication failure
try:
wrong_ad = b"from=Alice,to=Charlie,timestamp=2024-01-01"
cipher.decrypt(ciphertext, tag, nonce, associated_data=wrong_ad)
except ValueError:
print("Associated data mismatch detected")
Streaming Encryption
from metamui_crypto import ChaCha20Poly1305
class StreamingAEAD:
"""Streaming authenticated encryption"""
def __init__(self, key):
self.cipher = ChaCha20Poly1305(key)
self.chunk_size = 64 * 1024 # 64KB chunks
def encrypt_stream(self, input_stream, output_stream):
"""Encrypt stream with authentication"""
# Generate nonce for entire stream
stream_nonce = os.urandom(12)
output_stream.write(stream_nonce)
chunk_index = 0
tags = []
while True:
chunk = input_stream.read(self.chunk_size)
if not chunk:
break
# Use chunk index as additional data
ad = f"chunk:{chunk_index}".encode()
# Derive per-chunk nonce
chunk_nonce = self._derive_chunk_nonce(stream_nonce, chunk_index)
# Encrypt chunk
ciphertext, tag = self.cipher.encrypt(chunk, chunk_nonce, ad)
# Write length, tag, and ciphertext
output_stream.write(struct.pack('<I', len(ciphertext)))
output_stream.write(tag)
output_stream.write(ciphertext)
tags.append(tag)
chunk_index += 1
# Write final MAC over all tags
final_mac = self._compute_final_mac(stream_nonce, tags)
output_stream.write(final_mac)
def _derive_chunk_nonce(self, stream_nonce, index):
"""Derive unique nonce for chunk"""
# XOR index into nonce
nonce = bytearray(stream_nonce)
index_bytes = index.to_bytes(4, 'little')
for i in range(4):
nonce[i] ^= index_bytes[i]
return bytes(nonce)
Protocol Implementation
from metamui_crypto import ChaCha20Poly1305
import json
import time
class SecureProtocol:
"""Secure communication protocol using ChaCha20-Poly1305"""
def __init__(self, shared_key):
self.cipher = ChaCha20Poly1305(shared_key)
self.send_counter = 0
self.recv_counter = 0
def send_message(self, message_type, payload):
"""Send authenticated message"""
# Create message structure
message = {
'type': message_type,
'payload': payload,
'timestamp': int(time.time()),
'sequence': self.send_counter
}
# Serialize
plaintext = json.dumps(message).encode()
# Generate nonce from counter
nonce = struct.pack('<Q', self.send_counter) + b'\x00' * 4
# Create associated data
ad = f"{message_type}:{self.send_counter}".encode()
# Encrypt
ciphertext, tag = self.cipher.encrypt(plaintext, nonce, ad)
self.send_counter += 1
# Create wire format
return {
'sequence': self.send_counter - 1,
'ciphertext': base64.b64encode(ciphertext).decode(),
'tag': base64.b64encode(tag).decode()
}
def receive_message(self, wire_message):
"""Receive and verify message"""
sequence = wire_message['sequence']
# Check sequence number
if sequence <= self.recv_counter:
raise ValueError("Replay attack detected")
# Decode
ciphertext = base64.b64decode(wire_message['ciphertext'])
tag = base64.b64decode(wire_message['tag'])
# Reconstruct nonce
nonce = struct.pack('<Q', sequence) + b'\x00' * 4
# Reconstruct associated data (must match sender)
# This is where you'd need protocol knowledge
ad = f"unknown:{sequence}".encode() # Need actual type
try:
# Decrypt and verify
plaintext = self.cipher.decrypt(ciphertext, tag, nonce, ad)
message = json.loads(plaintext)
# Update counter
self.recv_counter = sequence
return message
except ValueError:
raise ValueError("Message authentication failed")
Implementation Details
AEAD Construction
ChaCha20-Poly1305 combines two primitives:
- ChaCha20 for encryption
- Poly1305 for authentication
def chacha20_poly1305_encrypt(key, nonce, plaintext, ad):
"""ChaCha20-Poly1305 AEAD encryption"""
# Generate Poly1305 key
poly_key = chacha20_block(key, 0, nonce)[:32]
# Encrypt plaintext
ciphertext = chacha20_encrypt(key, 1, nonce, plaintext)
# Construct MAC input
mac_data = pad16(ad) + pad16(ciphertext) + len(ad).to_bytes(8, 'little') + len(ciphertext).to_bytes(8, 'little')
# Compute tag
tag = poly1305_mac(poly_key, mac_data)
return ciphertext, tag
Poly1305 MAC
def poly1305_mac(key, message):
"""Poly1305 one-time authenticator"""
r = int.from_bytes(key[:16], 'little') & 0x0ffffffc0ffffffc0ffffffc0fffffff
s = int.from_bytes(key[16:32], 'little')
accumulator = 0
p = (1 << 130) - 5
# Process message in 16-byte blocks
for i in range(0, len(message), 16):
block = message[i:i+16]
n = int.from_bytes(block + b'\x01', 'little')
accumulator = ((accumulator + n) * r) % p
# Add secret
accumulator = (accumulator + s) & ((1 << 128) - 1)
return accumulator.to_bytes(16, 'little')
Advanced Usage
File Encryption with Chunking
from metamui_crypto import ChaCha20Poly1305
import os
class FileEncryptor:
"""Encrypt large files with ChaCha20-Poly1305"""
def __init__(self, key):
self.cipher = ChaCha20Poly1305(key)
self.chunk_size = 1024 * 1024 # 1MB chunks
def encrypt_file(self, input_path, output_path):
"""Encrypt file with per-chunk authentication"""
file_size = os.path.getsize(input_path)
with open(input_path, 'rb') as infile, \
open(output_path, 'wb') as outfile:
# Write header
header = {
'version': 1,
'chunk_size': self.chunk_size,
'file_size': file_size
}
header_bytes = json.dumps(header).encode()
outfile.write(struct.pack('<I', len(header_bytes)))
outfile.write(header_bytes)
# Process chunks
chunk_index = 0
while True:
chunk = infile.read(self.chunk_size)
if not chunk:
break
# Generate per-chunk nonce
nonce = os.urandom(12)
# Use chunk metadata as AD
ad = json.dumps({
'index': chunk_index,
'offset': chunk_index * self.chunk_size,
'size': len(chunk)
}).encode()
# Encrypt chunk
ciphertext, tag = self.cipher.encrypt(chunk, nonce, ad)
# Write chunk data
outfile.write(nonce)
outfile.write(tag)
outfile.write(struct.pack('<I', len(ciphertext)))
outfile.write(ciphertext)
chunk_index += 1
Key Commitment
from metamui_crypto import ChaCha20Poly1305, SHA256
class CommittingAEAD:
"""ChaCha20-Poly1305 with key commitment"""
def __init__(self, key):
self.key = key
self.cipher = ChaCha20Poly1305(key)
def encrypt_with_commitment(self, plaintext, nonce, ad=b''):
"""Encrypt with key commitment"""
# Compute key commitment
commitment = SHA256.hash(self.key + nonce)[:16]
# Include commitment in AD
extended_ad = commitment + ad
# Encrypt
ciphertext, tag = self.cipher.encrypt(
plaintext, nonce, extended_ad
)
return ciphertext, tag, commitment
def decrypt_with_commitment(self, ciphertext, tag, nonce, commitment, ad=b''):
"""Decrypt and verify commitment"""
# Verify commitment
expected = SHA256.hash(self.key + nonce)[:16]
if commitment != expected:
raise ValueError("Key commitment mismatch")
# Decrypt with extended AD
extended_ad = commitment + ad
return self.cipher.decrypt(
ciphertext, tag, nonce, extended_ad
)
Multi-Recipient Encryption
class MultiRecipientAEAD:
"""Encrypt for multiple recipients efficiently"""
@staticmethod
def encrypt_for_multiple(plaintext, recipient_keys):
"""Encrypt once, authenticate for multiple recipients"""
# Generate random content encryption key
cek = os.urandom(32)
content_cipher = ChaCha20Poly1305(cek)
# Encrypt content once
content_nonce = os.urandom(12)
ciphertext, content_tag = content_cipher.encrypt(
plaintext, content_nonce
)
# Wrap CEK for each recipient
recipients = []
for i, recipient_key in enumerate(recipient_keys):
# Create per-recipient cipher
recipient_cipher = ChaCha20Poly1305(recipient_key)
# Wrap CEK
wrap_nonce = os.urandom(12)
wrapped_cek, wrap_tag = recipient_cipher.encrypt(
cek,
wrap_nonce,
associated_data=f"recipient:{i}".encode()
)
recipients.append({
'id': i,
'wrapped_key': wrapped_cek,
'wrap_tag': wrap_tag,
'wrap_nonce': wrap_nonce
})
return {
'ciphertext': ciphertext,
'content_tag': content_tag,
'content_nonce': content_nonce,
'recipients': recipients
}
Security Considerations
Nonce Generation
class SecureNonceGenerator:
"""Secure nonce generation strategies"""
def __init__(self, strategy='random'):
self.strategy = strategy
self.counter = 0
self.prefix = os.urandom(8) if strategy == 'counter' else None
def get_nonce(self):
"""Generate unique nonce"""
if self.strategy == 'random':
# Random nonces - collision probability negligible
return os.urandom(12)
elif self.strategy == 'counter':
# Counter-based - guaranteed unique
nonce = self.prefix + self.counter.to_bytes(4, 'big')
self.counter += 1
if self.counter >= 2**32:
raise ValueError("Counter exhausted")
return nonce
elif self.strategy == 'time':
# Time-based with randomness
timestamp = int(time.time() * 1000000).to_bytes(8, 'big')
random_part = os.urandom(4)
return timestamp + random_part
Tag Truncation (Not Recommended)
# ChaCha20-Poly1305 produces 128-bit tags
# Truncation reduces security
def truncate_tag(tag, bits):
"""Truncate tag to specified bits (NOT RECOMMENDED)"""
if bits < 64:
raise ValueError("Tag too short for security")
if bits > 128:
raise ValueError("Tag cannot be extended")
bytes_needed = (bits + 7) // 8
return tag[:bytes_needed]
# Security levels with truncated tags:
# 128 bits: Full security (recommended)
# 96 bits: Reduced security
# 64 bits: Minimal acceptable
# <64 bits: Insecure
Misuse Resistance
class MisuseResistantAEAD:
"""Add misuse resistance layer"""
def __init__(self, key):
# Derive two keys
from metamui_crypto import HKDF
self.enc_key = HKDF(key, info=b"encryption", length=32)
self.mac_key = HKDF(key, info=b"mac", length=32)
self.cipher = ChaCha20Poly1305(self.enc_key)
def encrypt(self, plaintext, nonce, ad=b''):
"""Encrypt with synthetic IV"""
# Create synthetic IV
from metamui_crypto import HMAC
siv = HMAC(self.mac_key, 'sha256')
siv.update(nonce)
siv.update(plaintext)
siv.update(ad)
synthetic_nonce = siv.finalize()[:12]
# Encrypt with synthetic nonce
return self.cipher.encrypt(plaintext, synthetic_nonce, ad)
Performance Optimization
Parallel Processing
from concurrent.futures import ThreadPoolExecutor
import multiprocessing
class ParallelChaCha20Poly1305:
"""Parallel encryption for multiple messages"""
def __init__(self, key, workers=None):
self.key = key
self.workers = workers or multiprocessing.cpu_count()
def encrypt_batch(self, messages):
"""Encrypt multiple messages in parallel"""
cipher = ChaCha20Poly1305(self.key)
def encrypt_one(item):
plaintext, ad = item
nonce = os.urandom(12)
ciphertext, tag = cipher.encrypt(plaintext, nonce, ad)
return (nonce, ciphertext, tag)
with ThreadPoolExecutor(max_workers=self.workers) as executor:
results = list(executor.map(encrypt_one, messages))
return results
Precomputation
class PrecomputedChaCha20Poly1305:
"""Precompute keystream for known message lengths"""
def __init__(self, key):
self.key = key
self.keystream_cache = {}
def precompute_keystream(self, length, nonce):
"""Precompute keystream for given length"""
cache_key = (length, nonce)
if cache_key not in self.keystream_cache:
# Generate keystream
from metamui_crypto import ChaCha20
cipher = ChaCha20(self.key, nonce, counter=1)
keystream = cipher.encrypt(b'\x00' * length)
self.keystream_cache[cache_key] = keystream
return self.keystream_cache[cache_key]
Common Pitfalls
1. Nonce Reuse
# Bad: Reusing nonce breaks security completely
# nonce = b'0' * 12
# ct1, tag1 = cipher.encrypt(msg1, nonce)
# ct2, tag2 = cipher.encrypt(msg2, nonce) # BROKEN!
# Good: Unique nonce for every encryption
nonce1 = os.urandom(12)
nonce2 = os.urandom(12)
2. Tag Verification Timing
# Bad: Early return on tag mismatch (timing attack)
# if computed_tag != provided_tag:
# return None # Timing leak
# Good: Constant-time comparison (done internally)
try:
plaintext = cipher.decrypt(ciphertext, tag, nonce)
except ValueError:
# Generic error
return None
3. AD Mismatch
# Bad: Forgetting associated data
# ciphertext, tag = cipher.encrypt(data, nonce, ad=metadata)
# plaintext = cipher.decrypt(ciphertext, tag, nonce) # Missing AD!
# Good: Same AD for encrypt and decrypt
ciphertext, tag = cipher.encrypt(data, nonce, ad=metadata)
plaintext = cipher.decrypt(ciphertext, tag, nonce, ad=metadata)
Integration Examples
TLS 1.3 Record Layer
class TLS13Record:
"""Simplified TLS 1.3 record encryption"""
def __init__(self, traffic_secret):
# Derive key and IV
from metamui_crypto import HKDF
self.key = HKDF(
traffic_secret,
info=b"tls13 key",
length=32
)
self.iv = HKDF(
traffic_secret,
info=b"tls13 iv",
length=12
)
self.cipher = ChaCha20Poly1305(self.key)
self.sequence = 0
def encrypt_record(self, content_type, fragment):
"""Encrypt TLS record"""
# Build plaintext
plaintext = fragment + bytes([content_type])
# Compute nonce
nonce = bytearray(self.iv)
seq_bytes = self.sequence.to_bytes(8, 'big')
for i in range(8):
nonce[4 + i] ^= seq_bytes[i]
# Build additional data
ad = struct.pack('>BBH', 0x17, 0x03, 0x03) + \
struct.pack('>H', len(plaintext) + 16)
# Encrypt
ciphertext, tag = self.cipher.encrypt(
plaintext, bytes(nonce), ad
)
self.sequence += 1
return ciphertext + tag
Related Algorithms
- ChaCha20 - The underlying stream cipher
- Poly1305 - The authentication component
- AES-256-GCM - Alternative AEAD cipher
- Ascon - Lightweight AEAD alternative
Resources
- RFC 8439 - ChaCha20-Poly1305 Specification
- AEAD Security - AEAD Interface
- Security Analysis - Security properties
- Implementation Guide - Implementation details
- TLS 1.3 - Modern usage example