Advanced Features
Parallel execution, type safety, error handling, and advanced workflow patterns
Advanced Features
Learn about advanced OpenWorkflow features including parallel execution, TypeScript type safety, error handling, and sophisticated workflow patterns.
Parallel Step Execution
Run multiple steps concurrently using JavaScript's native Promise.all:
Basic Parallel Execution
const workflow = ow.defineWorkflow(
{ name: "fetch-user-data" },
async ({ input, step }) => {
// Execute all three steps in parallel
const [user, subscription, settings] = await Promise.all([
step.run({ name: "fetch-user" }, async () => {
return await db.users.findOne({ id: input.userId });
}),
step.run({ name: "fetch-subscription" }, async () => {
return await stripe.subscriptions.retrieve(input.subId);
}),
step.run({ name: "fetch-settings" }, async () => {
return await db.settings.findOne({ userId: input.userId });
}),
]);
return { user, subscription, settings };
}
);How It Works
Each step is still memoized individually:
- All three steps start executing concurrently
- Each step creates its own
step_attemptrecord with statusrunning - The worker waits for all to complete
- If the workflow crashes mid-execution:
- Completed steps return instantly on replay (cached)
- In-flight steps re-execute
Nested Parallel Execution
You can combine sequential and parallel execution:
const workflow = ow.defineWorkflow(
{ name: "process-order" },
async ({ input, step }) => {
// Step 1: Validate (sequential)
const validation = await step.run({ name: "validate" }, async () => {
return await validateOrder(input.orderId);
});
// Step 2-4: Process in parallel
const [payment, inventory, notification] = await Promise.all([
step.run({ name: "charge-card" }, async () => {
return await stripe.charges.create({ amount: validation.total });
}),
step.run({ name: "reserve-inventory" }, async () => {
return await inventory.reserve(input.items);
}),
step.run({ name: "notify-warehouse" }, async () => {
return await warehouse.notify({ orderId: input.orderId });
}),
]);
// Step 5: Finalize (sequential)
const confirmation = await step.run({ name: "confirm" }, async () => {
return await db.orders.update(input.orderId, { status: "confirmed" });
});
return { payment, inventory, notification, confirmation };
}
);Error Handling in Parallel Steps
If any step in Promise.all fails, the entire workflow fails:
try {
const [result1, result2] = await Promise.all([
step.run({ name: "step-1" }, async () => {
// This might throw
return await riskyOperation();
}),
step.run({ name: "step-2" }, async () => {
return await safeOperation();
}),
]);
} catch (error) {
// Handle the error
await step.run({ name: "cleanup" }, async () => {
await performCleanup();
});
throw error;
}Type Safety
OpenWorkflow provides full TypeScript support with generic type parameters.
Typed Workflows
Define input and output types for compile-time safety:
interface ProcessOrderInput {
orderId: string;
userId: string;
items: Array<{ sku: string; quantity: number }>;
}
interface ProcessOrderOutput {
orderId: string;
paymentId: string;
shipmentId: string;
total: number;
}
const processOrder = ow.defineWorkflow<ProcessOrderInput, ProcessOrderOutput>(
{ name: "process-order" },
async ({ input, step }) => {
// input is typed as ProcessOrderInput
const orderId = input.orderId; // ✅ Type-safe
const invalid = input.invalidField; // ❌ TypeScript error
// Return type must match ProcessOrderOutput
return {
orderId: input.orderId,
paymentId: "pay_123",
shipmentId: "ship_456",
total: 5000,
// extraField: "invalid" // ❌ TypeScript error
};
}
);
// Usage is also type-safe
const run = await processOrder.run({
orderId: "order_123",
userId: "user_456",
items: [{ sku: "SKU-001", quantity: 2 }],
});
const result = await run.result();
// result is typed as ProcessOrderOutput
console.log(result.paymentId); // ✅ Type-safeTyped Steps
Step outputs are automatically inferred:
const workflow = ow.defineWorkflow(
{ name: "typed-steps" },
async ({ input, step }) => {
// TypeScript infers user: { id: string; email: string }
const user = await step.run({ name: "fetch-user" }, async () => {
return { id: "123", email: "user@example.com" };
});
// Type-safe access
console.log(user.email); // ✅
console.log(user.invalidField); // ❌ TypeScript error
}
);Reusable Type Definitions
Create reusable types for common workflows:
// types.ts
export interface User {
id: string;
email: string;
name: string;
}
export interface SendEmailInput {
userId: string;
template: "welcome" | "confirmation" | "reset";
}
export interface SendEmailOutput {
emailId: string;
sentAt: string;
}
// workflow.ts
import { SendEmailInput, SendEmailOutput, User } from "./types";
const sendEmail = ow.defineWorkflow<SendEmailInput, SendEmailOutput>(
{ name: "send-email" },
async ({ input, step }) => {
const user = await step.run({ name: "fetch-user" }, async (): Promise<User> => {
return await db.users.findOne({ id: input.userId });
});
const emailId = await step.run({ name: "send" }, async () => {
return await emailService.send({
to: user.email,
template: input.template,
});
});
return {
emailId,
sentAt: new Date().toISOString(),
};
}
);Error Handling & Retries
Automatic Retries
When a step throws an error, the entire workflow can be retried automatically with exponential backoff.
const workflow = ow.defineWorkflow(
{ name: "resilient-api-call" },
async ({ input, step }) => {
const data = await step.run({ name: "fetch-external-api" }, async () => {
// If this throws, the workflow will retry automatically
return await externalAPI.getData();
});
return data;
}
);How Retries Work:
- Step throws an error
- Workflow is marked with retry metadata
- Worker sets
availableAt = NOW() + backoff - Another worker picks it up after the backoff period
- Workflow replays from the beginning
- Completed steps return cached results
- Failed step is re-executed
Manual Error Handling
Handle errors explicitly in your workflow logic:
const workflow = ow.defineWorkflow(
{ name: "manual-error-handling" },
async ({ input, step }) => {
try {
const result = await step.run({ name: "risky-operation" }, async () => {
return await riskyAPI.call();
});
return { success: true, result };
} catch (error) {
// Log the error
await step.run({ name: "log-error" }, async () => {
await logger.error("Operation failed", { error });
});
// Attempt fallback
const fallbackResult = await step.run({ name: "fallback" }, async () => {
return await fallbackAPI.call();
});
return { success: true, result: fallbackResult };
}
}
);Compensating Actions
Implement compensating actions for cleanup:
const workflow = ow.defineWorkflow(
{ name: "saga-pattern" },
async ({ input, step }) => {
// Step 1: Charge payment
const payment = await step.run({ name: "charge" }, async () => {
return await stripe.charges.create({ amount: 5000 });
});
try {
// Step 2: Reserve inventory (might fail)
const inventory = await step.run({ name: "reserve" }, async () => {
return await inventory.reserve(input.items);
});
return { payment, inventory };
} catch (error) {
// Compensating action: Refund payment
await step.run({ name: "refund" }, async () => {
await stripe.refunds.create({ charge: payment.id });
});
throw error;
}
}
);Partial Failure Recovery
Handle partial failures gracefully:
const workflow = ow.defineWorkflow(
{ name: "bulk-email" },
async ({ input, step }) => {
const results = [];
for (const userId of input.userIds) {
try {
const result = await step.run(
{ name: `send-email-${userId}` },
async () => {
return await emailService.send({ userId });
}
);
results.push({ userId, success: true, result });
} catch (error) {
// Don't fail the entire workflow if one email fails
results.push({ userId, success: false, error });
}
}
return { results };
}
);Workflow Versioning
Handle code changes safely while workflows are in-flight.
The Problem
Changing step names breaks deterministic replay:
// Old code (deployed with in-flight workflows)
await step.run({ name: "old-step-name" }, ...);
// New code (deployed while old workflows are running)
await step.run({ name: "new-step-name" }, ...);
// ❌ Replay fails: "old-step-name" in history, "new-step-name" in codeSolution: Version Parameter
Use the version parameter for conditional logic:
const workflow = ow.defineWorkflow(
{ name: "versioned-workflow" },
async ({ input, step, version }) => {
if (version === "v1") {
// Old code path
const data = await step.run({ name: "old-step-name" }, async () => {
return await legacyAPI.getData();
});
return { data };
} else {
// New code path (v2)
const data = await step.run({ name: "new-step-name" }, async () => {
return await newAPI.getData();
});
return { data };
}
}
);
// Start workflows with specific versions
await workflow.run({ input: "..." }, { version: "v1" });
await workflow.run({ input: "..." }, { version: "v2" });Best Practices
- Always version breaking changes: Step renames, order changes, removals
- Keep old versions until all in-flight workflows complete
- Use version constants:
const WORKFLOW_V1 = "v1";
const WORKFLOW_V2 = "v2";
if (version === WORKFLOW_V1) {
// ...
}Waiting for Results
Synchronous Wait
Wait for a workflow to complete:
const runHandle = await myWorkflow.run({ userId: "123" });
// Blocks until workflow completes (polls database)
const result = await runHandle.result();
console.log(result);Async Fire-and-Forget
Start a workflow without waiting:
app.post("/users/:id/welcome", async (req, res) => {
// Start the workflow
const runHandle = await sendWelcomeEmail.run({ userId: req.params.id });
// Return immediately
res.json({ workflowRunId: runHandle.workflowRun.id });
});Poll for Status
Check workflow status manually:
const runHandle = await myWorkflow.run({ userId: "123" });
// Poll every 5 seconds
const checkStatus = async () => {
const run = await backend.getWorkflowRun({
workflowRunId: runHandle.workflowRun.id,
});
if (run.status === "succeeded") {
console.log("Workflow completed:", run.output);
} else if (run.status === "failed") {
console.error("Workflow failed:", run.error);
} else {
setTimeout(checkStatus, 5000);
}
};
checkStatus();Workflow Deadlines
Set time limits for workflow completion:
const runHandle = await myWorkflow.run(
{ userId: "123" },
{
deadlineAt: new Date(Date.now() + 3600000), // Must complete in 1 hour
}
);If the deadline is reached:
- New steps are skipped
- The workflow is marked as
failed - No further retries occur
Use Cases:
- Time-sensitive operations (e.g., flash sales)
- SLA enforcement
- Resource cleanup
Dynamic Workflows
Conditional Steps
Create workflows with conditional logic:
const workflow = ow.defineWorkflow(
{ name: "conditional-workflow" },
async ({ input, step }) => {
const user = await step.run({ name: "fetch-user" }, async () => {
return await db.users.findOne({ id: input.userId });
});
if (user.isPremium) {
await step.run({ name: "send-premium-email" }, async () => {
return await email.sendPremium(user);
});
} else {
await step.run({ name: "send-standard-email" }, async () => {
return await email.sendStandard(user);
});
}
return { success: true };
}
);Important: Step names must be unique across all branches. Don't use the same step name in multiple branches.
Loops
Iterate over data:
const workflow = ow.defineWorkflow(
{ name: "bulk-process" },
async ({ input, step }) => {
const results = [];
for (let i = 0; i < input.items.length; i++) {
const item = input.items[i];
// Use unique step names
const result = await step.run({ name: `process-item-${i}` }, async () => {
return await processItem(item);
});
results.push(result);
}
return { results };
}
);Caution: The number of iterations must be deterministic (based on input, not external state).
Workflow Context
Share data across steps without explicit passing:
const workflow = ow.defineWorkflow(
{ name: "context-example" },
async ({ input, step }) => {
// Create a context object
const context = {
userId: input.userId,
startTime: new Date().toISOString(),
};
const user = await step.run({ name: "fetch-user" }, async () => {
return await db.users.findOne({ id: context.userId });
});
const email = await step.run({ name: "send-email" }, async () => {
return await emailService.send({
to: user.email,
metadata: { startTime: context.startTime },
});
});
return { user, email };
}
);Best Practices
✅ Do
- Use descriptive step names:
fetch-usernotstep1 - Keep steps idempotent: Safe to retry multiple times
- Handle errors gracefully: Use try-catch for critical operations
- Use TypeScript types: Catch errors at compile-time
- Test workflow logic: Unit test individual steps
❌ Don't
- Don't use dynamic step names:
step-${Date.now()}breaks replay - Don't make side effects outside steps: Database writes, API calls must be in steps
- Don't rely on external state: Workflows must be deterministic
- Don't use very large inputs/outputs: Keep data reasonable for database storage
Performance Tips
Minimize Step Count
Combine related operations:
// ❌ Too many steps
const user = await step.run({ name: "fetch-user" }, ...);
const email = await step.run({ name: "get-email" }, () => user.email);
const domain = await step.run({ name: "get-domain" }, () => email.split("@")[1]);
// ✅ One step
const userData = await step.run({ name: "fetch-user-data" }, async () => {
const user = await db.users.findOne();
return {
user,
email: user.email,
domain: user.email.split("@")[1],
};
});Use Parallel Execution
Maximize throughput with Promise.all:
// ❌ Sequential (slow)
const user = await step.run({ name: "user" }, ...);
const settings = await step.run({ name: "settings" }, ...);
const subscription = await step.run({ name: "subscription" }, ...);
// ✅ Parallel (fast)
const [user, settings, subscription] = await Promise.all([
step.run({ name: "user" }, ...),
step.run({ name: "settings" }, ...),
step.run({ name: "subscription" }, ...),
]);Optimize Worker Concurrency
Tune concurrency based on your workload:
// Low-latency, many short workflows
const worker = ow.newWorker({ concurrency: 100 });
// High-latency, few long workflows
const worker = ow.newWorker({ concurrency: 10 });