Architecture
Deep dive into OpenWorkflow's execution model, state machine replication, and worker-driven architecture
Architecture
This document provides a deep dive into how OpenWorkflow works under the hood, covering the worker-driven architecture, deterministic replay model, and state management.
System Architecture Overview
OpenWorkflow uses a worker-driven model where the database is the central point of coordination. There is no separate orchestrator server.
Architecture Diagram
+---------------------------------+ +--------------------------------+
| | | |
| Your Application Code | | OpenWorkflow Worker |
| (e.g., a web server) | | (Separate Process) |
| | | |
| +---------------------------+ | | +---------------------------+ |
| | OpenWorkflow Client | | | | Workflow Definitions | |
| | (Creates Workflow Runs) | | | | | |
| +---------------------------+ | | +---------------------------+ |
| | | | | |
+---------------+-----------------+ +---------------+----------------+
| |
| +------------------------+ |
+--| Backend Interface |--+
| (e.g., Postgres) |
+------------------------+
|
|
+------------------------------+
| |
| Backend Storage |
| |
| - workflow_runs |
| - step_attempts |
+------------------------------+Core Components
Client
The entry point for applications to interact with OpenWorkflow. Responsibilities:
- Creating new workflow runs
- Writing to the
workflow_runstable - Providing the
defineWorkflow()API - Creating workers
Worker
The execution engine that runs workflow code. Responsibilities:
- Polling the backend for available workflows
- Claiming workflows atomically
- Executing workflow code using deterministic replay
- Managing concurrency pools
- Heartbeating to maintain leases
- Handling errors and retries
Backend
The source of truth for all workflow state. Responsibilities:
- Storing workflow runs and step attempts
- Serving as the job queue (via
availableAttimestamps) - Providing atomic claim operations
- Enabling memoization through step history
The Execution Model: State Machine Replication
Key Insight
OpenWorkflow treats each workflow run as a state machine. The worker's job is to advance the state of that machine from its last known checkpoint to the next one.
The Replay Loop
When a worker claims a workflow run, it always executes the code from the beginning. This is the core of the deterministic replay model.
// A worker claims a workflow run.
// It loads the step history and begins execution.
const user = await step.run({ name: "fetch-user" }, async () => {
// 1. The framework sees "fetch-user".
// 2. It finds a completed result in the history.
// 3. It returns the cached output immediately without executing the function.
return await db.users.findOne({ id: 1 });
});
const welcomeEmail = await step.run({ name: "welcome-email" }, async () => {
// 4. The framework sees "welcome-email".
// 5. It is NOT in the history.
// 6. It creates a step_attempt with status "running".
// 7. It executes the function and saves the result.
// 8. It updates the step_attempt to status "succeeded" and continues.
return await email.send(user);
});Why Replay?
The replay model provides several key benefits:
- Durability: Workflows can resume from any point by replaying from the start
- Simplicity: No complex state machine definitions needed
- Debuggability: Workflow code looks like normal async code
- Flexibility: Easy to add new steps or modify logic
Replay Guarantees
During replay:
- Completed steps return their cached result instantly (no re-execution)
- Failed steps can be retried with new attempts
- New steps execute normally and are added to the history
- Step order must remain consistent (determinism requirement)
Basic Execution Flow
Here's what happens when you run a workflow:
Workflow Registration
Workflows are defined in code. When a worker starts, it automatically discovers and registers workflows in an in-memory map:
const ow = new OpenWorkflow({ backend });
// This registers the workflow in the client's registry
const myWorkflow = ow.defineWorkflow({ name: "my-workflow" }, async ({ step }) => {
// ...
});
// The worker inherits the registered workflows
const worker = ow.newWorker();There is no sync process with an external server.
Workflow Invocation
Your application code uses the Client to start a new workflow run:
const runHandle = await myWorkflow.run({ userId: "123" });This creates a new entry in the workflow_runs table:
INSERT INTO workflow_runs (
id,
workflow_name,
status,
available_at,
input
) VALUES (
'run_abc123',
'my-workflow',
'pending',
NOW(),
'{"userId":"123"}'
);Job Polling
A Worker process polls the workflow_runs table, looking for jobs with:
status = 'pending'available_at <= NOW()
It uses an atomic query to claim a single workflow run:
SELECT * FROM workflow_runs
WHERE status = 'pending'
AND available_at <= NOW()
ORDER BY available_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;The worker then updates the claimed run:
UPDATE workflow_runs
SET status = 'running',
worker_id = 'worker_xyz',
available_at = NOW() + INTERVAL '30 seconds'
WHERE id = 'run_abc123';Code Execution (Replay Loop)
The worker loads the history of completed step_attempts for the claimed workflow run:
SELECT * FROM step_attempts
WHERE workflow_run_id = 'run_abc123'
ORDER BY created_at ASC;It then executes the workflow code from the beginning, using the history to memoize results:
async function executeWorkflow(workflowRun, stepHistory) {
// Create a step API that checks the history
const step = {
run: async (config, fn) => {
const historyEntry = stepHistory.find(s => s.step_name === config.name);
if (historyEntry && historyEntry.status === 'succeeded') {
// Return cached result
return historyEntry.output;
}
// Execute the step
const stepAttempt = await backend.createStepAttempt({
workflowRunId: workflowRun.id,
stepName: config.name,
status: 'running',
});
const result = await fn();
await backend.markStepAttemptSucceeded({
stepAttemptId: stepAttempt.id,
output: result,
});
return result;
},
};
// Execute the workflow function
return await workflowDefinition.fn({ input: workflowRun.input, step });
}Step Processing
When the worker encounters a new step (not in history):
- Create a
step_attemptrecord with statusrunning - Execute the step function inline
- Update the
step_attempttosucceededorfailed
All steps execute synchronously within the worker. There is no async queueing of steps.
State Update
After each step, the worker updates the backend. When the workflow completes:
UPDATE workflow_runs
SET status = 'succeeded',
output = '{"result":"done"}',
completed_at = NOW()
WHERE id = 'run_abc123';The availableAt Mechanism
Core Fault Tolerance Mechanism
The availableAt timestamp is the core of OpenWorkflow's fault tolerance. It enables scheduling, heartbeating, crash recovery, and retry backoff—all with a single database field.
It serves multiple purposes:
Scheduling
Set availableAt to a future time to schedule a workflow:
await myWorkflow.run(
{ userId: "123" },
{ availableAt: new Date(Date.now() + 60000) } // Run in 1 minute
);Heartbeating
When a worker claims a workflow, it sets:
availableAt = NOW() + leaseDuration (e.g., 30 seconds)The worker must periodically update this timestamp to maintain its lease:
UPDATE workflow_runs
SET available_at = NOW() + INTERVAL '30 seconds'
WHERE id = 'run_abc123'
AND worker_id = 'worker_xyz';Crash Recovery
If a worker crashes:
- Heartbeats stop
- The
availableAttimestamp expires - The workflow becomes visible to other workers' polling queries
- Another worker claims it and resumes from the last completed step
This provides automatic fault tolerance without manual intervention or external monitoring.
Retry Backoff
Failed workflows can be retried with exponential backoff:
UPDATE workflow_runs
SET status = 'pending',
available_at = NOW() + INTERVAL '2 seconds' -- Exponential backoff
WHERE id = 'run_abc123';Step Execution Details
Synchronous Execution
All steps execute synchronously within the worker's event loop:
const result1 = await step.run({ name: "step-1" }, async () => {
// This runs inline, blocking the workflow execution
return await apiCall();
});
const result2 = await step.run({ name: "step-2" }, async () => {
// This runs after step-1 completes
return await anotherApiCall();
});High Concurrency
Workers can be configured with high concurrency limits (e.g., 100) to handle many workflow runs simultaneously:
const worker = ow.newWorker({ concurrency: 100 });Each workflow occupies a concurrency slot for its entire execution. This is acceptable because:
- Workers have high concurrency capacity
- Most workflows are I/O-bound
- The model is simple to reason about
Parallel Steps
Use Promise.all for parallel execution:
const [user, settings, subscription] = await Promise.all([
step.run({ name: "fetch-user" }, async () => await db.users.findOne()),
step.run({ name: "fetch-settings" }, async () => await db.settings.findOne()),
step.run({ name: "fetch-sub" }, async () => await stripe.subscriptions.retrieve()),
]);The worker executes all steps concurrently and waits for all to complete. Each step is persisted individually as a step_attempt.
Error Handling & Retries
Step Failures
When a step throws an error:
- The
step_attemptis marked asfailed - The error is stored in the
step_attempt - The error propagates up to the workflow level
Workflow Failures
When a workflow fails:
- The
workflow_runis marked asfailed(or retried) - The error is stored in the
workflow_run - The workflow can be retried with exponential backoff
Retry Logic
Workflows can be retried automatically:
// Future API (not yet implemented)
const myWorkflow = ow.defineWorkflow(
{
name: "my-workflow",
retry: { maxAttempts: 3, backoff: "exponential" }
},
async ({ input, step }) => {
// ...
}
);On retry:
- The workflow status is reset to
pending - The
availableAtis set toNOW() + backoff - A new worker claims and executes the workflow
- Completed steps return cached results
- Failed steps are re-executed
Deadlines
Workflows can have optional deadlines:
await myWorkflow.run(
{ userId: "123" },
{ deadlineAt: new Date(Date.now() + 3600000) } // Must complete in 1 hour
);If the deadline is reached:
- New steps are skipped
- The workflow is marked as
failed - No further retries occur
Concurrency & Parallelism
Workflow Concurrency
Workers maintain a concurrency pool:
const worker = ow.newWorker({ concurrency: 10 });The worker will:
- Maintain up to 10 in-flight workflow runs simultaneously
- Poll for new work only when slots are available
- Execute workflows in parallel using async/await
Database Concurrency
Multiple workers can poll the same table concurrently:
FOR UPDATE SKIP LOCKEDThis SQL feature ensures:
- No two workers claim the same workflow
- Workers don't block each other
- Claims are atomic and race-free
Handling Crashes During Parallel Execution
If a worker crashes while executing parallel steps:
- Its heartbeat stops
- The
availableAtfor the workflow run expires - Another worker claims it
- The new worker replays the workflow:
- Completed steps return cached results
- In-flight steps are re-executed
Versioning
The Challenge
If workflow code is changed while runs are in-flight, deterministic replay can break:
// Old code
await step.run({ name: "old-step-name" }, ...);
// New code (deployed while runs are in-flight)
await step.run({ name: "new-step-name" }, ...);The replaying worker fails because old-step-name is in the history but new-step-name is in the code.
Code-Based Versioning
Workflows receive a version parameter for conditional logic:
const workflow = ow.defineWorkflow(
{ name: "versioned-workflow" },
async ({ step, version }) => {
if (version === "v1") {
await step.run({ name: "old-step-name" }, ...);
} else {
await step.run({ name: "new-step-name" }, ...);
}
}
);This enables:
- Zero-downtime deploys: Old runs replay on v1, new runs use v2
- Gradual rollout: Version can be set per run
- Rollback safety: Old code can still replay old runs
Workers Deep Dive
Worker Responsibilities
- Polling: Continuously query the backend for available workflows
- Claiming: Atomically claim workflows using
FOR UPDATE SKIP LOCKED - Executing: Run workflow code using the deterministic replay model
- Heartbeating: Periodically update
availableAtto maintain the lease - Error Handling: Catch errors, mark workflows as failed, and implement retries
- Graceful Shutdown: Wait for active workflows to complete before exiting
The Polling Loop
async function runLoop() {
while (this.running) {
const availableSlots = this.concurrency - this.activeExecutions.size;
if (availableSlots > 0) {
// Claim and process workflows for each available slot
for (let i = 0; i < availableSlots; i++) {
const workflowRun = await this.backend.claimWorkflowRun({
workerId: this.workerId,
leaseDurationMs: 30000,
});
if (workflowRun) {
this.executeWorkflowInBackground(workflowRun);
}
}
}
await sleep(100); // Poll interval
}
}Graceful Shutdown
When a worker receives SIGTERM:
- Stop polling: Set
running = false - Wait for active executions:
while (activeExecutions.size > 0) await sleep(100) - Exit:
process.exit(0)
This ensures no work is lost during deploys.
Database Schema
workflow_runs Table
CREATE TABLE workflow_runs (
id UUID PRIMARY KEY,
workflow_name TEXT NOT NULL,
version TEXT,
status TEXT NOT NULL, -- 'pending' | 'running' | 'succeeded' | 'failed'
worker_id TEXT,
input JSONB,
output JSONB,
config JSONB,
context JSONB,
available_at TIMESTAMP NOT NULL,
deadline_at TIMESTAMP,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
completed_at TIMESTAMP
);
CREATE INDEX idx_workflow_runs_available ON workflow_runs (available_at)
WHERE status = 'pending';step_attempts Table
CREATE TABLE step_attempts (
id UUID PRIMARY KEY,
workflow_run_id UUID NOT NULL REFERENCES workflow_runs(id),
step_name TEXT NOT NULL,
kind TEXT NOT NULL,
status TEXT NOT NULL, -- 'running' | 'succeeded' | 'failed'
config JSONB,
context JSONB,
output JSONB,
error JSONB,
created_at TIMESTAMP NOT NULL DEFAULT NOW(),
completed_at TIMESTAMP
);
CREATE INDEX idx_step_attempts_workflow_run ON step_attempts (workflow_run_id);Performance Characteristics
Latency
- Workflow start: 1 database write (~1-5ms)
- Worker claim: 1 database query + 1 update (~2-10ms)
- Step execution: 2 database writes per step (~2-10ms)
Throughput
- Single worker: 100-1000 workflows/sec (depending on workflow complexity)
- Multiple workers: Scales linearly with worker count
Scaling Strategies
- Increase worker concurrency:
concurrency: 100 - Add more workers: Run multiple worker processes
- Optimize database: Add indexes, use connection pooling
- Reduce step count: Combine related operations
Comparison to Other Systems
vs. Temporal
- OpenWorkflow: Worker-driven, no separate server, database as queue
- Temporal: Server-driven, separate history service, gRPC communication
vs. Inngest
- OpenWorkflow: Self-hosted, PostgreSQL backend, open-source
- Inngest: Cloud-hosted, managed service, proprietary
vs. BullMQ
- OpenWorkflow: Workflow orchestration with steps and memoization
- BullMQ: Job queue with retries but no built-in workflow concepts