Table of Contents
1. Fundamentals & Best Practices
Basic Implementation with Error Handling
Python
import hashlib
import logging
from typing import Union, Optional
from pathlib import Path
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class SHA224Hasher:
"""Production-ready SHA-224 hasher with comprehensive error handling."""
@staticmethod
def hash_string(data: str, encoding: str = 'utf-8') -> Optional[str]:
"""
Hash a string using SHA-224.
Args:
data: String to hash
encoding: Character encoding (default: utf-8)
Returns:
Hex digest or None on error
"""
try:
if not isinstance(data, str):
raise TypeError(f"Expected str, got {type(data).__name__}")
encoded = data.encode(encoding)
hash_obj = hashlib.sha224(encoded)
return hash_obj.hexdigest()
except UnicodeEncodeError as e:
logger.error(f"Encoding error: {e}")
return None
except Exception as e:
logger.error(f"Unexpected error hashing string: {e}")
return None
@staticmethod
def hash_bytes(data: bytes) -> Optional[str]:
"""
Hash bytes using SHA-224.
Args:
data: Bytes to hash
Returns:
Hex digest or None on error
"""
try:
if not isinstance(data, bytes):
raise TypeError(f"Expected bytes, got {type(data).__name__}")
hash_obj = hashlib.sha224(data)
return hash_obj.hexdigest()
except Exception as e:
logger.error(f"Error hashing bytes: {e}")
return None
@staticmethod
def hash_file(filepath: Union[str, Path], chunk_size: int = 8192) -> Optional[str]:
"""
Hash a file using SHA-224 with streaming.
Args:
filepath: Path to file
chunk_size: Bytes to read at once
Returns:
Hex digest or None on error
"""
try:
filepath = Path(filepath)
if not filepath.exists():
raise FileNotFoundError(f"File not found: {filepath}")
if not filepath.is_file():
raise ValueError(f"Not a file: {filepath}")
hash_obj = hashlib.sha224()
with filepath.open('rb') as f:
while chunk := f.read(chunk_size):
hash_obj.update(chunk)
return hash_obj.hexdigest()
except FileNotFoundError as e:
logger.error(f"File not found: {e}")
return None
except PermissionError as e:
logger.error(f"Permission denied: {e}")
return None
except Exception as e:
logger.error(f"Error hashing file: {e}")
return None
# Usage examples
hasher = SHA224Hasher()
# Hash string
result = hasher.hash_string("Hello, World!")
print(f"String hash: {result}")
# Hash bytes
result = hasher.hash_bytes(b"Binary data")
print(f"Bytes hash: {result}")
# Hash file
result = hasher.hash_file("/path/to/file.txt")
print(f"File hash: {result}")
Context Manager Pattern
Python
import hashlib
from contextlib import contextmanager
from typing import Generator
class SHA224Context:
"""Context manager for SHA-224 operations with automatic cleanup."""
def __init__(self):
self.hasher = None
def __enter__(self):
self.hasher = hashlib.sha224()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
# Cleanup if needed
self.hasher = None
def update(self, data: bytes):
"""Update hash with new data."""
if self.hasher:
self.hasher.update(data)
else:
raise RuntimeError("Context manager not initialized")
def digest(self) -> str:
"""Get hex digest."""
if self.hasher:
return self.hasher.hexdigest()
else:
raise RuntimeError("Context manager not initialized")
@contextmanager
def sha224_context() -> Generator[hashlib.sha224, None, None]:
"""
Simple context manager for SHA-224 operations.
Example:
with sha224_context() as hasher:
hasher.update(b"data1")
hasher.update(b"data2")
result = hasher.hexdigest()
"""
hasher = hashlib.sha224()
try:
yield hasher
finally:
# Cleanup if needed
pass
# Usage examples
# Using class-based context manager
with SHA224Context() as ctx:
ctx.update(b"Part 1")
ctx.update(b"Part 2")
result = ctx.digest()
print(f"Class context result: {result}")
# Using function-based context manager
with sha224_context() as hasher:
hasher.update(b"Part 1")
hasher.update(b"Part 2")
result = hasher.hexdigest()
print(f"Function context result: {result}")
2. Streaming & Large File Processing
Advanced Streaming with Progress Tracking
Python
import hashlib
import time
from pathlib import Path
from typing import Callable, Optional, Generator
from dataclasses import dataclass
@dataclass
class HashProgress:
"""Progress information for hash operations."""
total_bytes: int
processed_bytes: int
percent_complete: float
elapsed_time: float
bytes_per_second: float
estimated_time_remaining: float
class StreamingHasher:
"""Advanced streaming hasher with progress tracking and statistics."""
def __init__(self, algorithm: str = 'sha224'):
self.algorithm = algorithm
self.reset()
def reset(self):
"""Reset the hasher to initial state."""
self.hasher = hashlib.new(self.algorithm)
self.total_processed = 0
self.start_time = None
def update(self, data: bytes):
"""Update hash with new data."""
self.hasher.update(data)
self.total_processed += len(data)
def hash_file_with_progress(
self,
filepath: Path,
chunk_size: int = 1048576, # 1MB chunks
progress_callback: Optional[Callable[[HashProgress], None]] = None,
update_interval: float = 0.1 # Update every 100ms
) -> str:
"""
Hash file with detailed progress tracking.
Args:
filepath: Path to file
chunk_size: Bytes to read at once
progress_callback: Function to call with progress updates
update_interval: Minimum seconds between progress updates
Returns:
Hex digest of the file
"""
self.reset()
filepath = Path(filepath)
file_size = filepath.stat().st_size
self.start_time = time.time()
last_update = 0
with filepath.open('rb') as f:
while chunk := f.read(chunk_size):
self.update(chunk)
# Calculate progress
current_time = time.time()
if progress_callback and (current_time - last_update) >= update_interval:
elapsed = current_time - self.start_time
bytes_per_second = self.total_processed / elapsed if elapsed > 0 else 0
percent = (self.total_processed / file_size * 100) if file_size > 0 else 0
remaining = ((file_size - self.total_processed) / bytes_per_second
if bytes_per_second > 0 else 0)
progress = HashProgress(
total_bytes=file_size,
processed_bytes=self.total_processed,
percent_complete=percent,
elapsed_time=elapsed,
bytes_per_second=bytes_per_second,
estimated_time_remaining=remaining
)
progress_callback(progress)
last_update = current_time
return self.hasher.hexdigest()
def hash_stream(
self,
stream: Generator[bytes, None, None],
progress_callback: Optional[Callable[[int], None]] = None
) -> str:
"""
Hash data from a generator/stream.
Args:
stream: Generator yielding bytes
progress_callback: Function to call with bytes processed
Returns:
Hex digest
"""
self.reset()
for chunk in stream:
self.update(chunk)
if progress_callback:
progress_callback(self.total_processed)
return self.hasher.hexdigest()
# Example usage with progress bar
def print_progress(progress: HashProgress):
"""Print a progress bar to console."""
bar_length = 40
filled = int(bar_length * progress.percent_complete / 100)
bar = '=' * filled + '-' * (bar_length - filled)
# Format bytes and speed
mb_processed = progress.processed_bytes / 1048576
mb_total = progress.total_bytes / 1048576
mb_per_sec = progress.bytes_per_second / 1048576
print(f'\r[{bar}] {progress.percent_complete:.1f}% '
f'({mb_processed:.1f}/{mb_total:.1f} MB) '
f'@ {mb_per_sec:.1f} MB/s '
f'ETA: {progress.estimated_time_remaining:.1f}s', end='')
# Hash a large file with progress
hasher = StreamingHasher()
# result = hasher.hash_file_with_progress(
# Path('/path/to/large/file.bin'),
# progress_callback=print_progress
# )
# print(f"\nHash: {result}")
# Generator example
def data_generator():
"""Example generator producing data chunks."""
for i in range(100):
yield f"Chunk {i}\n".encode('utf-8')
time.sleep(0.01) # Simulate slow data source
result = hasher.hash_stream(data_generator())
print(f"Stream hash: {result}")
Memory-Mapped File Hashing
Python
import hashlib
import mmap
from pathlib import Path
from typing import Optional
class MemoryMappedHasher:
"""Hash files using memory mapping for optimal performance."""
@staticmethod
def hash_file_mmap(filepath: Path) -> Optional[str]:
"""
Hash file using memory mapping.
Efficient for large files on systems with sufficient memory.
Args:
filepath: Path to file
Returns:
Hex digest or None on error
"""
try:
filepath = Path(filepath)
# Handle empty files
if filepath.stat().st_size == 0:
return hashlib.sha224(b'').hexdigest()
with filepath.open('rb') as f:
with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmapped:
return hashlib.sha224(mmapped).hexdigest()
except Exception as e:
print(f"Error with memory mapping: {e}")
return None
@staticmethod
def hash_file_chunked_mmap(
filepath: Path,
chunk_size: int = 67108864 # 64MB chunks
) -> Optional[str]:
"""
Hash file using memory mapping with chunking.
Best for very large files.
Args:
filepath: Path to file
chunk_size: Size of each memory-mapped chunk
Returns:
Hex digest or None on error
"""
try:
filepath = Path(filepath)
file_size = filepath.stat().st_size
if file_size == 0:
return hashlib.sha224(b'').hexdigest()
hasher = hashlib.sha224()
with filepath.open('rb') as f:
offset = 0
while offset < file_size:
# Calculate chunk length
length = min(chunk_size, file_size - offset)
# Memory map this chunk
with mmap.mmap(f.fileno(), length, offset=offset,
access=mmap.ACCESS_READ) as mmapped:
hasher.update(mmapped)
offset += length
return hasher.hexdigest()
except Exception as e:
print(f"Error with chunked memory mapping: {e}")
return None
# Example usage
mmap_hasher = MemoryMappedHasher()
# Standard memory mapping (entire file)
# result = mmap_hasher.hash_file_mmap(Path('/path/to/file.bin'))
# print(f"Memory-mapped hash: {result}")
# Chunked memory mapping (for very large files)
# result = mmap_hasher.hash_file_chunked_mmap(Path('/path/to/huge/file.bin'))
# print(f"Chunked mmap hash: {result}")
3. Async & Concurrent Processing
Async File Hashing
Python
import asyncio
import hashlib
import aiofiles
from pathlib import Path
from typing import List, Dict, Optional
import time
class AsyncHasher:
"""Asynchronous SHA-224 hasher for concurrent operations."""
@staticmethod
async def hash_file_async(
filepath: Path,
chunk_size: int = 65536
) -> Optional[str]:
"""
Hash a file asynchronously.
Args:
filepath: Path to file
chunk_size: Bytes to read at once
Returns:
Hex digest or None on error
"""
try:
hasher = hashlib.sha224()
async with aiofiles.open(filepath, 'rb') as f:
while chunk := await f.read(chunk_size):
hasher.update(chunk)
return hasher.hexdigest()
except Exception as e:
print(f"Error hashing file {filepath}: {e}")
return None
@staticmethod
async def hash_multiple_files(
filepaths: List[Path],
max_concurrent: int = 10
) -> Dict[Path, Optional[str]]:
"""
Hash multiple files concurrently with semaphore control.
Args:
filepaths: List of file paths
max_concurrent: Maximum concurrent operations
Returns:
Dictionary mapping filepath to hash
"""
semaphore = asyncio.Semaphore(max_concurrent)
async def hash_with_semaphore(filepath: Path) -> tuple:
async with semaphore:
result = await AsyncHasher.hash_file_async(filepath)
return filepath, result
tasks = [hash_with_semaphore(fp) for fp in filepaths]
results = await asyncio.gather(*tasks)
return dict(results)
@staticmethod
async def hash_stream_async(stream) -> str:
"""
Hash an async stream/generator.
Args:
stream: Async generator yielding bytes
Returns:
Hex digest
"""
hasher = hashlib.sha224()
async for chunk in stream:
hasher.update(chunk)
return hasher.hexdigest()
@staticmethod
async def hash_with_timeout(
filepath: Path,
timeout: float = 30.0
) -> Optional[str]:
"""
Hash file with timeout protection.
Args:
filepath: Path to file
timeout: Maximum seconds to wait
Returns:
Hex digest or None if timeout/error
"""
try:
return await asyncio.wait_for(
AsyncHasher.hash_file_async(filepath),
timeout=timeout
)
except asyncio.TimeoutError:
print(f"Timeout hashing {filepath}")
return None
# Example async stream generator
async def async_data_generator():
"""Generate data asynchronously."""
for i in range(10):
await asyncio.sleep(0.1) # Simulate async I/O
yield f"Async chunk {i}\n".encode('utf-8')
# Usage examples
async def main():
hasher = AsyncHasher()
# Single file
result = await hasher.hash_file_async(Path('/path/to/file.txt'))
print(f"Single file: {result}")
# Multiple files concurrently
files = [Path(f'/path/to/file{i}.txt') for i in range(10)]
results = await hasher.hash_multiple_files(files)
for filepath, hash_value in results.items():
print(f"{filepath}: {hash_value}")
# Async stream
result = await hasher.hash_stream_async(async_data_generator())
print(f"Stream hash: {result}")
# With timeout
result = await hasher.hash_with_timeout(
Path('/path/to/large/file.bin'),
timeout=10.0
)
print(f"With timeout: {result}")
# Run async code
# asyncio.run(main())
Thread Pool & Process Pool Hashing
Python
import hashlib
import concurrent.futures
from pathlib import Path
from typing import List, Dict, Optional, Callable
import multiprocessing
import time
class ConcurrentHasher:
"""Concurrent hashing using thread and process pools."""
@staticmethod
def hash_file(filepath: Path) -> tuple:
"""Hash a single file (for use in pools)."""
try:
hasher = hashlib.sha224()
with filepath.open('rb') as f:
while chunk := f.read(8192):
hasher.update(chunk)
return filepath, hasher.hexdigest()
except Exception as e:
return filepath, None
@staticmethod
def hash_files_threaded(
filepaths: List[Path],
max_workers: Optional[int] = None,
progress_callback: Optional[Callable] = None
) -> Dict[Path, Optional[str]]:
"""
Hash files using ThreadPoolExecutor.
Good for I/O-bound operations.
Args:
filepaths: List of file paths
max_workers: Maximum threads (None = default)
progress_callback: Called with (completed, total)
Returns:
Dictionary of filepath to hash
"""
results = {}
total = len(filepaths)
completed = 0
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
# Submit all tasks
future_to_file = {
executor.submit(ConcurrentHasher.hash_file, fp): fp
for fp in filepaths
}
# Process completed futures
for future in concurrent.futures.as_completed(future_to_file):
filepath, hash_value = future.result()
results[filepath] = hash_value
completed += 1
if progress_callback:
progress_callback(completed, total)
return results
@staticmethod
def hash_files_multiprocess(
filepaths: List[Path],
max_workers: Optional[int] = None,
chunk_size: int = 10
) -> Dict[Path, Optional[str]]:
"""
Hash files using ProcessPoolExecutor.
Good for CPU-bound operations or very large files.
Args:
filepaths: List of file paths
max_workers: Maximum processes (None = CPU count)
chunk_size: Files to process per task
Returns:
Dictionary of filepath to hash
"""
if max_workers is None:
max_workers = multiprocessing.cpu_count()
results = {}
with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
# Process in chunks for better efficiency
futures = []
for i in range(0, len(filepaths), chunk_size):
batch = filepaths[i:i + chunk_size]
future = executor.submit(ConcurrentHasher._hash_batch, batch)
futures.append(future)
# Collect results
for future in concurrent.futures.as_completed(futures):
batch_results = future.result()
results.update(batch_results)
return results
@staticmethod
def _hash_batch(filepaths: List[Path]) -> Dict[Path, Optional[str]]:
"""Hash a batch of files (for process pool)."""
results = {}
for filepath in filepaths:
_, hash_value = ConcurrentHasher.hash_file(filepath)
results[filepath] = hash_value
return results
@staticmethod
def benchmark_methods(filepaths: List[Path]) -> Dict[str, float]:
"""
Benchmark different hashing methods.
Args:
filepaths: List of files to hash
Returns:
Dictionary of method name to execution time
"""
results = {}
# Sequential
start = time.time()
for fp in filepaths:
ConcurrentHasher.hash_file(fp)
results['sequential'] = time.time() - start
# Threaded
start = time.time()
ConcurrentHasher.hash_files_threaded(filepaths)
results['threaded'] = time.time() - start
# Multiprocess
start = time.time()
ConcurrentHasher.hash_files_multiprocess(filepaths)
results['multiprocess'] = time.time() - start
return results
# Usage example
def progress_printer(completed: int, total: int):
"""Print progress."""
percent = (completed / total) * 100
print(f"Progress: {completed}/{total} ({percent:.1f}%)")
# Example usage
hasher = ConcurrentHasher()
# Create test files list
test_files = [Path(f'/tmp/file{i}.txt') for i in range(100)]
# Threaded hashing with progress
# results = hasher.hash_files_threaded(
# test_files,
# max_workers=4,
# progress_callback=progress_printer
# )
# Multiprocess hashing
# results = hasher.hash_files_multiprocess(
# test_files,
# max_workers=4
# )
# Benchmark different methods
# timings = hasher.benchmark_methods(test_files)
# for method, duration in timings.items():
# print(f"{method}: {duration:.2f} seconds")
4. Performance Optimization
Optimized Hashing with Caching
Python
import hashlib
import functools
import pickle
from pathlib import Path
from typing import Optional, Dict, Any
import time
import sqlite3
class CachedHasher:
"""SHA-224 hasher with intelligent caching."""
def __init__(self, cache_dir: Optional[Path] = None):
self.cache_dir = cache_dir or Path.home() / '.sha224_cache'
self.cache_dir.mkdir(exist_ok=True)
self.memory_cache = {}
self.stats = {'hits': 0, 'misses': 0, 'errors': 0}
# Initialize SQLite cache
self.db_path = self.cache_dir / 'hash_cache.db'
self._init_db()
def _init_db(self):
"""Initialize SQLite cache database."""
with sqlite3.connect(self.db_path) as conn:
conn.execute('''
CREATE TABLE IF NOT EXISTS hash_cache (
filepath TEXT PRIMARY KEY,
hash TEXT NOT NULL,
file_size INTEGER NOT NULL,
mtime REAL NOT NULL,
computed_at REAL NOT NULL
)
''')
conn.execute('CREATE INDEX IF NOT EXISTS idx_mtime ON hash_cache(mtime)')
@functools.lru_cache(maxsize=1024)
def hash_string_cached(self, data: str) -> str:
"""
Hash string with in-memory LRU cache.
Args:
data: String to hash
Returns:
Hex digest
"""
return hashlib.sha224(data.encode()).hexdigest()
def hash_file_cached(
self,
filepath: Path,
use_memory_cache: bool = True,
use_disk_cache: bool = True
) -> Optional[str]:
"""
Hash file with multi-level caching.
Args:
filepath: Path to file
use_memory_cache: Use in-memory cache
use_disk_cache: Use SQLite cache
Returns:
Hex digest or None on error
"""
filepath = Path(filepath)
try:
# Get file metadata
stat = filepath.stat()
file_key = f"{filepath}:{stat.st_size}:{stat.st_mtime}"
# Check memory cache
if use_memory_cache and file_key in self.memory_cache:
self.stats['hits'] += 1
return self.memory_cache[file_key]
# Check disk cache
if use_disk_cache:
cached = self._get_from_db(filepath, stat)
if cached:
self.stats['hits'] += 1
if use_memory_cache:
self.memory_cache[file_key] = cached
return cached
# Cache miss - compute hash
self.stats['misses'] += 1
hash_value = self._compute_hash(filepath)
if hash_value:
# Update caches
if use_memory_cache:
self.memory_cache[file_key] = hash_value
if use_disk_cache:
self._save_to_db(filepath, hash_value, stat)
return hash_value
except Exception as e:
self.stats['errors'] += 1
print(f"Error: {e}")
return None
def _compute_hash(self, filepath: Path) -> str:
"""Compute SHA-224 hash of file."""
hasher = hashlib.sha224()
with filepath.open('rb') as f:
while chunk := f.read(65536):
hasher.update(chunk)
return hasher.hexdigest()
def _get_from_db(self, filepath: Path, stat) -> Optional[str]:
"""Get cached hash from database."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute(
'SELECT hash FROM hash_cache WHERE filepath = ? AND file_size = ? AND mtime = ?',
(str(filepath), stat.st_size, stat.st_mtime)
)
row = cursor.fetchone()
return row[0] if row else None
def _save_to_db(self, filepath: Path, hash_value: str, stat):
"""Save hash to database cache."""
with sqlite3.connect(self.db_path) as conn:
conn.execute(
'''INSERT OR REPLACE INTO hash_cache
(filepath, hash, file_size, mtime, computed_at)
VALUES (?, ?, ?, ?, ?)''',
(str(filepath), hash_value, stat.st_size, stat.st_mtime, time.time())
)
def clear_cache(self, older_than_days: Optional[int] = None):
"""Clear cache entries."""
self.memory_cache.clear()
if older_than_days:
cutoff = time.time() - (older_than_days * 86400)
with sqlite3.connect(self.db_path) as conn:
conn.execute('DELETE FROM hash_cache WHERE computed_at < ?', (cutoff,))
else:
with sqlite3.connect(self.db_path) as conn:
conn.execute('DELETE FROM hash_cache')
def get_stats(self) -> Dict[str, Any]:
"""Get cache statistics."""
with sqlite3.connect(self.db_path) as conn:
cursor = conn.execute('SELECT COUNT(*) FROM hash_cache')
db_entries = cursor.fetchone()[0]
return {
'memory_entries': len(self.memory_cache),
'db_entries': db_entries,
'hits': self.stats['hits'],
'misses': self.stats['misses'],
'errors': self.stats['errors'],
'hit_rate': (self.stats['hits'] /
(self.stats['hits'] + self.stats['misses'])
if (self.stats['hits'] + self.stats['misses']) > 0 else 0)
}
# Usage example
cached_hasher = CachedHasher()
# Hash with caching
result = cached_hasher.hash_file_cached(Path('/path/to/file.txt'))
print(f"Hash: {result}")
# Check stats
stats = cached_hasher.get_stats()
print(f"Cache stats: {stats}")
# Clear old cache entries
cached_hasher.clear_cache(older_than_days=30)
5. Enterprise Patterns
Enterprise Hash Service
Python
import hashlib
import logging
import json
from datetime import datetime
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, asdict
from enum import Enum
import traceback
class HashAlgorithm(Enum):
"""Supported hash algorithms."""
SHA224 = "sha224"
SHA256 = "sha256"
SHA384 = "sha384"
SHA512 = "sha512"
@dataclass
class HashRequest:
"""Hash operation request."""
id: str
data: bytes
algorithm: HashAlgorithm
metadata: Dict[str, Any]
timestamp: datetime
@dataclass
class HashResult:
"""Hash operation result."""
request_id: str
success: bool
hash_value: Optional[str]
algorithm: HashAlgorithm
error: Optional[str]
execution_time_ms: float
timestamp: datetime
class EnterpriseHashService:
"""
Enterprise-grade hash service with comprehensive features.
"""
def __init__(self, config: Optional[Dict[str, Any]] = None):
self.config = config or {}
self.logger = self._setup_logger()
self.metrics = {
'total_requests': 0,
'successful_requests': 0,
'failed_requests': 0,
'total_bytes_processed': 0
}
self.audit_log = []
def _setup_logger(self) -> logging.Logger:
"""Configure enterprise logging."""
logger = logging.getLogger('EnterpriseHashService')
logger.setLevel(logging.INFO)
# Console handler
console_handler = logging.StreamHandler()
console_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
console_handler.setFormatter(console_formatter)
logger.addHandler(console_handler)
# File handler
file_handler = logging.FileHandler('hash_service.log')
file_formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
)
file_handler.setFormatter(file_formatter)
logger.addHandler(file_handler)
return logger
def process_request(self, request: HashRequest) -> HashResult:
"""
Process a hash request with full error handling and auditing.
Args:
request: Hash request object
Returns:
Hash result object
"""
start_time = datetime.now()
result = None
try:
self.logger.info(f"Processing request {request.id} with {request.algorithm.value}")
self.metrics['total_requests'] += 1
# Validate request
self._validate_request(request)
# Compute hash
hash_value = self._compute_hash(request.data, request.algorithm)
# Update metrics
self.metrics['successful_requests'] += 1
self.metrics['total_bytes_processed'] += len(request.data)
# Create result
execution_time = (datetime.now() - start_time).total_seconds() * 1000
result = HashResult(
request_id=request.id,
success=True,
hash_value=hash_value,
algorithm=request.algorithm,
error=None,
execution_time_ms=execution_time,
timestamp=datetime.now()
)
self.logger.info(f"Request {request.id} completed successfully")
except Exception as e:
self.metrics['failed_requests'] += 1
error_msg = str(e)
self.logger.error(f"Request {request.id} failed: {error_msg}")
self.logger.debug(traceback.format_exc())
execution_time = (datetime.now() - start_time).total_seconds() * 1000
result = HashResult(
request_id=request.id,
success=False,
hash_value=None,
algorithm=request.algorithm,
error=error_msg,
execution_time_ms=execution_time,
timestamp=datetime.now()
)
finally:
# Audit logging
self._audit_log(request, result)
return result
def _validate_request(self, request: HashRequest):
"""Validate hash request."""
if not request.id:
raise ValueError("Request ID is required")
if not request.data:
raise ValueError("Data is required")
if len(request.data) > self.config.get('max_data_size', 104857600): # 100MB default
raise ValueError("Data exceeds maximum size limit")
if request.algorithm not in HashAlgorithm:
raise ValueError(f"Unsupported algorithm: {request.algorithm}")
def _compute_hash(self, data: bytes, algorithm: HashAlgorithm) -> str:
"""Compute hash using specified algorithm."""
hasher = hashlib.new(algorithm.value)
hasher.update(data)
return hasher.hexdigest()
def _audit_log(self, request: HashRequest, result: HashResult):
"""Log audit trail."""
audit_entry = {
'request_id': request.id,
'timestamp': datetime.now().isoformat(),
'algorithm': request.algorithm.value,
'data_size': len(request.data),
'success': result.success,
'execution_time_ms': result.execution_time_ms,
'error': result.error,
'metadata': request.metadata
}
self.audit_log.append(audit_entry)
# Persist to file
with open('audit_log.jsonl', 'a') as f:
f.write(json.dumps(audit_entry) + '\n')
def batch_process(self, requests: List[HashRequest]) -> List[HashResult]:
"""
Process multiple hash requests.
Args:
requests: List of hash requests
Returns:
List of hash results
"""
results = []
for request in requests:
result = self.process_request(request)
results.append(result)
return results
def get_metrics(self) -> Dict[str, Any]:
"""Get service metrics."""
success_rate = (
self.metrics['successful_requests'] / self.metrics['total_requests']
if self.metrics['total_requests'] > 0 else 0
)
return {
**self.metrics,
'success_rate': success_rate,
'average_bytes': (
self.metrics['total_bytes_processed'] / self.metrics['successful_requests']
if self.metrics['successful_requests'] > 0 else 0
)
}
def health_check(self) -> Dict[str, Any]:
"""Service health check."""
try:
# Test hash computation
test_hash = hashlib.sha224(b"health_check").hexdigest()
healthy = test_hash == "5f3ebf1d214edfd0996c6e29476a76a357c734b5497b523e7cb6e8ef"
return {
'healthy': healthy,
'timestamp': datetime.now().isoformat(),
'metrics': self.get_metrics()
}
except Exception as e:
return {
'healthy': False,
'error': str(e),
'timestamp': datetime.now().isoformat()
}
# Usage example
service = EnterpriseHashService(config={'max_data_size': 52428800}) # 50MB limit
# Create request
request = HashRequest(
id="REQ-001",
data=b"Enterprise data to hash",
algorithm=HashAlgorithm.SHA224,
metadata={'client': 'webapp', 'version': '1.0'},
timestamp=datetime.now()
)
# Process request
result = service.process_request(request)
print(f"Result: {asdict(result)}")
# Get metrics
metrics = service.get_metrics()
print(f"Metrics: {metrics}")
# Health check
health = service.health_check()
print(f"Health: {health}")
7. Testing & Validation
Comprehensive Test Suite
Python
import unittest
import hashlib
import tempfile
from pathlib import Path
import time
class SHA224TestSuite(unittest.TestCase):
"""Comprehensive test suite for SHA-224 implementation."""
def setUp(self):
"""Set up test fixtures."""
self.test_vectors = [
# (input, expected_hash)
(b"", "d14a028c2a3a2bc9476102bb288234c415a2b01f828ea62ac5b3e42f"),
(b"abc", "23097d223405d8228642a477bda255b32aadbce4bda0b3f7e36c9da7"),
(b"The quick brown fox jumps over the lazy dog",
"730e109bd7a8a32b1cb9d9a09aa2325d2430587ddbc0c38bad911525")
]
def test_empty_string(self):
"""Test hash of empty string."""
result = hashlib.sha224(b"").hexdigest()
expected = "d14a028c2a3a2bc9476102bb288234c415a2b01f828ea62ac5b3e42f"
self.assertEqual(result, expected)
def test_known_vectors(self):
"""Test against known test vectors."""
for input_data, expected in self.test_vectors:
with self.subTest(input=input_data):
result = hashlib.sha224(input_data).hexdigest()
self.assertEqual(result, expected)
def test_incremental_update(self):
"""Test incremental hash updates."""
hasher1 = hashlib.sha224()
hasher1.update(b"Hello")
hasher1.update(b" ")
hasher1.update(b"World")
hasher2 = hashlib.sha224()
hasher2.update(b"Hello World")
self.assertEqual(hasher1.hexdigest(), hasher2.hexdigest())
def test_file_hashing(self):
"""Test file hashing."""
# Create temporary file
with tempfile.NamedTemporaryFile(delete=False) as f:
f.write(b"Test file content for SHA-224 hashing")
temp_path = Path(f.name)
try:
# Hash file
hasher = hashlib.sha224()
with temp_path.open('rb') as f:
hasher.update(f.read())
file_hash = hasher.hexdigest()
# Verify
direct_hash = hashlib.sha224(b"Test file content for SHA-224 hashing").hexdigest()
self.assertEqual(file_hash, direct_hash)
finally:
temp_path.unlink()
def test_large_data(self):
"""Test hashing large data."""
# 10MB of data
large_data = b"x" * (10 * 1024 * 1024)
start = time.time()
result = hashlib.sha224(large_data).hexdigest()
duration = time.time() - start
self.assertIsNotNone(result)
self.assertEqual(len(result), 56) # SHA-224 produces 56 hex chars
self.assertLess(duration, 5.0) # Should complete within 5 seconds
def test_unicode_handling(self):
"""Test Unicode string handling."""
unicode_strings = [
"Hello World",
"Héllo Wörld",
"你好世界",
"🚀 Emoji test"
]
for text in unicode_strings:
with self.subTest(text=text):
# Should work with UTF-8 encoding
result = hashlib.sha224(text.encode('utf-8')).hexdigest()
self.assertEqual(len(result), 56)
def test_consistency(self):
"""Test hash consistency."""
data = b"Consistency test data"
# Hash same data multiple times
hashes = [hashlib.sha224(data).hexdigest() for _ in range(100)]
# All should be identical
self.assertEqual(len(set(hashes)), 1)
def test_error_handling(self):
"""Test error handling."""
# Test with invalid types
with self.assertRaises(TypeError):
hashlib.sha224("string without encoding") # Should fail
def test_performance(self):
"""Benchmark performance."""
sizes = [1024, 10240, 102400, 1048576] # 1KB, 10KB, 100KB, 1MB
results = {}
for size in sizes:
data = b"x" * size
start = time.time()
for _ in range(100):
hashlib.sha224(data).hexdigest()
duration = time.time() - start
results[size] = duration
# Verify reasonable performance
for size, duration in results.items():
throughput = (size * 100) / duration / 1048576 # MB/s
self.assertGreater(throughput, 10) # At least 10 MB/s
# Run tests
if __name__ == '__main__':
unittest.main()
Python SHA-224 Best Practices
✅ DO:
- Always handle encoding explicitly (UTF-8 is recommended)
- Use streaming for files larger than available memory
- Implement proper error handling and logging
- Cache results when appropriate
- Use async/concurrent processing for multiple files
- Validate input data before processing
- Use context managers for resource cleanup
- Test with known test vectors
❌ DON'T:
- Load entire large files into memory
- Ignore encoding issues
- Use SHA-224 for password storage without salting
- Assume consistent performance across platforms
- Skip error handling in production code
- Mix bytes and strings without explicit conversion
Performance Tips:
- Optimal chunk size is typically 64KB-1MB
- Use memory mapping for very large files
- Consider process pools for CPU-bound operations
- Implement caching for frequently accessed data
- Profile your code to find bottlenecks