OpenWorkflow

Examples

Real-world OpenWorkflow examples and patterns

Examples

Real-world examples demonstrating common patterns and use cases with OpenWorkflow.

Basic Examples

Hello World

The simplest possible workflow:

import { BackendPostgres } from "@openworkflow/backend-postgres";
import { OpenWorkflow } from "openworkflow";

const backend = await BackendPostgres.connect(process.env.DATABASE_URL!);
const ow = new OpenWorkflow({ backend });

const helloWorld = ow.defineWorkflow(
  { name: "hello-world" },
  async ({ input, step }) => {
    const greeting = await step.run({ name: "greet" }, () => {
      return `Hello, ${input.name}!`;
    });

    return { greeting };
  }
);

// Start worker
const worker = ow.newWorker();
await worker.start();

// Run workflow
const run = await helloWorld.run({ name: "World" });
const result = await run.result();
console.log(result.greeting); // "Hello, World!"

Send Welcome Email

A common use case: sending a welcome email to new users.

interface SendWelcomeEmailInput {
  userId: string;
}

const sendWelcomeEmail = ow.defineWorkflow<SendWelcomeEmailInput, { success: boolean }>(
  { name: "send-welcome-email" },
  async ({ input, step }) => {
    // Step 1: Fetch user from database
    const user = await step.run({ name: "fetch-user" }, async () => {
      return await db.users.findOne({ id: input.userId });
    });

    // Step 2: Send email
    await step.run({ name: "send-email" }, async () => {
      return await emailService.send({
        to: user.email,
        subject: "Welcome to our platform!",
        template: "welcome",
        data: { name: user.name },
      });
    });

    // Step 3: Mark email as sent
    await step.run({ name: "mark-sent" }, async () => {
      await db.users.update(input.userId, { welcomeEmailSent: true });
    });

    return { success: true };
  }
);

// Trigger from API route
app.post("/users", async (req, res) => {
  const user = await db.users.create(req.body);

  // Start workflow asynchronously
  await sendWelcomeEmail.run({ userId: user.id });

  res.json({ user });
});

E-commerce Examples

E-commerce Best Practices

These examples demonstrate critical patterns like parallel execution, compensating transactions, and idempotent payment processing.

Process Order

Complete order processing workflow with payment, inventory, and notifications.

interface ProcessOrderInput {
  orderId: string;
  userId: string;
  items: Array<{ sku: string; quantity: number; price: number }>;
}

interface ProcessOrderOutput {
  orderId: string;
  paymentId: string;
  shipmentId: string;
  total: number;
}

const processOrder = ow.defineWorkflow<ProcessOrderInput, ProcessOrderOutput>(
  { name: "process-order" },
  async ({ input, step }) => {
    // Step 1: Calculate total
    const total = await step.run({ name: "calculate-total" }, () => {
      return input.items.reduce((sum, item) => sum + item.price * item.quantity, 0);
    });

    // Step 2: Charge payment
    const payment = await step.run({ name: "charge-payment" }, async () => {
      return await stripe.charges.create({
        amount: total,
        currency: "usd",
        customer: input.userId,
      });
    });

    // Step 3-5: Parallel operations
    const [inventory, shipment, notification] = await Promise.all([
      // Reserve inventory
      step.run({ name: "reserve-inventory" }, async () => {
        return await inventory.reserve(input.items);
      }),

      // Create shipment
      step.run({ name: "create-shipment" }, async () => {
        return await shipping.createShipment({
          orderId: input.orderId,
          items: input.items,
        });
      }),

      // Send confirmation email
      step.run({ name: "send-confirmation" }, async () => {
        const user = await db.users.findOne({ id: input.userId });
        return await email.send({
          to: user.email,
          template: "order-confirmation",
          data: { orderId: input.orderId, total },
        });
      }),
    ]);

    // Step 6: Update order status
    await step.run({ name: "update-order" }, async () => {
      await db.orders.update(input.orderId, {
        status: "confirmed",
        paymentId: payment.id,
        shipmentId: shipment.id,
      });
    });

    return {
      orderId: input.orderId,
      paymentId: payment.id,
      shipmentId: shipment.id,
      total,
    };
  }
);

