Skip to main content

Workflow API

Create and deploy robust, scalable workflows using queued functions and state management. Build reliable backend systems with automatic retry, state persistence, and distributed processing.

info

The Workflow API is an early access release. We appreciate any feedback from developers using it. The Workflow API is available from codehooks-js version 1.3.12.

Overview

Codehooks workflows combines persistent queues with state management to create reliable, scalable workflows. Each step in a workflow can modify the process state and control the flow using the goto function, enabling complex workflows with branching, loops, and conditional execution. The system automatically handles state persistence, distributed processing, and error recovery.

Key Features

  • State Management

    • Persistent state storage
    • Atomic state updates
    • Complete execution history
    • State inspection and debugging
  • Queue-Based Processing

    • Queued (durable) functions
    • Automatic retry on failures
    • Load balancing across workers
    • No lost messages or duplicate processing
    • Parallel execution of workflow steps
  • Scalability & Reliability

    • Distributed processing
    • Automatic failover
    • State recovery
    • Error handling and monitoring

Why Use Workflows?

Workflows help you build reliable backend systems and integration logic. Using queued functions they handle system failures and network issues by automatically resuming from the last known state after crashes or connection losses. This makes them ideal for critical processes like order processing and payment flows. The system distributes work across multiple workers and processes long-running tasks asynchronously. As a developer, you can focus on implementing business logic while the framework handles state persistence, retries, and scaling.

A simple workflow example

Let's create a simple workflow app that demonstrates branching based on a random number. The workflow will process even and odd numbers differently.

The full source code for the workflow steps is shown below.

index.js
import { app } from 'codehooks-js';

// Create a workflow definition
const workflow = app.createWorkflow('simpleTask', 'Basic workflow example', {
// Step 1: Initialize the workflow
begin: async function (state, goto) {
state = {
message: 'Starting workflow',
// Add a random number to demonstrate branching
value: Math.floor(Math.random() * 10)
};
goto('decide', state);
},

// Step 2: Decide which path to take
decide: async function (state, goto) {
// Branch based on whether the value is even or odd
if (state.value % 2 === 0) {
goto('evenPath', state);
} else {
goto('oddPath', state);
}
},

// Step 3a: Handle even numbers
evenPath: async function (state, goto) {
state = {
message: 'Processing even number',
path: 'even',
processed: true
};
goto('end', state);
},

// Step 3b: Handle odd numbers
oddPath: async function (state, goto) {
state = {
message: 'Processing odd number',
path: 'odd',
processed: true
};
goto('end', state);
},

// Step 4: End the workflow
end: function (state, goto) {
state = {
final_message: `Workflow completed! Processed ${state.path} number: ${state.value}`
};
goto(null, state); // workflow complete
}
});

// emitted event when a workflow completes
workflow.on('completed', (data) => {
console.log('Workflow completed:', data);
});

// REST API to start a new workflow instance
app.post('/start', async (req, res) => {
const result = await workflow.start({"foo": "bar"});
res.json(result);
});

// export app interface to serverless execution
export default app.init();

You can deploy the workflow in the Studio app directly or with the CLI command coho deploy.

See more examples at end of this page.

Workflow State Storage and Inspection

All workflow instances and their states are automatically persisted in a collection (default name: workflowdata). This collection stores the complete state of every workflow instance, making it easy to inspect and debug workflows at any time.

Each document in the collection contains:

  • _id: Unique identifier for the workflow instance
  • nextStep: The next step to be executed (null when workflow is complete)
  • createdAt: Timestamp when the workflow was created
  • updatedAt: Timestamp of the last state update
  • workflowName: Name of the workflow definition
  • previousStep: Name of the last executed step
  • stepCount: Object containing execution statistics for each step
    • visits: Number of times the step was executed
    • startTime: When the step started execution
    • finishTime: When the step finished execution
    • totalTime: Total execution time in milliseconds
  • totalTime: Total workflow execution time in milliseconds
  • ...state: Your custom state properties for the workflow

An example state object for a completed simpleTask workflow is shown below:

{
"foo": "bar",
"nextStep": null,
"createdAt": "2025-06-09T15:35:41.318Z",
"workflowName": "simpleTask",
"stepCount": {
"begin": {
"visits": 1,
"startTime": "2025-06-09T15:35:41.529Z",
"totalTime": 5,
"finishTime": "2025-06-09T15:35:41.534Z"
},
"decide": {
"visits": 1,
"startTime": "2025-06-09T15:35:41.700Z",
"totalTime": 7,
"finishTime": "2025-06-09T15:35:41.707Z"
},
"evenPath": {
"visits": 1,
"startTime": "2025-06-09T15:35:41.757Z",
"totalTime": 5,
"finishTime": "2025-06-09T15:35:41.762Z"
},
"end": {
"visits": 1,
"startTime": "2025-06-09T15:35:42.578Z",
"totalTime": 8,
"finishTime": "2025-06-09T15:35:42.586Z"
}
},
"_id": "6846ff4d5a5945501a11d691",
"updatedAt": "2025-06-09T15:35:42.586Z",
"message": "Processing even number",
"value": 6,
"previousStep": "end",
"path": "even",
"processed": true,
"final_message": "Workflow completed! Processed even number: 6",
"totalTime": 1260
}

You can query this collection directly to inspect all workflow states:

// Example REST API to get all completed instances of a workflow
app.get('/some', async (req, res) => {
// Example: Find all running instances of a workflow
const db = await Datastore.open();
const runningInstances = await db.getMany('workflowdata', {
workflowName: 'simpleTask',
"nextStep": null
}).toArray();
res.json(runningInstances);
});

Or you can query the collection for a specific workflow instance:

// Example REST API to get a specific workflow instance
app.get('/one/:id', async (req, res) => {
const db = await Datastore.open();
// Example: Get state of a specific workflow instance
const instance = await db.getOne('workflowdata', {
_id: req.params.id
});
res.json(instance);
});

To use a different collection name, configure it when creating the workflow:

const workflow = app.createWorkflow('myWorkflow', 'description', steps);
workflow.configure({ collectionName: 'my_custom_collection' });

Here's how the workflow data appears in Codehooks Studio for an example workflow:

Workflow data in Codehooks Studio

The screenshot shows the state of a workflow instance that:

  1. Started at 17:21:00
  2. Branched trough the even path
  3. Is completed because nextStep is null
  4. Contains the state data with the message properties

Core API Overview

Methods

The workflow API provides these core methods for managing persistent workflows:

createWorkflow(name, description, steps)

Creates a new workflow of persistent steps.

const workflow = app.createWorkflow('workflow', 'description', {
START: async (state, goto) => {
// step implementation
},
});

Parameters:

  • name (string): Unique identifier for the workflow
  • description (string): Human-readable description of the workflow
  • steps (WorkflowDefinition): Object containing step definitions

Returns: Workflow instance for managing the workflow

Parameters:

  • name (string): Unique identifier for the workflow
  • description (string): Human-readable description
  • steps (WorkflowDefinition): Step definitions

Returns: Promise with the registered workflow name

start(initialState)

Starts a new Workflow instance.

const result = await workflow.start({ data: 'value' });
// Returns state, e.g. {_id: '682ec2b3dab9887c1da726e9', ...state}

Parameters:

  • initialState (any): Initial state for the workflow instance

Returns: Promise with the workflow instance

updateState(instanceId, state, options?)

Updates the state of a workflow instance.

await workflow.updateState('682ec2b3dab9887c1da726e9', { count: 1 }, { continue: true });

Parameters:

  • instanceId (string): ID of the workflow instance
  • state (any): New state to update with
  • options (UpdateOptions, optional): Options for the update, { continue: false } to avoid continuing the step

Returns: Promise with the updated state

setState(instanceId, stateData)

Set the complete state of a workflow instance.

await workflow.setState('682ec2b3dab9887c1da726e9', { 
_id: '682ec2b3dab9887c1da726e9',
state: { count: 1, status: 'active' }
});

Parameters:

  • instanceId (string): ID of the workflow instance
  • stateData (object): Object containing _id and state

Returns: Promise that resolves to void

continue(instanceId, reset?)

Continue a paused workflow instance.

const result = await workflow.continue('682ec2b3dab9887c1da726e9', false);
// Returns: { instanceId: string }

Parameters:

  • instanceId (string): ID of the workflow instance
  • reset (boolean, optional): Whether to reset all step counts (true) or just the current step (false)

Returns: Promise with the queue ID for the continued step

getWorkflowStatus(id)

Get the status of a workflow instance.

const status = await workflow.getWorkflowStatus('682ec2b3dab9887c1da726e9');

Parameters:

  • id (string): ID of the workflow instance

Returns: Promise with the workflow status

getInstances(filter)

Lists workflow instances matching a filter.

const instances = await workflow.getInstances({ status: 'running' });

Parameters:

  • filter (any): Filter criteria for workflows

Returns: Promise with list of workflow instances

cancelWorkflow(id)

Cancel a workflow instance.

const result = await workflow.cancelWorkflow('682ec2b3dab9887c1da726e9');

Parameters:

  • id (string): ID of the workflow instance to cancel

Returns: Promise with the cancellation result

continueAllTimedOut()

Continues all timed out step instances in the workflow. This is useful for recovering workflows that have timed out due to system issues or long-running operations.

Returns: Promise with array of results containing instance IDs for continued workflows

Example:

Continue timed out workflows
// Create workflow with 5 minute timeout
const workflow = app.createWorkflow('simpleTask', 'Basic workflow example', {/* workflow steps */})
workflow.configure({
timeout: 1000*60*5 // 5 minute timeout threshold
})

// Schedule job to continue timed out workflows every 5 minutes
app.job('*/5 * * * *', async (req, res) => {
const result = await workflow.continueAllTimedOut()
if (result.length > 0) {
console.log('Continued these dangling workflows:', result);
}
res.end()
})

findTimedOutSteps(filter?)

Find all workflow instances with timed out steps.

const timedOutWorkflows = await workflow.findTimedOutSteps();
// Returns array of workflows with timeout details:
// [{
// workflowId: string,
// workflowName: string,
// isTimedOut: true,
// executionTime?: number,
// runningTime?: number,
// timeout: number,
// step: string,
// startTime: string,
// finishTime?: string,
// currentTime?: string,
// timeoutSource: string
// }]

Parameters:

  • filter (any, optional): Optional filter criteria for workflows

Returns: Promise with array of workflow instances that have timed out steps

on(event, listener)

Registers an event listener.

workflow.on('stepStarted', ({ workflowName, step, state, instanceId }) => {
console.log(`Step ${step} started in workflow ${workflowName}`);
});

Parameters:

  • event (WorkflowEvent): Name of the event to listen for
  • listener (function): Callback function to handle the event

Returns: workflow instance

once(event, listener)

Register a one-time event listener.

workflow.once('completed', (data) => console.log('Workflow completed once:', data));

Parameters:

  • event (WorkflowEvent): Name of the event to listen for
  • listener (function): Callback function to handle the event

Returns: workflow instance

off(event, listener)

Remove an event listener.

workflow.off('stepStarted', myListener);

Parameters:

  • event (WorkflowEvent): Name of the event
  • listener (function): Callback function to remove

Returns: workflow instance

emit(event, data)

Emit an event.

workflow.emit('customEvent', { message: 'Custom event data' });

Parameters:

  • event (WorkflowEvent): Name of the event to emit
  • data (WorkflowEventData): Event data

Returns: boolean indicating if the event was handled

configure(options)

Configure the workflow engine.

import { app } from 'codehooks-js';

const workflow = app.createWorkflow('myWorkflow', 'My workflow description', {
// ... workflow steps ...
});

// Configure workflow settings
workflow.configure({
collectionName: 'workflows', // Set storage collection name
queuePrefix: 'workflow', // Set queue prefix name
timeout: 30000, // Global timeout in milliseconds
maxStepCount: 3, // Maximum step execution count
steps: { // Step-specific configuration
stepName: {
timeout: 3000, // Step-specific timeout
maxRetries: 3 // Step-specific retry count
}
}
});

The configuration object supports these options:

  • collectionName (string, optional): Collection name for storing workflow data. Default: 'workflowdata'
  • queuePrefix (string, optional): Queue prefix for workflow jobs. Default: 'workflowqueue'
  • timeout (number, optional): Global timeout in milliseconds for workflow steps. Default: 30000
  • maxStepCount (number, optional): Maximum number of times a step can be executed. Default: 3
  • steps (object, optional): Step-specific configuration options
    • timeout (number, optional): Timeout in milliseconds for this specific step
    • maxRetries (number, optional): Maximum number of retries for this step

