Build robust webhook event processing with idempotency, async patterns, error handling, and production-ready best practices.
Critical: Webhooks may be delivered more than once. Your system must handle duplicate events gracefully.
Use eventId as the unique identifier:
const redis = require('redis');
const client = redis.createClient();
async function isEventProcessed(eventId) {
return await client.exists(`event:${eventId}`);
}
async function markEventProcessed(eventId) {
// Store for 7 days (604800 seconds)
await client.setex(`event:${eventId}`, 604800, 'processed');
}
async function processWebhook(payload) {
const { eventId } = payload;
// Check if already processed
if (await isEventProcessed(eventId)) {
console.log(`Duplicate event ${eventId}, skipping`);
return { status: 'duplicate', eventId };
}
// Mark as processing (prevents race conditions)
const wasSet = await client.set(
`event:${eventId}`,
'processing',
'EX', 604800,
'NX' // Only set if not exists
);
if (!wasSet) {
console.log(`Event ${eventId} already being processed`);
return { status: 'duplicate', eventId };
}
try {
// Process the event
await handleEvent(payload);
// Update to processed
await client.set(`event:${eventId}`, 'processed', 'EX', 604800);
return { status: 'processed', eventId };
} catch (error) {
// Remove processing lock on failure
await client.del(`event:${eventId}`);
throw error;
}
}-- Create processed_events table
CREATE TABLE processed_events (
event_id VARCHAR(255) PRIMARY KEY,
event_name VARCHAR(255) NOT NULL,
work_order_id INTEGER,
processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
payload JSONB,
status VARCHAR(50) DEFAULT 'processed'
);
-- Index for cleanup
CREATE INDEX idx_processed_at ON processed_events(processed_at);const { Pool } = require('pg');
const pool = new Pool();
async function isEventProcessed(eventId) {
const result = await pool.query(
'SELECT event_id FROM processed_events WHERE event_id = $1',
[eventId]
);
return result.rows.length > 0;
}
async function markEventProcessed(eventId, eventName, workOrderId, payload) {
try {
await pool.query(
`INSERT INTO processed_events (event_id, event_name, work_order_id, payload, status)
VALUES ($1, $2, $3, $4, 'processed')
ON CONFLICT (event_id) DO NOTHING`,
[eventId, eventName, workOrderId, JSON.stringify(payload)]
);
} catch (error) {
// Unique constraint violation means already processed
if (error.code === '23505') {
return false;
}
throw error;
}
return true;
}
async function processWebhook(payload) {
const { eventId, eventName, workOrderId } = payload;
// Try to insert event (atomic operation)
const wasInserted = await markEventProcessed(
eventId,
eventName,
workOrderId,
{ status: 'processing' }
);
if (!wasInserted) {
console.log(`Duplicate event ${eventId}`);
return { status: 'duplicate', eventId };
}
try {
// Process the event
await handleEvent(payload);
// Update status
await pool.query(
'UPDATE processed_events SET status = $1, payload = $2 WHERE event_id = $3',
['processed', JSON.stringify(payload), eventId]
);
return { status: 'processed', eventId };
} catch (error) {
// Mark as failed
await pool.query(
'UPDATE processed_events SET status = $1 WHERE event_id = $2',
['failed', eventId]
);
throw error;
}
}
// Cleanup old events (run daily)
async function cleanupOldEvents() {
await pool.query(
'DELETE FROM processed_events WHERE processed_at < NOW() - INTERVAL \'7 days\''
);
}// WARNING: Only for development - data lost on restart
const processedEvents = new Set();
function isEventProcessed(eventId) {
return processedEvents.has(eventId);
}
function markEventProcessed(eventId) {
processedEvents.add(eventId);
}
async function processWebhook(payload) {
const { eventId } = payload;
if (isEventProcessed(eventId)) {
console.log(`Duplicate event ${eventId}`);
return { status: 'duplicate', eventId };
}
markEventProcessed(eventId);
try {
await handleEvent(payload);
return { status: 'processed', eventId };
} catch (error) {
// Remove on failure to allow retry
processedEvents.delete(eventId);
throw error;
}
}Important: Store idempotency keys for at least 7 days to handle late retries and manual replays.
Critical: Respond to webhooks within 5 seconds, then process asynchronously.
const express = require('express');
const { Queue } = require('bull');
const app = express();
const webhookQueue = new Queue('webhooks', 'redis://localhost:6379');
// Webhook endpoint - respond immediately
app.post('/webhooks/fieldnation', async (req, res) => {
try {
// 1. Verify signature
if (!verifySignature(req.body, req.headers['x-fn-signature'])) {
return res.status(401).send('Unauthorized');
}
// 2. Parse payload
const payload = JSON.parse(req.body.toString());
// 3. Check idempotency
if (await isEventProcessed(payload.eventId)) {
console.log(`Duplicate: ${payload.eventId}`);
return res.status(200).send('Already processed');
}
// 4. Queue for processing
await webhookQueue.add('process', payload, {
attempts: 5,
backoff: {
type: 'exponential',
delay: 2000
},
removeOnComplete: true,
removeOnFail: false
});
// 5. Respond immediately
res.status(200).send('OK');
} catch (error) {
console.error('Webhook error:', error);
res.status(500).send('Internal error');
}
});
// Worker - process async
webhookQueue.process('process', async (job) => {
const payload = job.data;
try {
// Mark as processing
await markEventProcessing(payload.eventId);
// Process the event
await processEvent(payload);
// Mark as complete
await markEventProcessed(payload.eventId);
return { status: 'success', eventId: payload.eventId };
} catch (error) {
console.error(`Failed to process ${payload.eventId}:`, error);
throw error; // Will trigger retry
}
});
async function processEvent(payload) {
const { eventName, data } = payload;
switch (eventName) {
case 'workorder.status.published':
await handlePublished(data);
break;
case 'workorder.status.assigned':
await handleAssigned(data);
break;
case 'workorder.status.work_done':
await handleWorkDone(data);
break;
default:
console.log(`Unhandled event: ${eventName}`);
}
}Separate queues by priority or event type:
const highPriorityQueue = new Queue('webhooks-high', 'redis://localhost:6379');
const normalQueue = new Queue('webhooks-normal', 'redis://localhost:6379');
const lowPriorityQueue = new Queue('webhooks-low', 'redis://localhost:6379');
function getQueueForEvent(eventName) {
// High priority: payment, approval
if (eventName.includes('approved') || eventName.includes('paid')) {
return highPriorityQueue;
}
// Low priority: messages, uploads
if (eventName.includes('message') || eventName.includes('upload')) {
return lowPriorityQueue;
}
// Normal priority: everything else
return normalQueue;
}
app.post('/webhooks/fieldnation', async (req, res) => {
const payload = JSON.parse(req.body.toString());
const queue = getQueueForEvent(payload.eventName);
await queue.add('process', payload);
res.status(200).send('OK');
});
// Process each queue with different concurrency
highPriorityQueue.process('process', 10, processEvent);
normalQueue.process('process', 5, processEvent);
lowPriorityQueue.process('process', 2, processEvent);Robust error handling prevents data loss and ensures reliability.
Retry these errors - temporary issues that may resolve:
const RETRIABLE_ERRORS = [
'ECONNREFUSED', // Connection refused
'ETIMEDOUT', // Connection timeout
'ENOTFOUND', // DNS lookup failed
'ENETUNREACH', // Network unreachable
'ECONNRESET', // Connection reset
'503', // Service Unavailable
'504', // Gateway Timeout
'429' // Too Many Requests
];
function isRetriable(error) {
return RETRIABLE_ERRORS.some(code =>
error.code === code ||
error.message.includes(code) ||
error.statusCode === parseInt(code)
);
}
async function processWithRetry(payload) {
try {
await syncToSalesforce(payload.data);
} catch (error) {
if (isRetriable(error)) {
console.log(`Retriable error for ${payload.eventId}:`, error.message);
throw error; // Bull will retry
}
// Non-retriable error - log and move to DLQ
console.error(`Non-retriable error for ${payload.eventId}:`, error);
await sendToDLQ(payload, error);
}
}Don't retry these - permanent failures:
const NON_RETRIABLE_ERRORS = [
'VALIDATION_ERROR', // Invalid data format
'AUTHENTICATION_ERROR', // Invalid credentials
'AUTHORIZATION_ERROR', // Insufficient permissions
'NOT_FOUND', // Resource doesn't exist
'400', // Bad Request
'401', // Unauthorized
'403', // Forbidden
'404', // Not Found
'422' // Unprocessable Entity
];
async function processEvent(payload) {
try {
// Validate payload
if (!validatePayload(payload)) {
throw new Error('VALIDATION_ERROR: Invalid payload format');
}
// Process event
await handleEvent(payload);
} catch (error) {
if (isNonRetriable(error)) {
// Log and move to DLQ immediately
console.error(`Non-retriable error for ${payload.eventId}:`, error);
await sendToDLQ(payload, error);
return; // Don't throw - prevents retry
}
throw error; // Retriable - let it retry
}
}Handle partial success - some operations succeed:
async function processEvent(payload) {
const results = {
salesforce: null,
database: null,
notification: null
};
try {
// Step 1: Update Salesforce
results.salesforce = await syncToSalesforce(payload.data);
} catch (error) {
console.error('Salesforce sync failed:', error);
// Continue - don't fail entire job
}
try {
// Step 2: Update database
results.database = await updateDatabase(payload.data);
} catch (error) {
console.error('Database update failed:', error);
throw error; // Critical failure - retry everything
}
try {
// Step 3: Send notification
results.notification = await sendNotification(payload.data);
} catch (error) {
console.error('Notification failed:', error);
// Log but don't fail - not critical
}
// Log results
await logProcessingResults(payload.eventId, results);
return results;
}Send permanently failed events to DLQ for manual review:
const dlqQueue = new Queue('webhooks-dlq', 'redis://localhost:6379');
async function sendToDLQ(payload, error) {
await dlqQueue.add('failed', {
payload,
error: {
message: error.message,
stack: error.stack,
code: error.code
},
failedAt: new Date().toISOString(),
attempts: payload.attempts || 0
}, {
removeOnComplete: false, // Keep DLQ items
removeOnFail: false
});
// Alert team
await alertTeam({
type: 'webhook_dlq',
eventId: payload.eventId,
eventName: payload.eventName,
error: error.message
});
}
// Monitor DLQ
dlqQueue.on('completed', async (job) => {
console.log(`DLQ item resolved: ${job.data.payload.eventId}`);
});Organize your code by event type for clarity and maintainability:
// handlers/workorder.js
class WorkOrderHandlers {
async handlePublished(data) {
console.log(`Work order ${data.id} published`);
// Notify provider network
await this.notifyProviders(data);
// Update dispatch board
await this.updateDispatchBoard(data);
// Sync to Salesforce
await this.syncToSalesforce(data);
}
async handleAssigned(data) {
console.log(`Work order ${data.id} assigned to provider ${data.provider.id}`);
// Notify provider
await this.notifyProvider(data.provider, data);
// Update scheduling system
await this.updateSchedule(data);
// Log assignment
await this.logAssignment(data);
}
async handleWorkDone(data) {
console.log(`Work order ${data.id} work completed`);
// Trigger approval workflow
await this.triggerApprovalWorkflow(data);
// Notify buyer
await this.notifyBuyer(data.buyer, data);
// Update analytics
await this.updateMetrics(data);
}
async handleApproved(data) {
console.log(`Work order ${data.id} approved`);
// Generate invoice
await this.generateInvoice(data);
// Update accounting system
await this.updateAccounting(data);
// Archive work order
await this.archiveWorkOrder(data);
}
}
// handlers/index.js
const workOrderHandlers = new WorkOrderHandlers();
async function processEvent(payload) {
const { eventName, data } = payload;
// Route to appropriate handler
switch (eventName) {
case 'workorder.status.published':
return await workOrderHandlers.handlePublished(data);
case 'workorder.status.assigned':
return await workOrderHandlers.handleAssigned(data);
case 'workorder.status.work_done':
return await workOrderHandlers.handleWorkDone(data);
case 'workorder.status.approved':
return await workOrderHandlers.handleApproved(data);
default:
console.log(`No handler for event: ${eventName}`);
}
}Prevent cascading failures when downstream services are unavailable:
const CircuitBreaker = require('opossum');
// Configure circuit breaker
const salesforceBreaker = new CircuitBreaker(async (data) => {
return await syncToSalesforce(data);
}, {
timeout: 5000, // 5 second timeout
errorThresholdPercentage: 50, // Open after 50% failures
resetTimeout: 30000, // Try again after 30 seconds
rollingCountTimeout: 10000, // Track errors over 10 seconds
rollingCountBuckets: 10, // 10 buckets
name: 'salesforce'
});
// Monitor circuit breaker state
salesforceBreaker.on('open', () => {
console.error('Circuit breaker OPEN - Salesforce unavailable');
alertTeam({ service: 'salesforce', status: 'circuit_open' });
});
salesforceBreaker.on('halfOpen', () => {
console.log('Circuit breaker HALF-OPEN - Testing Salesforce');
});
salesforceBreaker.on('close', () => {
console.log('Circuit breaker CLOSED - Salesforce recovered');
});
// Use in event handler
async function handlePublished(data) {
try {
await salesforceBreaker.fire(data);
} catch (error) {
if (salesforceBreaker.opened) {
console.log('Salesforce circuit open, queuing for later');
await queueForLater(data);
} else {
throw error;
}
}
}Protect your downstream services from overload:
const Bottleneck = require('bottleneck');
// Rate limiter: max 10 requests per second
const salesforceLimiter = new Bottleneck({
maxConcurrent: 5, // Max 5 concurrent requests
minTime: 100, // Min 100ms between requests
reservoir: 10, // 10 requests
reservoirRefreshAmount: 10,
reservoirRefreshInterval: 1000 // per second
});
async function syncToSalesforce(data) {
return await salesforceLimiter.schedule(async () => {
const response = await fetch('https://salesforce.com/api/workorders', {
method: 'POST',
headers: { 'Authorization': `Bearer ${token}` },
body: JSON.stringify(data)
});
return await response.json();
});
}Track webhook processing health:
const prometheus = require('prom-client');
// Metrics
const webhookCounter = new prometheus.Counter({
name: 'webhooks_received_total',
help: 'Total webhooks received',
labelNames: ['event_name', 'status']
});
const webhookDuration = new prometheus.Histogram({
name: 'webhook_processing_duration_seconds',
help: 'Time to process webhook',
labelNames: ['event_name'],
buckets: [0.1, 0.5, 1, 2, 5, 10]
});
const webhookErrors = new prometheus.Counter({
name: 'webhook_errors_total',
help: 'Total webhook processing errors',
labelNames: ['event_name', 'error_type']
});
// Instrumented handler
async function processEvent(payload) {
const timer = webhookDuration.startTimer({ event_name: payload.eventName });
try {
await handleEvent(payload);
webhookCounter.inc({ event_name: payload.eventName, status: 'success' });
timer();
} catch (error) {
webhookCounter.inc({ event_name: payload.eventName, status: 'error' });
webhookErrors.inc({
event_name: payload.eventName,
error_type: error.code || 'unknown'
});
timer();
throw error;
}
}Here's a full production-ready webhook handler:
const express = require('express');
const { Queue } = require('bull');
const CircuitBreaker = require('opossum');
const redis = require('redis');
const app = express();
const redisClient = redis.createClient();
const webhookQueue = new Queue('webhooks', 'redis://localhost:6379');
// Circuit breakers
const salesforceBreaker = new CircuitBreaker(syncToSalesforce, {
timeout: 5000,
errorThresholdPercentage: 50,
resetTimeout: 30000
});
// Webhook endpoint
app.use(express.raw({ type: 'application/json' }));
app.post('/webhooks/fieldnation', async (req, res) => {
try {
// 1. Verify signature
if (!verifySignature(req.body, req.headers['x-fn-signature'], process.env.WEBHOOK_SECRET)) {
return res.status(401).send('Unauthorized');
}
// 2. Parse payload
const payload = JSON.parse(req.body.toString());
// 3. Check idempotency
const alreadyProcessed = await redisClient.exists(`event:${payload.eventId}`);
if (alreadyProcessed) {
return res.status(200).send('Already processed');
}
// 4. Queue for async processing
await webhookQueue.add('process', payload, {
attempts: 5,
backoff: { type: 'exponential', delay: 2000 },
removeOnComplete: true
});
// 5. Respond immediately
res.status(200).send('OK');
} catch (error) {
console.error('Webhook endpoint error:', error);
res.status(500).send('Internal error');
}
});
// Worker
webhookQueue.process('process', 5, async (job) => {
const payload = job.data;
try {
// Mark as processing
await redisClient.set(`event:${payload.eventId}`, 'processing', 'EX', 604800, 'NX');
// Process event
await processEvent(payload);
// Mark complete
await redisClient.set(`event:${payload.eventId}`, 'processed', 'EX', 604800);
} catch (error) {
console.error(`Processing failed for ${payload.eventId}:`, error);
if (isNonRetriable(error)) {
await sendToDLQ(payload, error);
return; // Don't throw - prevents retry
}
throw error; // Retriable
}
});
async function processEvent(payload) {
const { eventName, data } = payload;
switch (eventName) {
case 'workorder.status.published':
await salesforceBreaker.fire(data);
break;
case 'workorder.status.assigned':
await handleAssigned(data);
break;
case 'workorder.status.work_done':
await handleWorkDone(data);
break;
default:
console.log(`Unhandled event: ${eventName}`);
}
}
app.listen(3000);eventIdLast updated on