API Reference
Complete API documentation for OpenWorkflow classes, methods, and types
API Reference
Complete API documentation for all OpenWorkflow components.
OpenWorkflow Client
The main client class for defining workflows and creating workers.
Constructor
constructor(options: OpenWorkflowOptions)Creates a new OpenWorkflow client instance.
Parameters:
options.backend(Backend) - The backend implementation to use (e.g.,BackendPostgres)
Example:
import { OpenWorkflow } from "openworkflow";
import { BackendPostgres } from "@openworkflow/backend-postgres";
const backend = await BackendPostgres.connect(process.env.DATABASE_URL);
const ow = new OpenWorkflow({ backend });Methods
defineWorkflow()
defineWorkflow<Input, Output>(
config: WorkflowDefinitionConfig,
fn: WorkflowFunction<Input, Output>
): WorkflowDefinition<Input, Output>Defines and registers a new workflow.
Type Parameters:
Input- The type of the workflow inputOutput- The type of the workflow output
Parameters:
config.name(string) - The unique name of the workflowfn- The workflow function that defines the workflow's logic
Returns:
A WorkflowDefinition instance that can be used to start workflow runs.
Example:
const myWorkflow = ow.defineWorkflow(
{ name: "my-workflow" },
async ({ input, step }) => {
return { result: "done" };
}
);newWorker()
newWorker(options?: WorkerOptions): WorkerCreates a new worker with this client's backend and registered workflows.
Parameters:
options.concurrency(number, optional) - Maximum number of concurrent workflow executions (default: 1)
Returns:
A Worker instance.
Example:
const worker = ow.newWorker({ concurrency: 10 });
await worker.start();WorkflowDefinition
Represents a defined workflow that can be started.
Properties
name(string, readonly) - The workflow's namefn(WorkflowFunction<Input, Output>, readonly) - The workflow function
Methods
run()
run(
input?: Input,
options?: WorkflowRunOptions
): Promise<WorkflowRunHandle<Output>>Starts a new workflow run.
Parameters:
input(Input, optional) - The input data for the workflowoptions(WorkflowRunOptions, optional) - Configuration optionsdeadlineAt(Date, optional) - Deadline for workflow completion
Returns:
A WorkflowRunHandle that can be used to wait for the result.
Example:
// Fire and forget
const runHandle = await myWorkflow.run({ userId: "123" });
// With deadline
const runHandle = await myWorkflow.run(
{ userId: "123" },
{ deadlineAt: new Date(Date.now() + 3600000) } // 1 hour
);
// Wait for result
const result = await runHandle.result();WorkflowRunHandle
Represents a started workflow run. Returned from workflowDef.run().
Properties
workflowRun(WorkflowRun, readonly) - The workflow run record from the backend
Methods
result()
result(): Promise<Output>Waits for the workflow run to complete and returns its result. Polls the database until the workflow reaches a terminal state (succeeded or failed).
Returns:
The workflow's output value.
Throws:
Errorif the workflow failsErrorif the result times out (default: 5 minutes)
Example:
const runHandle = await myWorkflow.run({ userId: "123" });
const result = await runHandle.result();
console.log(result); // { userId: "123", ... }WorkflowFunction
The function signature for defining workflow logic.
Type
type WorkflowFunction<Input, Output> = (
params: WorkflowFunctionParams<Input>
) => Promise<Output> | OutputParameters:
params.input(Input) - The input data passed to the workflowparams.step(StepApi) - The step API for creating checkpoints
Returns:
The workflow's output (can be synchronous or asynchronous).
Example:
const workflow = ow.defineWorkflow<MyInput, MyOutput>(
{ name: "my-workflow" },
async ({ input, step }) => {
// Use input
const userId = input.userId;
// Create steps
const user = await step.run({ name: "fetch-user" }, async () => {
return await db.users.findOne({ id: userId });
});
return { user };
}
);StepApi
The API for defining steps within a workflow.
Methods
run()
run<Output>(
config: StepFunctionConfig,
fn: StepFunction<Output>
): Promise<Output>Executes a step with memoization. If the step has already been completed in a previous execution, returns the cached result without re-executing the function.
Type Parameters:
Output- The type of the step's return value
Parameters:
config.name(string) - The unique name of the step within the workflowfn(StepFunction<Output>) - The function to execute
Returns:
The step's output value.
Example:
const result = await step.run({ name: "fetch-data" }, async () => {
return await api.getData();
});Important Rules:
- Step names must be unique within a workflow
- Step names must be deterministic (no
step-${Date.now()}) - Step functions should be idempotent when possible
- Steps execute synchronously in the worker
StepFunction
The function signature for step logic.
Type
type StepFunction<Output> = () =>
| Promise<Output | undefined>
| Output
| undefinedReturns:
The step's output (can be synchronous or asynchronous, can be undefined).
Example:
// Async step
await step.run({ name: "async-step" }, async () => {
return await someAsyncWork();
});
// Sync step
await step.run({ name: "sync-step" }, () => {
return someValue;
});
// Void step
await step.run({ name: "void-step" }, async () => {
await doSomething();
// returns undefined (converted to null)
});Worker
The execution engine that polls for and executes workflows.
Constructor
constructor(options: WorkerOptions)Parameters:
options.backend(Backend) - The backend to polloptions.workflows(WorkflowDefinition[]) - The workflows to executeoptions.concurrency(number, optional) - Maximum concurrent executions (default: 1)
Note: Typically created using ow.newWorker() instead of directly constructing.
Methods
start()
start(): Promise<void>Starts the worker. It begins polling for and executing workflows.
Example:
await worker.start();
console.log("Worker is running");stop()
stop(): Promise<void>Stops the worker gracefully. Waits for all active workflow runs to complete before returning.
Example:
await worker.stop();
console.log("Worker stopped");Graceful Shutdown Pattern:
process.on("SIGTERM", async () => {
await worker.stop();
await backend.stop();
process.exit(0);
});tick()
tick(): Promise<number>Processes one round of work claims and execution. Primarily used for testing.
Returns:
The number of workflow runs claimed.
Backend Interface
The interface that all backend implementations must implement.
Methods
Workflow Run Methods
createWorkflowRun(params: CreateWorkflowRunParams): Promise<WorkflowRun>
getWorkflowRun(params: GetWorkflowRunParams): Promise<WorkflowRun | null>
claimWorkflowRun(params: ClaimWorkflowRunParams): Promise<WorkflowRun | null>
heartbeatWorkflowRun(params: HeartbeatWorkflowRunParams): Promise<void>
markWorkflowRunSucceeded(params: MarkWorkflowRunSucceededParams): Promise<void>
markWorkflowRunFailed(params: MarkWorkflowRunFailedParams): Promise<void>Step Attempt Methods
listStepAttempts(params: ListStepAttemptsParams): Promise<StepAttempt[]>
createStepAttempt(params: CreateStepAttemptParams): Promise<StepAttempt>
getStepAttempt(params: GetStepAttemptParams): Promise<StepAttempt | null>
markStepAttemptSucceeded(params: MarkStepAttemptSucceededParams): Promise<void>
markStepAttemptFailed(params: MarkStepAttemptFailedParams): Promise<void>BackendPostgres
PostgreSQL implementation of the Backend interface.
Static Methods
connect()
static connect(
connectionString: string,
options?: BackendPostgresOptions
): Promise<BackendPostgres>Connects to a PostgreSQL database and creates the necessary tables if they don't exist.
Parameters:
connectionString(string) - PostgreSQL connection URLoptions.namespaceId(string, optional) - Namespace for isolating workflows (default: "default")
Returns:
A connected BackendPostgres instance.
Example:
import { BackendPostgres } from "@openworkflow/backend-postgres";
const backend = await BackendPostgres.connect(
"postgresql://user:pass@localhost:5432/db",
{ namespaceId: "production" }
);Methods
stop()
stop(): Promise<void>Closes the database connection pool.
Example:
await backend.stop();Types
WorkflowRun
Represents a workflow run record from the backend.
interface WorkflowRun {
id: string;
workflowName: string;
version: string | null;
status: "pending" | "running" | "succeeded" | "failed";
workerId: string | null;
config: JsonValue;
context: JsonValue | null;
input: JsonValue | null;
output: JsonValue | null;
error: JsonValue | null;
availableAt: Date;
deadlineAt: Date | null;
createdAt: Date;
completedAt: Date | null;
}StepAttempt
Represents a step attempt record from the backend.
interface StepAttempt {
id: string;
workflowRunId: string;
stepName: string;
kind: StepKind;
status: "running" | "succeeded" | "failed";
config: JsonValue;
context: JsonValue | null;
output: JsonValue | null;
error: JsonValue | null;
createdAt: Date;
completedAt: Date | null;
}StepKind
The type of step.
type StepKind = "run"Currently only "run" is supported (via step.run()). Future versions may add other step types.
JsonValue
Type for JSON-serializable values.
type JsonPrimitive = string | number | boolean | null;
type JsonValue =
| JsonPrimitive
| JsonValue[]
| { [key: string]: JsonValue };All workflow inputs, outputs, and step results must be JSON-serializable.
Usage Examples
Basic Workflow
import { OpenWorkflow } from "openworkflow";
import { BackendPostgres } from "@openworkflow/backend-postgres";
const backend = await BackendPostgres.connect(process.env.DATABASE_URL);
const ow = new OpenWorkflow({ backend });
const greet = ow.defineWorkflow(
{ name: "greet" },
async ({ input, step }) => {
const greeting = await step.run({ name: "generate" }, () => {
return `Hello, ${input.name}!`;
});
return { greeting };
}
);
const worker = ow.newWorker();
await worker.start();
const result = await greet.run({ name: "Alice" });
console.log(await result.result()); // { greeting: "Hello, Alice!" }Typed Workflow
interface SendEmailInput {
userId: string;
subject: string;
}
interface SendEmailOutput {
emailId: string;
sentAt: Date;
}
const sendEmail = ow.defineWorkflow<SendEmailInput, SendEmailOutput>(
{ name: "send-email" },
async ({ input, step }) => {
const user = await step.run({ name: "fetch-user" }, async () => {
return await db.users.findOne({ id: input.userId });
});
const emailId = await step.run({ name: "send" }, async () => {
return await emailService.send({
to: user.email,
subject: input.subject,
});
});
return { emailId, sentAt: new Date() };
}
);Parallel Steps
const workflow = ow.defineWorkflow(
{ name: "parallel-example" },
async ({ input, step }) => {
const [user, settings, subscription] = await Promise.all([
step.run({ name: "user" }, () => db.users.findOne()),
step.run({ name: "settings" }, () => db.settings.findOne()),
step.run({ name: "subscription" }, () => stripe.subscriptions.retrieve()),
]);
return { user, settings, subscription };
}
);Error Handling
const workflow = ow.defineWorkflow(
{ name: "with-error-handling" },
async ({ input, step }) => {
try {
const result = await step.run({ name: "risky-operation" }, async () => {
return await riskyAPI.call();
});
return { success: true, result };
} catch (error) {
// Handle the error
await step.run({ name: "log-error" }, async () => {
await logger.error(error);
});
throw error; // Re-throw to mark workflow as failed
}
}
);Constants
Default Values
// Client
DEFAULT_RESULT_POLL_INTERVAL_MS = 1000 // 1 second
DEFAULT_RESULT_TIMEOUT_MS = 300000 // 5 minutes
// Worker
DEFAULT_LEASE_DURATION_MS = 30000 // 30 seconds
DEFAULT_POLL_INTERVAL_MS = 100 // 100ms
DEFAULT_CONCURRENCY = 1
// Backend
DEFAULT_NAMESPACE_ID = "default"