Queue

Queues are an extremely powerful tool when you have thousands of tasks that could block the main thread for no reason, as they could be simply processed in the background.

The default driver that is shipped provides an in-memory queue that is capable of processing thousands of jobs but using your own cache driver is just as easy.

Queue Usage

Create an instance

1import { Contracts, Identifiers } from "@mainsail/contracts";
2 
3const queue: Queue = app.get<Contracts.Kernel.QueueFactory>(Identifiers.Services.Queue.Factory)();

Start the queue

1queue.start();

Stop the queue

1queue.stop();

Pause the queue

1queue.pause();

Resume the queue

1queue.resume();

Clear the queue

1queue.clear();

Push a new job onto the default queue

1queue.push(() => console.log("Hello World");

Push a new job onto the default queue after a delay

1queue.later(60, () => console.log("Hello World"));

Push an array of jobs onto the default queue

1queue.bulk([
2 () => console.log("Hello World"),
3 () => console.log("Hello World")
4]);

Get the size of the given queue

1queue.size();

Extending

As explained in a previous article, it is possible to extend Core services due to the fact that a Manager pattern is used. Lets go over a quick example of how you could implement your own queue.

Implementing the Driver

Implementing a new driver is as simple as importing the queue contract that needs to be satisfied and implement the methods specified in it.

Information

In this example we will use p-queue which is a promise-based queue with concurrency control.

1import { Contracts } from "@mainsail/contracts";
2 
3export class MemoryQueue implements Contracts.Kernel.Queue {
4 private readonly queue: PQueue = new PQueue({ autoStart: false });
5 
6 public async start(): Promise<void> {
7 await this.queue.start();
8 }
9 
10 public async stop(): Promise<void> {
11 await this.queues.delete(queue);
12 }
13 
14 public async pause(): Promise<void> {
15 await this.queue.pause();
16 }
17 
18 public async resume(): Promise<void> {
19 await this.queue.resume();
20 }
21 
22 public async clear(): Promise<void> {
23 await this.queue.clear();
24 }
25 
26 public async push<T = any>(fn: () => PromiseLike<T>): Promise<void> {
27 this.queue.add(fn);
28 }
29 
30 public async later<T>(delay: number, fn: () => PromiseLike<T>): Promise<void> {
31 setTimeout(() => this.push(fn), delay);
32 }
33 
34 public async bulk<T>(functions: (() => PromiseLike<T>)[]): Promise<void> {
35 this.queue.addAll(functions);
36 }
37 
38 public size(): number {
39 return this.queue.size;
40 }
41}

Implementing the service provider

Now that we have implemented our memory driver for the queue service we can create a service provider to register it.

1import { Container, Contracts, Providers, Services } from "@arkecosystem/core-kernel";
2 
3export class ServiceProvider extends Providers.ServiceProvider {
4 public async register(): Promise<void> {
5 const cacheManager: Services.Queue.QueueManager = this.app.get<Services.Queue.QueueManager>(
6 Container.Identifiers.QueueManager,
7 );
8 
9 await cacheManager.extend("memory", MemoryQueue);
10 }
11}
12 
13import { Contracts, Identifiers } from "@mainsail/contracts";
14import { Providers, Services } from "@mainsail/kernel";
15 
16import { MemoryQueue } from "./memory-queue";
17 
18export class ServiceProvider extends Providers.ServiceProvider {
19 public async register(): Promise<void> {
20 const cacheManager = this.app.get<Services.Log.QueueManager>(
21 Identifiers.Services.Queue.Manager,
22 );
23 
24 await cacheManager.extend("console", async () =>
25 this.app.resolve<Contracts.Kernel.Queue>(MemoryQueue).make()
26 );
27 }
28}
  1. We retrieve an instance of the cache manager that is responsible for managing queue drivers.
  2. We call the extend method with an asynchronous function which is responsible for creating the queue instance.
Last updated 11 months ago
Edit Page
Share: