BLAKE2b is a cryptographic hash function that is faster than MD5, SHA-1, SHA-2, and SHA-3, yet is at least as secure as the latest standard SHA-3. BLAKE2b is optimized for 64-bit platforms and produces digests of any size between 1 and 64 bytes. It is a further development of the BLAKE hash function, which was a finalist in the SHA-3 competition.
| Parameter | Description | Default | Range/Type |
|---|---|---|---|
digest_size |
Output hash size in bytes | 64 | 1-64 |
key |
Optional key for MAC mode | None | 0-64 bytes |
salt |
Salt for randomization | Zero | 16 bytes |
personalization |
Domain separation | Zero | 16 bytes |
fanout |
Tree fanout | 1 | 0-255 |
depth |
Tree depth | 1 | 0-255 |
leaf_size |
Leaf maximal size | 0 | 0-2^32 |
node_offset |
Node offset | 0 | 0-2^64 |
node_depth |
Node depth | 0 | 0-255 |
inner_size |
Inner hash size | 0 | 0-64 |
from metamui_crypto import BLAKE2b
import os
# Basic hashing
blake2b = BLAKE2b()
hash_value = blake2b.hash(b"Hello, World!")
print(f"BLAKE2b hash: {hash_value.hex()}")
# Custom digest size
blake2b_256 = BLAKE2b(digest_size=32) # 256-bit output
hash_256 = blake2b_256.hash(b"Hello, World!")
print(f"BLAKE2b-256: {hash_256.hex()}")
# Keyed hashing (MAC mode)
key = os.urandom(32)
blake2b_mac = BLAKE2b(key=key)
mac = blake2b_mac.hash(b"Authenticated message")
print(f"BLAKE2b MAC: {mac.hex()}")
# Incremental hashing
hasher = BLAKE2b()
hasher.update(b"Hello, ")
hasher.update(b"World!")
final_hash = hasher.finalize()
# Personalization for domain separation
blake2b_files = BLAKE2b(personalization=b"FilesHasher2024")
blake2b_passwords = BLAKE2b(personalization=b"PasswordHash001")
# Salt for randomization
salt = os.urandom(16)
blake2b_salted = BLAKE2b(salt=salt)
salted_hash = blake2b_salted.hash(b"Data to hash")
# File hashing with progress
def hash_large_file(filepath, chunk_size=8192):
"""Hash a large file with progress reporting"""
hasher = BLAKE2b()
file_size = os.path.getsize(filepath)
bytes_read = 0
with open(filepath, 'rb') as f:
while chunk := f.read(chunk_size):
hasher.update(chunk)
bytes_read += len(chunk)
progress = (bytes_read / file_size) * 100
print(f"\rHashing: {progress:.1f}%", end='')
print("\nHash:", hasher.finalize().hex())
return hasher.finalize()
# Tree hashing for parallel processing
class TreeHasher:
def __init__(self, fanout=2, depth=2):
self.fanout = fanout
self.depth = depth
def hash_chunks(self, data_chunks):
"""Hash data chunks in parallel tree structure"""
# Leaf nodes
leaf_hashes = []
for i, chunk in enumerate(data_chunks):
leaf_hasher = BLAKE2b(
fanout=self.fanout,
depth=self.depth,
leaf_size=len(chunk),
node_offset=i,
node_depth=0,
inner_size=64
)
leaf_hashes.append(leaf_hasher.hash(chunk))
# Root node
root_hasher = BLAKE2b(
fanout=self.fanout,
depth=self.depth,
node_depth=1,
inner_size=64,
last_node=True
)
for leaf_hash in leaf_hashes:
root_hasher.update(leaf_hash)
return root_hasher.finalize()
# Content-based addressing
class ContentStore:
def __init__(self):
self.store = {}
self.hasher = BLAKE2b(digest_size=32) # 256-bit addresses
def put(self, content: bytes) -> str:
"""Store content and return its address"""
address = self.hasher.hash(content).hex()
self.store[address] = content
return address
def get(self, address: str) -> bytes:
"""Retrieve content by address"""
return self.store.get(address)
def exists(self, content: bytes) -> bool:
"""Check if content already exists"""
address = self.hasher.hash(content).hex()
return address in self.store
# Password hashing with BLAKE2b
class PasswordHasher:
def __init__(self):
self.personalization = b"MyAppPasswords01"
def hash_password(self, password: str, salt: bytes = None) -> tuple:
"""Hash password with salt"""
if salt is None:
salt = os.urandom(16)
hasher = BLAKE2b(
digest_size=32,
salt=salt,
personalization=self.personalization
)
# Add password stretching
intermediate = hasher.hash(password.encode())
for _ in range(1000): # Simple stretching
intermediate = hasher.hash(intermediate)
return intermediate, salt
def verify_password(self, password: str, hash_value: bytes, salt: bytes) -> bool:
"""Verify password against hash"""
computed_hash, _ = self.hash_password(password, salt)
return computed_hash == hash_value
# Keyed hashing for API authentication
class APIAuthenticator:
def __init__(self, secret_key: bytes):
self.hasher = BLAKE2b(key=secret_key, digest_size=32)
def sign_request(self, method: str, path: str, body: bytes, timestamp: int) -> str:
"""Create request signature"""
message = f"{method}\n{path}\n{timestamp}\n".encode() + body
signature = self.hasher.hash(message)
return signature.hex()
def verify_request(self, method: str, path: str, body: bytes,
timestamp: int, signature: str) -> bool:
"""Verify request signature"""
expected = self.sign_request(method, path, body, timestamp)
return expected == signature
# BLAKE2b core operations (simplified)
class BLAKE2bCore:
def __init__(self):
self.h = [ # Initial hash values
0x6a09e667f3bcc908, 0xbb67ae8584caa73b,
0x3c6ef372fe94f82b, 0xa54ff53a5f1d36f1,
0x510e527fade682d1, 0x9b05688c2b3e6c1f,
0x1f83d9abfb41bd6b, 0x5be0cd19137e2179
]
self.t = [0, 0] # Counter
self.f = [0, 0] # Finalization flags
def G(self, v, a, b, c, d, x, y):
"""BLAKE2b G function"""
v[a] = (v[a] + v[b] + x) & 0xFFFFFFFFFFFFFFFF
v[d] = self.rotr64(v[d] ^ v[a], 32)
v[c] = (v[c] + v[d]) & 0xFFFFFFFFFFFFFFFF
v[b] = self.rotr64(v[b] ^ v[c], 24)
v[a] = (v[a] + v[b] + y) & 0xFFFFFFFFFFFFFFFF
v[d] = self.rotr64(v[d] ^ v[a], 16)
v[c] = (v[c] + v[d]) & 0xFFFFFFFFFFFFFFFF
v[b] = self.rotr64(v[b] ^ v[c], 63)
def compress(self, block, last=False):
"""Compression function"""
v = self.h[:] + [
0x6a09e667f3bcc908, 0xbb67ae8584caa73b,
0x3c6ef372fe94f82b, 0xa54ff53a5f1d36f1
]
v[12] ^= self.t[0]
v[13] ^= self.t[1]
if last:
v[14] ^= 0xFFFFFFFFFFFFFFFF
# 12 rounds of mixing
for round in range(12):
# Column step
self.G(v, 0, 4, 8, 12, m[sigma[round][0]], m[sigma[round][1]])
self.G(v, 1, 5, 9, 13, m[sigma[round][2]], m[sigma[round][3]])
self.G(v, 2, 6, 10, 14, m[sigma[round][4]], m[sigma[round][5]])
self.G(v, 3, 7, 11, 15, m[sigma[round][6]], m[sigma[round][7]])
# Diagonal step
self.G(v, 0, 5, 10, 15, m[sigma[round][8]], m[sigma[round][9]])
self.G(v, 1, 6, 11, 12, m[sigma[round][10]], m[sigma[round][11]])
self.G(v, 2, 7, 8, 13, m[sigma[round][12]], m[sigma[round][13]])
self.G(v, 3, 4, 9, 14, m[sigma[round][14]], m[sigma[round][15]])
# Update hash values
for i in range(8):
self.h[i] ^= v[i] ^ v[i + 8]
# DON'T: Use short output for security-critical applications
blake2b_weak = BLAKE2b(digest_size=8) # Only 64 bits!
# WRONG: Vulnerable to birthday attacks
weak_hash = blake2b_weak.hash(sensitive_data)
# DO: Use adequate output size
blake2b_secure = BLAKE2b(digest_size=32) # 256 bits minimum
secure_hash = blake2b_secure.hash(sensitive_data)
# DON'T: Reuse keys across different contexts
shared_key = os.urandom(32)
# WRONG: Same key for different purposes
file_mac = BLAKE2b(key=shared_key).hash(file_data)
api_mac = BLAKE2b(key=shared_key).hash(api_data)
# DO: Derive separate keys or use personalization
file_hasher = BLAKE2b(key=shared_key, personalization=b"FileMAC_v1.0")
api_hasher = BLAKE2b(key=shared_key, personalization=b"APIMAC_v1.0")
# DON'T: Use BLAKE2b directly for password storage
password_hash = BLAKE2b().hash(password.encode())
# WRONG: No salt, no stretching, vulnerable to rainbow tables
# DO: Use proper password hashing
salt = os.urandom(16)
# Better: Use Argon2 which uses BLAKE2b internally
from metamui_crypto import Argon2
argon2 = Argon2()
password_hash = argon2.hash(password.encode(), salt)
| Operation | Data Size | Throughput | Time | CPU Cycles/Byte |
|---|---|---|---|---|
| BLAKE2b-512 | 64 B | 122 MB/s | 0.52 μs | 21.3 |
| BLAKE2b-512 | 1 KB | 952 MB/s | 1.05 μs | 2.73 |
| BLAKE2b-512 | 64 KB | 1.82 GB/s | 35.2 μs | 1.43 |
| BLAKE2b-512 | 1 MB | 1.91 GB/s | 524 μs | 1.36 |
| BLAKE2b-256 | 1 KB | 961 MB/s | 1.04 μs | 2.70 |
| Keyed Mode | 1 KB | 945 MB/s | 1.06 μs | 2.75 |
| Tree Mode (4-way) | 4 MB | 6.84 GB/s | 584 μs | 0.38 |
| Algorithm | 64 KB Throughput | Relative Speed |
|---|---|---|
| BLAKE2b | 1.82 GB/s | 1.00x (baseline) |
| SHA-256 | 0.54 GB/s | 0.30x |
| SHA-512 | 0.85 GB/s | 0.47x |
| SHA3-256 | 0.42 GB/s | 0.23x |
| BLAKE3 | 2.91 GB/s | 1.60x |
| MD5 | 0.65 GB/s | 0.36x |
# Efficient batch hashing
def hash_many_files(file_paths):
"""Hash multiple files efficiently"""
results = {}
hasher = BLAKE2b()
for path in file_paths:
hasher.reset() # Reuse hasher instance
with open(path, 'rb') as f:
while chunk := f.read(65536): # 64KB chunks
hasher.update(chunk)
results[path] = hasher.finalize()
return results
# Parallel hashing with multiprocessing
from multiprocessing import Pool
def parallel_hash(data_chunks):
"""Hash chunks in parallel"""
def hash_chunk(chunk):
return BLAKE2b().hash(chunk)
with Pool() as pool:
return pool.map(hash_chunk, data_chunks)
# Memory-efficient streaming
class StreamHasher:
def __init__(self, stream):
self.stream = stream
self.hasher = BLAKE2b()
def hash_stream(self, chunk_size=8192):
"""Hash data stream without loading into memory"""
while data := self.stream.read(chunk_size):
self.hasher.update(data)
yield len(data) # Progress indication
return self.hasher.finalize()
class BlockHeader:
def __init__(self, version, prev_hash, merkle_root, timestamp, nonce):
self.version = version
self.prev_hash = prev_hash
self.merkle_root = merkle_root
self.timestamp = timestamp
self.nonce = nonce
def hash(self):
"""Compute block hash using BLAKE2b"""
hasher = BLAKE2b(digest_size=32)
hasher.update(self.version.to_bytes(4, 'little'))
hasher.update(self.prev_hash)
hasher.update(self.merkle_root)
hasher.update(self.timestamp.to_bytes(8, 'little'))
hasher.update(self.nonce.to_bytes(8, 'little'))
return hasher.finalize()
class MerkleTree:
def __init__(self):
self.hasher = BLAKE2b(digest_size=32, personalization=b"MerkleTree2024")
def hash_pair(self, left: bytes, right: bytes) -> bytes:
"""Hash two nodes to create parent"""
return self.hasher.hash(left + right)
def compute_root(self, leaves: list) -> bytes:
"""Compute Merkle root from leaves"""
if not leaves:
return self.hasher.hash(b"")
nodes = leaves[:]
while len(nodes) > 1:
next_level = []
for i in range(0, len(nodes), 2):
if i + 1 < len(nodes):
parent = self.hash_pair(nodes[i], nodes[i + 1])
else:
parent = nodes[i] # Odd node carries forward
next_level.append(parent)
nodes = next_level
return nodes[0]
class FileIntegrityChecker:
def __init__(self, db_path):
self.db_path = db_path
self.hasher = BLAKE2b(personalization=b"FileIntegrity01")
def scan_directory(self, directory):
"""Scan directory and store file hashes"""
integrity_db = {}
for root, dirs, files in os.walk(directory):
for file in files:
filepath = os.path.join(root, file)
file_hash = self.hash_file(filepath)
integrity_db[filepath] = {
'hash': file_hash,
'size': os.path.getsize(filepath),
'mtime': os.path.getmtime(filepath)
}
return integrity_db
def hash_file(self, filepath):
"""Hash file with metadata"""
hasher = BLAKE2b()
# Include file metadata in hash
stat = os.stat(filepath)
hasher.update(f"size:{stat.st_size}\n".encode())
hasher.update(f"mode:{stat.st_mode}\n".encode())
# Hash file contents
with open(filepath, 'rb') as f:
while chunk := f.read(65536):
hasher.update(chunk)
return hasher.finalize()
def verify_integrity(self, directory, stored_db):
"""Verify files haven't changed"""
changes = []
current_db = self.scan_directory(directory)
for filepath, current_info in current_db.items():
if filepath in stored_db:
if current_info['hash'] != stored_db[filepath]['hash']:
changes.append({
'file': filepath,
'type': 'modified',
'old_hash': stored_db[filepath]['hash'].hex(),
'new_hash': current_info['hash'].hex()
})
else:
changes.append({
'file': filepath,
'type': 'added',
'hash': current_info['hash'].hex()
})
for filepath in stored_db:
if filepath not in current_db:
changes.append({
'file': filepath,
'type': 'deleted',
'hash': stored_db[filepath]['hash'].hex()
})
return changes
class SecureChannel:
def __init__(self, shared_secret: bytes):
self.shared_secret = shared_secret
self.send_counter = 0
self.recv_counter = 0
def derive_key(self, purpose: bytes, counter: int) -> bytes:
"""Derive key using BLAKE2b as KDF"""
hasher = BLAKE2b(
key=self.shared_secret,
personalization=b"SecureChannel001"
)
hasher.update(purpose)
hasher.update(counter.to_bytes(8, 'big'))
return hasher.finalize()[:32] # 256-bit key
def send_message(self, message: bytes) -> bytes:
"""Send authenticated encrypted message"""
# Derive keys for this message
enc_key = self.derive_key(b"encryption", self.send_counter)
mac_key = self.derive_key(b"mac", self.send_counter)
# Encrypt message (simplified - use real AEAD in practice)
nonce = os.urandom(12)
ciphertext = self.encrypt(message, enc_key, nonce)
# Compute MAC
mac_hasher = BLAKE2b(key=mac_key, digest_size=16)
mac_hasher.update(nonce)
mac_hasher.update(self.send_counter.to_bytes(8, 'big'))
mac_hasher.update(ciphertext)
mac = mac_hasher.finalize()
self.send_counter += 1
return nonce + ciphertext + mac
def receive_message(self, data: bytes) -> bytes:
"""Receive and verify message"""
nonce = data[:12]
ciphertext = data[12:-16]
mac = data[-16:]
# Derive keys
enc_key = self.derive_key(b"encryption", self.recv_counter)
mac_key = self.derive_key(b"mac", self.recv_counter)
# Verify MAC
mac_hasher = BLAKE2b(key=mac_key, digest_size=16)
mac_hasher.update(nonce)
mac_hasher.update(self.recv_counter.to_bytes(8, 'big'))
mac_hasher.update(ciphertext)
expected_mac = mac_hasher.finalize()
if mac != expected_mac:
raise ValueError("MAC verification failed")
# Decrypt message
message = self.decrypt(ciphertext, enc_key, nonce)
self.recv_counter += 1
return message
class DeduplicationStore:
def __init__(self, chunk_size=4096):
self.chunk_size = chunk_size
self.chunks = {} # hash -> data
self.files = {} # filename -> list of hashes
self.hasher = BLAKE2b(digest_size=32)
def store_file(self, filename: str, data: bytes):
"""Store file using content-based chunking"""
chunks = []
# Split into chunks
for i in range(0, len(data), self.chunk_size):
chunk = data[i:i + self.chunk_size]
chunk_hash = self.hasher.hash(chunk)
# Store chunk if new
if chunk_hash not in self.chunks:
self.chunks[chunk_hash] = chunk
chunks.append(chunk_hash)
self.files[filename] = chunks
# Calculate deduplication ratio
unique_size = len(self.chunks) * self.chunk_size
total_size = sum(len(chunks) * self.chunk_size for chunks in self.files.values())
ratio = 1 - (unique_size / total_size) if total_size > 0 else 0
return {
'chunks': len(chunks),
'unique_chunks': len(self.chunks),
'dedup_ratio': ratio
}
def retrieve_file(self, filename: str) -> bytes:
"""Reconstruct file from chunks"""
if filename not in self.files:
raise KeyError(f"File {filename} not found")
data = b""
for chunk_hash in self.files[filename]:
data += self.chunks[chunk_hash]
return data
def verify_integrity(self):
"""Verify all chunks match their hashes"""
errors = []
for chunk_hash, chunk_data in self.chunks.items():
computed_hash = self.hasher.hash(chunk_data)
if computed_hash != chunk_hash:
errors.append({
'stored_hash': chunk_hash.hex(),
'computed_hash': computed_hash.hex()
})
return errors
# Before: SHA-256
import hashlib
sha256_hash = hashlib.sha256(data).digest()
# After: BLAKE2b with 256-bit output
from metamui_crypto import BLAKE2b
blake2b = BLAKE2b(digest_size=32)
blake2b_hash = blake2b.hash(data)
# Migration wrapper
class HashMigrator:
def __init__(self, use_blake2b=True):
self.use_blake2b = use_blake2b
def hash(self, data):
if self.use_blake2b:
return BLAKE2b(digest_size=32).hash(data)
else:
return hashlib.sha256(data).digest()
# Before: HMAC-SHA256
import hmac
import hashlib
mac = hmac.new(key, data, hashlib.sha256).digest()
# After: BLAKE2b keyed mode
from metamui_crypto import BLAKE2b
blake2b_mac = BLAKE2b(key=key, digest_size=32)
mac = blake2b_mac.hash(data)
# Compatible wrapper
def compute_mac(key, data, algorithm='blake2b'):
if algorithm == 'blake2b':
return BLAKE2b(key=key, digest_size=32).hash(data)
elif algorithm == 'hmac-sha256':
return hmac.new(key, data, hashlib.sha256).digest()
# Before: MD5 (insecure!)
import hashlib
md5_hash = hashlib.md5(data).digest() # 128-bit
# After: BLAKE2b with appropriate size
from metamui_crypto import BLAKE2b
# For non-cryptographic uses (e.g., checksums)
blake2b_128 = BLAKE2b(digest_size=16) # 128-bit like MD5
checksum = blake2b_128.hash(data)
# For cryptographic uses
blake2b_256 = BLAKE2b(digest_size=32) # 256-bit secure
secure_hash = blake2b_256.hash(data)
# Test Vector 1: Empty input
blake2b = BLAKE2b()
assert blake2b.hash(b"").hex() == "786a02f742015903c6c6fd852552d272912f4740e15847618a86e217f71f5419d25e1031afee585313896444934eb04b903a685b1448b755d56f701afe9be2ce"
# Test Vector 2: "abc"
assert blake2b.hash(b"abc").hex() == "ba80a53f981c4d0d6a2797b69f12f6e94c212f14685ac4b74b12bb6fdbffa2d17d87c5392aab792dc252d5de4533cc9518d38aa8dbf1925ab92386edd4009923"
# Test Vector 3: One million 'a's
data = b"a" * 1000000
assert blake2b.hash(data).hex() == "98fb3efb7206fd19ebf69b6f312cf7b64e3b94dbe1a17107913975a793f177e1d077609d7fba363cbba00d05f7aa4e4fa8715d6428104c0a75643b0ff3fd3eaf"
# Test Vector 4: Keyed hashing
key = bytes.fromhex("000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f202122232425262728292a2b2c2d2e2f303132333435363738393a3b3c3d3e3f")
blake2b_keyed = BLAKE2b(key=key)
assert blake2b_keyed.hash(b"").hex() == "10ebb67700b1868efb4417987acf4690ae9d972fb7a590c2f02871799aaa4786b5e996e8f0f4eb981fc214b005f42d2ff4233499391653df7aefcbc13fc51568"
# Test Vector 5: BLAKE2b-256
blake2b_256 = BLAKE2b(digest_size=32)
# Empty input
assert blake2b_256.hash(b"").hex() == "0e5751c026e543b2e8ab2eb06099daa1d1e5df47778f7787faab45cdf12fe3a8"
# "The quick brown fox jumps over the lazy dog"
message = b"The quick brown fox jumps over the lazy dog"
assert blake2b_256.hash(message).hex() == "01718cec35cd3d796dd00020e0bfecb473ad23457d063b75eff29c0ffa2e58a9"
# Personalization test
blake2b_personal = BLAKE2b(digest_size=32, personalization=b"ZcashPrevoutHash")
assert blake2b_personal.hash(b"").hex() == "d53a633bbecf82fe9e9484d8a0e727c73bb9e68c96e72dec30144f6a84afa136"
# Test Vector 6: Incremental vs one-shot
data = b"The quick brown fox jumps over the lazy dog"
# One-shot
blake2b = BLAKE2b()
one_shot = blake2b.hash(data)
# Incremental
hasher = BLAKE2b()
hasher.update(b"The quick brown ")
hasher.update(b"fox jumps over ")
hasher.update(b"the lazy dog")
incremental = hasher.finalize()
assert one_shot == incremental
The BLAKE2 Cryptographic Hash and MAC - Official specification
Reference implementations and comprehensive documentation
Original academic specification and security analysis
Popular cryptographic library with optimized BLAKE2b implementation