OpenWorkflow

Architecture

Deep dive into OpenWorkflow's execution model, state machine replication, and worker-driven architecture

Architecture

This document provides a deep dive into how OpenWorkflow works under the hood, covering the worker-driven architecture, deterministic replay model, and state management.

System Architecture Overview

OpenWorkflow uses a worker-driven model where the database is the central point of coordination. There is no separate orchestrator server.

Architecture Diagram

+---------------------------------+      +--------------------------------+
|                                 |      |                                |
|      Your Application Code      |      |      OpenWorkflow Worker       |
|      (e.g., a web server)       |      |      (Separate Process)        |
|                                 |      |                                |
|  +---------------------------+  |      |  +---------------------------+ |
|  |   OpenWorkflow Client     |  |      |  |   Workflow Definitions    | |
|  | (Creates Workflow Runs)   |  |      |  |                           | |
|  +---------------------------+  |      |  +---------------------------+ |
|               |                 |      |               |                |
+---------------+-----------------+      +---------------+----------------+
                |                              |
                |  +------------------------+  |
                +--|  Backend Interface     |--+
                   |  (e.g., Postgres)      |
                   +------------------------+
                              |
                              |
               +------------------------------+
               |                              |
               |       Backend Storage        |
               |                              |
               | - workflow_runs              |
               | - step_attempts              |
               +------------------------------+

Core Components

Client

The entry point for applications to interact with OpenWorkflow. Responsibilities:

  • Creating new workflow runs
  • Writing to the workflow_runs table
  • Providing the defineWorkflow() API
  • Creating workers

Worker

The execution engine that runs workflow code. Responsibilities:

  • Polling the backend for available workflows
  • Claiming workflows atomically
  • Executing workflow code using deterministic replay
  • Managing concurrency pools
  • Heartbeating to maintain leases
  • Handling errors and retries

Backend

The source of truth for all workflow state. Responsibilities:

  • Storing workflow runs and step attempts
  • Serving as the job queue (via availableAt timestamps)
  • Providing atomic claim operations
  • Enabling memoization through step history

The Execution Model: State Machine Replication

Key Insight

OpenWorkflow treats each workflow run as a state machine. The worker's job is to advance the state of that machine from its last known checkpoint to the next one.

The Replay Loop

When a worker claims a workflow run, it always executes the code from the beginning. This is the core of the deterministic replay model.

// A worker claims a workflow run.
// It loads the step history and begins execution.

const user = await step.run({ name: "fetch-user" }, async () => {
  // 1. The framework sees "fetch-user".
  // 2. It finds a completed result in the history.
  // 3. It returns the cached output immediately without executing the function.
  return await db.users.findOne({ id: 1 });
});

const welcomeEmail = await step.run({ name: "welcome-email" }, async () => {
  // 4. The framework sees "welcome-email".
  // 5. It is NOT in the history.
  // 6. It creates a step_attempt with status "running".
  // 7. It executes the function and saves the result.
  // 8. It updates the step_attempt to status "succeeded" and continues.
  return await email.send(user);
});

Why Replay?

The replay model provides several key benefits:

  1. Durability: Workflows can resume from any point by replaying from the start
  2. Simplicity: No complex state machine definitions needed
  3. Debuggability: Workflow code looks like normal async code
  4. Flexibility: Easy to add new steps or modify logic

Replay Guarantees

During replay:

  • Completed steps return their cached result instantly (no re-execution)
  • Failed steps can be retried with new attempts
  • New steps execute normally and are added to the history
  • Step order must remain consistent (determinism requirement)

Basic Execution Flow

Here's what happens when you run a workflow:

Workflow Registration

Workflows are defined in code. When a worker starts, it automatically discovers and registers workflows in an in-memory map:

const ow = new OpenWorkflow({ backend });

// This registers the workflow in the client's registry
const myWorkflow = ow.defineWorkflow({ name: "my-workflow" }, async ({ step }) => {
  // ...
});

// The worker inherits the registered workflows
const worker = ow.newWorker();

There is no sync process with an external server.

Workflow Invocation

Your application code uses the Client to start a new workflow run:

const runHandle = await myWorkflow.run({ userId: "123" });

This creates a new entry in the workflow_runs table:

INSERT INTO workflow_runs (
  id,
  workflow_name,
  status,
  available_at,
  input
) VALUES (
  'run_abc123',
  'my-workflow',
  'pending',
  NOW(),
  '{"userId":"123"}'
);

Job Polling

A Worker process polls the workflow_runs table, looking for jobs with:

  • status = 'pending'
  • available_at <= NOW()

It uses an atomic query to claim a single workflow run:

SELECT * FROM workflow_runs
WHERE status = 'pending'
  AND available_at <= NOW()
ORDER BY available_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;

The worker then updates the claimed run:

UPDATE workflow_runs
SET status = 'running',
    worker_id = 'worker_xyz',
    available_at = NOW() + INTERVAL '30 seconds'
WHERE id = 'run_abc123';

Code Execution (Replay Loop)

The worker loads the history of completed step_attempts for the claimed workflow run:

SELECT * FROM step_attempts
WHERE workflow_run_id = 'run_abc123'
ORDER BY created_at ASC;

It then executes the workflow code from the beginning, using the history to memoize results:

async function executeWorkflow(workflowRun, stepHistory) {
  // Create a step API that checks the history
  const step = {
    run: async (config, fn) => {
      const historyEntry = stepHistory.find(s => s.step_name === config.name);

      if (historyEntry && historyEntry.status === 'succeeded') {
        // Return cached result
        return historyEntry.output;
      }

      // Execute the step
      const stepAttempt = await backend.createStepAttempt({
        workflowRunId: workflowRun.id,
        stepName: config.name,
        status: 'running',
      });

      const result = await fn();

      await backend.markStepAttemptSucceeded({
        stepAttemptId: stepAttempt.id,
        output: result,
      });

      return result;
    },
  };

  // Execute the workflow function
  return await workflowDefinition.fn({ input: workflowRun.input, step });
}

Step Processing

When the worker encounters a new step (not in history):

  1. Create a step_attempt record with status running
  2. Execute the step function inline
  3. Update the step_attempt to succeeded or failed

All steps execute synchronously within the worker. There is no async queueing of steps.

State Update

After each step, the worker updates the backend. When the workflow completes:

UPDATE workflow_runs
SET status = 'succeeded',
    output = '{"result":"done"}',
    completed_at = NOW()
WHERE id = 'run_abc123';

The availableAt Mechanism

Core Fault Tolerance Mechanism

The availableAt timestamp is the core of OpenWorkflow's fault tolerance. It enables scheduling, heartbeating, crash recovery, and retry backoff—all with a single database field.

It serves multiple purposes:

Scheduling

Set availableAt to a future time to schedule a workflow:

await myWorkflow.run(
  { userId: "123" },
  { availableAt: new Date(Date.now() + 60000) } // Run in 1 minute
);

Heartbeating

When a worker claims a workflow, it sets:

availableAt = NOW() + leaseDuration (e.g., 30 seconds)

The worker must periodically update this timestamp to maintain its lease:

UPDATE workflow_runs
SET available_at = NOW() + INTERVAL '30 seconds'
WHERE id = 'run_abc123'
  AND worker_id = 'worker_xyz';

Crash Recovery

If a worker crashes:

  1. Heartbeats stop
  2. The availableAt timestamp expires
  3. The workflow becomes visible to other workers' polling queries
  4. Another worker claims it and resumes from the last completed step

This provides automatic fault tolerance without manual intervention or external monitoring.

Retry Backoff

Failed workflows can be retried with exponential backoff:

UPDATE workflow_runs
SET status = 'pending',
    available_at = NOW() + INTERVAL '2 seconds'  -- Exponential backoff
