Core Concepts
Understanding workflows, steps, workers, and the execution model in OpenWorkflow
Core Concepts
This guide explains the fundamental concepts that power OpenWorkflow: workflows, steps, workers, backends, and the execution model.
Workflows
A workflow is a durable function that orchestrates multiple steps. Workflows can:
- Make external API calls
- Query databases
- Perform complex business logic
- Pause and resume across process restarts
- Run for seconds, minutes, hours, or even days
Defining a Workflow
const myWorkflow = ow.defineWorkflow(
{ name: "my-workflow" },
async ({ input, step }) => {
// Your workflow logic here
return result;
},
);Workflow Properties
- Durable: Survives crashes, restarts, and deploys
- Deterministic: Must produce the same result given the same input
- Resumable: Continues from the last completed step after interruption
- Versioned: Can be versioned to handle code changes safely
The Workflow Function
The workflow function receives a context object with:
input: The input data passed when starting the workflowstep: The step API for creating checkpointsversion: The workflow version (for versioning support)
async ({ input, step, version }) => {
// Access input
const userId = input.userId;
// Create steps
const user = await step.run({ name: "fetch-user" }, async () => {
return await db.users.findOne({ id: userId });
});
return { user };
}Steps
Steps are the building blocks of workflows. Each step represents a checkpoint in your workflow's execution.
Why Steps Matter
Dangerous Without Steps
Without steps, crashes can cause duplicate charges, double emails, or data inconsistencies.
Imagine this scenario without steps:
// ⚠️ Without steps - dangerous!
const workflow = async ({ input }) => {
await stripe.charges.create({ amount: 1000 }); // Charge customer
await email.send({ to: user.email }); // Send confirmation
};If the worker crashes after charging the customer, the workflow retries from the beginning and charges them twice!
Safe With Steps
Steps provide automatic memoization, ensuring each operation executes exactly once.
With steps:
// ✅ With steps - safe!
const workflow = async ({ input, step }) => {
await step.run({ name: "charge" }, async () => {
return await stripe.charges.create({ amount: 1000 });
});
await step.run({ name: "send-email" }, async () => {
return await email.send({ to: user.email });
});
};Now if the worker crashes after the charge, the retry skips the charge step (cached) and goes straight to sending the email.
Step Execution
Each step:
- Creates a record in the database with status
running - Executes the function synchronously
- Stores the result in the database
- Updates status to
succeededorfailed
Step Memoization
When a workflow replays (after a crash or restart):
- Completed steps return their cached result instantly without re-execution
- New steps execute normally and are added to the history
- Failed steps re-execute with retry logic
The step.run() API
const result = await step.run(
{ name: "step-name" },
async () => {
// This function runs once
// If the workflow restarts, this returns the cached result
return await someWork();
}
);Important Rules
- Step names must be unique within a workflow
- Step names must be deterministic (no dynamic names like
step-${Date.now()}) - Step functions should be idempotent when possible
- Steps execute synchronously in the worker
Workers
Workers are long-running processes that execute workflows. They are the execution engine of OpenWorkflow.
Worker Responsibilities
Workers are responsible for:
- Polling the backend for pending workflows
- Claiming workflow runs using atomic database operations
- Executing workflow code using deterministic replay
- Heartbeating to maintain their claim on active runs
- Managing concurrency to process multiple workflows simultaneously
- Handling errors and implementing retry logic
Creating and Starting a Worker
const worker = ow.newWorker({ concurrency: 10 });
await worker.start();Worker Concurrency
The concurrency option controls how many workflows a worker can execute simultaneously:
const worker = ow.newWorker({ concurrency: 20 });- Each workflow occupies one concurrency slot
- Higher concurrency = more workflows processed in parallel
- Start with
10and adjust based on your workload - Multiple workers can run simultaneously for scale
Worker Lifecycle
// Start the worker
await worker.start();
console.log("Worker is running");
// Stop gracefully (waits for active workflows to complete)
await worker.stop();
console.log("Worker stopped");Graceful Shutdown
Workers support graceful shutdown to prevent data loss during deploys:
process.on("SIGTERM", async () => {
console.log("Received SIGTERM, shutting down gracefully...");
await worker.stop(); // Waits for in-flight workflows
await backend.stop();
process.exit(0);
});When stop() is called:
- Worker stops polling for new workflows
- Worker waits for all active workflows to complete
- Worker exits cleanly
Heartbeats and Fault Tolerance
Workers maintain their claim on workflows through heartbeats:
- When a worker claims a workflow, it sets
availableAt = NOW() + leaseDuration - The worker periodically updates this timestamp (heartbeat)
- If a worker crashes, heartbeats stop
- The
availableAttimestamp expires - Another worker can now claim the workflow and resume it
This mechanism provides automatic crash recovery without manual intervention.
Backend
The backend is the persistence layer that stores all workflow state. It serves two purposes:
- Job queue: Workers poll the backend for pending workflows
- State store: All workflow runs and step attempts are persisted
Backend Interface
OpenWorkflow defines a Backend interface that can be implemented for different databases:
interface Backend {
// Workflow Runs
createWorkflowRun(params: CreateWorkflowRunParams): Promise<WorkflowRun>;
getWorkflowRun(params: GetWorkflowRunParams): Promise<WorkflowRun | null>;
claimWorkflowRun(params: ClaimWorkflowRunParams): Promise<WorkflowRun | null>;
heartbeatWorkflowRun(params: HeartbeatWorkflowRunParams): Promise<void>;
markWorkflowRunSucceeded(params: MarkWorkflowRunSucceededParams): Promise<void>;
markWorkflowRunFailed(params: MarkWorkflowRunFailedParams): Promise<void>;
// Step Attempts
listStepAttempts(params: ListStepAttemptsParams): Promise<StepAttempt[]>;
createStepAttempt(params: CreateStepAttemptParams): Promise<StepAttempt>;
markStepAttemptSucceeded(params: MarkStepAttemptSucceededParams): Promise<void>;
markStepAttemptFailed(params: MarkStepAttemptFailedParams): Promise<void>;
}PostgreSQL Backend
Currently, OpenWorkflow supports PostgreSQL:
import { BackendPostgres } from "@openworkflow/backend-postgres";
const backend = await BackendPostgres.connect(
"postgresql://user:pass@localhost:5432/db"
);The backend automatically creates two tables:
workflow_runs: Stores workflow execution statestep_attempts: Stores individual step results
Database as Queue
The database serves as a distributed queue using atomic operations:
-- Workers claim workflows atomically
SELECT * FROM workflow_runs
WHERE status = 'pending'
AND available_at <= NOW()
ORDER BY available_at ASC
LIMIT 1
FOR UPDATE SKIP LOCKED;This ensures:
- No two workers claim the same workflow
- Workers can poll concurrently without conflicts
- Failed workers release their claims automatically (via heartbeat timeout)
Client
The OpenWorkflow client is the entry point for defining workflows and creating workflow runs.
Creating a Client
import { OpenWorkflow } from "openworkflow";
import { BackendPostgres } from "@openworkflow/backend-postgres";
const backend = await BackendPostgres.connect(process.env.DATABASE_URL!);
const ow = new OpenWorkflow({ backend });Defining Workflows
const myWorkflow = ow.defineWorkflow(
{ name: "my-workflow" },
async ({ input, step }) => {
return { result: "done" };
},
);Running Workflows
// Start a workflow run
const runHandle = await myWorkflow.run({ userId: "123" });
// Get the workflow run ID
console.log(runHandle.workflowRun.id);
// Wait for the result (optional)
const result = await runHandle.result();Creating Workers
The client can create workers that have access to all registered workflows:
const worker = ow.newWorker({ concurrency: 10 });
await worker.start();Workflow Run Lifecycle
A workflow run progresses through several states:
Statuses
| Status | Description |
|---|---|
pending | Workflow created, waiting for a worker to claim it |
running | Worker is actively executing the workflow |
succeeded | Workflow completed successfully |
failed | Workflow failed and all retries exhausted |
Lifecycle Flow
pending → running → succeeded
↘ failedTransitions
- Create: Your app creates a workflow run → status:
pending - Claim: A worker claims the run → status:
running - Execute: Worker executes steps
- Complete:
- Success → status:
succeeded - Failure → status:
failed
- Success → status:
The availableAt Timestamp
The availableAt field controls when a workflow is visible to workers:
- Immediate execution:
availableAt = NOW() - Delayed execution:
availableAt = NOW() + delay - Retry backoff:
availableAt = NOW() + exponentialBackoff - Heartbeat:
availableAt = NOW() + leaseDuration
This single field enables:
- Scheduling
- Retries with backoff
- Heartbeat mechanism
- Crash recovery
Step Attempt Lifecycle
Step attempts have their own lifecycle:
Statuses
| Status | Description |
|---|---|
running | Step is currently executing |
succeeded | Step completed successfully |
failed | Step execution failed |
Lifecycle Flow
running → succeeded
↘ failedDeterministic Replay
OpenWorkflow uses deterministic replay to achieve durability:
The Replay Model
When a worker claims a workflow:
- Load history: Fetch all completed step attempts
- Execute from start: Run the workflow function from the beginning
- Memoize steps: Return cached results for completed steps
- Execute new steps: Run new steps and persist results
- Continue: Repeat until workflow completes
Example
const workflow = async ({ input, step }) => {
// First execution: runs and caches result
const user = await step.run({ name: "fetch-user" }, async () => {
return await db.users.findOne({ id: input.userId });
});
// [Worker crashes here]
// Second execution (replay): returns cached result instantly
const user = await step.run({ name: "fetch-user" }, async () => {
return await db.users.findOne({ id: input.userId });
});
// New step: executes normally
await step.run({ name: "send-email" }, async () => {
return await email.send({ to: user.email });
});
};Why Deterministic?
Workflows must be deterministic because:
- The same input must produce the same execution path
- Step names must appear in the same order on replay
- Side effects must only occur in steps (not in workflow code)
Rules for Deterministic Workflows
✅ Do
- Put all side effects inside
step.run() - Use deterministic step names
- Use the input to make decisions
- Use step results to make decisions
❌ Don't
- Use
Date.now()orMath.random()outside of steps - Make API calls outside of steps
- Use dynamic step names like
step-${Date.now()} - Change step order based on external state
Workflow Organization
Organize your workflows in separate files for better maintainability:
Putting It All Together
Here's how all the concepts work together:
// 1. Set up the client with a backend
const backend = await BackendPostgres.connect(process.env.DATABASE_URL!);
const ow = new OpenWorkflow({ backend });
// 2. Define a workflow with steps
const processOrder = ow.defineWorkflow(
{ name: "process-order" },
async ({ input, step }) => {
// Each step is a checkpoint
const payment = await step.run({ name: "charge-card" }, async () => {
return await stripe.charges.create({ amount: input.amount });
});
const shipment = await step.run({ name: "ship-order" }, async () => {
return await shipping.create({ orderId: input.orderId });
});
return { payment, shipment };
},
);
// 3. Start a worker to execute workflows
const worker = ow.newWorker({ concurrency: 10 });
await worker.start();
// 4. Run the workflow from your app
const run = await processOrder.run({ orderId: "123", amount: 5000 });
// 5. Optionally wait for the result
const result = await run.result();
console.log(result); // { payment: {...}, shipment: {...} }