Workflow API
Create and deploy robust, scalable workflows using queued functions and state management. Build reliable backend systems with automatic retry, state persistence, and distributed processing.
The Workflow API is an early access release. We appreciate any feedback from developers using it.
The Workflow API is available from codehooks-js
version 1.3.12
.
Overview
Codehooks workflows combines persistent queues with state management to create reliable, scalable workflows. Each step in a workflow can modify the process state and control the flow using the goto
function, enabling complex workflows with branching, loops, and conditional execution. The system automatically handles state persistence, distributed processing, and error recovery.
Key Features
-
State Management
- Persistent state storage
- Atomic state updates
- Complete execution history
- State inspection and debugging
-
Queue-Based Processing
- Queued (durable) functions
- Automatic retry on failures
- Load balancing across workers
- No lost messages or duplicate processing
- Parallel execution of workflow steps
-
Scalability & Reliability
- Distributed processing
- Automatic failover
- State recovery
- Error handling and monitoring
Why Use Workflows?
Workflows help you build reliable backend systems and integration logic. Using queued functions they handle system failures and network issues by automatically resuming from the last known state after crashes or connection losses. This makes them ideal for critical processes like order processing and payment flows. The system distributes work across multiple workers and processes long-running tasks asynchronously. As a developer, you can focus on implementing business logic while the framework handles state persistence, retries, and scaling.
A simple workflow example
Let's create a simple workflow app that demonstrates branching based on a random number. The workflow will process even and odd numbers differently.
The full source code for the workflow steps is shown below.
import { app } from 'codehooks-js';
// Create a workflow definition
const workflow = app.createWorkflow('simpleTask', 'Basic workflow example', {
// Step 1: Initialize the workflow
begin: async function (state, goto) {
state = {
message: 'Starting workflow',
// Add a random number to demonstrate branching
value: Math.floor(Math.random() * 10)
};
goto('decide', state);
},
// Step 2: Decide which path to take
decide: async function (state, goto) {
// Branch based on whether the value is even or odd
if (state.value % 2 === 0) {
goto('evenPath', state);
} else {
goto('oddPath', state);
}
},
// Step 3a: Handle even numbers
evenPath: async function (state, goto) {
state = {
message: 'Processing even number',
path: 'even',
processed: true
};
goto('end', state);
},
// Step 3b: Handle odd numbers
oddPath: async function (state, goto) {
state = {
message: 'Processing odd number',
path: 'odd',
processed: true
};
goto('end', state);
},
// Step 4: End the workflow
end: function (state, goto) {
state = {
final_message: `Workflow completed! Processed ${state.path} number: ${state.value}`
};
goto(null, state); // workflow complete
}
});
// emitted event when a workflow completes
workflow.on('completed', (data) => {
console.log('Workflow completed:', data);
});
// REST API to start a new workflow instance
app.post('/start', async (req, res) => {
const result = await workflow.start({"foo": "bar"});
res.json(result);
});
// export app interface to serverless execution
export default app.init();
You can deploy the workflow in the Studio app directly or with the CLI command coho deploy
.
See more examples at end of this page.
Workflow State Storage and Inspection
All workflow instances and their states are automatically persisted in a collection (default name: workflowdata
). This collection stores the complete state of every workflow instance, making it easy to inspect and debug workflows at any time.
Each document in the collection contains:
_id
: Unique identifier for the workflow instancenextStep
: The next step to be executed (null when workflow is complete)createdAt
: Timestamp when the workflow was createdupdatedAt
: Timestamp of the last state updateworkflowName
: Name of the workflow definitionpreviousStep
: Name of the last executed stepstepCount
: Object containing execution statistics for each stepvisits
: Number of times the step was executedstartTime
: When the step started executionfinishTime
: When the step finished executiontotalTime
: Total execution time in milliseconds
totalTime
: Total workflow execution time in milliseconds...state
: Your custom state properties for the workflow
An example state object for a completed simpleTask
workflow is shown below:
{
"foo": "bar",
"nextStep": null,
"createdAt": "2025-06-09T15:35:41.318Z",
"workflowName": "simpleTask",
"stepCount": {
"begin": {
"visits": 1,
"startTime": "2025-06-09T15:35:41.529Z",
"totalTime": 5,
"finishTime": "2025-06-09T15:35:41.534Z"
},
"decide": {
"visits": 1,
"startTime": "2025-06-09T15:35:41.700Z",
"totalTime": 7,
"finishTime": "2025-06-09T15:35:41.707Z"
},
"evenPath": {
"visits": 1,
"startTime": "2025-06-09T15:35:41.757Z",
"totalTime": 5,
"finishTime": "2025-06-09T15:35:41.762Z"
},
"end": {
"visits": 1,
"startTime": "2025-06-09T15:35:42.578Z",
"totalTime": 8,
"finishTime": "2025-06-09T15:35:42.586Z"
}
},
"_id": "6846ff4d5a5945501a11d691",
"updatedAt": "2025-06-09T15:35:42.586Z",
"message": "Processing even number",
"value": 6,
"previousStep": "end",
"path": "even",
"processed": true,
"final_message": "Workflow completed! Processed even number: 6",
"totalTime": 1260
}
You can query this collection directly to inspect all workflow states:
// Example REST API to get all completed instances of a workflow
app.get('/some', async (req, res) => {
// Example: Find all running instances of a workflow
const db = await Datastore.open();
const runningInstances = await db.getMany('workflowdata', {
workflowName: 'simpleTask',
"nextStep": null
}).toArray();
res.json(runningInstances);
});
Or you can query the collection for a specific workflow instance:
// Example REST API to get a specific workflow instance
app.get('/one/:id', async (req, res) => {
const db = await Datastore.open();
// Example: Get state of a specific workflow instance
const instance = await db.getOne('workflowdata', {
_id: req.params.id
});
res.json(instance);
});
To use a different collection name, configure it when creating the workflow:
const workflow = app.createWorkflow('myWorkflow', 'description', steps);
workflow.configure({ collectionName: 'my_custom_collection' });
Here's how the workflow data appears in Codehooks Studio for an example workflow:
The screenshot shows the state of a workflow instance that:
- Started at 17:21:00
- Branched trough the
even
path - Is completed because nextStep is
null
- Contains the state data with the message properties
Core API Overview
Methods
The workflow API provides these core methods for managing persistent workflows:
createWorkflow(name, description, steps)
Creates a new workflow of persistent steps.
const workflow = app.createWorkflow('workflow', 'description', {
START: async (state, goto) => {
// step implementation
},
});
Parameters:
name
(string): Unique identifier for the workflowdescription
(string): Human-readable description of the workflowsteps
(WorkflowDefinition): Object containing step definitions
Returns: Workflow instance for managing the workflow
Parameters:
name
(string): Unique identifier for the workflowdescription
(string): Human-readable descriptionsteps
(WorkflowDefinition): Step definitions
Returns: Promise with the registered workflow name
start(initialState)
Starts a new Workflow instance.
const result = await workflow.start({ data: 'value' });
// Returns state, e.g. {_id: '682ec2b3dab9887c1da726e9', ...state}
Parameters:
initialState
(any): Initial state for the workflow instance
Returns: Promise with the workflow instance
updateState(instanceId, state, options?)
Updates the state of a workflow instance.
await workflow.updateState('682ec2b3dab9887c1da726e9', { count: 1 }, { continue: true });
Parameters:
instanceId
(string): ID of the workflow instancestate
(any): New state to update withoptions
(UpdateOptions, optional): Options for the update,{ continue: false }
to avoid continuing the step
Returns: Promise with the updated state
setState(instanceId, stateData)
Set the complete state of a workflow instance.
await workflow.setState('682ec2b3dab9887c1da726e9', {
_id: '682ec2b3dab9887c1da726e9',
state: { count: 1, status: 'active' }
});
Parameters:
instanceId
(string): ID of the workflow instancestateData
(object): Object containing_id
andstate
Returns: Promise that resolves to void
continue(instanceId, reset?)
Continue a paused workflow instance.
const result = await workflow.continue('682ec2b3dab9887c1da726e9', false);
// Returns: { instanceId: string }
Parameters:
instanceId
(string): ID of the workflow instancereset
(boolean, optional): Whether to reset all step counts (true) or just the current step (false)
Returns: Promise with the queue ID for the continued step
getWorkflowStatus(id)
Get the status of a workflow instance.
const status = await workflow.getWorkflowStatus('682ec2b3dab9887c1da726e9');
Parameters:
id
(string): ID of the workflow instance
Returns: Promise with the workflow status
getInstances(filter)
Lists workflow instances matching a filter.
const instances = await workflow.getInstances({ status: 'running' });
Parameters:
filter
(any): Filter criteria for workflows
Returns: Promise with list of workflow instances
cancelWorkflow(id)
Cancel a workflow instance.
const result = await workflow.cancelWorkflow('682ec2b3dab9887c1da726e9');
Parameters:
id
(string): ID of the workflow instance to cancel
Returns: Promise with the cancellation result
continueAllTimedOut()
Continues all timed out step instances in the workflow. This is useful for recovering workflows that have timed out due to system issues or long-running operations.
Returns: Promise with array of results containing instance IDs for continued workflows
Example:
// Create workflow with 5 minute timeout
const workflow = app.createWorkflow('simpleTask', 'Basic workflow example', {/* workflow steps */})
workflow.configure({
timeout: 1000*60*5 // 5 minute timeout threshold
})
// Schedule job to continue timed out workflows every 5 minutes
app.job('*/5 * * * *', async (req, res) => {
const result = await workflow.continueAllTimedOut()
if (result.length > 0) {
console.log('Continued these dangling workflows:', result);
}
res.end()
})
findTimedOutSteps(filter?)
Find all workflow instances with timed out steps.
const timedOutWorkflows = await workflow.findTimedOutSteps();
// Returns array of workflows with timeout details:
// [{
// workflowId: string,
// workflowName: string,
// isTimedOut: true,
// executionTime?: number,
// runningTime?: number,
// timeout: number,
// step: string,
// startTime: string,
// finishTime?: string,
// currentTime?: string,
// timeoutSource: string
// }]
Parameters:
filter
(any, optional): Optional filter criteria for workflows
Returns: Promise with array of workflow instances that have timed out steps
on(event, listener)
Registers an event listener.
workflow.on('stepStarted', ({ workflowName, step, state, instanceId }) => {
console.log(`Step ${step} started in workflow ${workflowName}`);
});
Parameters:
event
(WorkflowEvent): Name of the event to listen forlistener
(function): Callback function to handle the event
Returns: workflow instance
once(event, listener)
Register a one-time event listener.
workflow.once('completed', (data) => console.log('Workflow completed once:', data));
Parameters:
event
(WorkflowEvent): Name of the event to listen forlistener
(function): Callback function to handle the event
Returns: workflow instance
off(event, listener)
Remove an event listener.
workflow.off('stepStarted', myListener);
Parameters:
event
(WorkflowEvent): Name of the eventlistener
(function): Callback function to remove
Returns: workflow instance
emit(event, data)
Emit an event.
workflow.emit('customEvent', { message: 'Custom event data' });
Parameters:
event
(WorkflowEvent): Name of the event to emitdata
(WorkflowEventData): Event data
Returns: boolean indicating if the event was handled
configure(options)
Configure the workflow engine.
import { app } from 'codehooks-js';
const workflow = app.createWorkflow('myWorkflow', 'My workflow description', {
// ... workflow steps ...
});
// Configure workflow settings
workflow.configure({
collectionName: 'workflows', // Set storage collection name
queuePrefix: 'workflow', // Set queue prefix name
timeout: 30000, // Global timeout in milliseconds
maxStepCount: 3, // Maximum step execution count
steps: { // Step-specific configuration
stepName: {
timeout: 3000, // Step-specific timeout
maxRetries: 3 // Step-specific retry count
}
}
});
The configuration object supports these options:
collectionName
(string, optional): Collection name for storing workflow data. Default:'workflowdata'
queuePrefix
(string, optional): Queue prefix for workflow jobs. Default:'workflowqueue'
timeout
(number, optional): Global timeout in milliseconds for workflow steps. Default:30000
maxStepCount
(number, optional): Maximum number of times a step can be executed. Default:3
steps
(object, optional): Step-specific configuration optionstimeout
(number, optional): Timeout in milliseconds for this specific stepmaxRetries
(number, optional): Maximum number of retries for this step
Returns: void
Events
The workflow API emits these events:
workflowCreated
Emitted when a new workflow is registered.
workflow.on('workflowCreated', ({ name, description }) => {
console.log(`Workflow "${name}" registered: ${description}`);
});
Event Data:
name
(string): Workflow namedescription
(string): Workflow description
workflowStarted
Emitted when a new workflow instance is started.
workflow.on('workflowStarted', ({ name, initialState }) => {
console.log(`Workflow "${name}" started with initial state:`, initialState);
});
Event Data:
name
(string): Workflow nameinitialState
(any): Initial state data
stepStarted
Emitted when a step begins execution.
workflow.on('stepStarted', ({ workflowName, step, state, instanceId }) => {
console.log(`Step "${step}" started in workflow "${workflowName}"`);
});
Event Data:
workflowName
(string): Name of the workflowstep
(string): Name of the stepstate
(any): Current workflow stateinstanceId
(string): Workflow instance ID
stateUpdated
Emitted when a step's state is updated.
workflow.on('stateUpdated', ({ workflowName, state, instanceId }) => {
console.log(`State updated in workflow "${workflowName}":`, state);
});
Event Data:
workflowName
(string): Name of the workflowstate
(any): Updated state datainstanceId
(string): Workflow instance ID
stepWaiting
Emitted when a step is waiting for input.
workflow.on('stepWaiting', ({ workflowName, step, instanceId }) => {
console.log(`Step "${step}" waiting for input in workflow "${workflowName}"`);
});
Event Data:
workflowName
(string): Name of the workflowstep
(string): Name of the stepinstanceId
(string): Workflow instance ID
workflowContinued
Emitted when a workflow instance is continued after waiting.
workflow.on('workflowContinued', ({ workflowName, step, instanceId }) => {
console.log(`Workflow "${workflowName}" continued from step "${step}"`);
});
Event Data:
workflowName
(string): Name of the workflowstep
(string): Name of the stepinstanceId
(string): Workflow instance ID
stepEnqueued
Emitted when a step function is added to queue.
workflow.on('stepEnqueued', ({ workflowName, step, instanceId }) => {
console.log(`Step "${step}" enqueued for workflow "${workflowName}"`);
});
Event Data:
workflowName
(string): Name of the workflowstep
(string): Name of the stepinstanceId
(string): Workflow instance ID
completed
Emitted when a workflow instance is completed.
workflow.on('completed', ({ message, state }) => {
console.log('Workflow completed:', message, state);
});
Event Data:
message
(string): Completion messagestate
(any): Final workflow state
cancelled
Emitted when a workflow instance is cancelled.
workflow.on('cancelled', ({ id }) => {
console.log(`Workflow instance "${id}" was cancelled`);
});
Event Data:
id
(string): Workflow instance ID
error
Emitted when an error occurs.
workflow.on('error', ({ error }) => {
console.error('Workflow error:', error);
});
Event Data:
error
(Error): Error object with details
Step Function
Each step function receives three parameters:
Parameters
state
(any): Current workflow state object that can be modified and passed to next stepgoto
(function): Function to transition between steps, supports:- Name of next step to transfer to:
goto('stepName', state)
- Parallel execution of multiple steps:
goto(['p1', 'p2'], state)
.- All parallel steps must join in the same next step, i.e.
goto('samestep', state)
. - Process will wait for all parallel steps to finish before continuing to
samestep
.
- All parallel steps must join in the same next step, i.e.
- If name of next step is
null
the workflow is completed:goto(null, state)
- Optional third parameter for additional options:
goto('nextStep', state, options)
- Name of next step to transfer to:
wait
(function, optional): Function to wait for an external state change- optional state update before going into a wait state, e.g.
wait()
orwait({val: 42})
- Call updateState to continue workflow
- optional state update before going into a wait state, e.g.
Examples
Basic step with single transition:
async function step(state, goto) {
// Update state
state = { count: state.count + 1 };
// Transition to next step
goto('nextStep', state);
}
Step with conditional transitions:
async function conditionalStep(state, goto) {
if (state.count >= 3) {
// End workflow when condition met
goto(null, state);
} else if (state.needsValidation) {
// Go to validation step
goto('validate', state);
} else {
// Continue to next step
goto('process', state);
}
}
Step with parallel execution:
async function parallelStep(state, goto) {
// Execute multiple steps in parallel
goto(['processA', 'processB'], state);
}
// Both processA and processB must eventually call:
// goto('joinStep', state);
Step with a wait state:
async function stepWaiting(state, goto, wait) {
// goto bar or wait
if (state.foo === 'bar') {
goto('bar', state)
} else {
wait({"status": "waiting for bar"});
}
}
Error handling
Workflow errors are automatically captured and logged. You can monitor errors in several ways:
Logging and Monitoring
- Studio UI: View workflow logs in real-time using the Studio interface
- CLI: Stream logs with
coho log -f
to see workflow execution and errors - Database: Errors are stored in the workflow instance as
lastError
property
Error Inspection
To find workflow instances with errors, use the Studio collection (workflowdata
) query tool with these example queries:
// Find all workflow instances with errors
{ lastError: { $exists: true } }
// Find all errors in a specific workflow
{ workflowName: 'simpleTask', lastError: { $exists: true } }
Error Properties
The lastError
object contains:
message
: Error messageupdatedAt
: When the error occurred
Example Error Object
{
lastError: {
message: "Failed to process payment",
stack: "Error: Payment service unavailable\n at processPayment...",
step: "paymentStep",
timestamp: "2024-03-14T12:34:56.789Z"
}
}
Best Practices
-
State Management
- Keep state objects simple and serializable
- Use atomic updates for state changes
- Validate state at each step
- Monitor state size and complexity
-
Error Handling
- Monitor errors in logs and database
- Use try/catch in steps that might fail
- Set appropriate timeouts
- Implement retry logic for transient failures
-
Scalability
- Design stateless step functions
- Use appropriate queue configurations
- Monitor worker performance
- Implement proper error recovery
-
Monitoring
- Track workflow completion rates
- Monitor error frequencies
- Watch queue lengths
- Set up alerts for critical failures
Example Workflows
Here are some example workflows demonstrating common patterns and use cases. Each example includes a visualization, explanation of the steps, and complete source code.
Minimal workflow example
This minimal example demonstrates the basic structure of a workflow with just two steps. It shows how to:
- Create a simple workflow definition
- Set and modify workflow state
- Use
goto
to control flow - Expose a REST endpoint to start the workflow
The code below implements this workflow, handling the state and transitions:
import { app } from 'codehooks-js'
// The workflow definition
const workflow = app.createWorkflow('minimal', 'Minimal workflow example', {
start: (state, goto) => { goto('end', {...state, message: 'Hello World'}) },
end: (state, goto) => goto(null, state) // null complete the workflow
})
// A REST API to start a new workflow instance
app.post('/start', async (req, res) => res.json(await workflow.start({prop: 'some value'})))
export default app.init()
Customer onboarding
Here's a visualization of the customer onboarding workflow:
The workflow shows a process that:
- Starts with customer email
- Sends welcome email and enters validation state
- Stays in validation state (no goto) until email is confirmed
- Completes when email is validated (goto null = finish)
Here's a practical example of a customer onboarding workflow:
import { app } from 'codehooks-js';
const workflow = app.createWorkflow(
'customerOnboarding',
'Customer signup and email validation',
{
START: async function (state, goto) {
// Initialize onboarding state
state = {
...state,
status: 'started',
email: state.email,
validated: false,
startedAt: new Date().toISOString(),
};
// Send welcome email
await sendWelcomeEmail(state.email);
// Move to validation step
goto('waitForValidation', state);
},
waitForValidation: async function (state, goto, wait) {
// Check if email was validated
if (state.validated) {
state.status = 'email_validated';
goto('complete', state);
} else {
// wait for validated = true
wait({status: "waiting_for_validation"});
}
},
complete: function (state, goto) {
// Finalize onboarding
state.status = 'completed';
state.completedAt = new Date().toISOString();
goto(null, state); // null === finish
},
}
);
// Event listeners for monitoring
workflow.on('completed', (data) => {
console.log('Onboarding completed for:', data);
});
workflow.on('stepStarted', (data) => {
console.log('Step started:', data.step, 'for customer:', data);
});
// API endpoints
app.post('/start-onboarding', async (req, res) => {
const { email } = req.body;
const result = await workflow.start({ email });
res.send(result);
});
// Endpoint to validate email (called by email webhook)
app.post('/validate-email', async (req, res) => {
const { email, token } = req.body;
// Find the onboarding instance for this email
const instances = await workflow.getInstances({
email: email,
validated: false,
});
if (instances.length > 0) {
const instance = instances[0];
// Update state with validation
const result = await workflow.updateState(
instance._id,
{
validated: true,
validatedAt: new Date().toISOString(),
validationToken: token,
},
{ continue: true } // this Step is complete
);
res.send(result);
} else {
res.status(404).send({ error: 'No pending onboarding found' });
}
});
// Helper function to send welcome email
async function sendWelcomeEmail(email) {
// Implementation of email sending
console.log('Sending welcome email to:', email);
// In a real app, you would use your email service here
}
export default app.init();
Batch Example
This example demonstrates a batch process for managing long-running data operations. It's useful for scenarios like:
- Processing large datasets with multiple validation steps
- Orchestrating complex data transformations across systems
- Managing stateful operations that require error recovery
- Coordinating multi-step data synchronization tasks
- Implementing audit trails for data operations
- Handling distributed transactions with rollback capabilities
- Automating periodic data maintenance and cleanup tasks
- Building resilient ETL (Extract, Transform, Load) pipelines
The flow diagram below shows a three-step process that validates data against an external API, updates a database record, and transfers the result to an archive system. Each step includes error handling and state tracking.
import { app } from 'codehooks-js';
const steps = app.createWorkflow(
'customerCleanup',
'Batch process customer data cleanup',
{
START: async function (state, goto) {
// Initialize batch state
state = {
...state,
customerId: state.customerId,
status: 'started',
startedAt: new Date().toISOString(),
results: {
checkStatus: null,
dataTransfer: null,
},
};
// Move to first step
goto('checkCustomerStatus', state);
},
checkCustomerStatus: async function (state, goto) {
try {
// Call external API to check customer status
const response = await fetch(
'https://api.example.com/customer-status',
{
method: 'POST',
body: JSON.stringify({ customerId: state.customerId }),
}
);
const data = await response.json();
// Update state with check results
state.results.checkStatus = {
status: data.status,
lastActive: data.lastActive,
checkedAt: new Date().toISOString(),
};
// Move to next step
goto('updateStatus', state);
} catch (error) {
state.error = error.message;
goto('error', state);
}
},
updateStatus: async function (state, goto) {
try {
// Update customer status in database
const newStatus =
state.results.checkStatus.lastActive < '2024-01-01'
? 'inactive'
: 'active';
await app.datastore.updateOne(
'customers',
{ customerId: state.customerId },
{ $set: { status: newStatus, updatedAt: new Date() } }
);
state.results.statusUpdated = {
newStatus,
updatedAt: new Date().toISOString(),
};
// Move to final step
goto('transferData', state);
} catch (error) {
state.error = error.message;
goto('error', state);
}
},
transferData: async function (state, goto) {
try {
// Transfer customer data to archive system
const response = await fetch('https://archive.example.com/transfer', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
customerId: state.customerId,
status: state.results.statusUpdated.newStatus,
lastActive: state.results.checkStatus.lastActive,
}),
});
state.results.dataTransfer = {
success: response.ok,
transferredAt: new Date().toISOString(),
};
// Complete the process
goto(null, state);
} catch (error) {
state.error = error.message;
goto('error', state);
}
},
error: function (state, goto) {
// Log error and end process
console.error('Batch process failed:', state.error);
state.status = 'failed';
goto(null, state);
},
}
);
// API endpoint to start batch process
app.post('/cleanup-customer', async (req, res) => {
const { customerId } = req.body;
const result = await steps.start({ customerId });
res.send(result);
});
// Helper endpoint to check batch status
app.get('/cleanup-status/:customerId', async (req, res) => {
const instances = await steps.getInstances({
customerId: req.params.customerId,
status: { $ne: 'failed' },
});
if (instances.length > 0) {
res.send(instances[0]);
} else {
res.status(404).send({ error: 'No cleanup process found' });
}
});
export default app.init();
This example demonstrates a straight-through batch process that:
-
Checks Customer Status
- Calls external API to verify customer status
- Records last active date
- Handles API errors
-
Updates Database
- Sets new status based on activity
- Updates customer record
- Tracks update timestamp
-
Transfers Data
- Sends data to archive system
- Records transfer success
- Handles transfer errors
Sub-workflows
Sub-workflows allow you to compose complex workflows by breaking them into smaller, reusable components. A main workflow can start a sub-workflow and wait for it to complete before continuing. This pattern is useful for:
- Modular Design: Break complex workflows into smaller, manageable pieces
- Reusability: Create sub-workflows that can be used by multiple parent workflows
- Separation of Concerns: Keep different business logic in separate workflow definitions
- Parallel Processing: Run multiple sub-workflows simultaneously
- Error Isolation: Contain errors within sub-workflows without affecting the parent
The example below shows a main workflow that processes a number and then waits for a sub-workflow to complete before finishing. The sub-workflow updates the parent's state to signal completion.
The diagram above shows the flow where the main workflow processes a number (even/odd branching), then waits for a sub-workflow to complete. The sub-workflow runs independently but communicates back to the parent through state updates. In the implementation below, you'll see how:
- The main workflow starts a sub-workflow and passes its own ID as
parentId
- The sub-workflow completes its task and updates the parent's state
- The parent workflow continues once it detects the sub-workflow has finished
- Both workflows use the same state management system for coordination
- The main workflow uses the
wait()
function to pause execution until the sub-workflow signals completion
import { app } from 'codehooks-js';
// Create a main workflow definition
const workflow = app.createWorkflow('simpleTask', 'Basic workflow example', {
// Step 1: Initialize the workflow
begin: async function (state, goto) {
state = {
message: 'Starting workflow',
// Add a random number to demonstrate branching
value: Math.floor(Math.random() * 10)
};
goto('decide', state);
},
// Step 2: Decide which path to take
decide: async function (state, goto) {
// Branch based on whether the value is even or odd
if (state.value % 2 === 0) {
goto('evenPath', state);
} else {
goto('oddPath', state);
}
},
// Step 3a: Handle even numbers
evenPath: async function (state, goto) {
state = {
message: 'Processing even number',
path: 'even',
processed: true
};
goto('end', state);
},
// Step 3b: Handle odd numbers
oddPath: async function (state, goto) {
state = {
message: 'Processing odd number',
path: 'odd',
processed: true
};
goto('end', state);
},
// Step 4: End the workflow
end: async function (state, goto, wait) {
if (state.subflowCompleted) {
state = {
final_message: `Workflow completed! Processed ${state.path} number: ${state.value}`
};
goto(null, state); // workflow complete
} else {
// Start the subflow and wait for it to complete, set the parentId in the subflow state
await subflow.start({"parentId": state._id});
wait({wait_message: 'Waiting for subflow to complete'});
}
}
});
// Create a subflow definition
const subflow = app.createWorkflow('subflow', 'Subflow', {
begin: async (state, goto) => {
state.message = 'Hello from subflow'
goto('end', state)
},
end: async (state, goto) => {
// Update the main workflow state to indicate that the subflow has completed
workflow.updateState(state.parentId, {
subflowCompleted: true
}, {continue: true});
goto(null, state)
}
})
// emitted event when a workflow completes
workflow.on('completed', (data) => {
console.log('Workflow completed:', data);
});
// REST API to start a new workflow instance
app.post('/start', async (req, res) => {
const result = await workflow.start({"foo": "bar"});
res.json(result);
});
// export app interface to serverless execution
export default app.init();
Best Practices
-
State Management
- Keep state objects simple and serializable
- Use atomic updates for state changes
- Validate state at each step
- Monitor state size and complexity
-
Error Handling
- Monitor errors in logs and database
- Use try/catch in steps that might fail
- Set appropriate timeouts
- Implement retry logic for transient failures
-
Scalability
- Design stateless step functions
- Use appropriate queue configurations
- Monitor worker performance
- Implement proper error recovery
-
Monitoring
- Track workflow completion rates
- Monitor error frequencies
- Watch queue lengths
- Set up alerts for critical failures
Related Topics
- Queue API - Learn about message queues
- Job Hooks - Schedule recurring tasks
- Data Operators - Work with data in your steps
Other popular Workflow Engines
- Temporal - Open source workflow engine
- Zeebe - Cloud-native workflow engine
- AWS Step Functions - Serverless workflow service
- Zapier - No-code workflow automation platform
- Pipedream - Developer-first workflow automation
- n8n - Open source workflow automation
- Airflow - Platform for data pipelines
- Prefect - Modern workflow orchestration
- Dagster - Data orchestration platform
- Make (Integromat) - Visual workflow automation