Field NationDeveloper Platform
Field NationDeveloper Platform
IntroductionQuickstartAPI Playground

Documentation

Creating WebhooksSecurity Best Practices

Implementation

Handling EventsMonitoring WebhooksTesting Webhooks

Advanced

Payload Customization

Support

Migration Guide
Guides

Handling Events

Build robust webhook event processing with idempotency, async patterns, error handling, and production-ready best practices.


Event Processing Architecture

Loading diagram...

1. Idempotency

Critical: Webhooks may be delivered more than once. Your system must handle duplicate events gracefully.

Why Idempotency Matters

  • Retry logic: Failed deliveries are retried
  • Network issues: Timeout may occur after processing
  • Manual retries: Operators can manually retry events
  • At-least-once delivery: Field Nation guarantees at-least-once, not exactly-once

Implementing Idempotency

Use eventId as the unique identifier:

const redis = require('redis');
const client = redis.createClient();

async function isEventProcessed(eventId) {
  return await client.exists(`event:${eventId}`);
}

async function markEventProcessed(eventId) {
  // Store for 7 days (604800 seconds)
  await client.setex(`event:${eventId}`, 604800, 'processed');
}

async function processWebhook(payload) {
  const { eventId } = payload;

  // Check if already processed
  if (await isEventProcessed(eventId)) {
    console.log(`Duplicate event ${eventId}, skipping`);
    return { status: 'duplicate', eventId };
  }

  // Mark as processing (prevents race conditions)
  const wasSet = await client.set(
    `event:${eventId}`,
    'processing',
    'EX', 604800,
    'NX'  // Only set if not exists
  );

  if (!wasSet) {
    console.log(`Event ${eventId} already being processed`);
    return { status: 'duplicate', eventId };
  }

  try {
    // Process the event
    await handleEvent(payload);

    // Update to processed
    await client.set(`event:${eventId}`, 'processed', 'EX', 604800);

    return { status: 'processed', eventId };
  } catch (error) {
    // Remove processing lock on failure
    await client.del(`event:${eventId}`);
    throw error;
  }
}
-- Create processed_events table
CREATE TABLE processed_events (
  event_id VARCHAR(255) PRIMARY KEY,
  event_name VARCHAR(255) NOT NULL,
  work_order_id INTEGER,
  processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  payload JSONB,
  status VARCHAR(50) DEFAULT 'processed'
);

-- Index for cleanup
CREATE INDEX idx_processed_at ON processed_events(processed_at);
const { Pool } = require('pg');
const pool = new Pool();

async function isEventProcessed(eventId) {
  const result = await pool.query(
    'SELECT event_id FROM processed_events WHERE event_id = $1',
    [eventId]
  );
  return result.rows.length > 0;
}

async function markEventProcessed(eventId, eventName, workOrderId, payload) {
  try {
    await pool.query(
      `INSERT INTO processed_events (event_id, event_name, work_order_id, payload, status)
       VALUES ($1, $2, $3, $4, 'processed')
       ON CONFLICT (event_id) DO NOTHING`,
      [eventId, eventName, workOrderId, JSON.stringify(payload)]
    );
  } catch (error) {
    // Unique constraint violation means already processed
    if (error.code === '23505') {
      return false;
    }
    throw error;
  }
  return true;
}

async function processWebhook(payload) {
  const { eventId, eventName, workOrderId } = payload;

  // Try to insert event (atomic operation)
  const wasInserted = await markEventProcessed(
    eventId,
    eventName,
    workOrderId,
    { status: 'processing' }
  );

  if (!wasInserted) {
    console.log(`Duplicate event ${eventId}`);
    return { status: 'duplicate', eventId };
  }

  try {
    // Process the event
    await handleEvent(payload);

    // Update status
    await pool.query(
      'UPDATE processed_events SET status = $1, payload = $2 WHERE event_id = $3',
      ['processed', JSON.stringify(payload), eventId]
    );

    return { status: 'processed', eventId };
  } catch (error) {
    // Mark as failed
    await pool.query(
      'UPDATE processed_events SET status = $1 WHERE event_id = $2',
      ['failed', eventId]
    );
    throw error;
  }
}

// Cleanup old events (run daily)
async function cleanupOldEvents() {
  await pool.query(
    'DELETE FROM processed_events WHERE processed_at < NOW() - INTERVAL \'7 days\''
  );
}
// WARNING: Only for development - data lost on restart
const processedEvents = new Set();

