OpenWorkflow

API Reference

Complete API documentation for OpenWorkflow classes, methods, and types

API Reference

Complete API documentation for all OpenWorkflow components.

OpenWorkflow Client

The main client class for defining workflows and creating workers.

Constructor

constructor(options: OpenWorkflowOptions)

Creates a new OpenWorkflow client instance.

Parameters:

  • options.backend (Backend) - The backend implementation to use (e.g., BackendPostgres)

Example:

import { OpenWorkflow } from "openworkflow";
import { BackendPostgres } from "@openworkflow/backend-postgres";

const backend = await BackendPostgres.connect(process.env.DATABASE_URL);
const ow = new OpenWorkflow({ backend });

Methods

defineWorkflow()

defineWorkflow<Input, Output>(
  config: WorkflowDefinitionConfig,
  fn: WorkflowFunction<Input, Output>
): WorkflowDefinition<Input, Output>

Defines and registers a new workflow.

Type Parameters:

  • Input - The type of the workflow input
  • Output - The type of the workflow output

Parameters:

  • config.name (string) - The unique name of the workflow
  • fn - The workflow function that defines the workflow's logic

Returns:

A WorkflowDefinition instance that can be used to start workflow runs.

Example:

const myWorkflow = ow.defineWorkflow(
  { name: "my-workflow" },
  async ({ input, step }) => {
    return { result: "done" };
  }
);

newWorker()

newWorker(options?: WorkerOptions): Worker

Creates a new worker with this client's backend and registered workflows.

Parameters:

  • options.concurrency (number, optional) - Maximum number of concurrent workflow executions (default: 1)

Returns:

A Worker instance.

Example:

const worker = ow.newWorker({ concurrency: 10 });
await worker.start();

WorkflowDefinition

Represents a defined workflow that can be started.

Properties

  • name (string, readonly) - The workflow's name
  • fn (WorkflowFunction<Input, Output>, readonly) - The workflow function

Methods

run()

run(
  input?: Input,
  options?: WorkflowRunOptions
): Promise<WorkflowRunHandle<Output>>

Starts a new workflow run.

Parameters:

  • input (Input, optional) - The input data for the workflow
  • options (WorkflowRunOptions, optional) - Configuration options
    • deadlineAt (Date, optional) - Deadline for workflow completion

Returns:

A WorkflowRunHandle that can be used to wait for the result.

Example:

// Fire and forget
const runHandle = await myWorkflow.run({ userId: "123" });

// With deadline
const runHandle = await myWorkflow.run(
  { userId: "123" },
  { deadlineAt: new Date(Date.now() + 3600000) } // 1 hour
);

// Wait for result
const result = await runHandle.result();

WorkflowRunHandle

Represents a started workflow run. Returned from workflowDef.run().

Properties

  • workflowRun (WorkflowRun, readonly) - The workflow run record from the backend

Methods

result()

result(): Promise<Output>

Waits for the workflow run to complete and returns its result. Polls the database until the workflow reaches a terminal state (succeeded or failed).

Returns:

The workflow's output value.

Throws:

  • Error if the workflow fails
  • Error if the result times out (default: 5 minutes)

Example:

const runHandle = await myWorkflow.run({ userId: "123" });
const result = await runHandle.result();
console.log(result); // { userId: "123", ... }

WorkflowFunction

The function signature for defining workflow logic.

Type

type WorkflowFunction<Input, Output> = (
  params: WorkflowFunctionParams<Input>
) => Promise<Output> | Output

Parameters:

  • params.input (Input) - The input data passed to the workflow
  • params.step (StepApi) - The step API for creating checkpoints

Returns:

The workflow's output (can be synchronous or asynchronous).

Example:

const workflow = ow.defineWorkflow<MyInput, MyOutput>(
  { name: "my-workflow" },
  async ({ input, step }) => {
    // Use input
    const userId = input.userId;

    // Create steps
    const user = await step.run({ name: "fetch-user" }, async () => {
      return await db.users.findOne({ id: userId });
    });

    return { user };
  }
);

StepApi

The API for defining steps within a workflow.

Methods

run()

