Ready-to-use recipes for every SHA-224 use case
While SHA-224 alone shouldn't be used for passwords, this recipe shows how to properly implement password hashing with salt and key stretching for legacy systems that require SHA-224.
import hashlib
import secrets
import hmac
from typing import Tuple
class SHA224PasswordManager:
"""Secure password management using SHA-224 with salt and stretching"""
def __init__(self, iterations: int = 100000, salt_length: int = 32):
self.iterations = iterations
self.salt_length = salt_length
def hash_password(self, password: str) -> Tuple[str, str]:
"""
Hash a password with salt and key stretching
Returns: (salt, hashed_password) as hex strings
"""
# Generate cryptographically secure salt
salt = secrets.token_bytes(self.salt_length)
# Apply key stretching using PBKDF2-like approach with SHA-224
hashed = self._stretch_key(password.encode(), salt)
return salt.hex(), hashed.hex()
def verify_password(self, password: str, salt_hex: str, hash_hex: str) -> bool:
"""
Verify a password against stored salt and hash
Uses constant-time comparison to prevent timing attacks
"""
salt = bytes.fromhex(salt_hex)
expected_hash = bytes.fromhex(hash_hex)
# Compute hash for provided password
computed_hash = self._stretch_key(password.encode(), salt)
# Use constant-time comparison
return hmac.compare_digest(computed_hash, expected_hash)
def _stretch_key(self, password: bytes, salt: bytes) -> bytes:
"""Apply key stretching using iterated SHA-224"""
# Initial hash with salt
current = hashlib.sha224(salt + password).digest()
# Iterate to increase computational cost
for i in range(self.iterations):
# Include iteration count to prevent short cycles
current = hashlib.sha224(
current + salt + i.to_bytes(4, 'big')
).digest()
return current
def migrate_from_plain_sha224(self, password: str, old_hash: str) -> Tuple[str, str]:
"""
Migrate from plain SHA-224 to salted version
Useful for upgrading legacy systems
"""
# Verify against old hash first
if hashlib.sha224(password.encode()).hexdigest() != old_hash:
raise ValueError("Password does not match existing hash")
# Generate new salted hash
return self.hash_password(password)
# Usage Example
def main():
manager = SHA224PasswordManager(iterations=100000)
# Hash a new password
password = "SecurePassword123!"
salt, hash_value = manager.hash_password(password)
print(f"Salt: {salt}")
print(f"Hash: {hash_value}")
# Verify password
is_valid = manager.verify_password("SecurePassword123!", salt, hash_value)
print(f"Password valid: {is_valid}")
# Verify wrong password
is_valid = manager.verify_password("WrongPassword", salt, hash_value)
print(f"Wrong password valid: {is_valid}")
# Migrate from legacy system
legacy_hash = hashlib.sha224(b"LegacyPassword").hexdigest()
try:
new_salt, new_hash = manager.migrate_from_plain_sha224("LegacyPassword", legacy_hash)
print(f"Migration successful - New salt: {new_salt[:16]}...")
except ValueError as e:
print(f"Migration failed: {e}")
if __name__ == "__main__":
main()
const crypto = require('crypto');
class SHA224PasswordManager {
constructor(iterations = 100000, saltLength = 32) {
this.iterations = iterations;
this.saltLength = saltLength;
}
/**
* Hash a password with salt and key stretching
*/
async hashPassword(password) {
// Generate cryptographically secure salt
const salt = crypto.randomBytes(this.saltLength);
// Apply key stretching
const hashed = await this.stretchKey(password, salt);
return {
salt: salt.toString('hex'),
hash: hashed.toString('hex')
};
}
/**
* Verify a password against stored salt and hash
*/
async verifyPassword(password, saltHex, hashHex) {
const salt = Buffer.from(saltHex, 'hex');
const expectedHash = Buffer.from(hashHex, 'hex');
// Compute hash for provided password
const computedHash = await this.stretchKey(password, salt);
// Use timing-safe comparison
return crypto.timingSafeEqual(computedHash, expectedHash);
}
/**
* Apply key stretching using iterated SHA-224
*/
async stretchKey(password, salt) {
let current = crypto
.createHash('sha224')
.update(salt)
.update(password)
.digest();
// Iterate to increase computational cost
for (let i = 0; i < this.iterations; i++) {
const iter = Buffer.allocUnsafe(4);
iter.writeUInt32BE(i, 0);
current = crypto
.createHash('sha224')
.update(current)
.update(salt)
.update(iter)
.digest();
}
return current;
}
/**
* Hash password using Web Crypto API (browser)
*/
async hashPasswordBrowser(password) {
const encoder = new TextEncoder();
const salt = crypto.getRandomValues(new Uint8Array(this.saltLength));
let current = await crypto.subtle.digest(
'SHA-224',
new Uint8Array([...salt, ...encoder.encode(password)])
);
// Apply iterations
for (let i = 0; i < this.iterations; i++) {
const iter = new ArrayBuffer(4);
new DataView(iter).setUint32(0, i, false);
const combined = new Uint8Array([
...new Uint8Array(current),
...salt,
...new Uint8Array(iter)
]);
current = await crypto.subtle.digest('SHA-224', combined);
}
return {
salt: Array.from(salt).map(b => b.toString(16).padStart(2, '0')).join(''),
hash: Array.from(new Uint8Array(current))
.map(b => b.toString(16).padStart(2, '0'))
.join('')
};
}
}
// Express.js Integration Example
const express = require('express');
const app = express();
const passwordManager = new SHA224PasswordManager();
app.post('/register', async (req, res) => {
try {
const { username, password } = req.body;
// Hash password
const { salt, hash } = await passwordManager.hashPassword(password);
// Store in database
await db.users.create({
username,
password_salt: salt,
password_hash: hash
});
res.json({ success: true });
} catch (error) {
res.status(500).json({ error: 'Registration failed' });
}
});
app.post('/login', async (req, res) => {
try {
const { username, password } = req.body;
// Retrieve user from database
const user = await db.users.findOne({ username });
if (!user) {
return res.status(401).json({ error: 'Invalid credentials' });
}
// Verify password
const isValid = await passwordManager.verifyPassword(
password,
user.password_salt,
user.password_hash
);
if (isValid) {
// Generate session token
const token = crypto.randomBytes(32).toString('hex');
res.json({ success: true, token });
} else {
res.status(401).json({ error: 'Invalid credentials' });
}
} catch (error) {
res.status(500).json({ error: 'Login failed' });
}
});
import java.security.MessageDigest;
import java.security.SecureRandom;
import java.security.NoSuchAlgorithmException;
import java.util.Arrays;
import javax.xml.bind.DatatypeConverter;
public class SHA224PasswordManager {
private final int iterations;
private final int saltLength;
private final SecureRandom random;
public SHA224PasswordManager(int iterations, int saltLength) {
this.iterations = iterations;
this.saltLength = saltLength;
this.random = new SecureRandom();
}
public static class PasswordHash {
public final String salt;
public final String hash;
public PasswordHash(String salt, String hash) {
this.salt = salt;
this.hash = hash;
}
}
/**
* Hash a password with salt and key stretching
*/
public PasswordHash hashPassword(String password)
throws NoSuchAlgorithmException {
// Generate salt
byte[] salt = new byte[saltLength];
random.nextBytes(salt);
// Apply key stretching
byte[] hash = stretchKey(password.getBytes(), salt);
return new PasswordHash(
DatatypeConverter.printHexBinary(salt).toLowerCase(),
DatatypeConverter.printHexBinary(hash).toLowerCase()
);
}
/**
* Verify password against stored hash
*/
public boolean verifyPassword(String password, String saltHex, String hashHex)
throws NoSuchAlgorithmException {
byte[] salt = DatatypeConverter.parseHexBinary(saltHex.toUpperCase());
byte[] expectedHash = DatatypeConverter.parseHexBinary(hashHex.toUpperCase());
byte[] computedHash = stretchKey(password.getBytes(), salt);
// Constant-time comparison
return MessageDigest.isEqual(computedHash, expectedHash);
}
/**
* Apply key stretching using iterated SHA-224
*/
private byte[] stretchKey(byte[] password, byte[] salt)
throws NoSuchAlgorithmException {
MessageDigest md = MessageDigest.getInstance("SHA-224");
// Initial hash
md.update(salt);
md.update(password);
byte[] current = md.digest();
// Iterate
for (int i = 0; i < iterations; i++) {
md.reset();
md.update(current);
md.update(salt);
md.update(intToBytes(i));
current = md.digest();
}
return current;
}
private byte[] intToBytes(int value) {
return new byte[] {
(byte)(value >>> 24),
(byte)(value >>> 16),
(byte)(value >>> 8),
(byte)value
};
}
// Spring Boot Integration Example
@RestController
@RequestMapping("/api/auth")
public class AuthController {
private final SHA224PasswordManager passwordManager =
new SHA224PasswordManager(100000, 32);
@Autowired
private UserRepository userRepository;
@PostMapping("/register")
public ResponseEntity> register(@RequestBody RegisterRequest request) {
try {
// Check if user exists
if (userRepository.existsByUsername(request.getUsername())) {
return ResponseEntity.badRequest()
.body(new ErrorResponse("Username already exists"));
}
// Hash password
PasswordHash ph = passwordManager.hashPassword(request.getPassword());
// Create user
User user = new User();
user.setUsername(request.getUsername());
user.setPasswordSalt(ph.salt);
user.setPasswordHash(ph.hash);
userRepository.save(user);
return ResponseEntity.ok(new SuccessResponse("User registered"));
} catch (Exception e) {
return ResponseEntity.status(500)
.body(new ErrorResponse("Registration failed"));
}
}
@PostMapping("/login")
public ResponseEntity> login(@RequestBody LoginRequest request) {
try {
// Find user
User user = userRepository.findByUsername(request.getUsername())
.orElse(null);
if (user == null) {
return ResponseEntity.status(401)
.body(new ErrorResponse("Invalid credentials"));
}
// Verify password
boolean isValid = passwordManager.verifyPassword(
request.getPassword(),
user.getPasswordSalt(),
user.getPasswordHash()
);
if (isValid) {
// Generate JWT token
String token = jwtUtil.generateToken(user.getUsername());
return ResponseEntity.ok(new LoginResponse(token));
} else {
return ResponseEntity.status(401)
.body(new ErrorResponse("Invalid credentials"));
}
} catch (Exception e) {
return ResponseEntity.status(500)
.body(new ErrorResponse("Login failed"));
}
}
}
}
package main
import (
"crypto/rand"
"crypto/sha256"
"crypto/subtle"
"encoding/hex"
"encoding/binary"
"fmt"
)
type SHA224PasswordManager struct {
iterations int
saltLength int
}
type PasswordHash struct {
Salt string `json:"salt"`
Hash string `json:"hash"`
}
// NewPasswordManager creates a new password manager
func NewPasswordManager(iterations, saltLength int) *SHA224PasswordManager {
return &SHA224PasswordManager{
iterations: iterations,
saltLength: saltLength,
}
}
// HashPassword hashes a password with salt and key stretching
func (pm *SHA224PasswordManager) HashPassword(password string) (*PasswordHash, error) {
// Generate salt
salt := make([]byte, pm.saltLength)
if _, err := rand.Read(salt); err != nil {
return nil, err
}
// Apply key stretching
hash := pm.stretchKey([]byte(password), salt)
return &PasswordHash{
Salt: hex.EncodeToString(salt),
Hash: hex.EncodeToString(hash),
}, nil
}
// VerifyPassword verifies a password against stored hash
func (pm *SHA224PasswordManager) VerifyPassword(password, saltHex, hashHex string) (bool, error) {
salt, err := hex.DecodeString(saltHex)
if err != nil {
return false, err
}
expectedHash, err := hex.DecodeString(hashHex)
if err != nil {
return false, err
}
computedHash := pm.stretchKey([]byte(password), salt)
// Constant-time comparison
return subtle.ConstantTimeCompare(computedHash, expectedHash) == 1, nil
}
// stretchKey applies key stretching using iterated SHA-224
func (pm *SHA224PasswordManager) stretchKey(password, salt []byte) []byte {
// SHA-224 is SHA-256 truncated to 224 bits
h := sha256.New224()
h.Write(salt)
h.Write(password)
current := h.Sum(nil)
// Iterate
for i := 0; i < pm.iterations; i++ {
h.Reset()
h.Write(current)
h.Write(salt)
// Add iteration count
iterBytes := make([]byte, 4)
binary.BigEndian.PutUint32(iterBytes, uint32(i))
h.Write(iterBytes)
current = h.Sum(nil)
}
return current
}
// Gin Framework Integration Example
package main
import (
"github.com/gin-gonic/gin"
"net/http"
)
type AuthHandler struct {
passwordManager *SHA224PasswordManager
userService *UserService
}
func NewAuthHandler() *AuthHandler {
return &AuthHandler{
passwordManager: NewPasswordManager(100000, 32),
userService: NewUserService(),
}
}
func (h *AuthHandler) Register(c *gin.Context) {
var req struct {
Username string `json:"username" binding:"required"`
Password string `json:"password" binding:"required,min=8"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// Check if user exists
if h.userService.UserExists(req.Username) {
c.JSON(http.StatusConflict, gin.H{"error": "Username already exists"})
return
}
// Hash password
ph, err := h.passwordManager.HashPassword(req.Password)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to hash password"})
return
}
// Create user
user := &User{
Username: req.Username,
PasswordSalt: ph.Salt,
PasswordHash: ph.Hash,
}
if err := h.userService.CreateUser(user); err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create user"})
return
}
c.JSON(http.StatusCreated, gin.H{"message": "User created successfully"})
}
func (h *AuthHandler) Login(c *gin.Context) {
var req struct {
Username string `json:"username" binding:"required"`
Password string `json:"password" binding:"required"`
}
if err := c.ShouldBindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
// Get user
user, err := h.userService.GetUserByUsername(req.Username)
if err != nil {
c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid credentials"})
return
}
// Verify password
valid, err := h.passwordManager.VerifyPassword(
req.Password,
user.PasswordSalt,
user.PasswordHash,
)
if err != nil || !valid {
c.JSON(http.StatusUnauthorized, gin.H{"error": "Invalid credentials"})
return
}
// Generate token
token, err := generateJWT(user.Username)
if err != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to generate token"})
return
}
c.JSON(http.StatusOK, gin.H{"token": token})
}
Build a robust file integrity monitoring system using SHA-224 to detect unauthorized changes, corruption, or tampering in critical files and documents.
import hashlib
import json
import os
from pathlib import Path
from datetime import datetime
from typing import Dict, List, Optional
import sqlite3
class FileIntegrityMonitor:
"""Monitor file integrity using SHA-224 checksums"""
def __init__(self, db_path: str = "integrity.db"):
self.db_path = db_path
self._init_database()
def _init_database(self):
"""Initialize SQLite database for storing checksums"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
CREATE TABLE IF NOT EXISTS file_checksums (
file_path TEXT PRIMARY KEY,
sha224_hash TEXT NOT NULL,
file_size INTEGER,
last_modified REAL,
last_checked TIMESTAMP,
metadata TEXT
)
''')
cursor.execute('''
CREATE TABLE IF NOT EXISTS integrity_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
file_path TEXT,
event_type TEXT,
old_hash TEXT,
new_hash TEXT,
timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
details TEXT
)
''')
conn.commit()
conn.close()
def calculate_file_hash(self, file_path: str, chunk_size: int = 8192) -> str:
"""Calculate SHA-224 hash of a file efficiently"""
sha224 = hashlib.sha224()
with open(file_path, 'rb') as f:
while chunk := f.read(chunk_size):
sha224.update(chunk)
return sha224.hexdigest()
def scan_directory(self, directory: str, patterns: List[str] = None) -> Dict:
"""Scan directory and calculate hashes for all matching files"""
results = {
'scanned': 0,
'new': 0,
'changed': 0,
'unchanged': 0,
'errors': 0,
'files': []
}
path = Path(directory)
# Get all files matching patterns
files = []
if patterns:
for pattern in patterns:
files.extend(path.rglob(pattern))
else:
files = [f for f in path.rglob('*') if f.is_file()]
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
for file_path in files:
results['scanned'] += 1
try:
# Calculate current hash
current_hash = self.calculate_file_hash(str(file_path))
file_stat = file_path.stat()
# Check if file exists in database
cursor.execute(
'SELECT sha224_hash FROM file_checksums WHERE file_path = ?',
(str(file_path),)
)
row = cursor.fetchone()
if row is None:
# New file
results['new'] += 1
self._add_file(cursor, file_path, current_hash, file_stat)
results['files'].append({
'path': str(file_path),
'status': 'new',
'hash': current_hash
})
elif row[0] != current_hash:
# File changed
results['changed'] += 1
self._update_file(cursor, file_path, current_hash, file_stat, row[0])
results['files'].append({
'path': str(file_path),
'status': 'changed',
'old_hash': row[0],
'new_hash': current_hash
})
else:
# File unchanged
results['unchanged'] += 1
self._touch_file(cursor, file_path)
except Exception as e:
results['errors'] += 1
results['files'].append({
'path': str(file_path),
'status': 'error',
'error': str(e)
})
conn.commit()
conn.close()
return results
def verify_file(self, file_path: str) -> Dict:
"""Verify a single file's integrity"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute(
'SELECT sha224_hash, file_size, last_modified FROM file_checksums WHERE file_path = ?',
(file_path,)
)
row = cursor.fetchone()
if row is None:
return {'status': 'not_monitored', 'file': file_path}
try:
current_hash = self.calculate_file_hash(file_path)
file_stat = os.stat(file_path)
if current_hash == row[0]:
return {
'status': 'valid',
'file': file_path,
'hash': current_hash,
'size': file_stat.st_size
}
else:
return {
'status': 'modified',
'file': file_path,
'expected_hash': row[0],
'actual_hash': current_hash,
'old_size': row[1],
'new_size': file_stat.st_size
}
except FileNotFoundError:
return {
'status': 'missing',
'file': file_path,
'last_hash': row[0]
}
except Exception as e:
return {
'status': 'error',
'file': file_path,
'error': str(e)
}
finally:
conn.close()
def _add_file(self, cursor, file_path, hash_value, file_stat):
"""Add new file to monitoring"""
cursor.execute('''
INSERT INTO file_checksums (file_path, sha224_hash, file_size, last_modified, last_checked)
VALUES (?, ?, ?, ?, ?)
''', (str(file_path), hash_value, file_stat.st_size, file_stat.st_mtime, datetime.now()))
cursor.execute('''
INSERT INTO integrity_logs (file_path, event_type, new_hash)
VALUES (?, ?, ?)
''', (str(file_path), 'added', hash_value))
def _update_file(self, cursor, file_path, new_hash, file_stat, old_hash):
"""Update changed file"""
cursor.execute('''
UPDATE file_checksums
SET sha224_hash = ?, file_size = ?, last_modified = ?, last_checked = ?
WHERE file_path = ?
''', (new_hash, file_stat.st_size, file_stat.st_mtime, datetime.now(), str(file_path)))
cursor.execute('''
INSERT INTO integrity_logs (file_path, event_type, old_hash, new_hash)
VALUES (?, ?, ?, ?)
''', (str(file_path), 'modified', old_hash, new_hash))
def _touch_file(self, cursor, file_path):
"""Update last checked time for unchanged file"""
cursor.execute('''
UPDATE file_checksums SET last_checked = ? WHERE file_path = ?
''', (datetime.now(), str(file_path)))
def generate_manifest(self, directory: str, output_file: str):
"""Generate a manifest file with all checksums"""
manifest = {
'version': '1.0',
'created': datetime.now().isoformat(),
'algorithm': 'SHA-224',
'files': {}
}
path = Path(directory)
for file_path in path.rglob('*'):
if file_path.is_file():
try:
rel_path = file_path.relative_to(path)
manifest['files'][str(rel_path)] = {
'sha224': self.calculate_file_hash(str(file_path)),
'size': file_path.stat().st_size
}
except Exception as e:
print(f"Error processing {file_path}: {e}")
with open(output_file, 'w') as f:
json.dump(manifest, f, indent=2)
return manifest
# Usage Example
if __name__ == "__main__":
monitor = FileIntegrityMonitor()
# Scan a directory
results = monitor.scan_directory("/path/to/important/files", patterns=["*.conf", "*.key"])
print(f"Scan Results: {json.dumps(results, indent=2)}")
# Verify specific file
verification = monitor.verify_file("/path/to/critical/file.conf")
print(f"Verification: {json.dumps(verification, indent=2)}")
# Generate manifest
manifest = monitor.generate_manifest("/path/to/directory", "manifest.json")
print(f"Manifest generated with {len(manifest['files'])} files")
Implement secure API request signing using HMAC-SHA224 to ensure authenticity and prevent tampering in REST APIs and webhook implementations.
const crypto = require('crypto');
const express = require('express');
const axios = require('axios');
class HMACSHA224Signer {
constructor(secretKey) {
this.secretKey = secretKey;
}
/**
* Generate signature for API request
*/
generateSignature(method, path, timestamp, body = '') {
// Create canonical request string
const canonicalRequest = [
method.toUpperCase(),
path,
timestamp,
body ? JSON.stringify(body) : ''
].join('\n');
// Generate HMAC-SHA224
const hmac = crypto.createHmac('sha224', this.secretKey);
hmac.update(canonicalRequest);
return hmac.digest('hex');
}
/**
* Verify incoming request signature
*/
verifySignature(signature, method, path, timestamp, body = '') {
const expectedSignature = this.generateSignature(method, path, timestamp, body);
// Constant-time comparison
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expectedSignature)
);
}
/**
* Sign outgoing HTTP request
*/
signRequest(config) {
const timestamp = Date.now();
const signature = this.generateSignature(
config.method || 'GET',
config.url,
timestamp,
config.data
);
// Add signature headers
config.headers = config.headers || {};
config.headers['X-Signature'] = signature;
config.headers['X-Timestamp'] = timestamp;
return config;
}
}
// Express Middleware for Signature Verification
function createSignatureMiddleware(secretKey) {
const signer = new HMACSHA224Signer(secretKey);
return (req, res, next) => {
const signature = req.headers['x-signature'];
const timestamp = req.headers['x-timestamp'];
if (!signature || !timestamp) {
return res.status(401).json({ error: 'Missing signature headers' });
}
// Check timestamp to prevent replay attacks (5 minute window)
const requestTime = parseInt(timestamp);
const now = Date.now();
if (Math.abs(now - requestTime) > 5 * 60 * 1000) {
return res.status(401).json({ error: 'Request expired' });
}
// Verify signature
try {
const isValid = signer.verifySignature(
signature,
req.method,
req.path,
timestamp,
req.body
);
if (!isValid) {
return res.status(401).json({ error: 'Invalid signature' });
}
next();
} catch (error) {
return res.status(500).json({ error: 'Signature verification failed' });
}
};
}
// API Client with Automatic Signing
class SignedAPIClient {
constructor(baseURL, secretKey) {
this.baseURL = baseURL;
this.signer = new HMACSHA224Signer(secretKey);
}
async request(method, path, data = null) {
const timestamp = Date.now();
const signature = this.signer.generateSignature(
method,
path,
timestamp,
data
);
const config = {
method,
url: `${this.baseURL}${path}`,
headers: {
'X-Signature': signature,
'X-Timestamp': timestamp,
'Content-Type': 'application/json'
}
};
if (data) {
config.data = data;
}
try {
const response = await axios(config);
return response.data;
} catch (error) {
if (error.response) {
throw new Error(`API Error: ${error.response.status} - ${error.response.data.error}`);
}
throw error;
}
}
get(path) {
return this.request('GET', path);
}
post(path, data) {
return this.request('POST', path, data);
}
put(path, data) {
return this.request('PUT', path, data);
}
delete(path) {
return this.request('DELETE', path);
}
}
// Webhook Signature Verification
class WebhookVerifier {
constructor(secret) {
this.secret = secret;
}
/**
* Generate webhook signature
*/
generateWebhookSignature(payload) {
const hmac = crypto.createHmac('sha224', this.secret);
hmac.update(JSON.stringify(payload));
return 'sha224=' + hmac.digest('hex');
}
/**
* Verify webhook signature
*/
verifyWebhook(signature, payload) {
const expected = this.generateWebhookSignature(payload);
return crypto.timingSafeEqual(
Buffer.from(signature),
Buffer.from(expected)
);
}
/**
* Express middleware for webhook verification
*/
middleware() {
return (req, res, next) => {
const signature = req.headers['x-webhook-signature'];
if (!signature) {
return res.status(401).json({ error: 'Missing webhook signature' });
}
if (!this.verifyWebhook(signature, req.body)) {
return res.status(401).json({ error: 'Invalid webhook signature' });
}
next();
};
}
}
// Example Express Server with Signed API
const app = express();
app.use(express.json());
// Initialize with secret key
const SECRET_KEY = process.env.API_SECRET_KEY || 'your-secret-key-here';
// Apply signature verification to protected routes
app.use('/api/protected', createSignatureMiddleware(SECRET_KEY));
// Protected endpoint
app.post('/api/protected/data', (req, res) => {
res.json({
message: 'Authenticated request successful',
data: req.body
});
});
// Webhook endpoint
const webhookVerifier = new WebhookVerifier(SECRET_KEY);
app.post('/webhook', webhookVerifier.middleware(), (req, res) => {
console.log('Webhook received:', req.body);
res.json({ status: 'processed' });
});
// Client Usage Example
async function clientExample() {
const client = new SignedAPIClient('http://localhost:3000', SECRET_KEY);
try {
// Make authenticated requests
const result = await client.post('/api/protected/data', {
action: 'create',
resource: 'user',
data: { name: 'John Doe' }
});
console.log('Response:', result);
// Send webhook
const webhookPayload = {
event: 'user.created',
timestamp: Date.now(),
data: { userId: '123' }
};
const webhookSignature = new WebhookVerifier(SECRET_KEY)
.generateWebhookSignature(webhookPayload);
await axios.post('http://localhost:3000/webhook', webhookPayload, {
headers: {
'X-Webhook-Signature': webhookSignature
}
});
} catch (error) {
console.error('Error:', error.message);
}
}
// Start server
const PORT = process.env.PORT || 3000;
app.listen(PORT, () => {
console.log(`Server running on port ${PORT}`);
});
Quick implementation snippets for common SHA-224 use cases