OpenWorkflow

Production Guide

Best practices for deploying OpenWorkflow to production

Production Guide

This guide covers best practices for deploying OpenWorkflow to production, including infrastructure setup, monitoring, scaling, and operational considerations.

Production Checklist

Before Going to Production

Before deploying to production, ensure you have all items below configured. Missing any of these can lead to data loss, downtime, or performance issues.

Ensure you have:

  • Production PostgreSQL database with backups
  • At least one worker process running
  • Graceful shutdown handlers for clean deploys
  • Monitoring and alerting for failed workflows
  • Connection pooling configured
  • Namespace isolation for environments
  • Cleanup strategy for old workflow data
  • Error tracking (e.g., Sentry, Datadog)
  • Performance tuning (concurrency, indexes)

Database Setup

Production PostgreSQL

Use a managed PostgreSQL service for production:

  • AWS RDS: Fully managed, automatic backups
  • Google Cloud SQL: High availability, point-in-time recovery
  • Azure Database: Built-in monitoring and scaling
  • Neon: Serverless Postgres with autoscaling
  • Supabase: Postgres with REST API

Configuration

const backend = await BackendPostgres.connect(
  process.env.DATABASE_URL!,
  {
    namespaceId: process.env.NODE_ENV || "production",
  }
);

Connection String Format

# Use SSL in production
export DATABASE_URL="postgresql://user:pass@db.example.com:5432/prod?sslmode=require"

Backups

Implement regular backups:

  • Automated backups: Use your cloud provider's backup service
  • Point-in-time recovery: Enable WAL archiving
  • Test restores: Regularly verify backup integrity

Security

  1. Use SSL/TLS: Add ?sslmode=require to connection string
  2. Restrict access: Whitelist only necessary IP addresses
  3. Rotate credentials: Use secrets management (AWS Secrets Manager, Vault)
  4. Least privilege: Grant only necessary database permissions
-- Create a dedicated user with limited permissions
CREATE USER openworkflow_app WITH PASSWORD 'secure_password';
GRANT SELECT, INSERT, UPDATE, DELETE ON workflow_runs TO openworkflow_app;
GRANT SELECT, INSERT, UPDATE, DELETE ON step_attempts TO openworkflow_app;

Worker Deployment

Production Project Structure

worker.ts
app.ts
config.ts
docker-compose.yml
package.json
.env.production

Standalone Worker Process

Run workers as separate processes from your application:

worker.ts
import { BackendPostgres } from "@openworkflow/backend-postgres";
import { OpenWorkflow } from "openworkflow";

// Import all workflow definitions
import "./workflows/index.js";

const backend = await BackendPostgres.connect(process.env.DATABASE_URL!);
const ow = new OpenWorkflow({ backend });

const worker = ow.newWorker({
  concurrency: parseInt(process.env.WORKER_CONCURRENCY || "10", 10),
});

await worker.start();
console.log("Worker started");

// Graceful shutdown
process.on("SIGTERM", async () => {
  console.log("Received SIGTERM, shutting down...");
  await worker.stop();
  await backend.stop();
  process.exit(0);
});

process.on("SIGINT", async () => {
  console.log("Received SIGINT, shutting down...");
  await worker.stop();
  await backend.stop();
  process.exit(0);
});

Docker Deployment

Create a Dockerfile for your worker:

Dockerfile
FROM node:20-alpine

WORKDIR /app

COPY package*.json ./
RUN npm ci --production

COPY . .

CMD ["node", "worker.js"]

Docker Compose

docker-compose.yml
version: "3.8"

services:
  postgres:
    image: postgres:16
    environment:
      POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
      POSTGRES_DB: openworkflow
    volumes:
      - postgres-data:/var/lib/postgresql/data
    restart: always

  worker:
    build: .
    environment:
      DATABASE_URL: postgresql://postgres:${POSTGRES_PASSWORD}@postgres:5432/openworkflow
      WORKER_CONCURRENCY: 20
      NODE_ENV: production
    depends_on:
      - postgres
    restart: always
    deploy:
      replicas: 3 # Run 3 worker instances

volumes:
  postgres-data:

Kubernetes Deployment

worker-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: openworkflow-worker
spec:
  replicas: 3
  selector:
    matchLabels:
      app: openworkflow-worker
  template:
    metadata:
      labels:
        app: openworkflow-worker
    spec:
      containers:
      - name: worker
        image: your-registry/openworkflow-worker:latest
        env:
        - name: DATABASE_URL
          valueFrom:
            secretKeyRef:
              name: openworkflow-secrets
              key: database-url
        - name: WORKER_CONCURRENCY
          value: "20"
        - name: NODE_ENV
          value: "production"
        resources:
          requests:
            memory: "256Mi"
            cpu: "250m"
          limits:
            memory: "512Mi"
            cpu: "500m"
        lifecycle:
          preStop:
            exec:
              command: ["/bin/sh", "-c", "sleep 10"] # Allow graceful shutdown

Concurrency Tuning

Choosing Concurrency

Start with these guidelines and adjust based on monitoring:

// Low-latency, many short workflows (< 1 second each)
const worker = ow.newWorker({ concurrency: 100 });

// Medium-latency workflows (1-10 seconds each)
const worker = ow.newWorker({ concurrency: 20 });

// High-latency workflows (> 10 seconds each)
const worker = ow.newWorker({ concurrency: 10 });

Monitoring Resource Usage

Monitor CPU, memory, and database connections:

# Check CPU usage
top

# Check memory usage
free -h

# Check database connections
psql -c "SELECT count(*) FROM pg_stat_activity;"

Scaling Workers

Scale horizontally by adding more worker processes:

# Docker Compose
docker-compose up --scale worker=5

# Kubernetes
kubectl scale deployment openworkflow-worker --replicas=5

Each worker operates independently. They coordinate through the database using atomic operations.

Graceful Shutdown

Importance

Graceful shutdown ensures:

  • No workflow data loss during deploys
  • In-flight workflows complete successfully
  • Database connections close cleanly

Implementation

const worker = ow.newWorker({ concurrency: 20 });
await worker.start();

let isShuttingDown = false;

async function shutdown(signal: string) {
  if (isShuttingDown) return;
  isShuttingDown = true;

  console.log(`Received ${signal}, starting graceful shutdown...`);

  try {
    // Stop accepting new work
    await worker.stop();
    console.log("Worker stopped");

    // Close database connection
    await backend.stop();
    console.log("Backend closed");

    console.log("Shutdown complete");
    process.exit(0);
  } catch (error) {
    console.error("Error during shutdown:", error);
    process.exit(1);
  }
}

process.on("SIGTERM", () => shutdown("SIGTERM"));
process.on("SIGINT", () => shutdown("SIGINT"));

Kubernetes

Configure Kubernetes for graceful shutdown:

spec:
  terminationGracePeriodSeconds: 60 # Wait up to 60s for graceful shutdown
  containers:
  - name: worker
    lifecycle:
      preStop:
        exec:
          command: ["/bin/sh", "-c", "sleep 10"]

Load Balancer Draining

If workers are behind a load balancer:

  1. Remove worker from load balancer
  2. Wait for in-flight requests to complete
  3. Stop worker
  4. Close database connection

Monitoring

Workflow Metrics

Track key metrics:

  • Workflow run count (total, by status)
  • Workflow duration (p50, p95, p99)
  • Step count per workflow
  • Failed workflow rate
  • Retry count

Database Metrics

Monitor PostgreSQL:

  • Connection count
  • Query duration
  • Table size (workflow_runs, step_attempts)
  • Index usage
  • Lock contention

Custom Monitoring

Query workflow stats:

-- Count workflows by status
SELECT status, COUNT(*) as count
FROM workflow_runs
GROUP BY status;

-- Average workflow duration
SELECT
  workflow_name,
  AVG(EXTRACT(EPOCH FROM (completed_at - created_at))) as avg_duration_seconds
FROM workflow_runs
WHERE status = 'succeeded'
GROUP BY workflow_name;

-- Failed workflows in last 24 hours
SELECT COUNT(*)
FROM workflow_runs
WHERE status = 'failed'
  AND created_at > NOW() - INTERVAL '24 hours';

Integration with Monitoring Tools

Prometheus

Expose metrics endpoint:

import { register, Counter, Histogram } from "prom-client";

const workflowCounter = new Counter({
  name: "openworkflow_runs_total",
  help: "Total number of workflow runs",
  labelNames: ["workflow_name", "status"],
});

const workflowDuration = new Histogram({
  name: "openworkflow_duration_seconds",
  help: "Workflow execution duration",
  labelNames: ["workflow_name"],
});

// In your workflow code
workflowCounter.inc({ workflow_name: "my-workflow", status: "succeeded" });

Datadog

import { StatsD } from "node-dogstatsd";

const statsd = new StatsD();

// Track workflow completion
statsd.increment("openworkflow.runs", 1, ["workflow:my-workflow", "status:succeeded"]);

// Track workflow duration
statsd.timing("openworkflow.duration", duration, ["workflow:my-workflow"]);

Alerting

Set up alerts for critical issues:

Failed Workflows

-- Alert if more than 10 failures in last hour
SELECT COUNT(*) > 10 as alert
FROM workflow_runs
WHERE status = 'failed'
  AND created_at > NOW() - INTERVAL '1 hour';

Long-Running Workflows

-- Alert if workflows running > 1 hour
SELECT COUNT(*) as long_running
FROM workflow_runs
WHERE status = 'running'
  AND created_at < NOW() - INTERVAL '1 hour';

No Active Workers

Monitor worker heartbeats:

-- Alert if no workflows claimed in last 5 minutes
SELECT COUNT(*) = 0 as no_workers
FROM workflow_runs
WHERE worker_id IS NOT NULL
  AND created_at > NOW() - INTERVAL '5 minutes';

Data Cleanup

Retention Policy

Define a retention policy for old data:

// Cleanup job (run daily)
async function cleanupOldWorkflows() {
  const retentionDays = 30;

  await backend.query(`
    DELETE FROM workflow_runs
    WHERE status IN ('succeeded', 'failed')
      AND completed_at < NOW() - INTERVAL '${retentionDays} days'
  `);

  console.log(`Cleaned up workflows older than ${retentionDays} days`);
}

// Run cleanup daily at 2 AM
import cron from "node-cron";
cron.schedule("0 2 * * *", cleanupOldWorkflows);

Archiving

Archive old workflows to cold storage:

async function archiveOldWorkflows() {
  // Export to S3, BigQuery, etc.
  const oldWorkflows = await backend.query(`
    SELECT * FROM workflow_runs
    WHERE status IN ('succeeded', 'failed')
      AND completed_at < NOW() - INTERVAL '30 days'
  `);

  await s3.upload("workflow-archive", JSON.stringify(oldWorkflows));

  // Then delete from database
  await backend.query(`
    DELETE FROM workflow_runs
    WHERE id IN (${oldWorkflows.map(w => w.id).join(',')})
  `);
}

Performance Optimization

Database Indexes

Ensure proper indexes exist:

-- Check existing indexes
\d workflow_runs

-- Add custom indexes if needed
CREATE INDEX idx_workflow_runs_created_at
ON workflow_runs (created_at DESC);

CREATE INDEX idx_workflow_runs_name_status
ON workflow_runs (workflow_name, status);

Connection Pooling

Configure connection pool size:

const backend = await BackendPostgres.connect(
  `postgresql://user:pass@host:5432/db?pool_size=20`
);

Match pool size to concurrency:

Pool Size ≈ (Worker Concurrency * Number of Workers) + 10

Query Optimization

Analyze slow queries:

-- Enable query logging in postgresql.conf
log_min_duration_statement = 1000  # Log queries > 1 second

-- Analyze query performance
EXPLAIN ANALYZE
SELECT * FROM workflow_runs
WHERE status = 'pending'
  AND available_at <= NOW()
ORDER BY available_at ASC
LIMIT 1;

Error Handling

Sentry Integration

Track workflow errors:

import * as Sentry from "@sentry/node";

Sentry.init({
  dsn: process.env.SENTRY_DSN,
  environment: process.env.NODE_ENV,
});

const workflow = ow.defineWorkflow(
  { name: "monitored-workflow" },
  async ({ input, step }) => {
    try {
      const result = await step.run({ name: "risky-step" }, async () => {
        return await riskyOperation();
      });
      return result;
    } catch (error) {
      Sentry.captureException(error, {
        tags: { workflow: "monitored-workflow" },
        extra: { input },
      });
      throw error;
    }
  }
);

Custom Error Logging

const workflow = ow.defineWorkflow(
  { name: "logged-workflow" },
  async ({ input, step }) => {
    try {
      return await step.run({ name: "main" }, async () => {
        return await mainOperation();
      });
    } catch (error) {
      // Log to your logging service
      await logger.error("Workflow failed", {
        workflow: "logged-workflow",
        input,
        error: error.message,
        stack: error.stack,
      });
      throw error;
    }
  }
);

High Availability

Multiple Workers

Run multiple worker processes for redundancy:

# docker-compose.yml
worker:
  deploy:
    replicas: 3

If one worker crashes, others continue processing workflows.

Database Failover

Use managed PostgreSQL with automatic failover:

  • AWS RDS: Multi-AZ deployment
  • Google Cloud SQL: High availability configuration
  • Azure Database: Zone-redundant deployment

Health Checks

Implement health check endpoints:

import express from "express";

const app = express();

app.get("/health", (req, res) => {
  if (worker.isRunning && backend.isConnected) {
    res.status(200).json({ status: "healthy" });
  } else {
    res.status(503).json({ status: "unhealthy" });
  }
});

app.listen(3000);

Security

Critical: Never Commit Secrets

Never hardcode database passwords, API keys, or other secrets in your code. Always use environment variables and secrets management systems.

Secrets Management

// ❌ Don't do this
const backend = await BackendPostgres.connect(
  "postgresql://user:password123@host:5432/db"
);

// ✅ Do this
const backend = await BackendPostgres.connect(process.env.DATABASE_URL!);

Use secrets management:

  • AWS Secrets Manager
  • Google Cloud Secret Manager
  • Azure Key Vault
  • HashiCorp Vault

Network Security

  • Use VPC/private networks
  • Restrict database access to worker IPs only
  • Enable SSL/TLS for all connections
  • Use firewall rules

Troubleshooting

Common Production Issues

Click each section below to see symptoms, causes, and solutions for common production issues.

Next Steps