run<Output>(
  config: StepFunctionConfig,
  fn: StepFunction<Output>
): Promise<Output>

Executes a step with memoization. If the step has already been completed in a previous execution, returns the cached result without re-executing the function.

Type Parameters:

  • Output - The type of the step's return value

Parameters:

  • config.name (string) - The unique name of the step within the workflow
  • fn (StepFunction<Output>) - The function to execute

Returns:

The step's output value.

Example:

const result = await step.run({ name: "fetch-data" }, async () => {
  return await api.getData();
});

Important Rules:

  • Step names must be unique within a workflow
  • Step names must be deterministic (no step-${Date.now()})
  • Step functions should be idempotent when possible
  • Steps execute synchronously in the worker

StepFunction

The function signature for step logic.

Type

type StepFunction<Output> = () =>
  | Promise<Output | undefined>
  | Output
  | undefined

Returns:

The step's output (can be synchronous or asynchronous, can be undefined).

Example:

// Async step
await step.run({ name: "async-step" }, async () => {
  return await someAsyncWork();
});

// Sync step
await step.run({ name: "sync-step" }, () => {
  return someValue;
});

// Void step
await step.run({ name: "void-step" }, async () => {
  await doSomething();
  // returns undefined (converted to null)
});

Worker

The execution engine that polls for and executes workflows.

Constructor

constructor(options: WorkerOptions)

Parameters:

  • options.backend (Backend) - The backend to poll
  • options.workflows (WorkflowDefinition[]) - The workflows to execute
  • options.concurrency (number, optional) - Maximum concurrent executions (default: 1)

Note: Typically created using ow.newWorker() instead of directly constructing.

Methods

start()

start(): Promise<void>

Starts the worker. It begins polling for and executing workflows.

Example:

await worker.start();
console.log("Worker is running");

stop()

stop(): Promise<void>

Stops the worker gracefully. Waits for all active workflow runs to complete before returning.

Example:

await worker.stop();
console.log("Worker stopped");

Graceful Shutdown Pattern:

process.on("SIGTERM", async () => {
  await worker.stop();
  await backend.stop();
  process.exit(0);
});

tick()

tick(): Promise<number>

Processes one round of work claims and execution. Primarily used for testing.

Returns:

The number of workflow runs claimed.

Backend Interface

The interface that all backend implementations must implement.

Methods

Workflow Run Methods

createWorkflowRun(params: CreateWorkflowRunParams): Promise<WorkflowRun>
getWorkflowRun(params: GetWorkflowRunParams): Promise<WorkflowRun | null>
claimWorkflowRun(params: ClaimWorkflowRunParams): Promise<WorkflowRun | null>
heartbeatWorkflowRun(params: HeartbeatWorkflowRunParams): Promise<void>
markWorkflowRunSucceeded(params: MarkWorkflowRunSucceededParams): Promise<void>
markWorkflowRunFailed(params: MarkWorkflowRunFailedParams): Promise<void>

Step Attempt Methods

listStepAttempts(params: ListStepAttemptsParams): Promise<StepAttempt[]>
createStepAttempt(params: CreateStepAttemptParams): Promise<StepAttempt>
getStepAttempt(params: GetStepAttemptParams): Promise<StepAttempt | null>
markStepAttemptSucceeded(params: MarkStepAttemptSucceededParams): Promise<void>
markStepAttemptFailed(params: MarkStepAttemptFailedParams): Promise<void>

BackendPostgres

PostgreSQL implementation of the Backend interface.

Static Methods

connect()

static connect(
  connectionString: string,
  options?: BackendPostgresOptions
): Promise<BackendPostgres>

Connects to a PostgreSQL database and creates the necessary tables if they don't exist.

Parameters:

  • connectionString (string) - PostgreSQL connection URL
  • options.namespaceId (string, optional) - Namespace for isolating workflows (default: "default")

Returns:

A connected BackendPostgres instance.

Example:

import { BackendPostgres } from "@openworkflow/backend-postgres";

const backend = await BackendPostgres.connect(
  "postgresql://user:pass@localhost:5432/db",
  { namespaceId: "production" }
);

Methods

stop()

stop(): Promise<void>