Abandoned Cart Recovery

Send reminder emails to users with abandoned carts.

interface AbandonedCartInput {
  userId: string;
  cartId: string;
}

const abandonedCartRecovery = ow.defineWorkflow<AbandonedCartInput, { emailSent: boolean }>(
  { name: "abandoned-cart-recovery" },
  async ({ input, step }) => {
    // Wait 1 hour (schedule workflow for future execution)
    // This would require scheduling support (coming soon)

    // Step 1: Check if cart still exists
    const cart = await step.run({ name: "check-cart" }, async () => {
      return await db.carts.findOne({ id: input.cartId });
    });

    if (!cart || cart.items.length === 0) {
      return { emailSent: false };
    }

    // Step 2: Get user
    const user = await step.run({ name: "fetch-user" }, async () => {
      return await db.users.findOne({ id: input.userId });
    });

    // Step 3: Send reminder email
    await step.run({ name: "send-reminder" }, async () => {
      return await email.send({
        to: user.email,
        template: "abandoned-cart",
        data: {
          items: cart.items,
          total: cart.total,
          checkoutUrl: `https://example.com/checkout/${cart.id}`,
        },
      });
    });

    return { emailSent: true };
  }
);

API Integration Examples

Sync User Data

Synchronize user data with external services.

interface SyncUserDataInput {
  userId: string;
}

const syncUserData = ow.defineWorkflow<SyncUserDataInput, { synced: string[] }>(
  { name: "sync-user-data" },
  async ({ input, step }) => {
    // Fetch user
    const user = await step.run({ name: "fetch-user" }, async () => {
      return await db.users.findOne({ id: input.userId });
    });

    // Sync to multiple services in parallel
    const [hubspot, salesforce, segment] = await Promise.all([
      step.run({ name: "sync-hubspot" }, async () => {
        return await hubspot.contacts.create({
          email: user.email,
          firstName: user.firstName,
          lastName: user.lastName,
        });
      }),

      step.run({ name: "sync-salesforce" }, async () => {
        return await salesforce.leads.create({
          Email: user.email,
          FirstName: user.firstName,
          LastName: user.lastName,
        });
      }),

      step.run({ name: "sync-segment" }, async () => {
        return await segment.identify({
          userId: user.id,
          traits: {
            email: user.email,
            name: `${user.firstName} ${user.lastName}`,
          },
        });
      }),
    ]);

    return { synced: ["hubspot", "salesforce", "segment"] };
  }
);

Webhook Processing

Process incoming webhooks reliably.

interface ProcessWebhookInput {
  webhookId: string;
  provider: string;
  payload: any;
}

const processWebhook = ow.defineWorkflow<ProcessWebhookInput, { processed: boolean }>(
  { name: "process-webhook" },
  async ({ input, step }) => {
    // Step 1: Verify webhook signature
    const verified = await step.run({ name: "verify-signature" }, async () => {
      return await webhookService.verify(input.provider, input.payload);
    });

    if (!verified) {
      throw new Error("Invalid webhook signature");
    }

    // Step 2: Parse webhook data
    const data = await step.run({ name: "parse-webhook" }, () => {
      return webhookService.parse(input.provider, input.payload);
    });

    // Step 3: Update database
    await step.run({ name: "update-database" }, async () => {
      await db[data.entity].update(data.id, data.updates);
    });

    // Step 4: Trigger downstream actions
    await step.run({ name: "trigger-actions" }, async () => {
      if (data.event === "payment.succeeded") {
        await sendReceiptEmail.run({ paymentId: data.id });
      }
    });

    return { processed: true };
  }
);

// API endpoint
app.post("/webhooks/:provider", async (req, res) => {
  const webhookId = randomUUID();

  // Start workflow
  await processWebhook.run({
    webhookId,
    provider: req.params.provider,
    payload: req.body,
  });

  res.status(200).json({ received: true });
});

