Symmetric Encryption

🔒 ARIA-256 Korean Standard Cipher

Security Level 256-bit
Performance ⭐⭐⭐⭐ Very Good
Quantum Resistant ⚠️ Partial (128-bit)
Block Size 128 bits
Key Size 256 bits
Rounds 16 rounds

📖 Overview

ARIA-256 is a Korean national standard block cipher developed in 2003 by Korean cryptographers. It's an international standard (RFC 5794) that uses a 256-bit key and operates on 128-bit blocks, providing strong security through a substitution-permutation network similar to AES.

✨ Key Features

🏛️

National Standard

Korean national standard (KS X 1213:2004) and RFC 5794

🔐

256-bit Security

Maximum security level with 256-bit key size

🔄

SPN Structure

Substitution-permutation network similar to AES

⚖️

Patent-Free

No licensing requirements for implementation

🔢

16 Rounds

Strong security margin with 16 encryption rounds

🌐

Multiple Modes

Supports ECB, CBC, CTR, GCM, and other modes

🎯 Common Use Cases

🏛️ Government & Financial

  • Korean Government: Official government communications
  • Financial Services: Banking and payment systems in Korea
  • Regulatory Compliance: Meeting Korean cryptographic standards
  • International Trade: Cross-border data protection

🔒 Security Applications

  • Secure Storage: Database and file encryption
  • Network Security: VPN and TLS implementations
  • Embedded Systems: Efficient on various platforms
  • Algorithm Diversity: Alternative to AES monoculture

🔧 Algorithm Parameters

📊 ARIA-256 Parameters

Block Size
128 bits
Standard block size (16 bytes)
Key Size
256 bits
Maximum security level (32 bytes)
Rounds
16
Encryption/decryption rounds
S-boxes
4 types
Different substitution boxes
Key Schedule
Feistel-based
Key expansion algorithm
Structure
SPN
Substitution-permutation network

🛡️ Security Properties

🔢

Key Space

2^256 possible keys (enormous keyspace)

🛡️

Attack Resistance

No known practical attacks against ARIA-256

🔒

Cryptanalysis Margin

Significant security margin against known attacks

S-box Design

Resistant to differential and linear cryptanalysis

💻 Usage Examples

Basic Encryption/Decryption

from metamui_crypto import ARIA256, generate_random_bytes
import os

# Basic encryption/decryption
key = generate_random_bytes(32)  # 256-bit key
aria = ARIA256(key)

# Encrypt data
plaintext = b"Sensitive information that needs protection"
ciphertext = aria.encrypt(plaintext)
print(f"Ciphertext: {ciphertext.hex()}")

# Decrypt data
decrypted = aria.decrypt(ciphertext)
assert decrypted == plaintext

# Using different modes of operation
# CBC mode with IV
iv = generate_random_bytes(16)
aria_cbc = ARIA256(key, mode='CBC', iv=iv)
ciphertext_cbc = aria_cbc.encrypt(plaintext)

# CTR mode for streaming
nonce = generate_random_bytes(12)
aria_ctr = ARIA256(key, mode='CTR', nonce=nonce)
ciphertext_ctr = aria_ctr.encrypt(plaintext)

# GCM mode for authenticated encryption
aria_gcm = ARIA256(key, mode='GCM')
ciphertext_gcm, tag = aria_gcm.encrypt_and_authenticate(plaintext)

🔄 Modes of Operation

