OpenWorkflow

Advanced Features

Parallel execution, type safety, error handling, and advanced workflow patterns

Advanced Features

Learn about advanced OpenWorkflow features including parallel execution, TypeScript type safety, error handling, and sophisticated workflow patterns.

Parallel Step Execution

Run multiple steps concurrently using JavaScript's native Promise.all:

Basic Parallel Execution

const workflow = ow.defineWorkflow(
  { name: "fetch-user-data" },
  async ({ input, step }) => {
    // Execute all three steps in parallel
    const [user, subscription, settings] = await Promise.all([
      step.run({ name: "fetch-user" }, async () => {
        return await db.users.findOne({ id: input.userId });
      }),
      step.run({ name: "fetch-subscription" }, async () => {
        return await stripe.subscriptions.retrieve(input.subId);
      }),
      step.run({ name: "fetch-settings" }, async () => {
        return await db.settings.findOne({ userId: input.userId });
      }),
    ]);

    return { user, subscription, settings };
  }
);

How It Works

Each step is still memoized individually:

  1. All three steps start executing concurrently
  2. Each step creates its own step_attempt record with status running
  3. The worker waits for all to complete
  4. If the workflow crashes mid-execution:
    • Completed steps return instantly on replay (cached)
    • In-flight steps re-execute

Nested Parallel Execution

You can combine sequential and parallel execution:

const workflow = ow.defineWorkflow(
  { name: "process-order" },
  async ({ input, step }) => {
    // Step 1: Validate (sequential)
    const validation = await step.run({ name: "validate" }, async () => {
      return await validateOrder(input.orderId);
    });

    // Step 2-4: Process in parallel
    const [payment, inventory, notification] = await Promise.all([
      step.run({ name: "charge-card" }, async () => {
        return await stripe.charges.create({ amount: validation.total });
      }),
      step.run({ name: "reserve-inventory" }, async () => {
        return await inventory.reserve(input.items);
      }),
      step.run({ name: "notify-warehouse" }, async () => {
        return await warehouse.notify({ orderId: input.orderId });
      }),
    ]);

    // Step 5: Finalize (sequential)
    const confirmation = await step.run({ name: "confirm" }, async () => {
      return await db.orders.update(input.orderId, { status: "confirmed" });
    });

    return { payment, inventory, notification, confirmation };
  }
);

Error Handling in Parallel Steps

If any step in Promise.all fails, the entire workflow fails:

try {
  const [result1, result2] = await Promise.all([
    step.run({ name: "step-1" }, async () => {
      // This might throw
      return await riskyOperation();
    }),
    step.run({ name: "step-2" }, async () => {
      return await safeOperation();
    }),
  ]);
} catch (error) {
  // Handle the error
  await step.run({ name: "cleanup" }, async () => {
    await performCleanup();
  });
  throw error;
}

Type Safety

OpenWorkflow provides full TypeScript support with generic type parameters.

Typed Workflows

Define input and output types for compile-time safety:

interface ProcessOrderInput {
  orderId: string;
  userId: string;
  items: Array<{ sku: string; quantity: number }>;
}

interface ProcessOrderOutput {
  orderId: string;
  paymentId: string;
  shipmentId: string;
  total: number;
}

const processOrder = ow.defineWorkflow<ProcessOrderInput, ProcessOrderOutput>(
  { name: "process-order" },
  async ({ input, step }) => {
    // input is typed as ProcessOrderInput
    const orderId = input.orderId; // ✅ Type-safe
    const invalid = input.invalidField; // ❌ TypeScript error

    // Return type must match ProcessOrderOutput
    return {
      orderId: input.orderId,
      paymentId: "pay_123",
      shipmentId: "ship_456",
      total: 5000,
      // extraField: "invalid" // ❌ TypeScript error
    };
  }
);

// Usage is also type-safe
const run = await processOrder.run({
  orderId: "order_123",
  userId: "user_456",
  items: [{ sku: "SKU-001", quantity: 2 }],
});

const result = await run.result();
// result is typed as ProcessOrderOutput
console.log(result.paymentId); // ✅ Type-safe

Typed Steps

Step outputs are automatically inferred:

const workflow = ow.defineWorkflow(
  { name: "typed-steps" },
  async ({ input, step }) => {
    // TypeScript infers user: { id: string; email: string }
    const user = await step.run({ name: "fetch-user" }, async () => {
      return { id: "123", email: "user@example.com" };
    });

    // Type-safe access
    console.log(user.email); // ✅
    console.log(user.invalidField); // ❌ TypeScript error
  }
);

Reusable Type Definitions

Create reusable types for common workflows:

// types.ts
export interface User {
  id: string;
  email: string;
  name: string;
}

export interface SendEmailInput {
  userId: string;
  template: "welcome" | "confirmation" | "reset";
}

export interface SendEmailOutput {
  emailId: string;
  sentAt: string;
}

// workflow.ts
import { SendEmailInput, SendEmailOutput, User } from "./types";

const sendEmail = ow.defineWorkflow<SendEmailInput, SendEmailOutput>(
  { name: "send-email" },
  async ({ input, step }) => {
    const user = await step.run({ name: "fetch-user" }, async (): Promise<User> => {
      return await db.users.findOne({ id: input.userId });
    });

    const emailId = await step.run({ name: "send" }, async () => {
      return await emailService.send({
        to: user.email,
        template: input.template,
      });
    });

    return {
      emailId,
      sentAt: new Date().toISOString(),
    };
  }
);

Error Handling & Retries

Automatic Retries

When a step throws an error, the entire workflow can be retried automatically with exponential backoff.

const workflow = ow.defineWorkflow(
  { name: "resilient-api-call" },
  async ({ input, step }) => {
    const data = await step.run({ name: "fetch-external-api" }, async () => {
      // If this throws, the workflow will retry automatically
      return await externalAPI.getData();
    });

    return data;
  }
);

How Retries Work:

  1. Step throws an error
  2. Workflow is marked with retry metadata
  3. Worker sets availableAt = NOW() + backoff
  4. Another worker picks it up after the backoff period
  5. Workflow replays from the beginning
  6. Completed steps return cached results
  7. Failed step is re-executed

Manual Error Handling

Handle errors explicitly in your workflow logic:

const workflow = ow.defineWorkflow(
  { name: "manual-error-handling" },
  async ({ input, step }) => {
    try {
      const result = await step.run({ name: "risky-operation" }, async () => {
        return await riskyAPI.call();
      });
      return { success: true, result };
    } catch (error) {
      // Log the error
      await step.run({ name: "log-error" }, async () => {
        await logger.error("Operation failed", { error });
      });

      // Attempt fallback
      const fallbackResult = await step.run({ name: "fallback" }, async () => {
        return await fallbackAPI.call();
      });

      return { success: true, result: fallbackResult };
    }
  }
);

Compensating Actions

Implement compensating actions for cleanup:

const workflow = ow.defineWorkflow(
  { name: "saga-pattern" },
  async ({ input, step }) => {
    // Step 1: Charge payment
    const payment = await step.run({ name: "charge" }, async () => {
      return await stripe.charges.create({ amount: 5000 });
    });

    try {
      // Step 2: Reserve inventory (might fail)
      const inventory = await step.run({ name: "reserve" }, async () => {
        return await inventory.reserve(input.items);
      });

      return { payment, inventory };
    } catch (error) {
      // Compensating action: Refund payment
      await step.run({ name: "refund" }, async () => {
        await stripe.refunds.create({ charge: payment.id });
      });

      throw error;
    }
  }
);

Partial Failure Recovery

Handle partial failures gracefully:

const workflow = ow.defineWorkflow(
  { name: "bulk-email" },
  async ({ input, step }) => {
    const results = [];

    for (const userId of input.userIds) {
      try {
        const result = await step.run(
          { name: `send-email-${userId}` },
          async () => {
            return await emailService.send({ userId });
          }
        );
        results.push({ userId, success: true, result });
      } catch (error) {
        // Don't fail the entire workflow if one email fails
        results.push({ userId, success: false, error });
      }
    }

    return { results };
  }
);

Workflow Versioning

Handle code changes safely while workflows are in-flight.

The Problem

Changing step names breaks deterministic replay:

// Old code (deployed with in-flight workflows)
await step.run({ name: "old-step-name" }, ...);

// New code (deployed while old workflows are running)
await step.run({ name: "new-step-name" }, ...);
// ❌ Replay fails: "old-step-name" in history, "new-step-name" in code

Solution: Version Parameter

Use the version parameter for conditional logic:

const workflow = ow.defineWorkflow(
  { name: "versioned-workflow" },
  async ({ input, step, version }) => {
    if (version === "v1") {
      // Old code path
      const data = await step.run({ name: "old-step-name" }, async () => {
        return await legacyAPI.getData();
      });
      return { data };
    } else {
      // New code path (v2)
      const data = await step.run({ name: "new-step-name" }, async () => {
        return await newAPI.getData();
      });
      return { data };
    }
  }
);

// Start workflows with specific versions
await workflow.run({ input: "..." }, { version: "v1" });
await workflow.run({ input: "..." }, { version: "v2" });

Best Practices

  1. Always version breaking changes: Step renames, order changes, removals
  2. Keep old versions until all in-flight workflows complete
  3. Use version constants:
const WORKFLOW_V1 = "v1";
const WORKFLOW_V2 = "v2";

if (version === WORKFLOW_V1) {
  // ...
}

Waiting for Results

Synchronous Wait

Wait for a workflow to complete:

const runHandle = await myWorkflow.run({ userId: "123" });

// Blocks until workflow completes (polls database)
const result = await runHandle.result();
console.log(result);

Async Fire-and-Forget

