Examples
Real-world OpenWorkflow examples and patterns
Examples
Real-world examples demonstrating common patterns and use cases with OpenWorkflow.
Basic Examples
Hello World
The simplest possible workflow:
import { BackendPostgres } from "@openworkflow/backend-postgres";
import { OpenWorkflow } from "openworkflow";
const backend = await BackendPostgres.connect(process.env.DATABASE_URL!);
const ow = new OpenWorkflow({ backend });
const helloWorld = ow.defineWorkflow(
{ name: "hello-world" },
async ({ input, step }) => {
const greeting = await step.run({ name: "greet" }, () => {
return `Hello, ${input.name}!`;
});
return { greeting };
}
);
// Start worker
const worker = ow.newWorker();
await worker.start();
// Run workflow
const run = await helloWorld.run({ name: "World" });
const result = await run.result();
console.log(result.greeting); // "Hello, World!"Send Welcome Email
A common use case: sending a welcome email to new users.
interface SendWelcomeEmailInput {
userId: string;
}
const sendWelcomeEmail = ow.defineWorkflow<SendWelcomeEmailInput, { success: boolean }>(
{ name: "send-welcome-email" },
async ({ input, step }) => {
// Step 1: Fetch user from database
const user = await step.run({ name: "fetch-user" }, async () => {
return await db.users.findOne({ id: input.userId });
});
// Step 2: Send email
await step.run({ name: "send-email" }, async () => {
return await emailService.send({
to: user.email,
subject: "Welcome to our platform!",
template: "welcome",
data: { name: user.name },
});
});
// Step 3: Mark email as sent
await step.run({ name: "mark-sent" }, async () => {
await db.users.update(input.userId, { welcomeEmailSent: true });
});
return { success: true };
}
);
// Trigger from API route
app.post("/users", async (req, res) => {
const user = await db.users.create(req.body);
// Start workflow asynchronously
await sendWelcomeEmail.run({ userId: user.id });
res.json({ user });
});E-commerce Examples
E-commerce Best Practices
These examples demonstrate critical patterns like parallel execution, compensating transactions, and idempotent payment processing.
Process Order
Complete order processing workflow with payment, inventory, and notifications.
interface ProcessOrderInput {
orderId: string;
userId: string;
items: Array<{ sku: string; quantity: number; price: number }>;
}
interface ProcessOrderOutput {
orderId: string;
paymentId: string;
shipmentId: string;
total: number;
}
const processOrder = ow.defineWorkflow<ProcessOrderInput, ProcessOrderOutput>(
{ name: "process-order" },
async ({ input, step }) => {
// Step 1: Calculate total
const total = await step.run({ name: "calculate-total" }, () => {
return input.items.reduce((sum, item) => sum + item.price * item.quantity, 0);
});
// Step 2: Charge payment
const payment = await step.run({ name: "charge-payment" }, async () => {
return await stripe.charges.create({
amount: total,
currency: "usd",
customer: input.userId,
});
});
// Step 3-5: Parallel operations
const [inventory, shipment, notification] = await Promise.all([
// Reserve inventory
step.run({ name: "reserve-inventory" }, async () => {
return await inventory.reserve(input.items);
}),
// Create shipment
step.run({ name: "create-shipment" }, async () => {
return await shipping.createShipment({
orderId: input.orderId,
items: input.items,
});
}),
// Send confirmation email
step.run({ name: "send-confirmation" }, async () => {
const user = await db.users.findOne({ id: input.userId });
return await email.send({
to: user.email,
template: "order-confirmation",
data: { orderId: input.orderId, total },
});
}),
]);
// Step 6: Update order status
await step.run({ name: "update-order" }, async () => {
await db.orders.update(input.orderId, {
status: "confirmed",
paymentId: payment.id,
shipmentId: shipment.id,
});
});
return {
orderId: input.orderId,
paymentId: payment.id,
shipmentId: shipment.id,
total,
};
}
);Abandoned Cart Recovery
Send reminder emails to users with abandoned carts.
interface AbandonedCartInput {
userId: string;
cartId: string;
}
const abandonedCartRecovery = ow.defineWorkflow<AbandonedCartInput, { emailSent: boolean }>(
{ name: "abandoned-cart-recovery" },
async ({ input, step }) => {
// Wait 1 hour (schedule workflow for future execution)
// This would require scheduling support (coming soon)
// Step 1: Check if cart still exists
const cart = await step.run({ name: "check-cart" }, async () => {
return await db.carts.findOne({ id: input.cartId });
});
if (!cart || cart.items.length === 0) {
return { emailSent: false };
}
// Step 2: Get user
const user = await step.run({ name: "fetch-user" }, async () => {
return await db.users.findOne({ id: input.userId });
});
// Step 3: Send reminder email
await step.run({ name: "send-reminder" }, async () => {
return await email.send({
to: user.email,
template: "abandoned-cart",
data: {
items: cart.items,
total: cart.total,
checkoutUrl: `https://example.com/checkout/${cart.id}`,
},
});
});
return { emailSent: true };
}
);API Integration Examples
Sync User Data
Synchronize user data with external services.
interface SyncUserDataInput {
userId: string;
}
const syncUserData = ow.defineWorkflow<SyncUserDataInput, { synced: string[] }>(
{ name: "sync-user-data" },
async ({ input, step }) => {
// Fetch user
const user = await step.run({ name: "fetch-user" }, async () => {
return await db.users.findOne({ id: input.userId });
});
// Sync to multiple services in parallel
const [hubspot, salesforce, segment] = await Promise.all([
step.run({ name: "sync-hubspot" }, async () => {
return await hubspot.contacts.create({
email: user.email,
firstName: user.firstName,
lastName: user.lastName,
});
}),
step.run({ name: "sync-salesforce" }, async () => {
return await salesforce.leads.create({
Email: user.email,
FirstName: user.firstName,
LastName: user.lastName,
});
}),
step.run({ name: "sync-segment" }, async () => {
return await segment.identify({
userId: user.id,
traits: {
email: user.email,
name: `${user.firstName} ${user.lastName}`,
},
});
}),
]);
return { synced: ["hubspot", "salesforce", "segment"] };
}
);Webhook Processing
Process incoming webhooks reliably.
interface ProcessWebhookInput {
webhookId: string;
provider: string;
payload: any;
}
const processWebhook = ow.defineWorkflow<ProcessWebhookInput, { processed: boolean }>(
{ name: "process-webhook" },
async ({ input, step }) => {
// Step 1: Verify webhook signature
const verified = await step.run({ name: "verify-signature" }, async () => {
return await webhookService.verify(input.provider, input.payload);
});
if (!verified) {
throw new Error("Invalid webhook signature");
}
// Step 2: Parse webhook data
const data = await step.run({ name: "parse-webhook" }, () => {
return webhookService.parse(input.provider, input.payload);
});
// Step 3: Update database
await step.run({ name: "update-database" }, async () => {
await db[data.entity].update(data.id, data.updates);
});
// Step 4: Trigger downstream actions
await step.run({ name: "trigger-actions" }, async () => {
if (data.event === "payment.succeeded") {
await sendReceiptEmail.run({ paymentId: data.id });
}
});
return { processed: true };
}
);
// API endpoint
app.post("/webhooks/:provider", async (req, res) => {
const webhookId = randomUUID();
// Start workflow
await processWebhook.run({
webhookId,
provider: req.params.provider,
payload: req.body,
});
res.status(200).json({ received: true });
});Data Processing Examples
ETL Pipeline
Extract, transform, and load data.
interface ETLInput {
sourceTable: string;
targetTable: string;
batchSize: number;
}
const etlPipeline = ow.defineWorkflow<ETLInput, { recordsProcessed: number }>(
{ name: "etl-pipeline" },
async ({ input, step }) => {
// Step 1: Extract data
const records = await step.run({ name: "extract" }, async () => {
return await sourceDB.query(
`SELECT * FROM ${input.sourceTable} WHERE processed = false LIMIT ${input.batchSize}`
);
});
if (records.length === 0) {
return { recordsProcessed: 0 };
}
// Step 2: Transform data
const transformed = await step.run({ name: "transform" }, () => {
return records.map(record => ({
...record,
fullName: `${record.firstName} ${record.lastName}`,
processedAt: new Date().toISOString(),
}));
});
// Step 3: Load data
await step.run({ name: "load" }, async () => {
await targetDB.batchInsert(input.targetTable, transformed);
});
// Step 4: Mark as processed
await step.run({ name: "mark-processed" }, async () => {
const ids = records.map(r => r.id);
await sourceDB.query(
`UPDATE ${input.sourceTable} SET processed = true WHERE id IN (${ids.join(",")})`
);
});
return { recordsProcessed: records.length };
}
);
// Run ETL job
const batchSize = 1000;
let totalProcessed = 0;
while (true) {
const run = await etlPipeline.run({
sourceTable: "users",
targetTable: "users_warehouse",
batchSize,
});
const result = await run.result();
totalProcessed += result.recordsProcessed;
if (result.recordsProcessed < batchSize) {
break; // All records processed
}
}
console.log(`Processed ${totalProcessed} records`);Report Generation
Generate and email reports.
interface GenerateReportInput {
reportType: "daily" | "weekly" | "monthly";
recipientEmail: string;
dateRange: { start: string; end: string };
}
const generateReport = ow.defineWorkflow<GenerateReportInput, { reportUrl: string }>(
{ name: "generate-report" },
async ({ input, step }) => {
// Step 1: Fetch data
const data = await step.run({ name: "fetch-data" }, async () => {
return await analytics.query({
type: input.reportType,
startDate: input.dateRange.start,
endDate: input.dateRange.end,
});
});
// Step 2: Generate PDF
const reportUrl = await step.run({ name: "generate-pdf" }, async () => {
const pdf = await pdfGenerator.create({
template: input.reportType,
data,
});
return await s3.upload(`reports/${Date.now()}.pdf`, pdf);
});
// Step 3: Send email
await step.run({ name: "send-email" }, async () => {
return await email.send({
to: input.recipientEmail,
subject: `${input.reportType} Report`,
template: "report",
attachments: [{ url: reportUrl }],
});
});
return { reportUrl };
}
);
// Schedule reports (using cron or scheduler)
import cron from "node-cron";
cron.schedule("0 9 * * *", async () => {
// Daily report at 9 AM
await generateReport.run({
reportType: "daily",
recipientEmail: "team@example.com",
dateRange: {
start: new Date(Date.now() - 86400000).toISOString(), // Yesterday
end: new Date().toISOString(), // Today
},
});
});Error Handling Examples
Error Handling Patterns
OpenWorkflow automatically retries failed steps with exponential backoff. Use try-catch blocks for custom error handling and compensating transactions.
Retry with Exponential Backoff
Handle transient failures with retries.
const resilientAPICall = ow.defineWorkflow(
{ name: "resilient-api-call" },
async ({ input, step }) => {
const result = await step.run({ name: "api-call" }, async () => {
// This will automatically retry with exponential backoff if it fails
return await externalAPI.getData(input.id);
});
return result;
}
);Compensating Transaction (Saga Pattern)
Saga Pattern
The saga pattern uses compensating transactions to rollback distributed operations when failures occur. This is critical for maintaining consistency across services.
Implement compensating actions for distributed transactions.
const bookTrip = ow.defineWorkflow(
{ name: "book-trip" },
async ({ input, step }) => {
let flightBooking, hotelBooking, carRental;
try {
// Step 1: Book flight
flightBooking = await step.run({ name: "book-flight" }, async () => {
return await flights.book({
from: input.origin,
to: input.destination,
date: input.date,
});
});
// Step 2: Book hotel
hotelBooking = await step.run({ name: "book-hotel" }, async () => {
return await hotels.book({
location: input.destination,
checkIn: input.date,
nights: input.nights,
});
});
// Step 3: Rent car (might fail)
carRental = await step.run({ name: "rent-car" }, async () => {
return await cars.rent({
location: input.destination,
date: input.date,
});
});
return { flightBooking, hotelBooking, carRental };
} catch (error) {
// Compensating actions (rollback)
if (hotelBooking) {
await step.run({ name: "cancel-hotel" }, async () => {
await hotels.cancel(hotelBooking.id);
});
}
if (flightBooking) {
await step.run({ name: "cancel-flight" }, async () => {
await flights.cancel(flightBooking.id);
});
}
throw error;
}
}
);Graceful Degradation
Fall back to alternative approaches when operations fail.
const fetchUserProfile = ow.defineWorkflow(
{ name: "fetch-user-profile" },
async ({ input, step }) => {
try {
// Try primary data source
const profile = await step.run({ name: "fetch-from-api" }, async () => {
return await primaryAPI.getUser(input.userId);
});
return { profile, source: "primary" };
} catch (error) {
// Fall back to cache
const cachedProfile = await step.run({ name: "fetch-from-cache" }, async () => {
return await cache.get(`user:${input.userId}`);
});
if (cachedProfile) {
return { profile: cachedProfile, source: "cache" };
}
// Fall back to database
const dbProfile = await step.run({ name: "fetch-from-db" }, async () => {
return await db.users.findOne({ id: input.userId });
});
return { profile: dbProfile, source: "database" };
}
}
);Testing Workflows
Testing Strategy
Test workflows at multiple levels: unit tests for individual steps, integration tests for complete workflows, and end-to-end tests for critical business flows.
Example Test Structure
Unit Testing Steps
Test individual workflow steps:
import { describe, it, expect } from "vitest";
describe("sendWelcomeEmail workflow", () => {
it("should fetch user", async () => {
const user = await db.users.findOne({ id: "test-user" });
expect(user).toBeDefined();
expect(user.email).toBe("test@example.com");
});
it("should send email", async () => {
const result = await emailService.send({
to: "test@example.com",
subject: "Welcome",
template: "welcome",
});
expect(result.success).toBe(true);
});
});Integration Testing
Test complete workflows:
import { describe, it, expect, beforeEach, afterEach } from "vitest";
describe("processOrder workflow", () => {
let backend, ow, worker;
beforeEach(async () => {
backend = await BackendPostgres.connect(process.env.TEST_DATABASE_URL!);
ow = new OpenWorkflow({ backend });
worker = ow.newWorker();
await worker.start();
});
afterEach(async () => {
await worker.stop();
await backend.stop();
});
it("should process order successfully", async () => {
const run = await processOrder.run({
orderId: "test-order",
userId: "test-user",
items: [{ sku: "SKU-001", quantity: 1, price: 1000 }],
});
const result = await run.result();
expect(result.paymentId).toBeDefined();
expect(result.shipmentId).toBeDefined();
expect(result.total).toBe(1000);
});
});