# Secure file encryption system
class ARIAFileEncryptor:
    def __init__(self):
        self.chunk_size = 64 * 1024  # 64KB chunks
    
    def encrypt_file(self, filepath: str, password: str) -> str:
        """Encrypt file with password-derived key"""
        # Derive key from password
        salt = generate_random_bytes(16)
        key = self.derive_key(password, salt)
        
        # Generate random IV
        iv = generate_random_bytes(16)
        
        # Initialize ARIA-256 in CBC mode
        cipher = ARIA256(key, mode='CBC', iv=iv)
        
        # Encrypt file
        encrypted_path = filepath + '.aria'
        with open(filepath, 'rb') as infile, \
             open(encrypted_path, 'wb') as outfile:
            
            # Write header: salt + iv
            outfile.write(salt)
            outfile.write(iv)
            
            # Encrypt file contents
            while chunk := infile.read(self.chunk_size):
                if len(chunk) % 16 != 0:
                    # Pad last chunk
                    chunk = self.pad(chunk)
                
                encrypted_chunk = cipher.encrypt(chunk)
                outfile.write(encrypted_chunk)
        
        return encrypted_path
    
    def decrypt_file(self, encrypted_path: str, password: str) -> str:
        """Decrypt file with password"""
        with open(encrypted_path, 'rb') as infile:
            # Read header
            salt = infile.read(16)
            iv = infile.read(16)
            
            # Derive key
            key = self.derive_key(password, salt)
            
            # Initialize cipher
            cipher = ARIA256(key, mode='CBC', iv=iv)
            
            # Decrypt file
            decrypted_path = encrypted_path.replace('.aria', '.decrypted')
            with open(decrypted_path, 'wb') as outfile:
                while chunk := infile.read(self.chunk_size):
                    decrypted_chunk = cipher.decrypt(chunk)
                    
                    # Remove padding from last chunk
                    if len(infile.peek(1)) == 0:
                        decrypted_chunk = self.unpad(decrypted_chunk)
                    
                    outfile.write(decrypted_chunk)
        
        return decrypted_path
    
    def derive_key(self, password: str, salt: bytes) -> bytes:
        """Derive key from password using PBKDF2"""
        from metamui_crypto import PBKDF2
        pbkdf2 = PBKDF2(iterations=100000, hash_function='SHA256')
        return pbkdf2.derive(password.encode(), salt, key_length=32)
    
    def pad(self, data: bytes) -> bytes:
        """PKCS#7 padding"""
        padding_length = 16 - (len(data) % 16)
        padding = bytes([padding_length] * padding_length)
        return data + padding
    
    def unpad(self, data: bytes) -> bytes:
        """Remove PKCS#7 padding"""
        padding_length = data[-1]
        return data[:-padding_length]

# Database encryption
class ARIADatabaseEncryption:
    def __init__(self, master_key: bytes):
        self.master_key = master_key
    
    def encrypt_column(self, table: str, column: str, value: bytes) -> bytes:
        """Encrypt database column value"""
        # Derive column-specific key
        from metamui_crypto import HKDF, SHA256
        hkdf = HKDF(hash_function=SHA256())
        column_key = hkdf.derive(
            ikm=self.master_key,
            info=f"{table}.{column}".encode(),
            length=32
        )
        
        # Generate random IV
        iv = generate_random_bytes(16)
        
        # Encrypt value
        cipher = ARIA256(column_key, mode='CBC', iv=iv)
        ciphertext = cipher.encrypt(self.pad(value))
        
        # Return IV + ciphertext
        return iv + ciphertext
    
    def decrypt_column(self, table: str, column: str, encrypted: bytes) -> bytes:
        """Decrypt database column value"""
        # Extract IV and ciphertext
        iv = encrypted[:16]
        ciphertext = encrypted[16:]
        
        # Derive column-specific key
        from metamui_crypto import HKDF, SHA256
        hkdf = HKDF(hash_function=SHA256())
        column_key = hkdf.derive(
            ikm=self.master_key,
            info=f"{table}.{column}".encode(),
            length=32
        )
        
        # Decrypt value
        cipher = ARIA256(column_key, mode='CBC', iv=iv)
        plaintext = cipher.decrypt(ciphertext)
        
        return self.unpad(plaintext)

# Streaming encryption
class ARIAStreamCipher:
    def __init__(self, key: bytes):
        self.key = key
        self.counter = 0
    
    def encrypt_stream(self, stream):
        """Encrypt data stream using CTR mode"""
        # Generate random nonce
        nonce = generate_random_bytes(12)
        
        # Initialize CTR mode
        cipher = ARIA256(self.key, mode='CTR', nonce=nonce)
        
        # Yield nonce first
        yield nonce
        
        # Encrypt stream in chunks
        for chunk in stream:
            encrypted = cipher.encrypt(chunk)
            yield encrypted
    
    def decrypt_stream(self, encrypted_stream):
        """Decrypt data stream"""
        # First chunk is nonce
        nonce = next(encrypted_stream)
        
        # Initialize CTR mode
        cipher = ARIA256(self.key, mode='CTR', nonce=nonce)
        
        # Decrypt remaining chunks
        for chunk in encrypted_stream:
            decrypted = cipher.decrypt(chunk)
            yield decrypted

# Authenticated encryption
class ARIAAuthenticatedEncryption:
    def __init__(self, key: bytes):
        self.key = key
    
    def seal(self, plaintext: bytes, associated_data: bytes = b"") -> tuple:
        """Authenticated encryption with associated data"""
        # Generate random nonce
        nonce = generate_random_bytes(12)
        
        # Initialize GCM mode
        cipher = ARIA256(self.key, mode='GCM', nonce=nonce)
        
        # Add associated data
        if associated_data:
            cipher.update_aad(associated_data)
        
        # Encrypt and authenticate
        ciphertext = cipher.encrypt(plaintext)
        tag = cipher.finalize()
        
        return nonce + ciphertext + tag, associated_data
    
    def open(self, sealed_data: bytes, associated_data: bytes = b"") -> bytes:
        """Authenticated decryption"""
        # Extract components
        nonce = sealed_data[:12]
        ciphertext = sealed_data[12:-16]
        tag = sealed_data[-16:]
        
        # Initialize GCM mode
        cipher = ARIA256(self.key, mode='GCM', nonce=nonce)
        
        # Add associated data
        if associated_data:
            cipher.update_aad(associated_data)
        
        # Decrypt and verify
        plaintext = cipher.decrypt(ciphertext)
        
        # Verify tag
        computed_tag = cipher.finalize()
        if computed_tag != tag:
            raise ValueError("Authentication failed")
        
        return plaintext

Implementation Details

# ARIA round function (simplified)
class ARIARound:
    def __init__(self):
        # S-boxes (simplified representation)
        self.s1 = [...]  # 256 entries
        self.s2 = [...]  # 256 entries
        self.x1 = [...]  # Inverse of s1
        self.x2 = [...]  # Inverse of s2
    
    def substitute(self, state: bytes, round_type: int) -> bytes:
        """Apply S-box substitution"""
        result = bytearray(16)
        
        if round_type == 1:
            # Type 1 substitution
            result[0] = self.s1[state[0]]
            result[1] = self.s2[state[1]]
            result[2] = self.x1[state[2]]
            result[3] = self.x2[state[3]]
            # ... continue pattern
        else:
            # Type 2 substitution
            result[0] = self.x1[state[0]]
            result[1] = self.x2[state[1]]
            result[2] = self.s1[state[2]]
            result[3] = self.s2[state[3]]
            # ... continue pattern
        
        return bytes(result)
    
    def diffusion(self, state: bytes) -> bytes:
        """Apply diffusion layer"""
        # Matrix multiplication in GF(2^8)
        # Similar to AES MixColumns but different matrix
        pass

# Key schedule
class ARIAKeySchedule:
    def __init__(self, master_key: bytes):
        self.master_key = master_key
        self.round_keys = self.expand_key()
    
    def expand_key(self) -> list:
        """Generate round keys from master key"""
        # Initialize with master key
        w0 = self.master_key[:16]
        w1 = self.master_key[16:]
        
        # Generate round keys using Feistel structure
        round_keys = []
        
        # Complex key expansion process
        # Involves multiple Feistel rounds
        # Uses round constants and S-boxes
        
        return round_keys

🔒 Security Considerations

Best Practices

  1. Key Management
    • Use cryptographically secure random keys
    • Never reuse keys across different contexts
    • Implement proper key rotation policies
    • Store keys in secure hardware when possible
  2. Mode Selection
    • Use GCM for authenticated encryption
    • Use CTR for parallel processing
    • Use CBC with random IVs for compatibility
    • Never use ECB mode
  3. IV/Nonce Handling
    • Always use random IVs for CBC mode
    • Never reuse nonce in CTR/GCM modes
    • Include IV/nonce with ciphertext
    • Verify IV uniqueness in protocols
  4. Padding
    • Use PKCS#7 padding for CBC mode
    • Verify padding on decryption
    • Consider padding oracle attacks
    • Use streaming modes to avoid padding

Common Pitfalls

# DON'T: Use ECB mode
aria_ecb = ARIA256(key, mode='ECB')
# WRONG: ECB mode reveals patterns
ciphertext = aria_ecb.encrypt(plaintext)

# DO: Use secure modes with IV/nonce
iv = generate_random_bytes(16)
aria_cbc = ARIA256(key, mode='CBC', iv=iv)
ciphertext = aria_cbc.encrypt(plaintext)

# DON'T: Reuse IV/nonce
iv = b'\x00' * 16  # Static IV
# WRONG: IV reuse breaks security
aria1 = ARIA256(key, mode='CBC', iv=iv)
aria2 = ARIA256(key, mode='CBC', iv=iv)

# DO: Generate random IV for each encryption
iv1 = generate_random_bytes(16)
iv2 = generate_random_bytes(16)
aria1 = ARIA256(key, mode='CBC', iv=iv1)
aria2 = ARIA256(key, mode='CBC', iv=iv2)

# DON'T: Use weak key derivation
key = hashlib.sha256(password.encode()).digest()
# WRONG: No salt, low iterations

# DO: Use proper key derivation
salt = generate_random_bytes(16)
pbkdf2 = PBKDF2(iterations=100000)
key = pbkdf2.derive(password.encode(), salt, key_length=32)

⚡ Performance Analysis

Benchmarks

Operation Mode Data Size Throughput CPU Usage
Encrypt ECB 1 MB 125 MB/s 95%
Encrypt CBC 1 MB 118 MB/s 92%
Encrypt CTR 1 MB 142 MB/s 88%
Encrypt GCM 1 MB 98 MB/s 94%
Decrypt ECB 1 MB 124 MB/s 95%
Decrypt CBC 1 MB 117 MB/s 92%
Decrypt CTR 1 MB 142 MB/s 88%
Decrypt GCM 1 MB 97 MB/s 94%

Performance vs Other Ciphers

Cipher Mode Throughput Relative Speed
ARIA-256 CTR 142 MB/s 1.00x (baseline)
AES-256 CTR 198 MB/s 1.39x
Camellia-256 CTR 134 MB/s 0.94x
Serpent-256 CTR 87 MB/s 0.61x

Optimization Strategies

# Parallel encryption for large files
import multiprocessing

class ParallelARIA:
    def __init__(self, key: bytes, num_workers=4):
        self.key = key
        self.num_workers = num_workers
    
    def encrypt_parallel(self, data: bytes) -> bytes:
        """Encrypt data in parallel using CTR mode"""
        # CTR mode allows parallel processing
        chunk_size = len(data) // self.num_workers
        
        with multiprocessing.Pool(self.num_workers) as pool:
            # Create tasks
            tasks = []
            for i in range(self.num_workers):
                start = i * chunk_size
                end = start + chunk_size if i < self.num_workers - 1 else len(data)
                chunk = data[start:end]
                
                # Each worker gets unique counter offset
                counter_offset = start // 16
                tasks.append((chunk, counter_offset))
            
            # Encrypt chunks in parallel
            encrypted_chunks = pool.starmap(self.encrypt_chunk, tasks)
        
        return b''.join(encrypted_chunks)
    
    def encrypt_chunk(self, chunk: bytes, counter_offset: int) -> bytes:
        """Encrypt a single chunk"""
        cipher = ARIA256(self.key, mode='CTR', counter=counter_offset)
        return cipher.encrypt(chunk)

# Hardware acceleration support
class HardwareARIA:
    def __init__(self, key: bytes):
        self.key = key
        self.use_hardware = self.check_hardware_support()
    
    def check_hardware_support(self) -> bool:
        """Check for hardware acceleration"""
        # Check for AES-NI or similar
        # ARIA can benefit from SIMD instructions
        try:
            import platform
            if 'aes' in platform.processor().lower():
                return True
        except:
            pass
        return False
    
    def encrypt(self, data: bytes) -> bytes:
        """Use hardware acceleration if available"""
        if self.use_hardware:
            # Use SIMD-optimized implementation
            return self.encrypt_simd(data)
        else:
            # Fall back to software implementation
            return self.encrypt_software(data)

Use Cases

1. Secure Communication Protocol

class SecureARIAProtocol:
    def __init__(self):
        self.session_key = None
        self.send_counter = 0
        self.recv_counter = 0
    
    def establish_session(self, shared_secret: bytes) -> bytes:
        """Establish encrypted session"""
        # Derive session key
        from metamui_crypto import HKDF, SHA256
        hkdf = HKDF(hash_function=SHA256())
        
        self.session_key = hkdf.derive(
            ikm=shared_secret,
            info=b"ARIA session key",
            length=32
        )
        
        # Generate session ID
        session_id = generate_random_bytes(16)
        return session_id
    
    def send_message(self, message: bytes) -> bytes:
        """Send encrypted message"""
        # Create packet header
        header = struct.pack('>HI', 
                           0x0100,  # Protocol version
                           self.send_counter)
        
        # Encrypt message with counter as nonce
        nonce = self.send_counter.to_bytes(12, 'big')
        cipher = ARIA256(self.session_key, mode='GCM', nonce=nonce)
        
        # Authenticate header
        cipher.update_aad(header)
        
        # Encrypt and get tag
        ciphertext = cipher.encrypt(message)
        tag = cipher.finalize()
        
        self.send_counter += 1
        
        return header + ciphertext + tag
    
    def receive_message(self, packet: bytes) -> bytes:
        """Receive and decrypt message"""
        # Parse packet
        header = packet[:6]
        ciphertext = packet[6:-16]
        tag = packet[-16:]
        
        # Verify protocol version
        version, counter = struct.unpack('>HI', header)
        if version != 0x0100:
            raise ValueError("Invalid protocol version")
        
        # Verify counter (prevent replay)
        if counter != self.recv_counter:
            raise ValueError("Invalid message counter")
        
        # Decrypt message
        nonce = counter.to_bytes(12, 'big')
        cipher = ARIA256(self.session_key, mode='GCM', nonce=nonce)
        
        # Authenticate header
        cipher.update_aad(header)
        
        # Decrypt and verify
        message = cipher.decrypt(ciphertext)
        computed_tag = cipher.finalize()
        
        if computed_tag != tag:
            raise ValueError("Authentication failed")
        
        self.recv_counter += 1
        
        return message

2. Encrypted Database Storage

