Queue

Queues are an extremely powerful tool when you have thousands of tasks that could block the main thread for no reason, as they could be simply processed in the background.

The default driver that is shipped provides an in-memory queue that is capable of processing thousands of jobs but using your own cache driver is just as easy.

Prerequisites

Before we start, we need to establish what a few recurring variables and imports in this document refer to when they are used.

1import { app, Container, Services } from "@arkecosystem/core-kernel";
  • The app import refers to the application instance which grants access to the container, configurations, system information and more.
  • The Container import refers to a namespace that contains all of the container specific entities like binding symbols and interfaces.
  • The Services import refers to a namespace that contains all of the core services. This generally will only be needed for type hints as Core is responsible for service creation and maintenance.

Queue Usage

Create an instance

1const queue: Queue = app.get<QueueFactory>(Container.Identifiers.QueueFactory)();

Start the queue

1queue.start();

Stop the queue

1queue.stop();

Pause the queue

1queue.pause();

Resume the queue

1queue.resume();

Clear the queue

1queue.clear();

Push a new job onto the default queue

1queue.push(() => console.log("Hello World");

Push a new job onto the default queue after a delay

1queue.later(60, () => console.log("Hello World"));

Push an array of jobs onto the default queue

1queue.bulk([
2 () => console.log("Hello World"),
3 () => console.log("Hello World")
4]);

Get the size of the given queue

1queue.size();

Extending

As explained in a previous article, it is possible to extend Core services due to the fact that a Manager pattern is used. Lets go over a quick example of how you could implement your own queue.

Implementing the Driver

Implementing a new driver is as simple as importing the queue contract that needs to be satisfied and implement the methods specified in it.

Information

In this example we will use p-queue which is a promise-based queue with concurrency control.

1import { Contracts } from "@arkecosystem/core-kernel";
2 
3export class MemoryQueue implements Contracts.Queue.Queue {
4 private readonly queue: PQueue = new PQueue({ autoStart: false });
5 
6 public async start(): Promise<void> {
7 await this.queue.start();
8 }
9 
10 public async stop(): Promise<void> {
11 await this.queues.delete(queue);
12 }
13 
14 public async pause(): Promise<void> {
15 await this.queue.pause();
16 }
17 
18 public async resume(): Promise<void> {
19 await this.queue.resume();
20 }
21 
22 public async clear(): Promise<void> {
23 await this.queue.clear();
24 }
25 
26 public async push<T = any>(fn: () => PromiseLike<T>): Promise<void> {
27 this.queue.add(fn);
28 }
29 
30 public async later<T>(delay: number, fn: () => PromiseLike<T>): Promise<void> {
31 setTimeout(() => this.push(fn), delay);
32 }
33 
34 public async bulk<T>(functions: (() => PromiseLike<T>)[]): Promise<void> {
35 this.queue.addAll(functions);
36 }
37 
38 public size(): number {
39 return this.queue.size;
40 }
41}

Implementing the service provider

Now that we have implemented our memory driver for the queue service we can create a service provider to register it.

1import { Container, Contracts, Providers, Services } from "@arkecosystem/core-kernel";
2 
3export class ServiceProvider extends Providers.ServiceProvider {
4 public async register(): Promise<void> {
5 const cacheManager: Services.Queue.QueueManager = this.app.get<Services.Queue.QueueManager>(
6 Container.Identifiers.QueueManager,
7 );
8 
9 await cacheManager.extend("memory", MemoryQueue);
10 }
11}
  1. We retrieve an instance of the cache manager that is responsible for managing queue drivers.
  2. We call the extend method with an asynchronous function which is responsible for creating the queue instance.
Last updated 2 years ago
Edit Page
Share: