PostgreSQL Backend
Learn about the PostgreSQL backend for OpenWorkflow
PostgreSQL Backend
The PostgreSQL backend is the default persistence layer for OpenWorkflow, providing durable storage for workflow runs and step attempts.
Overview
The @openworkflow/backend-postgres package implements the OpenWorkflow Backend interface using PostgreSQL as the underlying database.
Features
- Durable Storage: All workflow state persisted to PostgreSQL
- Atomic Operations: Uses
FOR UPDATE SKIP LOCKEDfor race-free workflow claiming - Automatic Migrations: Creates tables automatically on first connection
- Namespace Support: Isolate workflows by environment (dev, staging, prod)
- Connection Pooling: Built-in connection pool management
- Type-Safe: Full TypeScript support
Installation
npm install @openworkflow/backend-postgresThe backend requires PostgreSQL 12 or higher.
Connection
Basic Connection
import { BackendPostgres } from "@openworkflow/backend-postgres";
const backend = await BackendPostgres.connect(
"postgresql://username:password@localhost:5432/database"
);With Options
const backend = await BackendPostgres.connect(
process.env.DATABASE_URL!,
{
namespaceId: "production", // Isolate workflows by namespace
}
);Connection String Format
postgresql://[username]:[password]@[hostname]:[port]/[database]?[options]Examples:
postgresql://postgres:postgres@localhost:5432/postgres
postgresql://user:pass@db.example.com:5432/myapp
postgresql://user:pass@localhost:5432/db?sslmode=requireEnvironment Variables
Store connection strings in environment variables:
export DATABASE_URL="postgresql://postgres:postgres@localhost:5432/postgres"const backend = await BackendPostgres.connect(process.env.DATABASE_URL!);Database Schema
The backend automatically creates two tables on first connection.
workflow_runs Table
Stores workflow execution state and serves as the job queue.
CREATE TABLE workflow_runs (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
namespace_id TEXT NOT NULL DEFAULT 'default',
workflow_name TEXT NOT NULL,
version TEXT,
status TEXT NOT NULL,
worker_id TEXT,
config JSONB NOT NULL DEFAULT '{}',
context JSONB,
input JSONB,
output JSONB,
error JSONB,
available_at TIMESTAMPTZ NOT NULL,
deadline_at TIMESTAMPTZ,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
-- Index for efficient polling
CREATE INDEX idx_workflow_runs_available
ON workflow_runs (namespace_id, available_at)
WHERE status = 'pending';
-- Index for namespace lookups
CREATE INDEX idx_workflow_runs_namespace
ON workflow_runs (namespace_id, workflow_name, created_at DESC);Columns:
id: Unique workflow run identifier (UUID)namespace_id: Namespace for isolation (default: "default")workflow_name: Name of the workflow definitionversion: Workflow version (for versioning support)status: Current status (pending,running,succeeded,failed)worker_id: ID of the worker currently executing the workflowconfig: Workflow configuration (JSONB)context: Additional context data (JSONB)input: Workflow input data (JSONB)output: Workflow result (JSONB, null until completed)error: Error information if failed (JSONB)available_at: Timestamp when workflow becomes available to workersdeadline_at: Optional deadline for workflow completioncreated_at: When the workflow run was createdcompleted_at: When the workflow finished
step_attempts Table
Stores individual step execution results, enabling memoization.
CREATE TABLE step_attempts (
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
workflow_run_id UUID NOT NULL REFERENCES workflow_runs(id) ON DELETE CASCADE,
step_name TEXT NOT NULL,
kind TEXT NOT NULL,
status TEXT NOT NULL,
config JSONB NOT NULL DEFAULT '{}',
context JSONB,
output JSONB,
error JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
completed_at TIMESTAMPTZ
);
-- Index for loading step history
CREATE INDEX idx_step_attempts_workflow_run
ON step_attempts (workflow_run_id, created_at);Columns:
id: Unique step attempt identifier (UUID)workflow_run_id: Foreign key toworkflow_runsstep_name: Name of the step (fromstep.run({ name: "..." }))kind: Type of step (currently only"run")status: Current status (running,succeeded,failed)config: Step configuration (JSONB)context: Additional context data (JSONB)output: Step result (JSONB, null until completed)error: Error information if failed (JSONB)created_at: When the step startedcompleted_at: When the step finished
Namespaces
Namespaces allow you to isolate workflows by environment or tenant.
Use Cases
- Environment Isolation: Separate dev, staging, and production workflows
- Multi-Tenancy: Isolate workflows per customer/organization
- Testing: Create isolated namespaces for tests
Creating Namespaced Backends
// Production
const prodBackend = await BackendPostgres.connect(
process.env.DATABASE_URL!,
{ namespaceId: "production" }
);
// Staging
const stagingBackend = await BackendPostgres.connect(
process.env.DATABASE_URL!,
{ namespaceId: "staging" }
);
// Development
const devBackend = await BackendPostgres.connect(
process.env.DATABASE_URL!,
{ namespaceId: "development" }
);Each namespace operates independently:
- Workers only see workflows in their namespace
- Workflow runs don't interfere across namespaces
- Same database, logical separation
Default Namespace
If you don't specify a namespace, the default is "default":
const backend = await BackendPostgres.connect(process.env.DATABASE_URL!);
// Equivalent to: { namespaceId: "default" }Connection Management
Connection Pooling
The backend uses connection pooling automatically:
const backend = await BackendPostgres.connect(connectionString);
// The backend maintains a pool of connections
// No manual pool configuration neededClosing Connections
Always close the backend when your application shuts down:
await backend.stop();Example with Graceful Shutdown:
const backend = await BackendPostgres.connect(process.env.DATABASE_URL!);
const ow = new OpenWorkflow({ backend });
const worker = ow.newWorker();
await worker.start();
process.on("SIGTERM", async () => {
console.log("Shutting down gracefully...");
await worker.stop();
await backend.stop();
process.exit(0);
});Querying Workflows
You can query workflow runs directly using SQL:
Get All Pending Workflows
SELECT * FROM workflow_runs
WHERE status = 'pending'
ORDER BY created_at DESC;Get Workflows by Name
SELECT * FROM workflow_runs
WHERE workflow_name = 'send-welcome-email'
ORDER BY created_at DESC
LIMIT 100;Get Failed Workflows
SELECT * FROM workflow_runs
WHERE status = 'failed'
ORDER BY completed_at DESC;Get Workflow with Step History
SELECT
wr.*,
json_agg(
json_build_object(
'stepName', sa.step_name,
'status', sa.status,
'output', sa.output,
'createdAt', sa.created_at
) ORDER BY sa.created_at
) AS steps
FROM workflow_runs wr
LEFT JOIN step_attempts sa ON sa.workflow_run_id = wr.id
WHERE wr.id = 'your-workflow-run-id'
GROUP BY wr.id;Performance Considerations
Indexes
The default schema includes indexes for common queries:
idx_workflow_runs_available: Fast polling for pending workflowsidx_workflow_runs_namespace: Namespace and workflow name lookupsidx_step_attempts_workflow_run: Loading step history
Additional Indexes
For high-volume production use, consider adding:
-- Index for status filtering
CREATE INDEX idx_workflow_runs_status
ON workflow_runs (namespace_id, status, created_at DESC);
-- Index for worker queries
CREATE INDEX idx_workflow_runs_worker
ON workflow_runs (worker_id, status);Connection Pool Sizing
For production, use a database connection pool:
postgresql://user:pass@localhost:5432/db?pool_size=20Database Sizing
Estimate storage requirements:
- Workflow runs: ~1-5 KB per run (depending on input/output size)
- Step attempts: ~500 bytes - 2 KB per step
For 1 million workflow runs with 10 steps each:
- Workflows: ~5 GB
- Steps: ~20 GB
- Total: ~25 GB
Cleanup
Implement a cleanup strategy for old workflow runs:
-- Delete succeeded workflows older than 30 days
DELETE FROM workflow_runs
WHERE status = 'succeeded'
AND completed_at < NOW() - INTERVAL '30 days';
-- Step attempts are automatically deleted via CASCADETroubleshooting
Connection Errors
Issue: ECONNREFUSED or connection timeout
Solutions:
-
Verify PostgreSQL is running:
psql -U postgres -h localhost -
Check connection string format
-
Verify firewall rules
-
Check PostgreSQL
pg_hba.conffor authentication settings
Migration Errors
Issue: Tables already exist
The backend creates tables automatically. If you see errors:
- Ensure the database user has
CREATE TABLEpermissions - Drop existing tables if schema changed:
DROP TABLE step_attempts; DROP TABLE workflow_runs;
Performance Issues
Issue: Slow workflow polling
Solutions:
-
Verify indexes exist:
\d workflow_runs -
Analyze query performance:
EXPLAIN ANALYZE SELECT * FROM workflow_runs WHERE status = 'pending' AND available_at <= NOW() ORDER BY available_at ASC LIMIT 1; -
Increase worker poll interval:
// This is internal, but reduce workers if polling too aggressively const worker = ow.newWorker({ concurrency: 5 }); // Lower concurrency
Lock Contention
Issue: Workers blocking each other
PostgreSQL's SKIP LOCKED prevents this, but if you see issues:
- Ensure you're using PostgreSQL 9.5+
- Check for long-running transactions
- Monitor with:
SELECT * FROM pg_stat_activity WHERE state = 'active';
Docker Setup
Development
Use Docker for local development:
docker run -d \
--name openworkflow-postgres \
-e POSTGRES_PASSWORD=postgres \
-e POSTGRES_DB=openworkflow \
-p 5432:5432 \
postgres:16Connect with:
const backend = await BackendPostgres.connect(
"postgresql://postgres:postgres@localhost:5432/openworkflow"
);Production
Use Docker Compose for production-like setup:
version: "3.8"
services:
postgres:
image: postgres:16
environment:
POSTGRES_PASSWORD: ${POSTGRES_PASSWORD}
POSTGRES_DB: openworkflow
volumes:
- postgres-data:/var/lib/postgresql/data
ports:
- "5432:5432"
volumes:
postgres-data:Best Practices
✅ Do
- Use environment variables for connection strings
- Close connections on shutdown (
await backend.stop()) - Use namespaces for environment isolation
- Monitor database size and implement cleanup
- Use connection pooling in production
- Set appropriate
pg_hba.confrules for security
❌ Don't
- Don't hardcode connection strings
- Don't share backends across isolated services
- Don't manually modify
workflow_runsorstep_attemptsunless necessary - Don't forget to clean up old workflow data