function isEventProcessed(eventId) {
  return processedEvents.has(eventId);
}

function markEventProcessed(eventId) {
  processedEvents.add(eventId);
}

async function processWebhook(payload) {
  const { eventId } = payload;

  if (isEventProcessed(eventId)) {
    console.log(`Duplicate event ${eventId}`);
    return { status: 'duplicate', eventId };
  }

  markEventProcessed(eventId);

  try {
    await handleEvent(payload);
    return { status: 'processed', eventId };
  } catch (error) {
    // Remove on failure to allow retry
    processedEvents.delete(eventId);
    throw error;
  }
}

Important: Store idempotency keys for at least 7 days to handle late retries and manual replays.


2. Async Processing Pattern

Critical: Respond to webhooks within 5 seconds, then process asynchronously.

Why Async Processing?

  • Fast response: Prevents timeouts and retries
  • Scalability: Handle high event volumes
  • Reliability: Failures don't block webhook delivery
  • Resource management: Control processing concurrency

Basic Async Pattern

const express = require('express');
const { Queue } = require('bull');

const app = express();
const webhookQueue = new Queue('webhooks', 'redis://localhost:6379');

// Webhook endpoint - respond immediately
app.post('/webhooks/fieldnation', async (req, res) => {
  try {
    // 1. Verify signature
    if (!verifySignature(req.body, req.headers['x-fn-signature'])) {
      return res.status(401).send('Unauthorized');
    }

    // 2. Parse payload
    const payload = JSON.parse(req.body.toString());

    // 3. Check idempotency
    if (await isEventProcessed(payload.eventId)) {
      console.log(`Duplicate: ${payload.eventId}`);
      return res.status(200).send('Already processed');
    }

    // 4. Queue for processing
    await webhookQueue.add('process', payload, {
      attempts: 5,
      backoff: {
        type: 'exponential',
        delay: 2000
      },
      removeOnComplete: true,
      removeOnFail: false
    });

    // 5. Respond immediately
    res.status(200).send('OK');

  } catch (error) {
    console.error('Webhook error:', error);
    res.status(500).send('Internal error');
  }
});

// Worker - process async
webhookQueue.process('process', async (job) => {
  const payload = job.data;

  try {
    // Mark as processing
    await markEventProcessing(payload.eventId);

    // Process the event
    await processEvent(payload);

    // Mark as complete
    await markEventProcessed(payload.eventId);

    return { status: 'success', eventId: payload.eventId };
  } catch (error) {
    console.error(`Failed to process ${payload.eventId}:`, error);
    throw error; // Will trigger retry
  }
});

async function processEvent(payload) {
  const { eventName, data } = payload;

  switch (eventName) {
    case 'workorder.status.published':
      await handlePublished(data);
      break;
    case 'workorder.status.assigned':
      await handleAssigned(data);
      break;
    case 'workorder.status.work_done':
      await handleWorkDone(data);
      break;
    default:
      console.log(`Unhandled event: ${eventName}`);
  }
}

Advanced: Multi-Queue Architecture

Separate queues by priority or event type:

const highPriorityQueue = new Queue('webhooks-high', 'redis://localhost:6379');
const normalQueue = new Queue('webhooks-normal', 'redis://localhost:6379');
const lowPriorityQueue = new Queue('webhooks-low', 'redis://localhost:6379');

function getQueueForEvent(eventName) {
  // High priority: payment, approval
  if (eventName.includes('approved') || eventName.includes('paid')) {
    return highPriorityQueue;
  }

  // Low priority: messages, uploads
  if (eventName.includes('message') || eventName.includes('upload')) {
    return lowPriorityQueue;
  }

  // Normal priority: everything else
  return normalQueue;
}

app.post('/webhooks/fieldnation', async (req, res) => {
  const payload = JSON.parse(req.body.toString());
  const queue = getQueueForEvent(payload.eventName);

  await queue.add('process', payload);
  res.status(200).send('OK');
});

// Process each queue with different concurrency
highPriorityQueue.process('process', 10, processEvent);
normalQueue.process('process', 5, processEvent);
lowPriorityQueue.process('process', 2, processEvent);

3. Error Handling

Robust error handling prevents data loss and ensures reliability.

Error Categories

Retry these errors - temporary issues that may resolve:

const RETRIABLE_ERRORS = [
  'ECONNREFUSED',    // Connection refused
  'ETIMEDOUT',       // Connection timeout
  'ENOTFOUND',       // DNS lookup failed
  'ENETUNREACH',     // Network unreachable
  'ECONNRESET',      // Connection reset
  '503',             // Service Unavailable
  '504',             // Gateway Timeout
  '429'              // Too Many Requests
];

function isRetriable(error) {
  return RETRIABLE_ERRORS.some(code =>
    error.code === code ||
    error.message.includes(code) ||
    error.statusCode === parseInt(code)
  );
}

async function processWithRetry(payload) {
  try {
    await syncToSalesforce(payload.data);
  } catch (error) {
    if (isRetriable(error)) {
      console.log(`Retriable error for ${payload.eventId}:`, error.message);
      throw error; // Bull will retry
    }

    // Non-retriable error - log and move to DLQ
    console.error(`Non-retriable error for ${payload.eventId}:`, error);
    await sendToDLQ(payload, error);
  }
}

Don't retry these - permanent failures:

const NON_RETRIABLE_ERRORS = [
  'VALIDATION_ERROR',     // Invalid data format
  'AUTHENTICATION_ERROR', // Invalid credentials
  'AUTHORIZATION_ERROR',  // Insufficient permissions
  'NOT_FOUND',           // Resource doesn't exist
  '400',                 // Bad Request
  '401',                 // Unauthorized
  '403',                 // Forbidden
  '404',                 // Not Found
  '422'                  // Unprocessable Entity
];

async function processEvent(payload) {
  try {
    // Validate payload
    if (!validatePayload(payload)) {
      throw new Error('VALIDATION_ERROR: Invalid payload format');
    }

    // Process event
    await handleEvent(payload);

  } catch (error) {
    if (isNonRetriable(error)) {
      // Log and move to DLQ immediately
      console.error(`Non-retriable error for ${payload.eventId}:`, error);
      await sendToDLQ(payload, error);
      return; // Don't throw - prevents retry
    }

    throw error; // Retriable - let it retry
  }
}

Handle partial success - some operations succeed:

async function processEvent(payload) {
  const results = {
    salesforce: null,
    database: null,
    notification: null
  };

  try {
    // Step 1: Update Salesforce
    results.salesforce = await syncToSalesforce(payload.data);
  } catch (error) {
    console.error('Salesforce sync failed:', error);
    // Continue - don't fail entire job
  }

  try {
    // Step 2: Update database
    results.database = await updateDatabase(payload.data);
  } catch (error) {
    console.error('Database update failed:', error);
    throw error; // Critical failure - retry everything
  }

  try {
    // Step 3: Send notification
    results.notification = await sendNotification(payload.data);
  } catch (error) {
    console.error('Notification failed:', error);
    // Log but don't fail - not critical
  }

  // Log results
  await logProcessingResults(payload.eventId, results);

  return results;
}

Dead Letter Queue (DLQ)

Send permanently failed events to DLQ for manual review:

const dlqQueue = new Queue('webhooks-dlq', 'redis://localhost:6379');

async function sendToDLQ(payload, error) {
  await dlqQueue.add('failed', {
    payload,
    error: {
      message: error.message,
      stack: error.stack,
      code: error.code
    },
    failedAt: new Date().toISOString(),
    attempts: payload.attempts || 0
  }, {
    removeOnComplete: false, // Keep DLQ items
    removeOnFail: false
  });

  // Alert team
  await alertTeam({
    type: 'webhook_dlq',
    eventId: payload.eventId,
    eventName: payload.eventName,
    error: error.message
  });
}

// Monitor DLQ
dlqQueue.on('completed', async (job) => {
  console.log(`DLQ item resolved: ${job.data.payload.eventId}`);
});

4. Event-Specific Handlers

Organize your code by event type for clarity and maintainability:

// handlers/workorder.js
class WorkOrderHandlers {
  async handlePublished(data) {
    console.log(`Work order ${data.id} published`);

    // Notify provider network
    await this.notifyProviders(data);

    // Update dispatch board
    await this.updateDispatchBoard(data);

    // Sync to Salesforce
    await this.syncToSalesforce(data);
  }

  async handleAssigned(data) {
    console.log(`Work order ${data.id} assigned to provider ${data.provider.id}`);

    // Notify provider
    await this.notifyProvider(data.provider, data);

    // Update scheduling system
    await this.updateSchedule(data);

    // Log assignment
    await this.logAssignment(data);
  }

  async handleWorkDone(data) {
    console.log(`Work order ${data.id} work completed`);

    // Trigger approval workflow
    await this.triggerApprovalWorkflow(data);

    // Notify buyer
    await this.notifyBuyer(data.buyer, data);

    // Update analytics
    await this.updateMetrics(data);
  }

  async handleApproved(data) {
    console.log(`Work order ${data.id} approved`);

    // Generate invoice
    await this.generateInvoice(data);

    // Update accounting system
    await this.updateAccounting(data);

    // Archive work order
    await this.archiveWorkOrder(data);
  }
}

// handlers/index.js
const workOrderHandlers = new WorkOrderHandlers();

async function processEvent(payload) {
  const { eventName, data } = payload;

  // Route to appropriate handler
  switch (eventName) {
    case 'workorder.status.published':
      return await workOrderHandlers.handlePublished(data);

    case 'workorder.status.assigned':
      return await workOrderHandlers.handleAssigned(data);

    case 'workorder.status.work_done':
      return await workOrderHandlers.handleWorkDone(data);

    case 'workorder.status.approved':
      return await workOrderHandlers.handleApproved(data);

    default:
      console.log(`No handler for event: ${eventName}`);
  }
}

5. Circuit Breaker Pattern

Prevent cascading failures when downstream services are unavailable:

const CircuitBreaker = require('opossum');

// Configure circuit breaker
const salesforceBreaker = new CircuitBreaker(async (data) => {
  return await syncToSalesforce(data);
}, {
  timeout: 5000,                    // 5 second timeout
  errorThresholdPercentage: 50,     // Open after 50% failures
  resetTimeout: 30000,              // Try again after 30 seconds
  rollingCountTimeout: 10000,       // Track errors over 10 seconds
  rollingCountBuckets: 10,          // 10 buckets
  name: 'salesforce'
});

// Monitor circuit breaker state
salesforceBreaker.on('open', () => {
  console.error('Circuit breaker OPEN - Salesforce unavailable');
  alertTeam({ service: 'salesforce', status: 'circuit_open' });
});

salesforceBreaker.on('halfOpen', () => {
  console.log('Circuit breaker HALF-OPEN - Testing Salesforce');
});

salesforceBreaker.on('close', () => {
  console.log('Circuit breaker CLOSED - Salesforce recovered');
});

// Use in event handler
async function handlePublished(data) {
  try {
    await salesforceBreaker.fire(data);
  } catch (error) {
    if (salesforceBreaker.opened) {
      console.log('Salesforce circuit open, queuing for later');
      await queueForLater(data);
    } else {
      throw error;
    }
  }
}

6. Rate Limiting Downstream Services

Protect your downstream services from overload:

const Bottleneck = require('bottleneck');

// Rate limiter: max 10 requests per second
const salesforceLimiter = new Bottleneck({
  maxConcurrent: 5,        // Max 5 concurrent requests
  minTime: 100,            // Min 100ms between requests
  reservoir: 10,           // 10 requests
  reservoirRefreshAmount: 10,
  reservoirRefreshInterval: 1000 // per second
});

async function syncToSalesforce(data) {
  return await salesforceLimiter.schedule(async () => {
    const response = await fetch('https://salesforce.com/api/workorders', {
      method: 'POST',
      headers: { 'Authorization': `Bearer ${token}` },
      body: JSON.stringify(data)
    });
    return await response.json();
  });
}

7. Monitoring & Observability

Track webhook processing health:

const prometheus = require('prom-client');

// Metrics
const webhookCounter = new prometheus.Counter({
  name: 'webhooks_received_total',
  help: 'Total webhooks received',
  labelNames: ['event_name', 'status']
});

const webhookDuration = new prometheus.Histogram({
  name: 'webhook_processing_duration_seconds',
  help: 'Time to process webhook',
  labelNames: ['event_name'],
  buckets: [0.1, 0.5, 1, 2, 5, 10]
});

const webhookErrors = new prometheus.Counter({
  name: 'webhook_errors_total',
  help: 'Total webhook processing errors',
  labelNames: ['event_name', 'error_type']
});