class ARIADatabaseEngine:
    def __init__(self, master_password: str):
        # Derive master key from password
        self.salt = generate_random_bytes(32)
        from metamui_crypto import Argon2
        argon2 = Argon2(memory_cost=65536, time_cost=3)
        self.master_key = argon2.derive(
            master_password.encode(),
            self.salt
        )
    
    def create_encrypted_table(self, table_name: str, columns: list):
        """Create table with encrypted columns"""
        # Derive table-specific key
        table_key = self.derive_table_key(table_name)
        
        # Store encryption metadata
        metadata = {
            'table_name': table_name,
            'encrypted_columns': columns,
            'algorithm': 'ARIA-256-GCM',
            'key_derivation': 'HKDF-SHA256'
        }
        
        return metadata
    
    def encrypt_row(self, table_name: str, row_data: dict) -> dict:
        """Encrypt sensitive columns in row"""
        table_key = self.derive_table_key(table_name)
        encrypted_row = {}
        
        for column, value in row_data.items():
            if self.is_encrypted_column(table_name, column):
                # Encrypt value
                nonce = generate_random_bytes(12)
                cipher = ARIA256(table_key, mode='GCM', nonce=nonce)
                
                # Include column name in AAD
                cipher.update_aad(column.encode())
                
                # Encrypt value
                if isinstance(value, str):
                    value = value.encode()
                
                ciphertext = cipher.encrypt(value)
                tag = cipher.finalize()
                
                # Store as base64
                encrypted_value = base64.b64encode(
                    nonce + ciphertext + tag
                ).decode()
                
                encrypted_row[column] = encrypted_value
            else:
                encrypted_row[column] = value
        
        return encrypted_row
    
    def decrypt_row(self, table_name: str, encrypted_row: dict) -> dict:
        """Decrypt row data"""
        table_key = self.derive_table_key(table_name)
        decrypted_row = {}
        
        for column, value in encrypted_row.items():
            if self.is_encrypted_column(table_name, column):
                # Decode from base64
                encrypted_data = base64.b64decode(value)
                
                # Extract components
                nonce = encrypted_data[:12]
                ciphertext = encrypted_data[12:-16]
                tag = encrypted_data[-16:]
                
                # Decrypt
                cipher = ARIA256(table_key, mode='GCM', nonce=nonce)
                cipher.update_aad(column.encode())
                
                plaintext = cipher.decrypt(ciphertext)
                computed_tag = cipher.finalize()
                
                if computed_tag != tag:
                    raise ValueError(f"Authentication failed for column {column}")
                
                # Convert back to string if needed
                try:
                    decrypted_row[column] = plaintext.decode()
                except:
                    decrypted_row[column] = plaintext
            else:
                decrypted_row[column] = value
        
        return decrypted_row
    
    def derive_table_key(self, table_name: str) -> bytes:
        """Derive table-specific encryption key"""
        from metamui_crypto import HKDF, SHA256
        hkdf = HKDF(hash_function=SHA256())
        return hkdf.derive(
            ikm=self.master_key,
            info=f"table:{table_name}".encode(),
            length=32
        )

3. File System Encryption

class ARIAFileSystem:
    def __init__(self, mount_point: str, password: str):
        self.mount_point = mount_point
        self.master_key = self.setup_master_key(password)
        self.file_keys = {}
    
    def setup_master_key(self, password: str) -> bytes:
        """Setup master key for file system"""
        # Load or create salt
        salt_file = os.path.join(self.mount_point, '.aria_salt')
        if os.path.exists(salt_file):
            with open(salt_file, 'rb') as f:
                salt = f.read()
        else:
            salt = generate_random_bytes(32)
            with open(salt_file, 'wb') as f:
                f.write(salt)
        
        # Derive master key
        from metamui_crypto import Argon2
        argon2 = Argon2(memory_cost=131072, time_cost=4)
        return argon2.derive(password.encode(), salt)
    
    def encrypt_file(self, filepath: str) -> str:
        """Transparently encrypt file"""
        # Generate file-specific key
        file_key = generate_random_bytes(32)
        file_id = os.urandom(16)
        
        # Encrypt file key with master key
        key_cipher = ARIA256(self.master_key, mode='GCM')
        encrypted_file_key = key_cipher.seal(file_key, file_id)
        
        # Encrypt file contents
        encrypted_path = filepath + '.encrypted'
        
        with open(filepath, 'rb') as infile, \
             open(encrypted_path, 'wb') as outfile:
            
            # Write header
            outfile.write(file_id)
            outfile.write(encrypted_file_key)
            
            # Encrypt file in chunks
            cipher = ARIA256(file_key, mode='CTR')
            nonce = generate_random_bytes(12)
            outfile.write(nonce)
            
            while chunk := infile.read(1024 * 1024):  # 1MB chunks
                encrypted_chunk = cipher.encrypt(chunk)
                outfile.write(encrypted_chunk)
        
        # Store file key mapping
        self.file_keys[file_id] = file_key
        
        return encrypted_path
    
    def decrypt_file(self, encrypted_path: str) -> bytes:
        """Transparently decrypt file"""
        with open(encrypted_path, 'rb') as f:
            # Read header
            file_id = f.read(16)
            encrypted_file_key = f.read(60)  # 32 + 12 + 16
            nonce = f.read(12)
            
            # Decrypt file key
            if file_id in self.file_keys:
                file_key = self.file_keys[file_id]
            else:
                key_cipher = ARIA256(self.master_key, mode='GCM')
                file_key = key_cipher.open(encrypted_file_key, file_id)
                self.file_keys[file_id] = file_key
            
            # Decrypt file contents
            cipher = ARIA256(file_key, mode='CTR', nonce=nonce)
            
            decrypted_data = b""
            while chunk := f.read(1024 * 1024):
                decrypted_data += cipher.decrypt(chunk)
            
            return decrypted_data