Returns: void

Events

The workflow API emits these events:

workflowCreated

Emitted when a new workflow is registered.

workflow.on('workflowCreated', ({ name, description }) => {
console.log(`Workflow "${name}" registered: ${description}`);
});

Event Data:

  • name (string): Workflow name
  • description (string): Workflow description

workflowStarted

Emitted when a new workflow instance is started.

workflow.on('workflowStarted', ({ name, initialState }) => {
console.log(`Workflow "${name}" started with initial state:`, initialState);
});

Event Data:

  • name (string): Workflow name
  • initialState (any): Initial state data

stepStarted

Emitted when a step begins execution.

workflow.on('stepStarted', ({ workflowName, step, state, instanceId }) => {
console.log(`Step "${step}" started in workflow "${workflowName}"`);
});

Event Data:

  • workflowName (string): Name of the workflow
  • step (string): Name of the step
  • state (any): Current workflow state
  • instanceId (string): Workflow instance ID

stateUpdated

Emitted when a step's state is updated.

workflow.on('stateUpdated', ({ workflowName, state, instanceId }) => {
console.log(`State updated in workflow "${workflowName}":`, state);
});

Event Data:

  • workflowName (string): Name of the workflow
  • state (any): Updated state data
  • instanceId (string): Workflow instance ID

stepWaiting

Emitted when a step is waiting for input.

workflow.on('stepWaiting', ({ workflowName, step, instanceId }) => {
console.log(`Step "${step}" waiting for input in workflow "${workflowName}"`);
});

Event Data:

  • workflowName (string): Name of the workflow
  • step (string): Name of the step
  • instanceId (string): Workflow instance ID

workflowContinued

Emitted when a workflow instance is continued after waiting.

workflow.on('workflowContinued', ({ workflowName, step, instanceId }) => {
console.log(`Workflow "${workflowName}" continued from step "${step}"`);
});

Event Data:

  • workflowName (string): Name of the workflow
  • step (string): Name of the step
  • instanceId (string): Workflow instance ID

stepEnqueued

Emitted when a step function is added to queue.

workflow.on('stepEnqueued', ({ workflowName, step, instanceId }) => {
console.log(`Step "${step}" enqueued for workflow "${workflowName}"`);
});

Event Data:

  • workflowName (string): Name of the workflow
  • step (string): Name of the step
  • instanceId (string): Workflow instance ID

completed

Emitted when a workflow instance is completed.

workflow.on('completed', ({ message, state }) => {
console.log('Workflow completed:', message, state);
});

Event Data:

  • message (string): Completion message
  • state (any): Final workflow state

cancelled

Emitted when a workflow instance is cancelled.

workflow.on('cancelled', ({ id }) => {
console.log(`Workflow instance "${id}" was cancelled`);
});

Event Data:

  • id (string): Workflow instance ID

error

Emitted when an error occurs.

workflow.on('error', ({ error }) => {
console.error('Workflow error:', error);
});

Event Data:

  • error (Error): Error object with details

Step Function

Each step function receives three parameters:

Parameters

  • state (any): Current workflow state object that can be modified and passed to next step
  • goto (function): Function to transition between steps, supports:
    • Name of next step to transfer to: goto('stepName', state)
    • Parallel execution of multiple steps: goto(['p1', 'p2'], state).
      • All parallel steps must join in the same next step, i.e. goto('samestep', state).
      • Process will wait for all parallel steps to finish before continuing to samestep.
    • If name of next step is null the workflow is completed: goto(null, state)
    • Optional third parameter for additional options: goto('nextStep', state, options)
  • wait (function, optional): Function to wait for an external state change
    • optional state update before going into a wait state, e.g. wait() or wait({val: 42})
    • Call updateState to continue workflow

Examples

Basic step with single transition:

async function step(state, goto) {
// Update state
state = { count: state.count + 1 };

// Transition to next step
goto('nextStep', state);
}

Step with conditional transitions:

async function conditionalStep(state, goto) {
if (state.count >= 3) {
// End workflow when condition met
goto(null, state);
} else if (state.needsValidation) {
// Go to validation step
goto('validate', state);
} else {
// Continue to next step
goto('process', state);
}
}

Step with parallel execution:

async function parallelStep(state, goto) {
// Execute multiple steps in parallel
goto(['processA', 'processB'], state);
}

// Both processA and processB must eventually call:
// goto('joinStep', state);

Step with a wait state:

async function stepWaiting(state, goto, wait) {
// goto bar or wait
if (state.foo === 'bar') {
goto('bar', state)
} else {
wait({"status": "waiting for bar"});
}
}

Error handling

Workflow errors are automatically captured and logged. You can monitor errors in several ways:

Logging and Monitoring

  • Studio UI: View workflow logs in real-time using the Studio interface
  • CLI: Stream logs with coho log -f to see workflow execution and errors
  • Database: Errors are stored in the workflow instance as lastError property

Error Inspection

To find workflow instances with errors, use the Studio collection (workflowdata) query tool with these example queries:

// Find all workflow instances with errors
{ lastError: { $exists: true } }

// Find all errors in a specific workflow
{ workflowName: 'simpleTask', lastError: { $exists: true } }

Error Properties

The lastError object contains:

  • message: Error message
  • updatedAt: When the error occurred

Example Error Object

{
lastError: {
message: "Failed to process payment",
stack: "Error: Payment service unavailable\n at processPayment...",
step: "paymentStep",
timestamp: "2024-03-14T12:34:56.789Z"
}
}

Best Practices

  1. State Management

    • Keep state objects simple and serializable
    • Use atomic updates for state changes
    • Validate state at each step
    • Monitor state size and complexity
  2. Error Handling

    • Monitor errors in logs and database
    • Use try/catch in steps that might fail
    • Set appropriate timeouts
    • Implement retry logic for transient failures
  3. Scalability

    • Design stateless step functions
    • Use appropriate queue configurations
    • Monitor worker performance
    • Implement proper error recovery
  4. Monitoring

    • Track workflow completion rates
    • Monitor error frequencies
    • Watch queue lengths
    • Set up alerts for critical failures

Example Workflows

Here are some example workflows demonstrating common patterns and use cases. Each example includes a visualization, explanation of the steps, and complete source code.

Minimal workflow example

This minimal example demonstrates the basic structure of a workflow with just two steps. It shows how to:

  • Create a simple workflow definition
  • Set and modify workflow state
  • Use goto to control flow
  • Expose a REST endpoint to start the workflow

The code below implements this workflow, handling the state and transitions:

import { app } from 'codehooks-js'
// The workflow definition
const workflow = app.createWorkflow('minimal', 'Minimal workflow example', {
start: (state, goto) => { goto('end', {...state, message: 'Hello World'}) },
end: (state, goto) => goto(null, state) // null complete the workflow
})
// A REST API to start a new workflow instance
app.post('/start', async (req, res) => res.json(await workflow.start({prop: 'some value'})))

export default app.init()

Customer onboarding

Here's a visualization of the customer onboarding workflow:

The workflow shows a process that:

  1. Starts with customer email
  2. Sends welcome email and enters validation state
  3. Stays in validation state (no goto) until email is confirmed
  4. Completes when email is validated (goto null = finish)

Here's a practical example of a customer onboarding workflow:

index.js
import { app } from 'codehooks-js';

const workflow = app.createWorkflow(
'customerOnboarding',
'Customer signup and email validation',
{
START: async function (state, goto) {
// Initialize onboarding state
state = {
...state,
status: 'started',
email: state.email,
validated: false,
startedAt: new Date().toISOString(),
};
// Send welcome email
await sendWelcomeEmail(state.email);
// Move to validation step
goto('waitForValidation', state);
},

waitForValidation: async function (state, goto, wait) {
// Check if email was validated
if (state.validated) {
state.status = 'email_validated';
goto('complete', state);
} else {
// wait for validated = true
wait({status: "waiting_for_validation"});
}
},

complete: function (state, goto) {
// Finalize onboarding
state.status = 'completed';
state.completedAt = new Date().toISOString();
goto(null, state); // null === finish
},
}
);

// Event listeners for monitoring
workflow.on('completed', (data) => {
console.log('Onboarding completed for:', data);
});

workflow.on('stepStarted', (data) => {
console.log('Step started:', data.step, 'for customer:', data);
});

// API endpoints
app.post('/start-onboarding', async (req, res) => {
const { email } = req.body;
const result = await workflow.start({ email });
res.send(result);
});

// Endpoint to validate email (called by email webhook)
app.post('/validate-email', async (req, res) => {
const { email, token } = req.body;
// Find the onboarding instance for this email
const instances = await workflow.getInstances({
email: email,
validated: false,
});

if (instances.length > 0) {
const instance = instances[0];
// Update state with validation
const result = await workflow.updateState(
instance._id,
{
validated: true,
validatedAt: new Date().toISOString(),
validationToken: token,
},
{ continue: true } // this Step is complete
);
res.send(result);
} else {
res.status(404).send({ error: 'No pending onboarding found' });
}
});

// Helper function to send welcome email
async function sendWelcomeEmail(email) {
// Implementation of email sending
console.log('Sending welcome email to:', email);
// In a real app, you would use your email service here
}

export default app.init();

Batch Example

This example demonstrates a batch process for managing long-running data operations. It's useful for scenarios like:

  • Processing large datasets with multiple validation steps
  • Orchestrating complex data transformations across systems
  • Managing stateful operations that require error recovery
  • Coordinating multi-step data synchronization tasks
  • Implementing audit trails for data operations
  • Handling distributed transactions with rollback capabilities
  • Automating periodic data maintenance and cleanup tasks
  • Building resilient ETL (Extract, Transform, Load) pipelines

The flow diagram below shows a three-step process that validates data against an external API, updates a database record, and transfers the result to an archive system. Each step includes error handling and state tracking.

batch.js
import { app } from 'codehooks-js';

const steps = app.createWorkflow(
'customerCleanup',
'Batch process customer data cleanup',
{
START: async function (state, goto) {
// Initialize batch state
state = {
...state,
customerId: state.customerId,
status: 'started',
startedAt: new Date().toISOString(),
results: {
checkStatus: null,
dataTransfer: null,
},
};
// Move to first step
goto('checkCustomerStatus', state);
},

checkCustomerStatus: async function (state, goto) {
try {
// Call external API to check customer status
const response = await fetch(
'https://api.example.com/customer-status',
{
method: 'POST',
body: JSON.stringify({ customerId: state.customerId }),
}
);
const data = await response.json();

// Update state with check results
state.results.checkStatus = {
status: data.status,
lastActive: data.lastActive,
checkedAt: new Date().toISOString(),
};

// Move to next step
goto('updateStatus', state);
} catch (error) {
state.error = error.message;
goto('error', state);
}
},

updateStatus: async function (state, goto) {
try {
// Update customer status in database
const newStatus =
state.results.checkStatus.lastActive < '2024-01-01'
? 'inactive'
: 'active';

await app.datastore.updateOne(
'customers',
{ customerId: state.customerId },
{ $set: { status: newStatus, updatedAt: new Date() } }
);

state.results.statusUpdated = {
newStatus,
updatedAt: new Date().toISOString(),
};

// Move to final step
goto('transferData', state);
} catch (error) {
state.error = error.message;
goto('error', state);
}
},

transferData: async function (state, goto) {
try {
// Transfer customer data to archive system
const response = await fetch('https://archive.example.com/transfer', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
customerId: state.customerId,
status: state.results.statusUpdated.newStatus,
lastActive: state.results.checkStatus.lastActive,
}),
});

state.results.dataTransfer = {
success: response.ok,
transferredAt: new Date().toISOString(),
};

// Complete the process
goto(null, state);
} catch (error) {
state.error = error.message;
goto('error', state);
}
},

error: function (state, goto) {
// Log error and end process
console.error('Batch process failed:', state.error);
state.status = 'failed';
goto(null, state);
},
}
);

// API endpoint to start batch process
app.post('/cleanup-customer', async (req, res) => {
const { customerId } = req.body;
const result = await steps.start({ customerId });
res.send(result);
});

// Helper endpoint to check batch status
app.get('/cleanup-status/:customerId', async (req, res) => {
const instances = await steps.getInstances({
customerId: req.params.customerId,
status: { $ne: 'failed' },
});

if (instances.length > 0) {
res.send(instances[0]);
} else {
res.status(404).send({ error: 'No cleanup process found' });
}
});

export default app.init();

