SHA-224 in Python: Production Deep Dive

Master SHA-224 implementation in Python with production-ready patterns, async processing, streaming capabilities, and enterprise-grade error handling.

Table of Contents

  1. Fundamentals & Best Practices
  2. Streaming & Large File Processing
  3. Async & Concurrent Processing
  4. Performance Optimization
  5. Enterprise Patterns
  6. Security Implementations
  7. Testing & Validation
  8. Framework Integration
  9. Monitoring & Observability
  10. Real-World Examples

1. Fundamentals & Best Practices

Basic Implementation with Error Handling

Python
import hashlib
import logging
from typing import Union, Optional
from pathlib import Path

# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

class SHA224Hasher:
    """Production-ready SHA-224 hasher with comprehensive error handling."""

    @staticmethod
    def hash_string(data: str, encoding: str = 'utf-8') -> Optional[str]:
        """
        Hash a string using SHA-224.

        Args:
            data: String to hash
            encoding: Character encoding (default: utf-8)

        Returns:
            Hex digest or None on error
        """
        try:
            if not isinstance(data, str):
                raise TypeError(f"Expected str, got {type(data).__name__}")

            encoded = data.encode(encoding)
            hash_obj = hashlib.sha224(encoded)
            return hash_obj.hexdigest()

        except UnicodeEncodeError as e:
            logger.error(f"Encoding error: {e}")
            return None
        except Exception as e:
            logger.error(f"Unexpected error hashing string: {e}")
            return None

    @staticmethod
    def hash_bytes(data: bytes) -> Optional[str]:
        """
        Hash bytes using SHA-224.

        Args:
            data: Bytes to hash

        Returns:
            Hex digest or None on error
        """
        try:
            if not isinstance(data, bytes):
                raise TypeError(f"Expected bytes, got {type(data).__name__}")

            hash_obj = hashlib.sha224(data)
            return hash_obj.hexdigest()

        except Exception as e:
            logger.error(f"Error hashing bytes: {e}")
            return None

    @staticmethod
    def hash_file(filepath: Union[str, Path], chunk_size: int = 8192) -> Optional[str]:
        """
        Hash a file using SHA-224 with streaming.

        Args:
            filepath: Path to file
            chunk_size: Bytes to read at once

        Returns:
            Hex digest or None on error
        """
        try:
            filepath = Path(filepath)

            if not filepath.exists():
                raise FileNotFoundError(f"File not found: {filepath}")

            if not filepath.is_file():
                raise ValueError(f"Not a file: {filepath}")

            hash_obj = hashlib.sha224()

            with filepath.open('rb') as f:
                while chunk := f.read(chunk_size):
                    hash_obj.update(chunk)

            return hash_obj.hexdigest()

        except FileNotFoundError as e:
            logger.error(f"File not found: {e}")
            return None
        except PermissionError as e:
            logger.error(f"Permission denied: {e}")
            return None
        except Exception as e:
            logger.error(f"Error hashing file: {e}")
            return None

# Usage examples
hasher = SHA224Hasher()

# Hash string
result = hasher.hash_string("Hello, World!")
print(f"String hash: {result}")

# Hash bytes
result = hasher.hash_bytes(b"Binary data")
print(f"Bytes hash: {result}")

# Hash file
result = hasher.hash_file("/path/to/file.txt")
print(f"File hash: {result}")

Context Manager Pattern

Python
import hashlib
from contextlib import contextmanager
from typing import Generator

class SHA224Context:
    """Context manager for SHA-224 operations with automatic cleanup."""

    def __init__(self):
        self.hasher = None

    def __enter__(self):
        self.hasher = hashlib.sha224()
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        # Cleanup if needed
        self.hasher = None

    def update(self, data: bytes):
        """Update hash with new data."""
        if self.hasher:
            self.hasher.update(data)
        else:
            raise RuntimeError("Context manager not initialized")

    def digest(self) -> str:
        """Get hex digest."""
        if self.hasher:
            return self.hasher.hexdigest()
        else:
            raise RuntimeError("Context manager not initialized")

@contextmanager
def sha224_context() -> Generator[hashlib.sha224, None, None]:
    """
    Simple context manager for SHA-224 operations.

    Example:
        with sha224_context() as hasher:
            hasher.update(b"data1")
            hasher.update(b"data2")
            result = hasher.hexdigest()
    """
    hasher = hashlib.sha224()
    try:
        yield hasher
    finally:
        # Cleanup if needed
        pass

# Usage examples
# Using class-based context manager
with SHA224Context() as ctx:
    ctx.update(b"Part 1")
    ctx.update(b"Part 2")
    result = ctx.digest()
    print(f"Class context result: {result}")

# Using function-based context manager
with sha224_context() as hasher:
    hasher.update(b"Part 1")
    hasher.update(b"Part 2")
    result = hasher.hexdigest()
    print(f"Function context result: {result}")

2. Streaming & Large File Processing

Advanced Streaming with Progress Tracking

Python
import hashlib
import time
from pathlib import Path
from typing import Callable, Optional, Generator
from dataclasses import dataclass

@dataclass
class HashProgress:
    """Progress information for hash operations."""
    total_bytes: int
    processed_bytes: int
    percent_complete: float
    elapsed_time: float
    bytes_per_second: float
    estimated_time_remaining: float

class StreamingHasher:
    """Advanced streaming hasher with progress tracking and statistics."""

    def __init__(self, algorithm: str = 'sha224'):
        self.algorithm = algorithm
        self.reset()

    def reset(self):
        """Reset the hasher to initial state."""
        self.hasher = hashlib.new(self.algorithm)
        self.total_processed = 0
        self.start_time = None

    def update(self, data: bytes):
        """Update hash with new data."""
        self.hasher.update(data)
        self.total_processed += len(data)

    def hash_file_with_progress(
        self,
        filepath: Path,
        chunk_size: int = 1048576,  # 1MB chunks
        progress_callback: Optional[Callable[[HashProgress], None]] = None,
        update_interval: float = 0.1  # Update every 100ms
    ) -> str:
        """
        Hash file with detailed progress tracking.

        Args:
            filepath: Path to file
            chunk_size: Bytes to read at once
            progress_callback: Function to call with progress updates
            update_interval: Minimum seconds between progress updates

        Returns:
            Hex digest of the file
        """
        self.reset()
        filepath = Path(filepath)
        file_size = filepath.stat().st_size
        self.start_time = time.time()
        last_update = 0

        with filepath.open('rb') as f:
            while chunk := f.read(chunk_size):
                self.update(chunk)

                # Calculate progress
                current_time = time.time()
                if progress_callback and (current_time - last_update) >= update_interval:
                    elapsed = current_time - self.start_time
                    bytes_per_second = self.total_processed / elapsed if elapsed > 0 else 0
                    percent = (self.total_processed / file_size * 100) if file_size > 0 else 0
                    remaining = ((file_size - self.total_processed) / bytes_per_second
                                if bytes_per_second > 0 else 0)

                    progress = HashProgress(
                        total_bytes=file_size,
                        processed_bytes=self.total_processed,
                        percent_complete=percent,
                        elapsed_time=elapsed,
                        bytes_per_second=bytes_per_second,
                        estimated_time_remaining=remaining
                    )

                    progress_callback(progress)
                    last_update = current_time

        return self.hasher.hexdigest()

    def hash_stream(
        self,
        stream: Generator[bytes, None, None],
        progress_callback: Optional[Callable[[int], None]] = None
    ) -> str:
        """
        Hash data from a generator/stream.

        Args:
            stream: Generator yielding bytes
            progress_callback: Function to call with bytes processed

        Returns:
            Hex digest
        """
        self.reset()

        for chunk in stream:
            self.update(chunk)
            if progress_callback:
                progress_callback(self.total_processed)

        return self.hasher.hexdigest()

# Example usage with progress bar
def print_progress(progress: HashProgress):
    """Print a progress bar to console."""
    bar_length = 40
    filled = int(bar_length * progress.percent_complete / 100)
    bar = '=' * filled + '-' * (bar_length - filled)

    # Format bytes and speed
    mb_processed = progress.processed_bytes / 1048576
    mb_total = progress.total_bytes / 1048576
    mb_per_sec = progress.bytes_per_second / 1048576

    print(f'\r[{bar}] {progress.percent_complete:.1f}% '
          f'({mb_processed:.1f}/{mb_total:.1f} MB) '
          f'@ {mb_per_sec:.1f} MB/s '
          f'ETA: {progress.estimated_time_remaining:.1f}s', end='')

# Hash a large file with progress
hasher = StreamingHasher()
# result = hasher.hash_file_with_progress(
#     Path('/path/to/large/file.bin'),
#     progress_callback=print_progress
# )
# print(f"\nHash: {result}")

# Generator example
def data_generator():
    """Example generator producing data chunks."""
    for i in range(100):
        yield f"Chunk {i}\n".encode('utf-8')
        time.sleep(0.01)  # Simulate slow data source

result = hasher.hash_stream(data_generator())
print(f"Stream hash: {result}")

Memory-Mapped File Hashing

Python
import hashlib
import mmap
from pathlib import Path
from typing import Optional

class MemoryMappedHasher:
    """Hash files using memory mapping for optimal performance."""

    @staticmethod
    def hash_file_mmap(filepath: Path) -> Optional[str]:
        """
        Hash file using memory mapping.
        Efficient for large files on systems with sufficient memory.

        Args:
            filepath: Path to file

        Returns:
            Hex digest or None on error
        """
        try:
            filepath = Path(filepath)

            # Handle empty files
            if filepath.stat().st_size == 0:
                return hashlib.sha224(b'').hexdigest()

            with filepath.open('rb') as f:
                with mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) as mmapped:
                    return hashlib.sha224(mmapped).hexdigest()

        except Exception as e:
            print(f"Error with memory mapping: {e}")
            return None

    @staticmethod
    def hash_file_chunked_mmap(
        filepath: Path,
        chunk_size: int = 67108864  # 64MB chunks
    ) -> Optional[str]:
        """
        Hash file using memory mapping with chunking.
        Best for very large files.

        Args:
            filepath: Path to file
            chunk_size: Size of each memory-mapped chunk

        Returns:
            Hex digest or None on error
        """
        try:
            filepath = Path(filepath)
            file_size = filepath.stat().st_size

            if file_size == 0:
                return hashlib.sha224(b'').hexdigest()

            hasher = hashlib.sha224()

            with filepath.open('rb') as f:
                offset = 0
                while offset < file_size:
                    # Calculate chunk length
                    length = min(chunk_size, file_size - offset)

                    # Memory map this chunk
                    with mmap.mmap(f.fileno(), length, offset=offset,
                                  access=mmap.ACCESS_READ) as mmapped:
                        hasher.update(mmapped)

                    offset += length

            return hasher.hexdigest()

        except Exception as e:
            print(f"Error with chunked memory mapping: {e}")
            return None

# Example usage
mmap_hasher = MemoryMappedHasher()

# Standard memory mapping (entire file)
# result = mmap_hasher.hash_file_mmap(Path('/path/to/file.bin'))
# print(f"Memory-mapped hash: {result}")

# Chunked memory mapping (for very large files)
# result = mmap_hasher.hash_file_chunked_mmap(Path('/path/to/huge/file.bin'))
# print(f"Chunked mmap hash: {result}")

3. Async & Concurrent Processing

Async File Hashing

Python
import asyncio
import hashlib
import aiofiles
from pathlib import Path
from typing import List, Dict, Optional
import time

class AsyncHasher:
    """Asynchronous SHA-224 hasher for concurrent operations."""

    @staticmethod
    async def hash_file_async(
        filepath: Path,
        chunk_size: int = 65536
    ) -> Optional[str]:
        """
        Hash a file asynchronously.

        Args:
            filepath: Path to file
            chunk_size: Bytes to read at once

        Returns:
            Hex digest or None on error
        """
        try:
            hasher = hashlib.sha224()

            async with aiofiles.open(filepath, 'rb') as f:
                while chunk := await f.read(chunk_size):
                    hasher.update(chunk)

            return hasher.hexdigest()

        except Exception as e:
            print(f"Error hashing file {filepath}: {e}")
            return None

    @staticmethod
    async def hash_multiple_files(
        filepaths: List[Path],
        max_concurrent: int = 10
    ) -> Dict[Path, Optional[str]]:
        """
        Hash multiple files concurrently with semaphore control.

        Args:
            filepaths: List of file paths
            max_concurrent: Maximum concurrent operations

        Returns:
            Dictionary mapping filepath to hash
        """
        semaphore = asyncio.Semaphore(max_concurrent)

        async def hash_with_semaphore(filepath: Path) -> tuple:
            async with semaphore:
                result = await AsyncHasher.hash_file_async(filepath)
                return filepath, result

        tasks = [hash_with_semaphore(fp) for fp in filepaths]
        results = await asyncio.gather(*tasks)

        return dict(results)

    @staticmethod
    async def hash_stream_async(stream) -> str:
        """
        Hash an async stream/generator.

        Args:
            stream: Async generator yielding bytes

        Returns:
            Hex digest
        """
        hasher = hashlib.sha224()

        async for chunk in stream:
            hasher.update(chunk)

        return hasher.hexdigest()

    @staticmethod
    async def hash_with_timeout(
        filepath: Path,
        timeout: float = 30.0
    ) -> Optional[str]:
        """
        Hash file with timeout protection.

        Args:
            filepath: Path to file
            timeout: Maximum seconds to wait

        Returns:
            Hex digest or None if timeout/error
        """
        try:
            return await asyncio.wait_for(
                AsyncHasher.hash_file_async(filepath),
                timeout=timeout
            )
        except asyncio.TimeoutError:
            print(f"Timeout hashing {filepath}")
            return None

# Example async stream generator
async def async_data_generator():
    """Generate data asynchronously."""
    for i in range(10):
        await asyncio.sleep(0.1)  # Simulate async I/O
        yield f"Async chunk {i}\n".encode('utf-8')

# Usage examples
async def main():
    hasher = AsyncHasher()

    # Single file
    result = await hasher.hash_file_async(Path('/path/to/file.txt'))
    print(f"Single file: {result}")

    # Multiple files concurrently
    files = [Path(f'/path/to/file{i}.txt') for i in range(10)]
    results = await hasher.hash_multiple_files(files)
    for filepath, hash_value in results.items():
        print(f"{filepath}: {hash_value}")

    # Async stream
    result = await hasher.hash_stream_async(async_data_generator())
    print(f"Stream hash: {result}")

    # With timeout
    result = await hasher.hash_with_timeout(
        Path('/path/to/large/file.bin'),
        timeout=10.0
    )
    print(f"With timeout: {result}")

# Run async code
# asyncio.run(main())

Thread Pool & Process Pool Hashing

Python
import hashlib
import concurrent.futures
from pathlib import Path
from typing import List, Dict, Optional, Callable
import multiprocessing
import time

class ConcurrentHasher:
    """Concurrent hashing using thread and process pools."""

    @staticmethod
    def hash_file(filepath: Path) -> tuple:
        """Hash a single file (for use in pools)."""
        try:
            hasher = hashlib.sha224()
            with filepath.open('rb') as f:
                while chunk := f.read(8192):
                    hasher.update(chunk)
            return filepath, hasher.hexdigest()
        except Exception as e:
            return filepath, None

    @staticmethod
    def hash_files_threaded(
        filepaths: List[Path],
        max_workers: Optional[int] = None,
        progress_callback: Optional[Callable] = None
    ) -> Dict[Path, Optional[str]]:
        """
        Hash files using ThreadPoolExecutor.
        Good for I/O-bound operations.

        Args:
            filepaths: List of file paths
            max_workers: Maximum threads (None = default)
            progress_callback: Called with (completed, total)

        Returns:
            Dictionary of filepath to hash
        """
        results = {}
        total = len(filepaths)
        completed = 0

        with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
            # Submit all tasks
            future_to_file = {
                executor.submit(ConcurrentHasher.hash_file, fp): fp
                for fp in filepaths
            }

            # Process completed futures
            for future in concurrent.futures.as_completed(future_to_file):
                filepath, hash_value = future.result()
                results[filepath] = hash_value
                completed += 1

                if progress_callback:
                    progress_callback(completed, total)

        return results

    @staticmethod
    def hash_files_multiprocess(
        filepaths: List[Path],
        max_workers: Optional[int] = None,
        chunk_size: int = 10
    ) -> Dict[Path, Optional[str]]:
        """
        Hash files using ProcessPoolExecutor.
        Good for CPU-bound operations or very large files.

        Args:
            filepaths: List of file paths
            max_workers: Maximum processes (None = CPU count)
            chunk_size: Files to process per task

        Returns:
            Dictionary of filepath to hash
        """
        if max_workers is None:
            max_workers = multiprocessing.cpu_count()

        results = {}

        with concurrent.futures.ProcessPoolExecutor(max_workers=max_workers) as executor:
            # Process in chunks for better efficiency
            futures = []
            for i in range(0, len(filepaths), chunk_size):
                batch = filepaths[i:i + chunk_size]
                future = executor.submit(ConcurrentHasher._hash_batch, batch)
                futures.append(future)

            # Collect results
            for future in concurrent.futures.as_completed(futures):
                batch_results = future.result()
                results.update(batch_results)

        return results

    @staticmethod
    def _hash_batch(filepaths: List[Path]) -> Dict[Path, Optional[str]]:
        """Hash a batch of files (for process pool)."""
        results = {}
        for filepath in filepaths:
            _, hash_value = ConcurrentHasher.hash_file(filepath)
            results[filepath] = hash_value
        return results

    @staticmethod
    def benchmark_methods(filepaths: List[Path]) -> Dict[str, float]:
        """
        Benchmark different hashing methods.

        Args:
            filepaths: List of files to hash

        Returns:
            Dictionary of method name to execution time
        """
        results = {}

        # Sequential
        start = time.time()
        for fp in filepaths:
            ConcurrentHasher.hash_file(fp)
        results['sequential'] = time.time() - start

        # Threaded
        start = time.time()
        ConcurrentHasher.hash_files_threaded(filepaths)
        results['threaded'] = time.time() - start

        # Multiprocess
        start = time.time()
        ConcurrentHasher.hash_files_multiprocess(filepaths)
        results['multiprocess'] = time.time() - start

        return results