4. Secure Messaging Application

class ARIASecureMessenger:
    def __init__(self, user_id: str):
        self.user_id = user_id
        self.contacts = {}
        self.message_keys = {}
    
    def add_contact(self, contact_id: str, shared_secret: bytes):
        """Add contact with shared secret"""
        # Derive contact-specific keys
        from metamui_crypto import HKDF, SHA256
        hkdf = HKDF(hash_function=SHA256())
        
        # Separate keys for each direction
        send_key = hkdf.derive(
            ikm=shared_secret,
            info=f"{self.user_id}->{contact_id}".encode(),
            length=32
        )
        recv_key = hkdf.derive(
            ikm=shared_secret,
            info=f"{contact_id}->{self.user_id}".encode(),
            length=32
        )
        
        self.contacts[contact_id] = {
            'send_key': send_key,
            'recv_key': recv_key,
            'send_counter': 0,
            'recv_counter': 0
        }
    
    def send_message(self, contact_id: str, message: str) -> bytes:
        """Send encrypted message to contact"""
        if contact_id not in self.contacts:
            raise ValueError("Unknown contact")
        
        contact = self.contacts[contact_id]
        
        # Create message structure
        msg_data = {
            'from': self.user_id,
            'to': contact_id,
            'timestamp': int(time.time()),
            'counter': contact['send_counter'],
            'message': message
        }
        
        # Serialize message
        msg_bytes = json.dumps(msg_data).encode()
        
        # Encrypt message
        nonce = contact['send_counter'].to_bytes(12, 'big')
        cipher = ARIA256(contact['send_key'], mode='GCM', nonce=nonce)
        
        # Add metadata to AAD
        aad = f"{self.user_id}:{contact_id}:{contact['send_counter']}".encode()
        cipher.update_aad(aad)
        
        # Encrypt and authenticate
        ciphertext = cipher.encrypt(msg_bytes)
        tag = cipher.finalize()
        
        # Update counter
        contact['send_counter'] += 1
        
        # Return encrypted message
        return struct.pack('>Q', contact['send_counter'] - 1) + ciphertext + tag
    
    def receive_message(self, contact_id: str, encrypted_msg: bytes) -> dict:
        """Receive and decrypt message"""
        if contact_id not in self.contacts:
            raise ValueError("Unknown contact")
        
        contact = self.contacts[contact_id]
        
        # Parse encrypted message
        counter = struct.unpack('>Q', encrypted_msg[:8])[0]
        ciphertext = encrypted_msg[8:-16]
        tag = encrypted_msg[-16:]
        
        # Verify counter
        if counter != contact['recv_counter']:
            raise ValueError("Invalid message counter")
        
        # Decrypt message
        nonce = counter.to_bytes(12, 'big')
        cipher = ARIA256(contact['recv_key'], mode='GCM', nonce=nonce)
        
        # Add metadata to AAD
        aad = f"{contact_id}:{self.user_id}:{counter}".encode()
        cipher.update_aad(aad)
        
        # Decrypt and verify
        msg_bytes = cipher.decrypt(ciphertext)
        computed_tag = cipher.finalize()
        
        if computed_tag != tag:
            raise ValueError("Message authentication failed")
        
        # Update counter
        contact['recv_counter'] += 1
        
        # Parse message
        return json.loads(msg_bytes.decode())

Comparison with Other Ciphers

ARIA vs AES

Feature ARIA-256 AES-256
Block Size 128 bits 128 bits
Key Size 256 bits 256 bits
Rounds 16 14
Structure SPN SPN
S-boxes 4 types 1 type
Hardware Support Limited Extensive
Performance Good Excellent