This example demonstrates a straight-through batch process that:

  1. Checks Customer Status

    • Calls external API to verify customer status
    • Records last active date
    • Handles API errors
  2. Updates Database

    • Sets new status based on activity
    • Updates customer record
    • Tracks update timestamp
  3. Transfers Data

    • Sends data to archive system
    • Records transfer success
    • Handles transfer errors

Sub-workflows

Sub-workflows allow you to compose complex workflows by breaking them into smaller, reusable components. A main workflow can start a sub-workflow and wait for it to complete before continuing. This pattern is useful for:

  • Modular Design: Break complex workflows into smaller, manageable pieces
  • Reusability: Create sub-workflows that can be used by multiple parent workflows
  • Separation of Concerns: Keep different business logic in separate workflow definitions
  • Parallel Processing: Run multiple sub-workflows simultaneously
  • Error Isolation: Contain errors within sub-workflows without affecting the parent

The example below shows a main workflow that processes a number and then waits for a sub-workflow to complete before finishing. The sub-workflow updates the parent's state to signal completion.

The diagram above shows the flow where the main workflow processes a number (even/odd branching), then waits for a sub-workflow to complete. The sub-workflow runs independently but communicates back to the parent through state updates. In the implementation below, you'll see how:

  • The main workflow starts a sub-workflow and passes its own ID as parentId
  • The sub-workflow completes its task and updates the parent's state
  • The parent workflow continues once it detects the sub-workflow has finished
  • Both workflows use the same state management system for coordination
  • The main workflow uses the wait() function to pause execution until the sub-workflow signals completion
subflow.js
import { app } from 'codehooks-js';

// Create a main workflow definition
const workflow = app.createWorkflow('simpleTask', 'Basic workflow example', {
// Step 1: Initialize the workflow
begin: async function (state, goto) {
state = {
message: 'Starting workflow',
// Add a random number to demonstrate branching
value: Math.floor(Math.random() * 10)
};
goto('decide', state);
},

// Step 2: Decide which path to take
decide: async function (state, goto) {
// Branch based on whether the value is even or odd
if (state.value % 2 === 0) {
goto('evenPath', state);
} else {
goto('oddPath', state);
}
},

// Step 3a: Handle even numbers
evenPath: async function (state, goto) {
state = {
message: 'Processing even number',
path: 'even',
processed: true
};
goto('end', state);
},

// Step 3b: Handle odd numbers
oddPath: async function (state, goto) {
state = {
message: 'Processing odd number',
path: 'odd',
processed: true
};
goto('end', state);
},

// Step 4: End the workflow
end: async function (state, goto, wait) {
if (state.subflowCompleted) {
state = {
final_message: `Workflow completed! Processed ${state.path} number: ${state.value}`
};
goto(null, state); // workflow complete
} else {
// Start the subflow and wait for it to complete, set the parentId in the subflow state
await subflow.start({"parentId": state._id});
wait({wait_message: 'Waiting for subflow to complete'});
}
}
});

// Create a subflow definition
const subflow = app.createWorkflow('subflow', 'Subflow', {
begin: async (state, goto) => {
state.message = 'Hello from subflow'
goto('end', state)
},
end: async (state, goto) => {
// Update the main workflow state to indicate that the subflow has completed
workflow.updateState(state.parentId, {
subflowCompleted: true
}, {continue: true});
goto(null, state)
}
})

// emitted event when a workflow completes
workflow.on('completed', (data) => {
console.log('Workflow completed:', data);
});

// REST API to start a new workflow instance
app.post('/start', async (req, res) => {
const result = await workflow.start({"foo": "bar"});
res.json(result);
});

// export app interface to serverless execution
export default app.init();

Best Practices

  1. State Management

    • Keep state objects simple and serializable
    • Use atomic updates for state changes
    • Validate state at each step
    • Monitor state size and complexity
  2. Error Handling

    • Monitor errors in logs and database
    • Use try/catch in steps that might fail
    • Set appropriate timeouts
    • Implement retry logic for transient failures
  3. Scalability

    • Design stateless step functions
    • Use appropriate queue configurations
    • Monitor worker performance
    • Implement proper error recovery
  4. Monitoring

    • Track workflow completion rates
    • Monitor error frequencies
    • Watch queue lengths
    • Set up alerts for critical failures