Intro
What is a queue? It’s a data structure that follows a particular order in which the operations are performed. If you are familiar with FIFO (First In First Out) and LIFO (Last In Last Out), then you’ll understand what queues are about. A queue basically utilises FIFO method. That means all jobs added to the queue are processed in the same order they’ve been added (first added job is processed first).
Queues and NestJS
Regarding to NestJS documentation queues are a powerful design pattern that can help us with the following problems (source):
- Smooth out processing peaks. For example, if users can initiate resource-intensive tasks at arbitrary times, you can add these tasks to a queue instead of performing them synchronously. Then you can have worker processes pull tasks from the queue in a controlled manner. You can easily add new Queue consumers to scale up the back-end task handling as the application scales up.
- Break up monolithic tasks that may otherwise block the Node.js event loop. For example, if a user request requires CPU intensive work like audio transcoding, you can delegate this task to other processes, freeing up user-facing processes to remain responsive.
- Provide a reliable communication channel across various services. For example, you can queue tasks (jobs) in one process or service, and consume them in another. You can be notified (by listening for status events) upon completion, error or other state changes in the job life cycle from any process or service. When Queue producers or consumers fail, their state is preserved and task handling can restart automatically when nodes are restarted.
The powerful package to manage queues in Node.js is Bull. NestJS provides a wrapper @nestjs/bull
on top of this package.
Note: Bull uses Redis to persist job data, so you’ll need to have Redis installed on your system. Because it is Redis-backed, your Queue architecture can be completely distributed and platform-independent. For example, you can have some Queue producers and consumers and listeners running in Nest on one (or several) nodes, and other producers, consumers and listeners running on other Node.js platforms on other network nodes.
Coding
Let’s connect Bull with a Nest application. All you need to do is to install dependencies
$ npm install --save @nestjs/bull bull
$ npm install --save-dev @types/bull
and import root module
// app.module.ts
import { Module } from '@nestjs/common';
import { BullModule } from '@nestjs/bull';
@Module({
imports: [
BullModule.forRoot({
redis: {
host: 'localhost',
port: 6379,
},
}),
],
})
export class AppModule {}
Then in any module where you want to use queues you need to register a queue by doing the following:
BullModule.registerQueue({
name: 'notifications',
});
Producer
This class will handle the process of adding tasks to the queue. Basically, it can be any service in your app. You just need to inject the queue as any other dependency.
import { Injectable } from '@nestjs/common';
import { Queue } from 'bull';
import { InjectQueue } from '@nestjs/bull';
@Injectable()
export class NotificationsService {
constructor(@InjectQueue('notifications') private notificationsQueue: Queue) {}
async sendNotification(payload: any) {
return this.notificationsQueue.add(payload);
}
}
Consumer
Ok, now we need to implement tasks processor or consumer. Usually, I create a file called something like notifications.processor.ts
This file might look like:
import { Processor, Process } from '@nestjs/bull';
import { Job } from 'bull';
@Processor('notifications')
export class NotificationsProcessor {
@Process()
async sendNotification(job: Job<unknown>) {
// implement how the task should be processed
}
}
We’ve defined a task consumer and decorated sendNotification
with Process
that indicates this method will handle the next task from the queue. All you have to do is to implement business logic inside the job handler. And that’s it!
So the queues approach is useful when you need to execute some resource-intensive tasks, but don’t want the Node.js event loop to be blocked. In case of queues, the task will be handled by worker process which will “free” the main one.
Bonus - debounced jobs (tasks)
Imagine the next situation (the real case on my project). Whenever a user uploads a document (PDF, HTML Form, whatever), the admin should be notified. I believe this is common on many projects.
Notification payload might look like this:
export class Notification {
receiver: string;
type: string;
uploadedDocuments: string[];
// any other fields
}
Okay, what if the user makes changes to the same document (add new files, delete old and add new, .etc.) many times, let’s say, he did that 3 times every 15 seconds. Should we notify the admin for every change? Of course, we can do that. But I think that we can avoid this “spam”. And I’ll show you how.
The simplest solution is the following. Each Bull
job has its own ID
(and prefix) and can be delayed. When adding new jobs to the queue, we can:
- Remove previous jobs with the same prefix
- Add the new one.
- If the new job is added within delayed interval we repeat all steps.
import { Job, JobOptions, Queue } from 'bull';
import { randomUUID } from 'crypto';
/**
* If there are multiple jobs, then only the last one will be executed if
* there is no new jobs with the same debounceKey within debounceTimeout interval.
*
* @param data
* @param debounceKey
* @param debounceTimeout
* @param jobName
* @param options
* @param queue
*/
export async function addDebouncedJob<TData extends Record<string, unknown>>({
data,
debounceKey,
debounceTimeout,
jobName,
options,
queue,
}: {
data?: TData;
debounceKey: string;
debounceTimeout: number;
jobName: string;
options?: Omit<JobOptions, 'delay' | 'jobId'>;
queue?: Queue;
}): Promise<Job<TData>> {
const jobIdPrefix = `debounced:${jobName}:${debounceKey}`;
const jobId = `${jobIdPrefix}:${randomUUID()}`;
await queue.removeJobs(`${jobIdPrefix}:*`);
return queue.add(jobName, data, {
...options,
delay: debounceTimeout,
jobId,
});
}
We can use this implementation instead of this.notificationsQueue.add()
. This works as just simple as a regular debounced function.
The simple is the best! BUT NOT HERE! 😅
Bonus 2
Okay, we delayed the notifications if user tries to upload/update the same document within specific interval. But what if user does this for different documents? Let’s say, we set debounce interval to 30 seconds. As the user, I upload 5 documents within this interval. Then the admin will get 5 delayed notifications. All seems fine. We get a separate notification for each document. Good? Nah, could be better.
We can group this notifications into one! Then instead of 5 the admin will get just one with all the documents. Great? Of course, man 😁 Let’s get into it!
For implementation we also need to use Cache module from NestJS (read how to use it here). We need that to store jobs’ payloads. Of course, you can use any storage (Redis, LRU, whatever). I’ll continue with Cache module as I use that in other modules.
import { Cache } from 'cache-manager';
import { cloneDeep } from 'lodash';
export async function stackNotificationsIntoOneByPayloadKey(
cacheManager: Cache,
notification: Record<string, any>,
debounceKey: string,
stackKey: string,
) {
const copy = cloneDeep(notification);
const cached = (await cacheManager.get(debounceKey)) as any[];
if (!cached) {
await cacheManager.set(debounceKey, copy[stackKey]); // set ttl as debounced timeout
} else {
const stacked = cached.concat(copy[stackKey]);
copy[stackKey] = stacked;
await cacheManager.set(debounceKey, stacked); // set ttl as debounced timeout
}
return copy;
}
Ok, what’s happening here?
We clone the new notification and check if there’s something in the cache by debounceKey
. If it is we just concatenate the old payload with the new one and set it to cache.
That’s how we can apply that. Do you remember what out notification class looks like? It has uploadedDocuments
prop, which is a string array with document names.
import { snakeCase } from "lodash";
// some of the bull job options
const jobName = 'notification';
const jobOptions = { removeOnComplete: true };
const notification = new Notification({uploadedDocuments: ["CV"]}) // implementation is not really important here
const debounceKey = snakeCase(notification.receiver + notification.type);
const stacked = await stackNotificationsIntoOneByPayloadKey(
this.cacheManager,
notification,
debounceKey,
"uploadedDocuments" // pay attention here
);
await addDebouncedJob({
data: stacked,
debounceKey,
debounceTimeout: 30000, // 30 seconds
jobName,
options: jobOptions,
queue: this.notificationsQueue,
});
Step by step
We debounce the notifications for 30 seconds as in the previous section (only the last one will be sent). But this only works if the user updates the same document. If documents are different, the admin will get a notification for each one.
With this approach, all notifications will be stacked into one by checking if the cache manager holds any value by debouncedKey
key name. If it does, we just concatenate previous stackKey
values with the new ones and update the notification body.
In our case, whenever we add the notification to the queue (when user uploads documents) we check if there are any documents stored in the cache. We check that by debounceKey
which is the same within one receiver (because receiver IDs are different).
That’s the whole point - debounce key stays the same for one pair of receiver and notification type. We can distinguish jobs between different receivers by that key.
Then we merge the old document array with the new one and set it to the cache. We repeat that process if there’s a new notification within debounced interval we set before.
Sneak peak
How notifications will be sent and stored doesn’t matter for now, we’ll discuss it in one of the following posts. I’ll show you how to design and implement a notification system using NestJS, Queues, and Redis. And I’ll cover those debounced jobs in more detail also. Must be interesting.