ARIA vs Camellia

Feature ARIA-256 Camellia-256
Origin Korea Japan
Structure SPN Feistel
Rounds 16 24
S-boxes 4 types 4 types
Key Schedule Complex Complex
Standards RFC 5794 RFC 3713

When to Use ARIA

# Use ARIA for Korean compliance
if region == "Korea" or standard == "KS":
    cipher = ARIA256(key)

# Use ARIA for algorithm diversity
if avoid_monoculture:
    # Don't rely solely on AES
    ciphers = [AES256(key), ARIA256(key), Camellia256(key)]
    cipher = random.choice(ciphers)

# Use ARIA when AES is not available
if not has_aes_hardware:
    # ARIA performs well in software
    cipher = ARIA256(key)

Migration Guide

From AES to ARIA

# Before: AES encryption
from Crypto.Cipher import AES
aes = AES.new(key, AES.MODE_CBC, iv)
ciphertext = aes.encrypt(pad(plaintext, 16))

# After: ARIA encryption
from metamui_crypto import ARIA256
aria = ARIA256(key, mode='CBC', iv=iv)
ciphertext = aria.encrypt(plaintext)  # Padding handled automatically

# Migration wrapper
class CipherMigrator:
    def __init__(self, algorithm='aria'):
        self.algorithm = algorithm
    
    def create_cipher(self, key, mode='CBC', iv=None):
        if self.algorithm == 'aria':
            return ARIA256(key, mode=mode, iv=iv)
        elif self.algorithm == 'aes':
            return AES256(key, mode=mode, iv=iv)

From 3DES to ARIA

# Before: 3DES (deprecated)
from Crypto.Cipher import DES3
cipher = DES3.new(key_192bit, DES3.MODE_CBC, iv)

# After: ARIA-256 (secure)
from metamui_crypto import ARIA256
# Expand key if needed
if len(key_192bit) == 24:
    key_256bit = HKDF(SHA256()).derive(key_192bit, length=32)
else:
    key_256bit = key_192bit

cipher = ARIA256(key_256bit, mode='CBC', iv=iv)

Test Vectors

ARIA-256 ECB Test Vectors

# Test Vector 1: ARIA-256 ECB
key = bytes.fromhex("000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f")
plaintext = bytes.fromhex("00112233445566778899aabbccddeeff")

aria = ARIA256(key, mode='ECB')
ciphertext = aria.encrypt(plaintext)
assert ciphertext.hex() == "f92bd7c79fb72e2f2b8f80c1972d24fc"

# Decryption
decrypted = aria.decrypt(ciphertext)
assert decrypted == plaintext

ARIA-256 CBC Test Vectors

# Test Vector 2: ARIA-256 CBC
key = bytes.fromhex("000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f")
iv = bytes.fromhex("0f1e2d3c4b5a69788796a5b4c3d2e1f0")
plaintext = b"The quick brown fox jumps over the lazy dog" + b"\x04" * 4  # PKCS#7 padding

aria_cbc = ARIA256(key, mode='CBC', iv=iv)
ciphertext = aria_cbc.encrypt(plaintext)

# Verify decryption
aria_cbc_dec = ARIA256(key, mode='CBC', iv=iv)
decrypted = aria_cbc_dec.decrypt(ciphertext)
assert decrypted == plaintext

ARIA-256 CTR Test Vectors

# Test Vector 3: ARIA-256 CTR
key = bytes.fromhex("000102030405060708090a0b0c0d0e0f101112131415161718191a1b1c1d1e1f")
nonce = bytes.fromhex("0f1e2d3c4b5a69788796a5b4")
plaintext = b"Stream cipher mode test"

aria_ctr = ARIA256(key, mode='CTR', nonce=nonce)
ciphertext = aria_ctr.encrypt(plaintext)

# CTR mode: encryption = decryption
aria_ctr_dec = ARIA256(key, mode='CTR', nonce=nonce)
decrypted = aria_ctr_dec.encrypt(ciphertext)  # Note: encrypt for decryption in CTR
assert decrypted == plaintext

References