Data Processing Examples

ETL Pipeline

Extract, transform, and load data.

interface ETLInput {
  sourceTable: string;
  targetTable: string;
  batchSize: number;
}

const etlPipeline = ow.defineWorkflow<ETLInput, { recordsProcessed: number }>(
  { name: "etl-pipeline" },
  async ({ input, step }) => {
    // Step 1: Extract data
    const records = await step.run({ name: "extract" }, async () => {
      return await sourceDB.query(
        `SELECT * FROM ${input.sourceTable} WHERE processed = false LIMIT ${input.batchSize}`
      );
    });

    if (records.length === 0) {
      return { recordsProcessed: 0 };
    }

    // Step 2: Transform data
    const transformed = await step.run({ name: "transform" }, () => {
      return records.map(record => ({
        ...record,
        fullName: `${record.firstName} ${record.lastName}`,
        processedAt: new Date().toISOString(),
      }));
    });

    // Step 3: Load data
    await step.run({ name: "load" }, async () => {
      await targetDB.batchInsert(input.targetTable, transformed);
    });

    // Step 4: Mark as processed
    await step.run({ name: "mark-processed" }, async () => {
      const ids = records.map(r => r.id);
      await sourceDB.query(
        `UPDATE ${input.sourceTable} SET processed = true WHERE id IN (${ids.join(",")})`
      );
    });

    return { recordsProcessed: records.length };
  }
);

// Run ETL job
const batchSize = 1000;
let totalProcessed = 0;

while (true) {
  const run = await etlPipeline.run({
    sourceTable: "users",
    targetTable: "users_warehouse",
    batchSize,
  });

  const result = await run.result();
  totalProcessed += result.recordsProcessed;

  if (result.recordsProcessed < batchSize) {
    break; // All records processed
  }
}

console.log(`Processed ${totalProcessed} records`);

Report Generation

Generate and email reports.

interface GenerateReportInput {
  reportType: "daily" | "weekly" | "monthly";
  recipientEmail: string;
  dateRange: { start: string; end: string };
}

const generateReport = ow.defineWorkflow<GenerateReportInput, { reportUrl: string }>(
  { name: "generate-report" },
  async ({ input, step }) => {
    // Step 1: Fetch data
    const data = await step.run({ name: "fetch-data" }, async () => {
      return await analytics.query({
        type: input.reportType,
        startDate: input.dateRange.start,
        endDate: input.dateRange.end,
      });
    });

    // Step 2: Generate PDF
    const reportUrl = await step.run({ name: "generate-pdf" }, async () => {
      const pdf = await pdfGenerator.create({
        template: input.reportType,
        data,
      });

      return await s3.upload(`reports/${Date.now()}.pdf`, pdf);
    });

    // Step 3: Send email
    await step.run({ name: "send-email" }, async () => {
      return await email.send({
        to: input.recipientEmail,
        subject: `${input.reportType} Report`,
        template: "report",
        attachments: [{ url: reportUrl }],
      });
    });

    return { reportUrl };
  }
);

// Schedule reports (using cron or scheduler)
import cron from "node-cron";

cron.schedule("0 9 * * *", async () => {
  // Daily report at 9 AM
  await generateReport.run({
    reportType: "daily",
    recipientEmail: "team@example.com",
    dateRange: {
      start: new Date(Date.now() - 86400000).toISOString(), // Yesterday
      end: new Date().toISOString(), // Today
    },
  });
});

Error Handling Examples

Error Handling Patterns

OpenWorkflow automatically retries failed steps with exponential backoff. Use try-catch blocks for custom error handling and compensating transactions.

Retry with Exponential Backoff

Handle transient failures with retries.

const resilientAPICall = ow.defineWorkflow(
  { name: "resilient-api-call" },
  async ({ input, step }) => {
    const result = await step.run({ name: "api-call" }, async () => {
      // This will automatically retry with exponential backoff if it fails
      return await externalAPI.getData(input.id);
    });

    return result;
  }
);

