Workflow API
Create and deploy robust, scalable workflows using persistent queues and state management. Build reliable backend systems with automatic retry, state persistence, and distributed processing.
The Workflow API is an early access release. We appreciate any feedback from developers using it.
The Workflow API is available from codehooks-js
version 1.3.6
API quick overview
Overview
Codehooks workflows combines persistent queues with state management to create reliable, scalable workflows. Each step in a workflow can modify the process state and control the flow using the goto
function, enabling complex workflows with branching, loops, and conditional execution. The system automatically handles state persistence, distributed processing, and error recovery.
Key Features
-
State Management
- Persistent state storage
- Atomic state updates
- Complete execution history
- State inspection and debugging
-
Queue-Based Processing
- Reliable message delivery
- Automatic retry on failures
- Load balancing across workers
- No lost messages or duplicate processing
- Parallel execution of workflow steps
-
Scalability & Reliability
- Distributed processing
- Automatic failover
- State recovery
- Error handling and monitoring
Why Use Workflows?
Workflows help you build reliable backend systems and integration logic. They handle system failures and network issues by automatically resuming from the last known state after crashes or connection losses. This makes them ideal for critical processes like order processing and payment flows. The system distributes work across multiple workers and processes long-running tasks asynchronously. As a developer, you can focus on implementing business logic while the framework handles state persistence, retries, and scaling.
A simple workflow example
Let's create a simple workflow app that demonstrates branching based on a random number. The workflow will process even and odd numbers differently.
The full source code for the workflow steps is shown below.
import { app } from 'codehooks-js';
// Create a workflow definition
const workflow = app.createWorkflow('simpleTask', 'Basic workflow example', {
// Step 1: Initialize the workflow
begin: async function (state, goto) {
state = {
message: 'Starting workflow',
// Add a random number to demonstrate branching
value: Math.floor(Math.random() * 10)
};
goto('decide', state);
},
// Step 2: Decide which path to take
decide: async function (state, goto) {
// Branch based on whether the value is even or odd
if (state.value % 2 === 0) {
goto('evenPath', state);
} else {
goto('oddPath', state);
}
},
// Step 3a: Handle even numbers
evenPath: async function (state, goto) {
state = {
message: 'Processing even number',
path: 'even',
processed: true
};
goto('end', state);
},
// Step 3b: Handle odd numbers
oddPath: async function (state, goto) {
state = {
message: 'Processing odd number',
path: 'odd',
processed: true
};
goto('end', state);
},
// Step 4: End the workflow
end: function (state, goto) {
state = {
final_message: `Workflow completed! Processed ${state.path} number: ${state.value}`
};
goto(null, state); // workflow complete
}
});
// emitted event when a workflow completes
workflow.on('completed', (data) => {
console.log('Workflow completed:', data);
});
// REST API to start a new workflow instance
app.post('/start', async (req, res) => {
const result = await workflow.start('simpleTask', {"foo": "bar"});
res.json(result);
});
// export app interface to serverless execution
export default app.init();
See more examples at end of this page.
Workflow State Storage and Inspection
All workflow instances and their states are automatically persisted in a collection (default name: workflowdata
). This collection stores the complete state of every workflow instance, making it easy to inspect and debug workflows at any time.
Each document in the collection contains:
_id
: Unique identifier for the workflow instancenextStep
: The next step to be executedcreatedAt
: Timestamp when the workflow was createdupdatedAt
: Timestamp of the last state updateworkflowName
: Name of the workflow definition...state
: Your custom state properties for the workflow
You can query this collection directly to inspect all workflow states:
// Example: Find all running instances of a workflow
const runningInstances = await app.datastore.find('workflowdata', {
workflowName: 'myWorkflow',
status: 'verify_email'
});
// Example: Get state of a specific workflow instance
const instance = await app.datastore.getOne('workflowdata', {
_id: 'instanceId'
});
To use a different collection name, configure it when creating the workflow:
const workflow = app.createWorkflow('myWorkflow', 'description', steps);
workflow.configure({ collectionName: 'my_custom_collection' });
Here's how the workflow data looks in Codehooks Studio for our simple example workflow:
The screenshot shows the state of a workflow instance that:
- Started at 17:21:00
- Branched trough the
even
path - Is completed because nextStep is
null
- Contains the state data with the message properties
You can see all the metadata fields (_id
, nextStep
, timestamps) and the current state object in the document view.
Core API Overview
Methods
The workflow API provides these core methods for managing persistent workflows:
createWorkflow(name, description, steps)
Creates a new workflow of persistent steps.
const workflow = app.createWorkflow('workflow', 'description', {
START: async (state, goto) => {
// step implementation
},
});
Parameters:
name
(string): Workflow identifierdescription
(string): Workflow descriptionworkflow
(object): Workflow definitions
Returns: Workflow instance
start(name, initialState)
Starts a new Workflow instance.
const result = await workflow.start('workflow', { data: 'value' });
// Returns: { _id, nextStep, createdAt, ...state }
Parameters:
name
(string): Workflow nameinitialState
(any): Initial state data
Returns: Instance object with _id
, nextStep
, createdAt
, and state data
updateState(name, id, state, options)
Updates the state of a workflow instance.
await workflow.updateState('workflow', '682ec2b3dab9887c1da726e9', { count: 1 });
Parameters:
name
(string): Workflow nameinstanceId
(string): Instance identifierstate
(any): New state dataoptions
(object, optional): Update options{continue: true|false}
to wait for further state changes or continue workflow
Returns: Updated state
getInstances(filter)
Lists workflow instances matching a filter.
const instances = await workflow.getInstances({ status: 'running' });
Parameters:
filter
(object): Filter criteria
Returns: Array of instances
continueAllTimedOut()
Continues all timed out step instances in the workflow. This is useful for recovering workflows that have timed out due to system issues or long-running operations.
Returns: Promise with array of results containing queue IDs for continued workflows
Example:
// Create workflow with 5 minute timeout
const workflow = app.createWorkflow('simpleTask', 'Basic workflow example', {/* workflow steps */})
workflow.configure({
timeout: 1000*60*5 // 5 minute timeout threshold
})
// Schedule job to continue timed out workflows every 5 minutes
app.job('*/5 * * * *', async (req, res) => {
const result = await workflow.continueAllTimedOut()
if (result.length > 0) {
console.log('Continued these dangling workflows:', result);
}
res.end()
})
on(event, data)
Registers an event listener.
workflow.on('completed', (data) => console.log(data));
Parameters:
event
(string): Event namelistener
(function): Event handler
Returns: workflow instance
Events
The workflow API emits these events:
workflowCreated
- Workflow registeredworkflowStarted
- Instance startedstepStarted
- Step execution beganstateUpdated
- State was updatedstepWaiting
- Step waiting for inputstepContinued
- Step continued after waitstepEnqueued
- Step function is added to queuecompleted
- Workflow completedcancelled
- Workflow cancellederror
- Workflow error occurred
Step Function
Each step function receives two parameters:
Parameters
state
: Current workflow state object that can be modified and passed to next stepgoto
: Function to transition between steps, supports:- Name of next step to transfer to:
goto('stepName', state)
- Parallel execution of multiple steps:
goto(['p1', 'p2'], state)
.- All parallel steps must join in the same next step, i.e.
goto('samestep', state)
. - Process will wait for all parallel steps to finish before continuing to
samestep
.
- All parallel steps must join in the same next step, i.e.
- If name of next step is
null
the workflow is completed:goto(null, state)
- Name of next step to transfer to:
Examples
Basic step with single transition:
async function step(state, goto) {
// Update state
state = { count: state.count + 1 };
// Transition to next step
goto('nextStep', state);
}
Step with conditional transitions:
async function conditionalStep(state, goto) {
if (state.count >= 3) {
// End workflow when condition met
goto(null, state);
} else if (state.needsValidation) {
// Go to validation step
goto('validate', state);
} else {
// Continue to next step
goto('process', state);
}
}
Configuration
The Workflow API can be configured using the configure
method:
import { app } from 'codehooks-js';
const workflow = app.createWorkflow('myWorkflow', 'My workflow description', {
// ... workflow steps ...
});
// Configure workflow settings
const workflowConfig = {
collectionName: 'workflowdata', // Set storage collection name
queuePrefix: 'workflowqueue', // Set queue prefix name,
timeout: 1000*60*5, // threshold for marking workflow as dead
maxStepCount: 3 // prevent infinite loops
};
workflow.configure(workflowConfig);
The configuration object supports these options:
collectionName
(string): Set the storage collection name for the workflow. Default value'workflowdata'
.queuePrefix
(string): Set the queue prefix name for the workflow steps. Default value'workflowqueue'
.timeout
(integer): Set timeout in millis threshold for the workflow. Default value30000
.maxStepCount
(integer): Set max times a step can be called in a workflow instance. Default value3
.
Error handling
Workflow errors are automatically captured and logged. You can monitor errors in several ways:
Logging and Monitoring
- Studio UI: View workflow logs in real-time using the Studio interface
- CLI: Stream logs with
coho log -f
to see workflow execution and errors - Database: Errors are stored in the workflow instance as
lastError
property
Error Inspection
To find workflow instances with errors, use the Studio collection (workflowdata
) query tool with these example queries:
// Find all workflow instances with errors
{ lastError: { $exists: true } }
// Find all errors in a specific workflow
{ workflowName: 'simpleTask', lastError: { $exists: true } }
Error Properties
The lastError
object contains:
message
: Error messageupdatedAt
: When the error occurred
Example Error Object
{
lastError: {
message: "Failed to process payment",
stack: "Error: Payment service unavailable\n at processPayment...",
step: "paymentStep",
timestamp: "2024-03-14T12:34:56.789Z"
}
}
Best Practices
-
State Management
- Keep state objects simple and serializable
- Use atomic updates for state changes
- Validate state at each step
- Monitor state size and complexity
-
Error Handling
- Monitor errors in logs and database
- Use try/catch in steps that might fail
- Set appropriate timeouts
- Implement retry logic for transient failures
-
Scalability
- Design stateless step functions
- Use appropriate queue configurations
- Monitor worker performance
- Implement proper error recovery
-
Monitoring
- Track workflow completion rates
- Monitor error frequencies
- Watch queue lengths
- Set up alerts for critical failures
Example Workflows
Customer onboarding
Here's a visualization of the customer onboarding workflow:
The workflow shows a process that:
- Starts with customer email
- Sends welcome email and enters validation state
- Stays in validation state (no goto) until email is confirmed
- Completes when email is validated (goto null = finish)
Here's a practical example of a customer onboarding workflow:
import { app } from 'codehooks-js';
const steps = app.createWorkflow(
'customerOnboarding',
'Customer signup and email validation',
{
START: async function (state, goto) {
// Initialize onboarding state
state = {
...state,
status: 'started',
email: state.email,
validated: false,
startedAt: new Date().toISOString(),
};
// Send welcome email
await sendWelcomeEmail(state.email);
// Move to validation step
goto('waitForValidation', state);
},
waitForValidation: async function (state, goto) {
// Check if email was validated
if (state.validated) {
state.status = 'email_validated';
goto('complete', state);
} else {
// Stay in validation state
state.status = 'waiting_for_validation';
// just wait here
}
},
complete: function (state, goto) {
// Finalize onboarding
state.status = 'completed';
state.completedAt = new Date().toISOString();
goto(null, state); // null === finish
},
}
);
// Event listeners for monitoring
steps.on('completed', (data) => {
console.log('Onboarding completed for:', data);
});
steps.on('stepStarted', (data) => {
console.log('Step started:', data.step, 'for customer:', data);
});
// API endpoints
app.post('/start-onboarding', async (req, res) => {
const { email } = req.body;
const result = await steps.start('customerOnboarding', { email });
res.send(result);
});
// Endpoint to validate email (called by email webhook)
app.post('/validate-email', async (req, res) => {
const { email, token } = req.body;
// Find the onboarding instance for this email
const instances = await steps.getInstances({
email: email,
validated: false,
});
if (instances.length > 0) {
const instance = instances[0];
// Update state with validation
const result = await steps.updateState(
'customerOnboarding',
instance._id,
{
validated: true,
validatedAt: new Date().toISOString(),
validationToken: token,
},
{ continue: true } // this Step is complete
);
res.send(result);
} else {
res.status(404).send({ error: 'No pending onboarding found' });
}
});
// Helper function to send welcome email
async function sendWelcomeEmail(email) {
// Implementation of email sending
console.log('Sending welcome email to:', email);
// In a real app, you would use your email service here
}
export default app.init();
Batch Example
This example demonstrates a batch process for managing long-running data operations. It's useful for scenarios like:
- Processing large datasets with multiple validation steps
- Orchestrating complex data transformations across systems
- Managing stateful operations that require error recovery
- Coordinating multi-step data synchronization tasks
- Implementing audit trails for data operations
- Handling distributed transactions with rollback capabilities
- Automating periodic data maintenance and cleanup tasks
- Building resilient ETL (Extract, Transform, Load) pipelines
The flow diagram below shows a three-step process that validates data against an external API, updates a database record, and transfers the result to an archive system. Each step includes error handling and state tracking.
import { app } from 'codehooks-js';
const steps = app.createWorkflow(
'customerCleanup',
'Batch process customer data cleanup',
{
START: async function (state, goto) {
// Initialize batch state
state = {
...state,
customerId: state.customerId,
status: 'started',
startedAt: new Date().toISOString(),
results: {
checkStatus: null,
dataTransfer: null,
},
};
// Move to first step
goto('checkCustomerStatus', state);
},
checkCustomerStatus: async function (state, goto) {
try {
// Call external API to check customer status
const response = await fetch(
'https://api.example.com/customer-status',
{
method: 'POST',
body: JSON.stringify({ customerId: state.customerId }),
}
);
const data = await response.json();
// Update state with check results
state.results.checkStatus = {
status: data.status,
lastActive: data.lastActive,
checkedAt: new Date().toISOString(),
};
// Move to next step
goto('updateStatus', state);
} catch (error) {
state.error = error.message;
goto('error', state);
}
},
updateStatus: async function (state, goto) {
try {
// Update customer status in database
const newStatus =
state.results.checkStatus.lastActive < '2024-01-01'
? 'inactive'
: 'active';
await app.datastore.updateOne(
'customers',
{ customerId: state.customerId },
{ $set: { status: newStatus, updatedAt: new Date() } }
);
state.results.statusUpdated = {
newStatus,
updatedAt: new Date().toISOString(),
};
// Move to final step
goto('transferData', state);
} catch (error) {
state.error = error.message;
goto('error', state);
}
},
transferData: async function (state, goto) {
try {
// Transfer customer data to archive system
const response = await fetch('https://archive.example.com/transfer', {
method: 'POST',
headers: { 'Content-Type': 'application/json' },
body: JSON.stringify({
customerId: state.customerId,
status: state.results.statusUpdated.newStatus,
lastActive: state.results.checkStatus.lastActive,
}),
});
state.results.dataTransfer = {
success: response.ok,
transferredAt: new Date().toISOString(),
};
// Complete the process
goto(null, state);
} catch (error) {
state.error = error.message;
goto('error', state);
}
},
error: function (state, goto) {
// Log error and end process
console.error('Batch process failed:', state.error);
state.status = 'failed';
goto(null, state);
},
}
);
// API endpoint to start batch process
app.post('/cleanup-customer', async (req, res) => {
const { customerId } = req.body;
const result = await steps.start('customerCleanup', { customerId });
res.send(result);
});
// Helper endpoint to check batch status
app.get('/cleanup-status/:customerId', async (req, res) => {
const instances = await steps.listSteps({
customerId: req.params.customerId,
status: { $ne: 'failed' },
});
if (instances.length > 0) {
res.send(instances[0]);
} else {
res.status(404).send({ error: 'No cleanup process found' });
}
});
export default app.init();
This example demonstrates a straight-through batch process that:
-
Checks Customer Status
- Calls external API to verify customer status
- Records last active date
- Handles API errors
-
Updates Database
- Sets new status based on activity
- Updates customer record
- Tracks update timestamp
-
Transfers Data
- Sends data to archive system
- Records transfer success
- Handles transfer errors
Best Practices
-
State Management
- Keep state objects simple and serializable
- Use atomic updates for state changes
- Validate state at each step
- Monitor state size and complexity
-
Error Handling
- Monitor errors in logs and database
- Use try/catch in steps that might fail
- Set appropriate timeouts
- Implement retry logic for transient failures
-
Scalability
- Design stateless step functions
- Use appropriate queue configurations
- Monitor worker performance
- Implement proper error recovery
-
Monitoring
- Track workflow completion rates
- Monitor error frequencies
- Watch queue lengths
- Set up alerts for critical failures
Related Topics
- Queue API - Learn about message queues
- Job Hooks - Schedule recurring tasks
- Data Operators - Work with data in your steps
Other popular Workflow Engines
- Temporal - Open source workflow engine
- Zeebe - Cloud-native workflow engine
- AWS Step Functions - Serverless workflow service
- Zapier - No-code workflow automation platform
- Pipedream - Developer-first workflow automation
- n8n - Open source workflow automation
- Airflow - Platform for data pipelines
- Prefect - Modern workflow orchestration
- Dagster - Data orchestration platform
- Make (Integromat) - Visual workflow automation