🌊 ChaCha20 Stream Cipher
📋 Quick Navigation
📖 Overview
ChaCha20 is a high-speed stream cipher designed by Daniel J. Bernstein. It's an improved variant of Salsa20, offering better diffusion and performance while maintaining excellent security properties. ChaCha20 operates on 512-bit blocks using 256-bit keys and is particularly fast on platforms without hardware AES acceleration.
✨ Key Features
High Performance
Extremely fast on all platforms, especially without AES-NI
Software Optimized
Excellent software performance without hardware acceleration
Simple Design
Easy to implement correctly with minimal complexity
IETF Standard
Standardized in RFC 8439 and RFC 7539
Widely Adopted
Used in TLS 1.3, WireGuard, and many modern protocols
Proven Security
Extensive cryptanalysis with strong security guarantees
Stream Cipher
Generates keystream for XOR-based encryption
Constant Time
Resistant to timing attacks when properly implemented
🎯 Common Use Cases
🔐 Protocol Integration
- TLS 1.3: ChaCha20-Poly1305 cipher suite
- WireGuard VPN: Primary encryption algorithm
- SSH: chacha20-poly1305@openssh.com cipher
- Signal Protocol: End-to-end encryption
💻 Application Areas
- Mobile Devices: Efficient encryption without AES hardware
- IoT Devices: Lightweight encryption for embedded systems
- File Encryption: High-speed bulk data encryption
- Network Security: Real-time communication encryption
Algorithm Parameters
| Parameter | Value |
|---|---|
| Key Size | 256 bits |
| Nonce Size | 96 bits (IETF) |
| Block Size | 512 bits |
| Rounds | 20 |
| Security Level | 256 bits |
Usage Examples
Basic Stream Encryption
from metamui_crypto import ChaCha20
import os
# Generate key and nonce
key = os.urandom(32) # 256 bits
nonce = os.urandom(12) # 96 bits (IETF variant)
# Create cipher
cipher = ChaCha20(key, nonce)
# Encrypt data (any length)
plaintext = b"Secret message of any length"
ciphertext = cipher.encrypt(plaintext)
# Decrypt (same operation)
decipher = ChaCha20(key, nonce)
decrypted = decipher.decrypt(ciphertext)
assert decrypted == plaintext
Counter Management
from metamui_crypto import ChaCha20
# Specify initial counter value
key = os.urandom(32)
nonce = os.urandom(12)
initial_counter = 1 # Start from block 1
cipher = ChaCha20(key, nonce, counter=initial_counter)
# Encrypt in chunks
chunk1 = b"First chunk of data"
chunk2 = b"Second chunk of data"
encrypted1 = cipher.encrypt(chunk1)
encrypted2 = cipher.encrypt(chunk2) # Counter auto-increments
# Decrypt with same initial counter
decipher = ChaCha20(key, nonce, counter=initial_counter)
decrypted1 = decipher.decrypt(encrypted1)
decrypted2 = decipher.decrypt(encrypted2)
Seeking in Stream
from metamui_crypto import ChaCha20
class SeekableChaCha20:
"""ChaCha20 with seek capability"""
def __init__(self, key, nonce):
self.key = key
self.nonce = nonce
self.block_size = 64 # ChaCha20 block size
def encrypt_at_offset(self, plaintext, offset):
"""Encrypt data at specific offset"""
# Calculate block number and position
block_num = offset // self.block_size
block_offset = offset % self.block_size
# Create cipher at correct position
cipher = ChaCha20(self.key, self.nonce, counter=block_num)
# Generate keystream for partial block if needed
if block_offset > 0:
# Burn initial bytes
cipher.encrypt(b'\x00' * block_offset)
return cipher.encrypt(plaintext)
def decrypt_range(self, ciphertext, start, length):
"""Decrypt specific range"""
return self.encrypt_at_offset(
ciphertext[start:start+length],
start
)
Parallel Encryption
from metamui_crypto import ChaCha20
import concurrent.futures
class ParallelChaCha20:
"""Parallel ChaCha20 encryption"""
def __init__(self, key, nonce):
self.key = key
self.nonce = nonce
self.chunk_size = 1024 * 1024 # 1MB chunks
def encrypt_parallel(self, data, workers=4):
"""Encrypt data using multiple threads"""
chunks = []
# Split into chunks with block alignment
for i in range(0, len(data), self.chunk_size):
chunk = data[i:i + self.chunk_size]
block_counter = i // 64 # ChaCha20 block size
chunks.append((chunk, block_counter))
# Encrypt chunks in parallel
encrypted = [None] * len(chunks)
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
futures = {}
for idx, (chunk, counter) in enumerate(chunks):
future = executor.submit(
self._encrypt_chunk,
chunk, counter
)
futures[future] = idx
for future in concurrent.futures.as_completed(futures):
idx = futures[future]
encrypted[idx] = future.result()
return b''.join(encrypted)
def _encrypt_chunk(self, chunk, counter):
"""Encrypt single chunk"""
cipher = ChaCha20(self.key, self.nonce, counter=counter)
return cipher.encrypt(chunk)
Implementation Details
ChaCha20 Core Function
The ChaCha20 core operates on a 4×4 matrix of 32-bit words:
def chacha20_block(key, counter, nonce):
"""Generate single ChaCha20 block"""
# Initial state
state = [
0x61707865, 0x3320646e, 0x79622d32, 0x6b206574, # Constants
key[0:4], key[4:8], key[8:12], key[12:16], # Key
key[16:20], key[20:24], key[24:28], key[28:32], # Key
counter, nonce[0:4], nonce[4:8], nonce[8:12] # Counter + Nonce
]
working_state = state.copy()
# 20 rounds (10 double-rounds)
for _ in range(10):
# Column rounds
quarter_round(working_state, 0, 4, 8, 12)
quarter_round(working_state, 1, 5, 9, 13)
quarter_round(working_state, 2, 6, 10, 14)
quarter_round(working_state, 3, 7, 11, 15)
# Diagonal rounds
quarter_round(working_state, 0, 5, 10, 15)
quarter_round(working_state, 1, 6, 11, 12)
quarter_round(working_state, 2, 7, 8, 13)
quarter_round(working_state, 3, 4, 9, 14)
# Add initial state
for i in range(16):
working_state[i] = (working_state[i] + state[i]) & 0xffffffff
return serialize(working_state)
Quarter Round Function
def quarter_round(state, a, b, c, d):
"""ChaCha20 quarter round"""
state[a] = (state[a] + state[b]) & 0xffffffff
state[d] ^= state[a]
state[d] = rotate_left(state[d], 16)
state[c] = (state[c] + state[d]) & 0xffffffff
state[b] ^= state[c]
state[b] = rotate_left(state[b], 12)
state[a] = (state[a] + state[b]) & 0xffffffff
state[d] ^= state[a]
state[d] = rotate_left(state[d], 8)
state[c] = (state[c] + state[d]) & 0xffffffff
state[b] ^= state[c]
state[b] = rotate_left(state[b], 7)
Advanced Usage
File Encryption with ChaCha20
from metamui_crypto import ChaCha20
import os
import struct
class ChaCha20FileEncryptor:
def __init__(self):
self.chunk_size = 64 * 1024 # 64KB chunks
def encrypt_file(self, input_path, output_path, key):
"""Encrypt file with ChaCha20"""
# Generate random nonce
nonce = os.urandom(12)
# Initialize cipher
cipher = ChaCha20(key, nonce)
with open(input_path, 'rb') as infile, \
open(output_path, 'wb') as outfile:
# Write header: nonce
outfile.write(nonce)
# Encrypt file in chunks
while True:
chunk = infile.read(self.chunk_size)
if not chunk:
break
encrypted_chunk = cipher.encrypt(chunk)
outfile.write(encrypted_chunk)
def decrypt_file(self, input_path, output_path, key):
"""Decrypt file with ChaCha20"""
with open(input_path, 'rb') as infile:
# Read nonce
nonce = infile.read(12)
# Initialize cipher
cipher = ChaCha20(key, nonce)
with open(output_path, 'wb') as outfile:
# Decrypt file in chunks
while True:
chunk = infile.read(self.chunk_size)
if not chunk:
break
decrypted_chunk = cipher.decrypt(chunk)
outfile.write(decrypted_chunk)
Random Number Generation
from metamui_crypto import ChaCha20
import time
class ChaCha20RNG:
"""ChaCha20-based random number generator"""
def __init__(self, seed=None):
if seed is None:
# Use system randomness
seed = os.urandom(32)
self.key = seed
self.counter = 0
self.buffer = b''
self.buffer_pos = 0
def random_bytes(self, n):
"""Generate n random bytes"""
result = b''
while len(result) < n:
# Refill buffer if needed
if self.buffer_pos >= len(self.buffer):
self._refill_buffer()
# Take bytes from buffer
available = len(self.buffer) - self.buffer_pos
needed = n - len(result)
to_take = min(available, needed)
result += self.buffer[self.buffer_pos:self.buffer_pos + to_take]
self.buffer_pos += to_take
return result
def _refill_buffer(self):
"""Generate new random block"""
# Use counter as nonce
nonce = self.counter.to_bytes(12, 'little')
# Generate keystream
cipher = ChaCha20(self.key, nonce)
self.buffer = cipher.encrypt(b'\x00' * 512) # 8 blocks
self.buffer_pos = 0
# Increment counter
self.counter += 1
# Reseed periodically
if self.counter % 1000000 == 0:
self._reseed()
def _reseed(self):
"""Reseed with fresh entropy"""
new_seed = os.urandom(32)
# Mix with current key
cipher = ChaCha20(self.key, b'\x00' * 12)
self.key = cipher.encrypt(new_seed)[:32]
self.counter = 0
Network Protocol Encryption
from metamui_crypto import ChaCha20
import struct
class ChaCha20Protocol:
"""Network protocol using ChaCha20"""
def __init__(self, shared_key):
self.shared_key = shared_key
self.send_counter = 0
self.recv_counter = 0
def send_message(self, message):
"""Encrypt and send message"""
# Use message counter as nonce
nonce = struct.pack('<Q', self.send_counter) + b'\x00' * 4
# Encrypt message
cipher = ChaCha20(self.shared_key, nonce)
ciphertext = cipher.encrypt(message)
# Create packet
packet = struct.pack('<Q', self.send_counter) + ciphertext
self.send_counter += 1
return packet
def receive_message(self, packet):
"""Receive and decrypt message"""
# Extract counter
counter = struct.unpack('<Q', packet[:8])[0]
ciphertext = packet[8:]
# Check counter for replay attacks
if counter <= self.recv_counter:
raise ValueError("Replay attack detected")
# Decrypt message
nonce = struct.pack('<Q', counter) + b'\x00' * 4
cipher = ChaCha20(self.shared_key, nonce)
message = cipher.decrypt(ciphertext)
self.recv_counter = counter
return message
Performance Optimization
SIMD Implementation
class ChaCha20SIMD:
"""SIMD-optimized ChaCha20"""
def __init__(self, key, nonce):
self.key = key
self.nonce = nonce
# Detect SIMD support
import platform
self.has_simd = self._detect_simd()
def _detect_simd(self):
"""Detect SIMD instruction support"""
if platform.machine() in ['x86_64', 'AMD64']:
try:
import cpuinfo
info = cpuinfo.get_cpu_info()
flags = info.get('flags', [])
return 'avx2' in flags or 'sse2' in flags
except:
return False
elif platform.machine().startswith('arm'):
# ARM NEON
return 'neon' in platform.machine().lower()
return False
def encrypt_blocks(self, plaintext, num_blocks=4):
"""Encrypt multiple blocks in parallel"""
if self.has_simd and num_blocks > 1:
# Use SIMD implementation
return self._encrypt_simd(plaintext, num_blocks)
else:
# Fallback to scalar
return self._encrypt_scalar(plaintext)
Cache-Friendly Implementation
def chacha20_encrypt_cached(key, nonce, plaintext):
"""Cache-friendly ChaCha20 encryption"""
# Process in cache-line-sized chunks
CACHE_LINE = 64 # bytes
BLOCKS_PER_CACHE = CACHE_LINE // 64 # ChaCha20 block size
result = bytearray(len(plaintext))
cipher = ChaCha20(key, nonce)
# Process aligned chunks
for i in range(0, len(plaintext), CACHE_LINE):
chunk = plaintext[i:i + CACHE_LINE]
result[i:i + len(chunk)] = cipher.encrypt(chunk)
return bytes(result)
Security Considerations
Nonce Reuse Prevention
class NonceManager:
"""Prevent nonce reuse"""
def __init__(self):
self.used_nonces = set()
self.counter_mode = False
def get_nonce(self):
"""Get unique nonce"""
while True:
if self.counter_mode:
# Counter-based nonce
nonce = self.counter.to_bytes(12, 'little')
self.counter += 1
else:
# Random nonce
nonce = os.urandom(12)
# Check for reuse
if nonce not in self.used_nonces:
self.used_nonces.add(nonce)
return nonce
# Switch to counter mode if too many collisions
if len(self.used_nonces) > 2**32:
self.counter_mode = True
self.counter = 0
Key Rotation
class ChaCha20KeyRotation:
"""Automatic key rotation"""
def __init__(self, master_key):
self.master_key = master_key
self.current_key = self._derive_key(0)
self.key_id = 0
self.message_count = 0
self.rotation_interval = 1000000 # Messages per key
def encrypt(self, plaintext):
"""Encrypt with automatic rotation"""
# Check if rotation needed
if self.message_count >= self.rotation_interval:
self._rotate_key()
# Generate nonce
nonce = os.urandom(12)
# Encrypt
cipher = ChaCha20(self.current_key, nonce)
ciphertext = cipher.encrypt(plaintext)
self.message_count += 1
# Include key ID for decryption
return struct.pack('<I', self.key_id) + nonce + ciphertext
def _rotate_key(self):
"""Rotate to new key"""
self.key_id += 1
self.current_key = self._derive_key(self.key_id)
self.message_count = 0
def _derive_key(self, key_id):
"""Derive key from master key"""
from metamui_crypto import HKDF
return HKDF(
self.master_key,
salt=b"chacha20-rotation",
info=struct.pack('<I', key_id),
length=32
)
Common Pitfalls
1. Nonce Reuse
# Bad: Reusing nonce
# nonce = b'0' * 12
# cipher1 = ChaCha20(key, nonce)
# cipher2 = ChaCha20(key, nonce) # Same nonce!
# Good: Unique nonce per encryption
nonce1 = os.urandom(12)
nonce2 = os.urandom(12)
2. Counter Overflow
# Bad: Not checking counter overflow
# Large file might overflow 32-bit counter
# Good: Check for overflow
def encrypt_large_file(key, nonce, file_size):
max_bytes = 2**32 * 64 # Counter * block_size
if file_size > max_bytes:
raise ValueError(f"File too large: {file_size} > {max_bytes}")
3. Wrong Nonce Size
# Bad: Wrong nonce size
# nonce = os.urandom(8) # Original uses 64-bit
# Good: Use correct size for variant
nonce = os.urandom(12) # IETF variant (96-bit)
# or
nonce = os.urandom(8) # Original variant (64-bit)
Comparison with AES
| Feature | ChaCha20 | AES-256 |
|---|---|---|
| Key Size | 256 bits | 256 bits |
| Speed (no HW) | Faster | Slower |
| Speed (with HW) | Fast | Faster |
| Implementation | Simple | Complex |
| Side-channels | Resistant | Needs care |
| Parallelizable | Yes | Mode-dependent |
Resources
- RFC 8439 - ChaCha20 and Poly1305
- Original Paper - ChaCha design
- Security Analysis - Security properties
- Implementation Guide - Implementation details
- Performance Analysis - Performance comparison