Skip to main content

Queue workers

Queue workers enables async and persistent queues (MQ). Queue workers are a brilliant way of adding scalability and resilience to your application backend.

Objects (messages) are added to queues using the built-in JavaScript Queue API. Serverless queue workers process each object in a queue asynchronously. The illustration below shows how three independent queues (named topic-1, topic-2, topic-3) with their message objects are being processed by independent workers that eventually execute your function.

Parallel message queues invoking your serverless functions
            / "topic-1" : x x x x x x -> (WORKERS) -> yourFunc1(x)
Enqueue(x) - "topic-2" : x x -> (WORKERS) -> yourFunc2(x)
\ "topic-3" : x x x x x -> (WORKERS) -> yourFunc3(x)

Example queue worker

The following example queue worker function prints to the log each time a request.body is added to the queue as a POST to the example route /addQueue.

index.js
import {coho as app, Datastore} from 'codehooks-js'

// Serverless queue worker function for topic 'myqueue'
app.worker('myqueue', (req, res) => {
console.log("The queue item is", req.body.payload);
res.end(); // done with one item
})

// API route that adds to the queue
app.post('/addQueue', async (req, res) => {
const conn = await Datastore.open();
const jobId = await conn.enqueue("myqueue", {"data": req.body});
res.status(201).json({jobId});
})

export default app.init();
  • Topic names are unique strings. E.g. "sendEmails"
  • Queue data are accessed through the request.body.payload object
  • Each topic is processed as a separate queue
  • QUEUE_WORKERS environment variable manage number of parallell workers. E.g. QUEUE_WORKERS=10
  • Each queue are stored in separate system collections prefixed with _queue_{TOPIC}. E.g. if the topic is sendEmail the queue would be named _queue_sendEmails
  • Read more about the JavaScript Queue API