// Instrumented handler
async function processEvent(payload) {
  const timer = webhookDuration.startTimer({ event_name: payload.eventName });

  try {
    await handleEvent(payload);

    webhookCounter.inc({ event_name: payload.eventName, status: 'success' });
    timer();

  } catch (error) {
    webhookCounter.inc({ event_name: payload.eventName, status: 'error' });
    webhookErrors.inc({
      event_name: payload.eventName,
      error_type: error.code || 'unknown'
    });
    timer();
    throw error;
  }
}

Complete Production Example

Here's a full production-ready webhook handler:

webhook-handler.js
const express = require('express');
const { Queue } = require('bull');
const CircuitBreaker = require('opossum');
const redis = require('redis');

const app = express();
const redisClient = redis.createClient();
const webhookQueue = new Queue('webhooks', 'redis://localhost:6379');

// Circuit breakers
const salesforceBreaker = new CircuitBreaker(syncToSalesforce, {
  timeout: 5000,
  errorThresholdPercentage: 50,
  resetTimeout: 30000
});

// Webhook endpoint
app.use(express.raw({ type: 'application/json' }));

app.post('/webhooks/fieldnation', async (req, res) => {
  try {
    // 1. Verify signature
    if (!verifySignature(req.body, req.headers['x-fn-signature'], process.env.WEBHOOK_SECRET)) {
      return res.status(401).send('Unauthorized');
    }

    // 2. Parse payload
    const payload = JSON.parse(req.body.toString());

    // 3. Check idempotency
    const alreadyProcessed = await redisClient.exists(`event:${payload.eventId}`);
    if (alreadyProcessed) {
      return res.status(200).send('Already processed');
    }

    // 4. Queue for async processing
    await webhookQueue.add('process', payload, {
      attempts: 5,
      backoff: { type: 'exponential', delay: 2000 },
      removeOnComplete: true
    });

    // 5. Respond immediately
    res.status(200).send('OK');

  } catch (error) {
    console.error('Webhook endpoint error:', error);
    res.status(500).send('Internal error');
  }
});

// Worker
webhookQueue.process('process', 5, async (job) => {
  const payload = job.data;

  try {
    // Mark as processing
    await redisClient.set(`event:${payload.eventId}`, 'processing', 'EX', 604800, 'NX');

    // Process event
    await processEvent(payload);

    // Mark complete
    await redisClient.set(`event:${payload.eventId}`, 'processed', 'EX', 604800);

  } catch (error) {
    console.error(`Processing failed for ${payload.eventId}:`, error);

    if (isNonRetriable(error)) {
      await sendToDLQ(payload, error);
      return; // Don't throw - prevents retry
    }

    throw error; // Retriable
  }
});

async function processEvent(payload) {
  const { eventName, data } = payload;

  switch (eventName) {
    case 'workorder.status.published':
      await salesforceBreaker.fire(data);
      break;

    case 'workorder.status.assigned':
      await handleAssigned(data);
      break;

    case 'workorder.status.work_done':
      await handleWorkDone(data);
      break;

    default:
      console.log(`Unhandled event: ${eventName}`);
  }
}

app.listen(3000);

Best Practices Checklist

  • ✅ Implement idempotency with eventId
  • ✅ Respond to webhooks within 5 seconds
  • ✅ Process events asynchronously
  • ✅ Handle retriable vs non-retriable errors differently
  • ✅ Use circuit breakers for downstream services
  • ✅ Rate limit calls to external APIs
  • ✅ Implement Dead Letter Queue for failed events
  • ✅ Monitor processing metrics
  • ✅ Log errors with context
  • ✅ Set up alerts for anomalies

Last updated on

Security Best Practices

Secure your webhook endpoints with HMAC-SHA256 signature verification, IP whitelisting, HTTPS, and custom authentication headers.

Monitoring Webhooks

Set up monitoring, alerts, and dashboards to track webhook delivery health, processing metrics, and system reliability.

On this page

Event Processing Architecture
1. Idempotency
Why Idempotency Matters
Implementing Idempotency
2. Async Processing Pattern
Why Async Processing?
Basic Async Pattern
Advanced: Multi-Queue Architecture
3. Error Handling
Error Categories
Dead Letter Queue (DLQ)
4. Event-Specific Handlers
5. Circuit Breaker Pattern
6. Rate Limiting Downstream Services
7. Monitoring & Observability
Complete Production Example
Best Practices Checklist