WHERE id = 'run_abc123';

Step Execution Details

Synchronous Execution

All steps execute synchronously within the worker's event loop:

const result1 = await step.run({ name: "step-1" }, async () => {
  // This runs inline, blocking the workflow execution
  return await apiCall();
});

const result2 = await step.run({ name: "step-2" }, async () => {
  // This runs after step-1 completes
  return await anotherApiCall();
});

High Concurrency

Workers can be configured with high concurrency limits (e.g., 100) to handle many workflow runs simultaneously:

const worker = ow.newWorker({ concurrency: 100 });

Each workflow occupies a concurrency slot for its entire execution. This is acceptable because:

  • Workers have high concurrency capacity
  • Most workflows are I/O-bound
  • The model is simple to reason about

Parallel Steps

Use Promise.all for parallel execution:

const [user, settings, subscription] = await Promise.all([
  step.run({ name: "fetch-user" }, async () => await db.users.findOne()),
  step.run({ name: "fetch-settings" }, async () => await db.settings.findOne()),
  step.run({ name: "fetch-sub" }, async () => await stripe.subscriptions.retrieve()),
]);

The worker executes all steps concurrently and waits for all to complete. Each step is persisted individually as a step_attempt.

Error Handling & Retries

Step Failures

When a step throws an error:

  1. The step_attempt is marked as failed
  2. The error is stored in the step_attempt
  3. The error propagates up to the workflow level

Workflow Failures

When a workflow fails:

  1. The workflow_run is marked as failed (or retried)
  2. The error is stored in the workflow_run
  3. The workflow can be retried with exponential backoff

Retry Logic

Workflows can be retried automatically:

// Future API (not yet implemented)
const myWorkflow = ow.defineWorkflow(
  {
    name: "my-workflow",
    retry: { maxAttempts: 3, backoff: "exponential" }
  },
  async ({ input, step }) => {
    // ...
  }
);

On retry:

  1. The workflow status is reset to pending
  2. The availableAt is set to NOW() + backoff
  3. A new worker claims and executes the workflow
  4. Completed steps return cached results
  5. Failed steps are re-executed

Deadlines

Workflows can have optional deadlines:

await myWorkflow.run(
  { userId: "123" },
  { deadlineAt: new Date(Date.now() + 3600000) } // Must complete in 1 hour
);

If the deadline is reached:

  • New steps are skipped
  • The workflow is marked as failed
  • No further retries occur

Concurrency & Parallelism

Workflow Concurrency

Workers maintain a concurrency pool:

const worker = ow.newWorker({ concurrency: 10 });

The worker will:

  • Maintain up to 10 in-flight workflow runs simultaneously
  • Poll for new work only when slots are available
  • Execute workflows in parallel using async/await

Database Concurrency

Multiple workers can poll the same table concurrently:

FOR UPDATE SKIP LOCKED

This SQL feature ensures:

  • No two workers claim the same workflow
  • Workers don't block each other
  • Claims are atomic and race-free

Handling Crashes During Parallel Execution

If a worker crashes while executing parallel steps:

  1. Its heartbeat stops
  2. The availableAt for the workflow run expires
  3. Another worker claims it
  4. The new worker replays the workflow:
    • Completed steps return cached results
    • In-flight steps are re-executed

Versioning

The Challenge

If workflow code is changed while runs are in-flight, deterministic replay can break:

// Old code
await step.run({ name: "old-step-name" }, ...);

// New code (deployed while runs are in-flight)
await step.run({ name: "new-step-name" }, ...);

The replaying worker fails because old-step-name is in the history but new-step-name is in the code.

Code-Based Versioning

Workflows receive a version parameter for conditional logic:

const workflow = ow.defineWorkflow(
  { name: "versioned-workflow" },
  async ({ step, version }) => {
    if (version === "v1") {
      await step.run({ name: "old-step-name" }, ...);
    } else {
      await step.run({ name: "new-step-name" }, ...);
    }
  }
);

This enables:

  • Zero-downtime deploys: Old runs replay on v1, new runs use v2
  • Gradual rollout: Version can be set per run
  • Rollback safety: Old code can still replay old runs

Workers Deep Dive

Worker Responsibilities

  1. Polling: Continuously query the backend for available workflows
  2. Claiming: Atomically claim workflows using FOR UPDATE SKIP LOCKED
  3. Executing: Run workflow code using the deterministic replay model
  4. Heartbeating: Periodically update availableAt to maintain the lease
  5. Error Handling: Catch errors, mark workflows as failed, and implement retries
  6. Graceful Shutdown: Wait for active workflows to complete before exiting

The Polling Loop

async function runLoop() {
  while (this.running) {
    const availableSlots = this.concurrency - this.activeExecutions.size;

    if (availableSlots > 0) {
      // Claim and process workflows for each available slot
      for (let i = 0; i < availableSlots; i++) {
        const workflowRun = await this.backend.claimWorkflowRun({
          workerId: this.workerId,
          leaseDurationMs: 30000,
        });

        if (workflowRun) {
          this.executeWorkflowInBackground(workflowRun);
        }
      }
    }

    await sleep(100); // Poll interval
  }
}

Graceful Shutdown

When a worker receives SIGTERM:

  1. Stop polling: Set running = false
  2. Wait for active executions: while (activeExecutions.size > 0) await sleep(100)
  3. Exit: process.exit(0)

This ensures no work is lost during deploys.

Database Schema

workflow_runs Table

CREATE TABLE workflow_runs (
  id UUID PRIMARY KEY,
  workflow_name TEXT NOT NULL,
  version TEXT,
  status TEXT NOT NULL, -- 'pending' | 'running' | 'succeeded' | 'failed'
  worker_id TEXT,
  input JSONB,
  output JSONB,
  config JSONB,
  context JSONB,
  available_at TIMESTAMP NOT NULL,
  deadline_at TIMESTAMP,
  created_at TIMESTAMP NOT NULL DEFAULT NOW(),
  completed_at TIMESTAMP
);

CREATE INDEX idx_workflow_runs_available ON workflow_runs (available_at)
  WHERE status = 'pending';

step_attempts Table

CREATE TABLE step_attempts (
  id UUID PRIMARY KEY,
  workflow_run_id UUID NOT NULL REFERENCES workflow_runs(id),
  step_name TEXT NOT NULL,
  kind TEXT NOT NULL,
  status TEXT NOT NULL, -- 'running' | 'succeeded' | 'failed'
  config JSONB,
  context JSONB,
  output JSONB,
  error JSONB,
  created_at TIMESTAMP NOT NULL DEFAULT NOW(),
  completed_at TIMESTAMP
);

CREATE INDEX idx_step_attempts_workflow_run ON step_attempts (workflow_run_id);

Performance Characteristics

Latency

  • Workflow start: 1 database write (~1-5ms)
  • Worker claim: 1 database query + 1 update (~2-10ms)
  • Step execution: 2 database writes per step (~2-10ms)

Throughput

  • Single worker: 100-1000 workflows/sec (depending on workflow complexity)
  • Multiple workers: Scales linearly with worker count

Scaling Strategies

  1. Increase worker concurrency: concurrency: 100
  2. Add more workers: Run multiple worker processes
  3. Optimize database: Add indexes, use connection pooling
  4. Reduce step count: Combine related operations

Comparison to Other Systems

vs. Temporal

  • OpenWorkflow: Worker-driven, no separate server, database as queue
  • Temporal: Server-driven, separate history service, gRPC communication

vs. Inngest

  • OpenWorkflow: Self-hosted, PostgreSQL backend, open-source
  • Inngest: Cloud-hosted, managed service, proprietary

vs. BullMQ

  • OpenWorkflow: Workflow orchestration with steps and memoization
  • BullMQ: Job queue with retries but no built-in workflow concepts

Next Steps