# Usage example
def progress_printer(completed: int, total: int):
    """Print progress."""
    percent = (completed / total) * 100
    print(f"Progress: {completed}/{total} ({percent:.1f}%)")

# Example usage
hasher = ConcurrentHasher()

# Create test files list
test_files = [Path(f'/tmp/file{i}.txt') for i in range(100)]

# Threaded hashing with progress
# results = hasher.hash_files_threaded(
#     test_files,
#     max_workers=4,
#     progress_callback=progress_printer
# )

# Multiprocess hashing
# results = hasher.hash_files_multiprocess(
#     test_files,
#     max_workers=4
# )

# Benchmark different methods
# timings = hasher.benchmark_methods(test_files)
# for method, duration in timings.items():
#     print(f"{method}: {duration:.2f} seconds")

4. Performance Optimization

Optimized Hashing with Caching

Python
import hashlib
import functools
import pickle
from pathlib import Path
from typing import Optional, Dict, Any
import time
import sqlite3

class CachedHasher:
    """SHA-224 hasher with intelligent caching."""

    def __init__(self, cache_dir: Optional[Path] = None):
        self.cache_dir = cache_dir or Path.home() / '.sha224_cache'
        self.cache_dir.mkdir(exist_ok=True)
        self.memory_cache = {}
        self.stats = {'hits': 0, 'misses': 0, 'errors': 0}

        # Initialize SQLite cache
        self.db_path = self.cache_dir / 'hash_cache.db'
        self._init_db()

    def _init_db(self):
        """Initialize SQLite cache database."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute('''
                CREATE TABLE IF NOT EXISTS hash_cache (
                    filepath TEXT PRIMARY KEY,
                    hash TEXT NOT NULL,
                    file_size INTEGER NOT NULL,
                    mtime REAL NOT NULL,
                    computed_at REAL NOT NULL
                )
            ''')
            conn.execute('CREATE INDEX IF NOT EXISTS idx_mtime ON hash_cache(mtime)')

    @functools.lru_cache(maxsize=1024)
    def hash_string_cached(self, data: str) -> str:
        """
        Hash string with in-memory LRU cache.

        Args:
            data: String to hash

        Returns:
            Hex digest
        """
        return hashlib.sha224(data.encode()).hexdigest()

    def hash_file_cached(
        self,
        filepath: Path,
        use_memory_cache: bool = True,
        use_disk_cache: bool = True
    ) -> Optional[str]:
        """
        Hash file with multi-level caching.

        Args:
            filepath: Path to file
            use_memory_cache: Use in-memory cache
            use_disk_cache: Use SQLite cache

        Returns:
            Hex digest or None on error
        """
        filepath = Path(filepath)

        try:
            # Get file metadata
            stat = filepath.stat()
            file_key = f"{filepath}:{stat.st_size}:{stat.st_mtime}"

            # Check memory cache
            if use_memory_cache and file_key in self.memory_cache:
                self.stats['hits'] += 1
                return self.memory_cache[file_key]

            # Check disk cache
            if use_disk_cache:
                cached = self._get_from_db(filepath, stat)
                if cached:
                    self.stats['hits'] += 1
                    if use_memory_cache:
                        self.memory_cache[file_key] = cached
                    return cached

            # Cache miss - compute hash
            self.stats['misses'] += 1
            hash_value = self._compute_hash(filepath)

            if hash_value:
                # Update caches
                if use_memory_cache:
                    self.memory_cache[file_key] = hash_value
                if use_disk_cache:
                    self._save_to_db(filepath, hash_value, stat)

            return hash_value

        except Exception as e:
            self.stats['errors'] += 1
            print(f"Error: {e}")
            return None

    def _compute_hash(self, filepath: Path) -> str:
        """Compute SHA-224 hash of file."""
        hasher = hashlib.sha224()
        with filepath.open('rb') as f:
            while chunk := f.read(65536):
                hasher.update(chunk)
        return hasher.hexdigest()

    def _get_from_db(self, filepath: Path, stat) -> Optional[str]:
        """Get cached hash from database."""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute(
                'SELECT hash FROM hash_cache WHERE filepath = ? AND file_size = ? AND mtime = ?',
                (str(filepath), stat.st_size, stat.st_mtime)
            )
            row = cursor.fetchone()
            return row[0] if row else None

    def _save_to_db(self, filepath: Path, hash_value: str, stat):
        """Save hash to database cache."""
        with sqlite3.connect(self.db_path) as conn:
            conn.execute(
                '''INSERT OR REPLACE INTO hash_cache
                   (filepath, hash, file_size, mtime, computed_at)
                   VALUES (?, ?, ?, ?, ?)''',
                (str(filepath), hash_value, stat.st_size, stat.st_mtime, time.time())
            )

    def clear_cache(self, older_than_days: Optional[int] = None):
        """Clear cache entries."""
        self.memory_cache.clear()

        if older_than_days:
            cutoff = time.time() - (older_than_days * 86400)
            with sqlite3.connect(self.db_path) as conn:
                conn.execute('DELETE FROM hash_cache WHERE computed_at < ?', (cutoff,))
        else:
            with sqlite3.connect(self.db_path) as conn:
                conn.execute('DELETE FROM hash_cache')

    def get_stats(self) -> Dict[str, Any]:
        """Get cache statistics."""
        with sqlite3.connect(self.db_path) as conn:
            cursor = conn.execute('SELECT COUNT(*) FROM hash_cache')
            db_entries = cursor.fetchone()[0]

        return {
            'memory_entries': len(self.memory_cache),
            'db_entries': db_entries,
            'hits': self.stats['hits'],
            'misses': self.stats['misses'],
            'errors': self.stats['errors'],
            'hit_rate': (self.stats['hits'] /
                        (self.stats['hits'] + self.stats['misses'])
                        if (self.stats['hits'] + self.stats['misses']) > 0 else 0)
        }

# Usage example
cached_hasher = CachedHasher()

# Hash with caching
result = cached_hasher.hash_file_cached(Path('/path/to/file.txt'))
print(f"Hash: {result}")

# Check stats
stats = cached_hasher.get_stats()
print(f"Cache stats: {stats}")

# Clear old cache entries
cached_hasher.clear_cache(older_than_days=30)

5. Enterprise Patterns

Enterprise Hash Service

Python
import hashlib
import logging
import json
from datetime import datetime
from typing import Optional, Dict, Any, List
from dataclasses import dataclass, asdict
from enum import Enum
import traceback

class HashAlgorithm(Enum):
    """Supported hash algorithms."""
    SHA224 = "sha224"
    SHA256 = "sha256"
    SHA384 = "sha384"
    SHA512 = "sha512"

@dataclass
class HashRequest:
    """Hash operation request."""
    id: str
    data: bytes
    algorithm: HashAlgorithm
    metadata: Dict[str, Any]
    timestamp: datetime

@dataclass
class HashResult:
    """Hash operation result."""
    request_id: str
    success: bool
    hash_value: Optional[str]
    algorithm: HashAlgorithm
    error: Optional[str]
    execution_time_ms: float
    timestamp: datetime

class EnterpriseHashService:
    """
    Enterprise-grade hash service with comprehensive features.
    """

    def __init__(self, config: Optional[Dict[str, Any]] = None):
        self.config = config or {}
        self.logger = self._setup_logger()
        self.metrics = {
            'total_requests': 0,
            'successful_requests': 0,
            'failed_requests': 0,
            'total_bytes_processed': 0
        }
        self.audit_log = []

    def _setup_logger(self) -> logging.Logger:
        """Configure enterprise logging."""
        logger = logging.getLogger('EnterpriseHashService')
        logger.setLevel(logging.INFO)

        # Console handler
        console_handler = logging.StreamHandler()
        console_formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        console_handler.setFormatter(console_formatter)
        logger.addHandler(console_handler)

        # File handler
        file_handler = logging.FileHandler('hash_service.log')
        file_formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(funcName)s:%(lineno)d - %(message)s'
        )
        file_handler.setFormatter(file_formatter)
        logger.addHandler(file_handler)

        return logger

    def process_request(self, request: HashRequest) -> HashResult:
        """
        Process a hash request with full error handling and auditing.

        Args:
            request: Hash request object

        Returns:
            Hash result object
        """
        start_time = datetime.now()
        result = None

        try:
            self.logger.info(f"Processing request {request.id} with {request.algorithm.value}")
            self.metrics['total_requests'] += 1

            # Validate request
            self._validate_request(request)

            # Compute hash
            hash_value = self._compute_hash(request.data, request.algorithm)

            # Update metrics
            self.metrics['successful_requests'] += 1
            self.metrics['total_bytes_processed'] += len(request.data)

            # Create result
            execution_time = (datetime.now() - start_time).total_seconds() * 1000
            result = HashResult(
                request_id=request.id,
                success=True,
                hash_value=hash_value,
                algorithm=request.algorithm,
                error=None,
                execution_time_ms=execution_time,
                timestamp=datetime.now()
            )

            self.logger.info(f"Request {request.id} completed successfully")

        except Exception as e:
            self.metrics['failed_requests'] += 1
            error_msg = str(e)
            self.logger.error(f"Request {request.id} failed: {error_msg}")
            self.logger.debug(traceback.format_exc())

            execution_time = (datetime.now() - start_time).total_seconds() * 1000
            result = HashResult(
                request_id=request.id,
                success=False,
                hash_value=None,
                algorithm=request.algorithm,
                error=error_msg,
                execution_time_ms=execution_time,
                timestamp=datetime.now()
            )

        finally:
            # Audit logging
            self._audit_log(request, result)

        return result

    def _validate_request(self, request: HashRequest):
        """Validate hash request."""
        if not request.id:
            raise ValueError("Request ID is required")

        if not request.data:
            raise ValueError("Data is required")

        if len(request.data) > self.config.get('max_data_size', 104857600):  # 100MB default
            raise ValueError("Data exceeds maximum size limit")

        if request.algorithm not in HashAlgorithm:
            raise ValueError(f"Unsupported algorithm: {request.algorithm}")

    def _compute_hash(self, data: bytes, algorithm: HashAlgorithm) -> str:
        """Compute hash using specified algorithm."""
        hasher = hashlib.new(algorithm.value)
        hasher.update(data)
        return hasher.hexdigest()

    def _audit_log(self, request: HashRequest, result: HashResult):
        """Log audit trail."""
        audit_entry = {
            'request_id': request.id,
            'timestamp': datetime.now().isoformat(),
            'algorithm': request.algorithm.value,
            'data_size': len(request.data),
            'success': result.success,
            'execution_time_ms': result.execution_time_ms,
            'error': result.error,
            'metadata': request.metadata
        }

        self.audit_log.append(audit_entry)

        # Persist to file
        with open('audit_log.jsonl', 'a') as f:
            f.write(json.dumps(audit_entry) + '\n')

    def batch_process(self, requests: List[HashRequest]) -> List[HashResult]:
        """
        Process multiple hash requests.

        Args:
            requests: List of hash requests

        Returns:
            List of hash results
        """
        results = []
        for request in requests:
            result = self.process_request(request)
            results.append(result)
        return results

    def get_metrics(self) -> Dict[str, Any]:
        """Get service metrics."""
        success_rate = (
            self.metrics['successful_requests'] / self.metrics['total_requests']
            if self.metrics['total_requests'] > 0 else 0
        )

        return {
            **self.metrics,
            'success_rate': success_rate,
            'average_bytes': (
                self.metrics['total_bytes_processed'] / self.metrics['successful_requests']
                if self.metrics['successful_requests'] > 0 else 0
            )
        }

    def health_check(self) -> Dict[str, Any]:
        """Service health check."""
        try:
            # Test hash computation
            test_hash = hashlib.sha224(b"health_check").hexdigest()
            healthy = test_hash == "5f3ebf1d214edfd0996c6e29476a76a357c734b5497b523e7cb6e8ef"

            return {
                'healthy': healthy,
                'timestamp': datetime.now().isoformat(),
                'metrics': self.get_metrics()
            }
        except Exception as e:
            return {
                'healthy': False,
                'error': str(e),
                'timestamp': datetime.now().isoformat()
            }

# Usage example
service = EnterpriseHashService(config={'max_data_size': 52428800})  # 50MB limit

# Create request
request = HashRequest(
    id="REQ-001",
    data=b"Enterprise data to hash",
    algorithm=HashAlgorithm.SHA224,
    metadata={'client': 'webapp', 'version': '1.0'},
    timestamp=datetime.now()
)

# Process request
result = service.process_request(request)
print(f"Result: {asdict(result)}")

# Get metrics
metrics = service.get_metrics()
print(f"Metrics: {metrics}")

# Health check
health = service.health_check()
print(f"Health: {health}")

7. Testing & Validation

Comprehensive Test Suite

Python
import unittest
import hashlib
import tempfile
from pathlib import Path
import time

class SHA224TestSuite(unittest.TestCase):
    """Comprehensive test suite for SHA-224 implementation."""

    def setUp(self):
        """Set up test fixtures."""
        self.test_vectors = [
            # (input, expected_hash)
            (b"", "d14a028c2a3a2bc9476102bb288234c415a2b01f828ea62ac5b3e42f"),
            (b"abc", "23097d223405d8228642a477bda255b32aadbce4bda0b3f7e36c9da7"),
            (b"The quick brown fox jumps over the lazy dog",
             "730e109bd7a8a32b1cb9d9a09aa2325d2430587ddbc0c38bad911525")
        ]

    def test_empty_string(self):
        """Test hash of empty string."""
        result = hashlib.sha224(b"").hexdigest()
        expected = "d14a028c2a3a2bc9476102bb288234c415a2b01f828ea62ac5b3e42f"
        self.assertEqual(result, expected)

    def test_known_vectors(self):
        """Test against known test vectors."""
        for input_data, expected in self.test_vectors:
            with self.subTest(input=input_data):
                result = hashlib.sha224(input_data).hexdigest()
                self.assertEqual(result, expected)

    def test_incremental_update(self):
        """Test incremental hash updates."""
        hasher1 = hashlib.sha224()
        hasher1.update(b"Hello")
        hasher1.update(b" ")
        hasher1.update(b"World")

        hasher2 = hashlib.sha224()
        hasher2.update(b"Hello World")

        self.assertEqual(hasher1.hexdigest(), hasher2.hexdigest())

    def test_file_hashing(self):
        """Test file hashing."""
        # Create temporary file
        with tempfile.NamedTemporaryFile(delete=False) as f:
            f.write(b"Test file content for SHA-224 hashing")
            temp_path = Path(f.name)

        try:
            # Hash file
            hasher = hashlib.sha224()
            with temp_path.open('rb') as f:
                hasher.update(f.read())
            file_hash = hasher.hexdigest()

            # Verify
            direct_hash = hashlib.sha224(b"Test file content for SHA-224 hashing").hexdigest()
            self.assertEqual(file_hash, direct_hash)

        finally:
            temp_path.unlink()

    def test_large_data(self):
        """Test hashing large data."""
        # 10MB of data
        large_data = b"x" * (10 * 1024 * 1024)

        start = time.time()
        result = hashlib.sha224(large_data).hexdigest()
        duration = time.time() - start

        self.assertIsNotNone(result)
        self.assertEqual(len(result), 56)  # SHA-224 produces 56 hex chars
        self.assertLess(duration, 5.0)  # Should complete within 5 seconds

    def test_unicode_handling(self):
        """Test Unicode string handling."""
        unicode_strings = [
            "Hello World",
            "Héllo Wörld",
            "你好世界",
            "🚀 Emoji test"
        ]

        for text in unicode_strings:
            with self.subTest(text=text):
                # Should work with UTF-8 encoding
                result = hashlib.sha224(text.encode('utf-8')).hexdigest()
                self.assertEqual(len(result), 56)

    def test_consistency(self):
        """Test hash consistency."""
        data = b"Consistency test data"

        # Hash same data multiple times
        hashes = [hashlib.sha224(data).hexdigest() for _ in range(100)]

        # All should be identical
        self.assertEqual(len(set(hashes)), 1)

    def test_error_handling(self):
        """Test error handling."""
        # Test with invalid types
        with self.assertRaises(TypeError):
            hashlib.sha224("string without encoding")  # Should fail

    def test_performance(self):
        """Benchmark performance."""
        sizes = [1024, 10240, 102400, 1048576]  # 1KB, 10KB, 100KB, 1MB
        results = {}

        for size in sizes:
            data = b"x" * size
            start = time.time()

            for _ in range(100):
                hashlib.sha224(data).hexdigest()

            duration = time.time() - start
            results[size] = duration

        # Verify reasonable performance
        for size, duration in results.items():
            throughput = (size * 100) / duration / 1048576  # MB/s
            self.assertGreater(throughput, 10)  # At least 10 MB/s

# Run tests
if __name__ == '__main__':
    unittest.main()

Python SHA-224 Best Practices

✅ DO:

❌ DON'T:

Performance Tips:

Additional Resources

Python hashlib Docs

Official Python documentation for the hashlib module

View Docs

PyCryptodome

Self-contained Python package of low-level cryptographic primitives

Learn More

Python Cryptography

Modern cryptographic library for Python

Explore