Closes the database connection pool.

Example:

await backend.stop();

Types

WorkflowRun

Represents a workflow run record from the backend.

interface WorkflowRun {
  id: string;
  workflowName: string;
  version: string | null;
  status: "pending" | "running" | "succeeded" | "failed";
  workerId: string | null;
  config: JsonValue;
  context: JsonValue | null;
  input: JsonValue | null;
  output: JsonValue | null;
  error: JsonValue | null;
  availableAt: Date;
  deadlineAt: Date | null;
  createdAt: Date;
  completedAt: Date | null;
}

StepAttempt

Represents a step attempt record from the backend.

interface StepAttempt {
  id: string;
  workflowRunId: string;
  stepName: string;
  kind: StepKind;
  status: "running" | "succeeded" | "failed";
  config: JsonValue;
  context: JsonValue | null;
  output: JsonValue | null;
  error: JsonValue | null;
  createdAt: Date;
  completedAt: Date | null;
}

StepKind

The type of step.

type StepKind = "run"

Currently only "run" is supported (via step.run()). Future versions may add other step types.

JsonValue

Type for JSON-serializable values.

type JsonPrimitive = string | number | boolean | null;
type JsonValue =
  | JsonPrimitive
  | JsonValue[]
  | { [key: string]: JsonValue };

All workflow inputs, outputs, and step results must be JSON-serializable.

Usage Examples

Basic Workflow

import { OpenWorkflow } from "openworkflow";
import { BackendPostgres } from "@openworkflow/backend-postgres";

const backend = await BackendPostgres.connect(process.env.DATABASE_URL);
const ow = new OpenWorkflow({ backend });

const greet = ow.defineWorkflow(
  { name: "greet" },
  async ({ input, step }) => {
    const greeting = await step.run({ name: "generate" }, () => {
      return `Hello, ${input.name}!`;
    });
    return { greeting };
  }
);

const worker = ow.newWorker();
await worker.start();

const result = await greet.run({ name: "Alice" });
console.log(await result.result()); // { greeting: "Hello, Alice!" }

Typed Workflow

interface SendEmailInput {
  userId: string;
  subject: string;
}

interface SendEmailOutput {
  emailId: string;
  sentAt: Date;
}

const sendEmail = ow.defineWorkflow<SendEmailInput, SendEmailOutput>(
  { name: "send-email" },
  async ({ input, step }) => {
    const user = await step.run({ name: "fetch-user" }, async () => {
      return await db.users.findOne({ id: input.userId });
    });

    const emailId = await step.run({ name: "send" }, async () => {
      return await emailService.send({
        to: user.email,
        subject: input.subject,
      });
    });

    return { emailId, sentAt: new Date() };
  }
);

Parallel Steps

const workflow = ow.defineWorkflow(
  { name: "parallel-example" },
  async ({ input, step }) => {
    const [user, settings, subscription] = await Promise.all([
      step.run({ name: "user" }, () => db.users.findOne()),
      step.run({ name: "settings" }, () => db.settings.findOne()),
      step.run({ name: "subscription" }, () => stripe.subscriptions.retrieve()),
    ]);

    return { user, settings, subscription };
  }
);

Error Handling

const workflow = ow.defineWorkflow(
  { name: "with-error-handling" },
  async ({ input, step }) => {
    try {
      const result = await step.run({ name: "risky-operation" }, async () => {
        return await riskyAPI.call();
      });
      return { success: true, result };
    } catch (error) {
      // Handle the error
      await step.run({ name: "log-error" }, async () => {
        await logger.error(error);
      });
      throw error; // Re-throw to mark workflow as failed
    }
  }
);

Constants

Default Values

// Client
DEFAULT_RESULT_POLL_INTERVAL_MS = 1000  // 1 second
DEFAULT_RESULT_TIMEOUT_MS = 300000      // 5 minutes

// Worker
DEFAULT_LEASE_DURATION_MS = 30000       // 30 seconds
DEFAULT_POLL_INTERVAL_MS = 100          // 100ms
DEFAULT_CONCURRENCY = 1

// Backend
DEFAULT_NAMESPACE_ID = "default"

Next Steps