Compensating Transaction (Saga Pattern)

Saga Pattern

The saga pattern uses compensating transactions to rollback distributed operations when failures occur. This is critical for maintaining consistency across services.

Implement compensating actions for distributed transactions.

const bookTrip = ow.defineWorkflow(
  { name: "book-trip" },
  async ({ input, step }) => {
    let flightBooking, hotelBooking, carRental;

    try {
      // Step 1: Book flight
      flightBooking = await step.run({ name: "book-flight" }, async () => {
        return await flights.book({
          from: input.origin,
          to: input.destination,
          date: input.date,
        });
      });

      // Step 2: Book hotel
      hotelBooking = await step.run({ name: "book-hotel" }, async () => {
        return await hotels.book({
          location: input.destination,
          checkIn: input.date,
          nights: input.nights,
        });
      });

      // Step 3: Rent car (might fail)
      carRental = await step.run({ name: "rent-car" }, async () => {
        return await cars.rent({
          location: input.destination,
          date: input.date,
        });
      });

      return { flightBooking, hotelBooking, carRental };
    } catch (error) {
      // Compensating actions (rollback)
      if (hotelBooking) {
        await step.run({ name: "cancel-hotel" }, async () => {
          await hotels.cancel(hotelBooking.id);
        });
      }

      if (flightBooking) {
        await step.run({ name: "cancel-flight" }, async () => {
          await flights.cancel(flightBooking.id);
        });
      }

      throw error;
    }
  }
);

Graceful Degradation

Fall back to alternative approaches when operations fail.

const fetchUserProfile = ow.defineWorkflow(
  { name: "fetch-user-profile" },
  async ({ input, step }) => {
    try {
      // Try primary data source
      const profile = await step.run({ name: "fetch-from-api" }, async () => {
        return await primaryAPI.getUser(input.userId);
      });
      return { profile, source: "primary" };
    } catch (error) {
      // Fall back to cache
      const cachedProfile = await step.run({ name: "fetch-from-cache" }, async () => {
        return await cache.get(`user:${input.userId}`);
      });

      if (cachedProfile) {
        return { profile: cachedProfile, source: "cache" };
      }

      // Fall back to database
      const dbProfile = await step.run({ name: "fetch-from-db" }, async () => {
        return await db.users.findOne({ id: input.userId });
      });

      return { profile: dbProfile, source: "database" };
    }
  }
);

Testing Workflows

Testing Strategy

Test workflows at multiple levels: unit tests for individual steps, integration tests for complete workflows, and end-to-end tests for critical business flows.

Example Test Structure

Unit Testing Steps

Test individual workflow steps:

import { describe, it, expect } from "vitest";

describe("sendWelcomeEmail workflow", () => {
  it("should fetch user", async () => {
    const user = await db.users.findOne({ id: "test-user" });
    expect(user).toBeDefined();
    expect(user.email).toBe("test@example.com");
  });

  it("should send email", async () => {
    const result = await emailService.send({
      to: "test@example.com",
      subject: "Welcome",
      template: "welcome",
    });
    expect(result.success).toBe(true);
  });
});

Integration Testing

Test complete workflows:

import { describe, it, expect, beforeEach, afterEach } from "vitest";

describe("processOrder workflow", () => {
  let backend, ow, worker;

  beforeEach(async () => {
    backend = await BackendPostgres.connect(process.env.TEST_DATABASE_URL!);
    ow = new OpenWorkflow({ backend });
    worker = ow.newWorker();
    await worker.start();
  });

  afterEach(async () => {
    await worker.stop();
    await backend.stop();
  });

  it("should process order successfully", async () => {
    const run = await processOrder.run({
      orderId: "test-order",
      userId: "test-user",
      items: [{ sku: "SKU-001", quantity: 1, price: 1000 }],
    });

    const result = await run.result();

    expect(result.paymentId).toBeDefined();
    expect(result.shipmentId).toBeDefined();
    expect(result.total).toBe(1000);
  });
});

Next Steps