Start a workflow without waiting:

app.post("/users/:id/welcome", async (req, res) => {
  // Start the workflow
  const runHandle = await sendWelcomeEmail.run({ userId: req.params.id });

  // Return immediately
  res.json({ workflowRunId: runHandle.workflowRun.id });
});

Poll for Status

Check workflow status manually:

const runHandle = await myWorkflow.run({ userId: "123" });

// Poll every 5 seconds
const checkStatus = async () => {
  const run = await backend.getWorkflowRun({
    workflowRunId: runHandle.workflowRun.id,
  });

  if (run.status === "succeeded") {
    console.log("Workflow completed:", run.output);
  } else if (run.status === "failed") {
    console.error("Workflow failed:", run.error);
  } else {
    setTimeout(checkStatus, 5000);
  }
};

checkStatus();

Workflow Deadlines

Set time limits for workflow completion:

const runHandle = await myWorkflow.run(
  { userId: "123" },
  {
    deadlineAt: new Date(Date.now() + 3600000), // Must complete in 1 hour
  }
);

If the deadline is reached:

  • New steps are skipped
  • The workflow is marked as failed
  • No further retries occur

Use Cases:

  • Time-sensitive operations (e.g., flash sales)
  • SLA enforcement
  • Resource cleanup

Dynamic Workflows

Conditional Steps

Create workflows with conditional logic:

const workflow = ow.defineWorkflow(
  { name: "conditional-workflow" },
  async ({ input, step }) => {
    const user = await step.run({ name: "fetch-user" }, async () => {
      return await db.users.findOne({ id: input.userId });
    });

    if (user.isPremium) {
      await step.run({ name: "send-premium-email" }, async () => {
        return await email.sendPremium(user);
      });
    } else {
      await step.run({ name: "send-standard-email" }, async () => {
        return await email.sendStandard(user);
      });
    }

    return { success: true };
  }
);

Important: Step names must be unique across all branches. Don't use the same step name in multiple branches.

Loops

Iterate over data:

const workflow = ow.defineWorkflow(
  { name: "bulk-process" },
  async ({ input, step }) => {
    const results = [];

    for (let i = 0; i < input.items.length; i++) {
      const item = input.items[i];

      // Use unique step names
      const result = await step.run({ name: `process-item-${i}` }, async () => {
        return await processItem(item);
      });

      results.push(result);
    }

    return { results };
  }
);

Caution: The number of iterations must be deterministic (based on input, not external state).

Workflow Context

Share data across steps without explicit passing:

const workflow = ow.defineWorkflow(
  { name: "context-example" },
  async ({ input, step }) => {
    // Create a context object
    const context = {
      userId: input.userId,
      startTime: new Date().toISOString(),
    };

    const user = await step.run({ name: "fetch-user" }, async () => {
      return await db.users.findOne({ id: context.userId });
    });

    const email = await step.run({ name: "send-email" }, async () => {
      return await emailService.send({
        to: user.email,
        metadata: { startTime: context.startTime },
      });
    });

    return { user, email };
  }
);

Best Practices

✅ Do

  • Use descriptive step names: fetch-user not step1
  • Keep steps idempotent: Safe to retry multiple times
  • Handle errors gracefully: Use try-catch for critical operations
  • Use TypeScript types: Catch errors at compile-time
  • Test workflow logic: Unit test individual steps

❌ Don't

  • Don't use dynamic step names: step-${Date.now()} breaks replay
  • Don't make side effects outside steps: Database writes, API calls must be in steps
  • Don't rely on external state: Workflows must be deterministic
  • Don't use very large inputs/outputs: Keep data reasonable for database storage

Performance Tips

Minimize Step Count

Combine related operations:

// ❌ Too many steps
const user = await step.run({ name: "fetch-user" }, ...);
const email = await step.run({ name: "get-email" }, () => user.email);
const domain = await step.run({ name: "get-domain" }, () => email.split("@")[1]);

// ✅ One step
const userData = await step.run({ name: "fetch-user-data" }, async () => {
  const user = await db.users.findOne();
  return {
    user,
    email: user.email,
    domain: user.email.split("@")[1],
  };
});

Use Parallel Execution

Maximize throughput with Promise.all:

// ❌ Sequential (slow)
const user = await step.run({ name: "user" }, ...);
const settings = await step.run({ name: "settings" }, ...);
const subscription = await step.run({ name: "subscription" }, ...);

// ✅ Parallel (fast)
const [user, settings, subscription] = await Promise.all([
  step.run({ name: "user" }, ...),
  step.run({ name: "settings" }, ...),
  step.run({ name: "subscription" }, ...),
]);

Optimize Worker Concurrency

Tune concurrency based on your workload:

// Low-latency, many short workflows
const worker = ow.newWorker({ concurrency: 100 });

// High-latency, few long workflows
const worker = ow.newWorker({ concurrency: 10 });

Next Steps