OpenWorkflow

Core Concepts

Understanding workflows, steps, workers, and the execution model in OpenWorkflow

Core Concepts

This guide explains the fundamental concepts that power OpenWorkflow: workflows, steps, workers, backends, and the execution model.

Workflows

A workflow is a durable function that orchestrates multiple steps. Workflows can:

  • Make external API calls
  • Query databases
  • Perform complex business logic
  • Pause and resume across process restarts
  • Run for seconds, minutes, hours, or even days

Defining a Workflow

const myWorkflow = ow.defineWorkflow(
  { name: "my-workflow" },
  async ({ input, step }) => {
    // Your workflow logic here
    return result;
  },
);

Workflow Properties

  • Durable: Survives crashes, restarts, and deploys
  • Deterministic: Must produce the same result given the same input
  • Resumable: Continues from the last completed step after interruption
  • Versioned: Can be versioned to handle code changes safely

The Workflow Function

The workflow function receives a context object with:

  • input: The input data passed when starting the workflow
  • step: The step API for creating checkpoints
  • version: The workflow version (for versioning support)
async ({ input, step, version }) => {
  // Access input
  const userId = input.userId;

  // Create steps
  const user = await step.run({ name: "fetch-user" }, async () => {
    return await db.users.findOne({ id: userId });
  });

  return { user };
}

Steps

Steps are the building blocks of workflows. Each step represents a checkpoint in your workflow's execution.

Why Steps Matter

Dangerous Without Steps

Without steps, crashes can cause duplicate charges, double emails, or data inconsistencies.

Imagine this scenario without steps:

// ⚠️ Without steps - dangerous!
const workflow = async ({ input }) => {
  await stripe.charges.create({ amount: 1000 }); // Charge customer
  await email.send({ to: user.email });          // Send confirmation
};

If the worker crashes after charging the customer, the workflow retries from the beginning and charges them twice!

Safe With Steps

Steps provide automatic memoization, ensuring each operation executes exactly once.

With steps:

// ✅ With steps - safe!
const workflow = async ({ input, step }) => {
  await step.run({ name: "charge" }, async () => {
    return await stripe.charges.create({ amount: 1000 });
  });

  await step.run({ name: "send-email" }, async () => {
    return await email.send({ to: user.email });
  });
};

Now if the worker crashes after the charge, the retry skips the charge step (cached) and goes straight to sending the email.

Step Execution

Each step:

  1. Creates a record in the database with status running
  2. Executes the function synchronously
  3. Stores the result in the database
  4. Updates status to succeeded or failed

Step Memoization

When a workflow replays (after a crash or restart):

  • Completed steps return their cached result instantly without re-execution
  • New steps execute normally and are added to the history
  • Failed steps re-execute with retry logic

The step.run() API

const result = await step.run(
  { name: "step-name" },
  async () => {
    // This function runs once
    // If the workflow restarts, this returns the cached result
    return await someWork();
  }
);

Important Rules

  • Step names must be unique within a workflow
  • Step names must be deterministic (no dynamic names like step-${Date.now()})
  • Step functions should be idempotent when possible
  • Steps execute synchronously in the worker

Workers

Workers are long-running processes that execute workflows. They are the execution engine of OpenWorkflow.

Worker Responsibilities

Workers are responsible for:

  • Polling the backend for pending workflows
  • Claiming workflow runs using atomic database operations
  • Executing workflow code using deterministic replay
  • Heartbeating to maintain their claim on active runs
  • Managing concurrency to process multiple workflows simultaneously
  • Handling errors and implementing retry logic

Creating and Starting a Worker

const worker = ow.newWorker({ concurrency: 10 });
await worker.start();

Worker Concurrency

The concurrency option controls how many workflows a worker can execute simultaneously:

const worker = ow.newWorker({ concurrency: 20 });
  • Each workflow occupies one concurrency slot
  • Higher concurrency = more workflows processed in parallel
  • Start with 10 and adjust based on your workload
  • Multiple workers can run simultaneously for scale

Worker Lifecycle

// Start the worker
await worker.start();
console.log("Worker is running");

// Stop gracefully (waits for active workflows to complete)
await worker.stop();
console.log("Worker stopped");

Graceful Shutdown

Workers support graceful shutdown to prevent data loss during deploys:

process.on("SIGTERM", async () => {
  console.log("Received SIGTERM, shutting down gracefully...");
  await worker.stop(); // Waits for in-flight workflows
  await backend.stop();
  process.exit(0);
});

When stop() is called:

  1. Worker stops polling for new workflows
  2. Worker waits for all active workflows to complete
  3. Worker exits cleanly

Heartbeats and Fault Tolerance

Workers maintain their claim on workflows through heartbeats:

  1. When a worker claims a workflow, it sets availableAt = NOW() + leaseDuration
  2. The worker periodically updates this timestamp (heartbeat)
  3. If a worker crashes, heartbeats stop
  4. The availableAt timestamp expires
  5. Another worker can now claim the workflow and resume it

This mechanism provides automatic crash recovery without manual intervention.

Backend

The backend is the persistence layer that stores all workflow state. It serves two purposes:

  1. Job queue: Workers poll the backend for pending workflows
  2. State store: All workflow runs and step attempts are persisted

Backend Interface

OpenWorkflow defines a Backend interface that can be implemented for different databases:

interface Backend {
  // Workflow Runs
  createWorkflowRun(params: CreateWorkflowRunParams): Promise<WorkflowRun>;
  getWorkflowRun(params: GetWorkflowRunParams): Promise<WorkflowRun | null>;
  claimWorkflowRun(params: ClaimWorkflowRunParams): Promise<WorkflowRun | null>;
  heartbeatWorkflowRun(params: HeartbeatWorkflowRunParams): Promise<void>;
  markWorkflowRunSucceeded(params: MarkWorkflowRunSucceededParams): Promise<void>;
  markWorkflowRunFailed(params: MarkWorkflowRunFailedParams): Promise<void>;

  // Step Attempts
  listStepAttempts(params: ListStepAttemptsParams): Promise<StepAttempt[]>;
  createStepAttempt(params: CreateStepAttemptParams): Promise<StepAttempt>;
  markStepAttemptSucceeded(params: MarkStepAttemptSucceededParams): Promise<void>;
  markStepAttemptFailed(params: MarkStepAttemptFailedParams): Promise<void>;
}

PostgreSQL Backend

Currently, OpenWorkflow supports PostgreSQL:

import { BackendPostgres } from "@openworkflow/backend-postgres";

const backend = await BackendPostgres.connect(
  "postgresql://user:pass@localhost:5432/db"
);

The backend automatically creates two tables:

  • workflow_runs: Stores workflow execution state
  • step_attempts: Stores individual step results

Database as Queue

The database serves as a distributed queue using atomic operations:

-- Workers claim workflows atomically
SELECT * FROM workflow_runs
WHERE status = 'pending'
  AND available_at <= NOW()
ORDER BY available_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;

This ensures:

  • No two workers claim the same workflow
  • Workers can poll concurrently without conflicts
  • Failed workers release their claims automatically (via heartbeat timeout)

Client

The OpenWorkflow client is the entry point for defining workflows and creating workflow runs.

Creating a Client

import { OpenWorkflow } from "openworkflow";
import { BackendPostgres } from "@openworkflow/backend-postgres";

const backend = await BackendPostgres.connect(process.env.DATABASE_URL!);
const ow = new OpenWorkflow({ backend });

Defining Workflows

const myWorkflow = ow.defineWorkflow(
  { name: "my-workflow" },
  async ({ input, step }) => {
    return { result: "done" };
  },
);

Running Workflows

// Start a workflow run
const runHandle = await myWorkflow.run({ userId: "123" });

// Get the workflow run ID
console.log(runHandle.workflowRun.id);

// Wait for the result (optional)
const result = await runHandle.result();

Creating Workers

The client can create workers that have access to all registered workflows:

const worker = ow.newWorker({ concurrency: 10 });
await worker.start();

Workflow Run Lifecycle

A workflow run progresses through several states:

Statuses

StatusDescription
pendingWorkflow created, waiting for a worker to claim it
runningWorker is actively executing the workflow
succeededWorkflow completed successfully
failedWorkflow failed and all retries exhausted

Lifecycle Flow

pending → running → succeeded
                  ↘ failed

Transitions

  1. Create: Your app creates a workflow run → status: pending
  2. Claim: A worker claims the run → status: running
  3. Execute: Worker executes steps
  4. Complete:
    • Success → status: succeeded
    • Failure → status: failed

The availableAt Timestamp

The availableAt field controls when a workflow is visible to workers:

  • Immediate execution: availableAt = NOW()
  • Delayed execution: availableAt = NOW() + delay
  • Retry backoff: availableAt = NOW() + exponentialBackoff
  • Heartbeat: availableAt = NOW() + leaseDuration

This single field enables:

  • Scheduling
  • Retries with backoff
  • Heartbeat mechanism
  • Crash recovery

Step Attempt Lifecycle

Step attempts have their own lifecycle:

Statuses

StatusDescription
runningStep is currently executing
succeededStep completed successfully
failedStep execution failed

Lifecycle Flow

running → succeeded
        ↘ failed

Deterministic Replay

OpenWorkflow uses deterministic replay to achieve durability:

The Replay Model

When a worker claims a workflow:

  1. Load history: Fetch all completed step attempts
  2. Execute from start: Run the workflow function from the beginning
  3. Memoize steps: Return cached results for completed steps
  4. Execute new steps: Run new steps and persist results
  5. Continue: Repeat until workflow completes

Example

const workflow = async ({ input, step }) => {
  // First execution: runs and caches result
  const user = await step.run({ name: "fetch-user" }, async () => {
    return await db.users.findOne({ id: input.userId });
  });

  // [Worker crashes here]

  // Second execution (replay): returns cached result instantly
  const user = await step.run({ name: "fetch-user" }, async () => {
    return await db.users.findOne({ id: input.userId });
  });

  // New step: executes normally
  await step.run({ name: "send-email" }, async () => {
    return await email.send({ to: user.email });
  });
};

Why Deterministic?

Workflows must be deterministic because:

  • The same input must produce the same execution path
  • Step names must appear in the same order on replay
  • Side effects must only occur in steps (not in workflow code)

Rules for Deterministic Workflows

✅ Do

  • Put all side effects inside step.run()
  • Use deterministic step names
  • Use the input to make decisions
  • Use step results to make decisions

❌ Don't

  • Use Date.now() or Math.random() outside of steps
  • Make API calls outside of steps
  • Use dynamic step names like step-${Date.now()}
  • Change step order based on external state

Workflow Organization

Organize your workflows in separate files for better maintainability:

send-welcome-email.ts
process-order.ts
sync-user-data.ts
index.ts
worker.ts
client.ts

Putting It All Together

Here's how all the concepts work together:

// 1. Set up the client with a backend
const backend = await BackendPostgres.connect(process.env.DATABASE_URL!);
const ow = new OpenWorkflow({ backend });

// 2. Define a workflow with steps
const processOrder = ow.defineWorkflow(
  { name: "process-order" },
  async ({ input, step }) => {
    // Each step is a checkpoint
    const payment = await step.run({ name: "charge-card" }, async () => {
      return await stripe.charges.create({ amount: input.amount });
    });

    const shipment = await step.run({ name: "ship-order" }, async () => {
      return await shipping.create({ orderId: input.orderId });
    });

    return { payment, shipment };
  },
);

// 3. Start a worker to execute workflows
const worker = ow.newWorker({ concurrency: 10 });
await worker.start();

// 4. Run the workflow from your app
const run = await processOrder.run({ orderId: "123", amount: 5000 });

// 5. Optionally wait for the result
const result = await run.result();
console.log(result); // { payment: {...}, shipment: {...} }

Next Steps