Nestjs/bull
Dado que los trabajos se almacenan en Redis, cada vez que se instancie una cola específica (por ejemplo, cuando se inicia/reinicia una aplicación), ésta intentará procesar cualquier trabajo antiguo que pueda existir de una sesión anterior no finalizada.
Cada cola puede tener uno o varios productores, consumidores y oyentes. Los consumidores recuperan los trabajos de la cola en un orden específico: FIFO (por defecto), LIFO, o según prioridades. Aquí se trata el control del orden de procesamiento de las colas.
Definición
Nest proporciona el paquete @nestjs/bull como una abstracción/envoltura sobre Bull, una implementación de sistema de colas basada en Node.js popular, bien soportada y de alto rendimiento. El paquete facilita la integración de Bull Queues una aplicación Nest de la manera más amigable
Bull utiliza Redis para persistir los datos de los trabajos, por lo que necesitará tener Redis instalado en su sistema. Debido a que está respaldado por Redis, su arquitectura Queue puede ser completamente distribuida e independiente de la plataforma. Por ejemplo, puede tener algunos producers y consumers de Queue y listeners ejecutándose en Nest en uno (o varios) nodos, y otros producers, consumers y listeners ejecutándose en otras plataformas Node.js en otros nodos de la red.
Ventajas de uso
Las colas son un potente patrón de diseño que ayuda a resolver los problemas más comunes de escalado y rendimiento de las aplicaciones. Algunos ejemplos de problemas que las colas pueden ayudarle a resolver son:
Suavizar los picos de procesamiento. Por ejemplo, si los usuarios pueden iniciar tareas que consumen muchos recursos en momentos arbitrarios, puede añadir estas tareas a una cola en lugar de realizarlas de forma sincrónica. A continuación, puede hacer que los procesos de los trabajadores extraigan tareas de la cola de forma controlada. Puede añadir fácilmente nuevos consumidores de cola para ampliar la gestión de tareas de back-end a medida que la aplicación se amplía.
Divide las tareas monolíticas que, de otro modo, podrían bloquear el bucle de eventos de Node.js. Por ejemplo, si una solicitud de usuario requiere un trabajo intensivo de la CPU como la transcodificación de audio, puedes delegar esta tarea a otros procesos, liberando los procesos orientados al usuario para que sigan respondiendo.
Proporcionar un canal de comunicación fiable entre varios servicios. Por ejemplo, puede poner en cola tareas (trabajos) en un proceso o servicio y consumirlas en otro. Puede recibir notificaciones (mediante la escucha de eventos de estado) en caso de finalización, error u otros cambios de estado en el ciclo de vida del trabajo desde cualquier proceso o servicio. Cuando los producers o consumers de colas fallan, su estado se conserva y la gestión de tareas puede reiniciarse automáticamente cuando se reinician los nodos.
Instalación
Para empezar a utilizarlo, primero instalamos las dependencias necesarias.
pnpm add @nestjs/bull bull
pnpm add @types/bull
Uso
Una vez finalizado el proceso de instalación, podemos importar el
BullModule
al módulo raízAppModule
.import { Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bull'; @Module({ imports: [ BullModule.forRoot({ redis: { host: 'localhost', port: 6379, }, }), ], }) export class AppModule {}
El método
forRoot()
se utiliza para registrar un objeto de configuración del paquetebull
que será utilizado por todas las colas registradas en la aplicación (a menos que se especifique lo contrario). Un objeto de configuración consta de las siguientes propiedades:limiter
:RateLimiter
- opciones para controlar la velocidad a la que se procesan los trabajos de la cola. Para obtener más información, dirijase a RateLimiter. Opcionalinterface RateLimiter { max: number, // Max number of jobs processed duration: number, // per duration in milliseconds bounceBack: boolean = false; // When jobs get rate limited, they stay in the waiting queue and are not moved to the delayed queue }
redis
:RedisOpts
- Opciones para configurar la conexión Redis. Ver RedisOpts para más información. Opcional.interface RedisOpts { port?: number = 6379; host?: string = localhost; db?: number = 0; password?: string; }
prefix
:string
- Prefijo para todas las claves de cola. Opcional.defaultJobOptions
:JobOpts
- Opciones para controlar la configuración predeterminada de los nuevos trabajos. Ver JobOpts para más información. Opcionalinterface JobOpts { priority: number; // Optional priority value. ranges from 1 (highest priority) to MAX_INT (lowest priority). Note that // using priorities has a slight impact on performance, so do not use it if not required. delay: number; // An amount of miliseconds to wait until this job can be processed. Note that for accurate delays, both // server and clients should have their clocks synchronized. [optional]. attempts: number; // The total number of attempts to try the job until it completes. repeat: RepeatOpts; // Repeat job according to a cron specification. backoff: number | BackoffOpts; // Backoff setting for automatic retries if the job fails lifo: boolean; // if true, adds the job to the right of the queue instead of the left (default false) timeout: number; // The number of milliseconds after which the job should be fail with a timeout error [optional] jobId: number | string; // Override the job ID - by default, the job ID is a unique // integer, but you can use this setting to override it. // If you use this option, it is up to you to ensure the // jobId is unique. If you attempt to add a job with an id that // already exists, it will not be added. removeOnComplete: boolean | number; // If true, removes the job when it successfully // completes. A number specified the amount of jobs to keep. Default behavior is to keep the job in the completed set. removeOnFail: boolean | number; // If true, removes the job when it fails after all attempts. A number specified the amount of jobs to keep // Default behavior is to keep the job in the failed set. stackTraceLimit: number; // Limits the amount of stack trace lines that will be recorded in the stacktrace. }
settings
:AdvancedSettings
- Ajustes avanzados de configuración de la cola. Por lo general, no deben modificarse. Consulte AdvancedSettings para obtener más información. Opcional.interface AdvancedSettings { lockDuration: number = 30000; // Key expiration time for job locks. lockRenewTime: number = 15000; // Interval on which to acquire the job lock stalledInterval: number = 30000; // How often check for stalled jobs (use 0 for never checking). maxStalledCount: number = 1; // Max amount of times a stalled job will be re-processed. guardInterval: number = 5000; // Poll interval for delayed jobs and added jobs. retryProcessDelay: number = 5000; // delay before processing next job in case of internal error. backoffStrategies: {}; // A set of custom backoff strategies keyed by name. drainDelay: number = 5; // A timeout for when the queue is in drained state (empty waiting for jobs). }
Todas las opciones son opcionales, proporcionando un control detallado sobre el comportamiento de la cola. Se pasan directamente al constructor de la cola de bull. Lea más sobre estas opciones aquí.
Para registrar una cola, importe el módulo dinámico
BullModule.registerQueue()
, así:BullModule.registerQueue({ name: 'audio', });
El método
registerQueue()
se utiliza para instanciar y/o registrar colas. Las colas se comparten entre módulos y procesos que se conectan a la misma base de datos Redis subyacente con las mismas credenciales. Cada cola es única por su propiedadname
. Un nombre de cola se utiliza tanto como un token de inyección (para inyectar la cola en controladores/proveedores), y como un argumento para decoradores para asociar clases consumers y listeners con colas.Dado que los trabajos se almacenan en Redis, cada vez que se instancie una cola específica (por ejemplo, cuando se inicia/reinicia una aplicación), ésta intentará procesar cualquier trabajo antiguo que pueda existir de una sesión anterior no finalizada.
Cada cola puede tener uno o varios producers, consumers y listeners. Los consumidores recuperan los trabajos de la cola en un orden específico: FIFO (por defecto), LIFO, o según prioridades. Aquí se trata el control del orden de procesamiento de las colas.
Configuraciones con nombre
Si tus colas se conectan a varias instancias diferentes de Redis, puedes utilizar una técnica llamada
named configurations
. Esta característica le permite registrar varias configuraciones bajo claves específicas, a las que puede hacer referencia en las opciones de cola.Por ejemplo, suponiendo que tienes una instancia de Redis adicional (aparte de la predeterminada) utilizada por unas cuantas colas registradas en tu aplicación, puedes registrar su configuración de la siguiente manera:
BullModule.forRoot('alternative-config', { redis: { port: 6381, }, });
En el ejemplo anterior,
"alternative-config"
es sólo una clave de configuración (puede ser cualquier cadena arbitraria).Con esto en su lugar, ahora puede apuntar a esta configuración en el objeto de opciones
registerQueue()
:BullModule.registerQueue({ configKey: 'alternative-queue' name: 'video', });
Producers
Los
job producers
añaden trabajos a las colas. Los productores suelen ser servicios de aplicación (providers de Nest). Para añadir trabajos a una cola, primero inyecte la cola en el servicio de la siguiente manera:import { Injectable } from '@nestjs/common'; import { Queue } from 'bull'; import { InjectQueue } from '@nestjs/bull'; @Injectable() export class AudioService { constructor(@InjectQueue('audio') private audioQueue: Queue) {} }
Ahora, añade un trabajo llamando al método
add()
de la cola, pasando un objetojob
definido por el usuario. Los trabajos se representan como objetos JavaScript serializables (ya que así es como se almacenan en la base de datos Redis). La forma del trabajo que pases es arbitraria; úsala para representar la semántica de tu objeto de trabajo.const job = await this.audioQueue.add({ foo: 'bar', });
Los trabajos pueden tener nombres únicos. Esto permite crear consumers especializados que sólo procesarán trabajos con un nombre determinado.
const job = await this.audioQueue.add('transcode', { foo: 'bar', });
AdvertenciaCuando utilice trabajos con nombre, debe crear procesadores para cada nombre único añadido a una cola, o la cola se quejará de que le falta un procesador para el trabajo en cuestión.
Job options
Los trabajos pueden tener opciones adicionales asociadas. Pase un objeto
options
después del argumentojob
en el métodoQueue.add()
. Las propiedades de las opciones de los trabajos son:priority
:number
- Valor de prioridad opcional. Va de 1 (prioridad más alta) a MAX_INT (prioridad más baja). Tenga en cuenta que el uso de prioridades tiene un ligero impacto en el rendimiento, así que úselas con precaución.delay
:number
- Cantidad de tiempo (milisegundos) que hay que esperar hasta que este trabajo pueda ser procesado. Tenga en cuenta que para retrasos precisos, tanto el servidor como los clientes deben tener sus relojes sincronizados.attempts
:number
- El número total de intentos para intentar el trabajo hasta que se complete.repeat
:RepeatOpts
- Repite el trabajo según una especificación cron. Véase RepeatOpts.interface RepeatOpts { cron?: string; // Cron string tz?: string; // Timezone startDate?: Date | string | number; // Start date when the repeat job should start repeating (only with cron). endDate?: Date | string | number; // End date when the repeat job should stop repeating. limit?: number; // Number of times the job should repeat at max. every?: number; // Repeat every millis (cron setting cannot be used together with this setting.) count?: number; // The start value for the repeat iteration count. }
backoff
:number | BackoffOpts
- Configuración de backoff para reintentos automáticos si el trabajo falla. Véase BackoffOpts.interface BackoffOpts { type: string; // Backoff type, which can be either `fixed` or `exponential`. A custom backoff strategy can also be specified in `backoffStrategies` on the queue settings. delay: number; // Backoff delay, in milliseconds. }
lifo
:boolean
- Si es true, añade el trabajo al extremo derecho de la cola en lugar de al izquierdo (por defecto false).timeout
:number
- El número de milisegundos después de los cuales el trabajo debe fallar con un error de tiempo de espera.jobId
:number | string
- Sobreescribir el ID del trabajo: por defecto, el ID del trabajo es un número entero único, pero puede utilizar esta opción para sobreescribirlo. Si utiliza esta opción, depende de usted asegurarse de que eljobId
es único. Si intenta añadir un trabajo con un ID que ya existe, no se añadirá.removeOnComplete
:boolean | number
- Si es true, elimina el trabajo cuando finaliza con éxito. Un número especifica la cantidad de trabajos a mantener. El comportamiento por defecto es mantener el trabajo en el conjunto completado.removeOnFail
:boolean | number
- Si es true, elimina el trabajo cuando falla después de todos los intentos. Un número especifica la cantidad de trabajos a mantener. El comportamiento por defecto es mantener el trabajo en el conjunto fallido.stackTraceLimit
:number
- Limita la cantidad de líneas de seguimiento de pila que se registrarán en stacktrace.
Estos son algunos ejemplos de personalización de trabajos con opciones de trabajo.
Para retrasar el inicio de un trabajo, utilice la propiedad de configuración de
delay
.const job = await this.audioQueue.add( { foo: 'bar', }, { delay: 3000 }, // 3 seconds delayed );
Para añadir un trabajo al extremo derecho de la cola (procesar el trabajo como LIFO (Last In First Out)), establezca la propiedad
lifo
del objeto de configuración entrue
.const job = await this.audioQueue.add( { foo: 'bar', }, { lifo: true }, );
Para dar prioridad a un trabajo, utilice la propiedad de
priority
.const job = await this.audioQueue.add( { foo: 'bar', }, { priority: 2 }, );
Consumers
Un consumer es una clase que define métodos que o bien procesan trabajos añadidos a la cola, o bien escuchan eventos en la cola, o ambas cosas. Declare una clase consumidora usando el decorador
@Processor()
como sigue:import { Processor } from '@nestjs/bull'; @Processor('audio') export class AudioConsumer {}
Los consumers deben estar registrados como
proveedores
para que el paquete@nestjs/bull
pueda recogerlos.Donde el argumentos tipo string del decorador (por ejemplo,
'audio'
) es el nombre de la cola que se asociará con los métodos de la clase.Dentro de una clase consumer, declare los gestores de trabajos decorando los métodos de los gestores con el decorador
@Process()
.import { Processor, Process } from '@nestjs/bull'; import { Job } from 'bull'; @Processor('audio') export class AudioConsumer { @Process() async transcode(job: Job<unknown>) { let progress = 0; for (i = 0; i < 100; i++) { await doSomething(job.data); progress += 1; await job.progress(progress); } return {}; } }
El método decorado (por ejemplo,
transcode()
) se llama siempre que el trabajador está inactivo y hay trabajos que procesar en la cola. Este método recibe el objetojob
como único argumento. El valor devuelto por el método handler se almacena en el objeto job y se puede acceder a él posteriormente, por ejemplo en un listener para el evento completado.Los objetos
Job
tienen múltiples métodos que te permiten interactuar con su estado. Por ejemplo, el código anterior utiliza el métodoprogress()
para actualizar el progreso del trabajo. Consulte aquí la referencia completa de laAPI
de objetos Job.Puedes designar que un método manejador de trabajos maneje sólo trabajos de un cierto tipo (trabajos con un
nombre
específico) pasando esenombre
al decorador@Process()
como se muestra a continuación. Puedes tener múltiples manejadores@Process()
en una clase consumidora dada, correspondiendo a cada tipo de trabajo (name
). Cuando utilice trabajos con nombre, asegúrese de tener un manejador correspondiente a cada nombre.@Process('transcode') async transcode(job: Job<unknown>) { ... }
Request-scoped consumers
Cuando un consumidor es marcado como request-scoped (aprenda más sobre los ámbitos de inyección aquí), una nueva instancia de la clase será creada exclusivamente para cada trabajo. La instancia será recolectada después de que el trabajo se haya completado.
@Processor({ name: 'audio', scope: Scope.REQUEST, })
Dado que las clases consumidoras con ámbito de petición se instancian dinámicamente y con ámbito de un único trabajo, puede inyectar un
JOB_REF
a través del constructor utilizando un enfoque estándar.constructor(@Inject(JOB_REF) jobRef: Job) { console.log(jobRef); }
El token
JOB_REF
se importa del paquete@nestjs/bull
.Event Listeners
Bull genera un conjunto de eventos útiles cuando se producen cambios en el estado de la cola y/o del trabajo. Nest proporciona un conjunto de decoradores que permiten suscribirse a un conjunto básico de eventos estándar. Estos se exportan desde el paquete
@nestjs/bull
.Los event listeners deben declararse dentro de una clase consumer (es decir, dentro de una clase decorada con el decorador
@Processor()
). Para escuchar un evento, utilice uno de los decoradores de la tabla siguiente para declarar un manejador del evento. Por ejemplo, para escuchar el evento emitido cuando un trabajo entra en estado activo en la cola deaudio
, utilice la siguiente construcción:import { Processor, Process } from '@nestjs/bull'; import { Job } from 'bull'; @Processor('audio') export class AudioConsumer { @OnQueueActive() onActive(job: Job) { console.log( `Processing job ${job.id} of type ${job.name} with data ${job.data}...`, ); } ...
Dado que Bull opera en un entorno distribuido (multinodo), define el concepto de localización de eventos. Este concepto reconoce que los eventos pueden ser desencadenados enteramente dentro de un único proceso, o en colas compartidas de diferentes procesos. Un evento local es aquel que se produce cuando se desencadena una acción o un cambio de estado en una cola del proceso local. En otras palabras, cuando tus producers y consumers de eventos son locales a un único proceso, todos los eventos que ocurren en las colas son locales.
Cuando una cola se comparte entre varios procesos, nos encontramos con la posibilidad de eventos globales. Para que un oyente en un proceso reciba una notificación de evento disparada por otro proceso, debe registrarse para un evento global.
Los manejadores de eventos se invocan cada vez que se emite el evento correspondiente. El manejador es llamado con la firma mostrada en la tabla inferior, proporcionando acceso a la información relevante para el evento. A continuación discutiremos una diferencia clave entre las firmas de los manejadores de eventos locales y globales.
Receptores de eventos localesReceptores de eventos globalesFirma del método Handler / Cuando se dispara@OnQueueError()
@OnGlobalQueueError()
handler(error: Error) - Se ha producido un error. error contiene el error desencadenante.
@OnQueueWaiting()
@OnGlobalQueueWaiting()
handler(jobId: number
@OnQueueActive()
@OnGlobalQueueActive()
handler(job: Job) - El trabajo job ha comenzado.
@OnQueueStalled()
@OnGlobalQueueStalled()
handler(job: Job) - El trabajo job ha sido marcado como paralizado. Esto es útil para depurar job workers que se bloquean o pausan el bucle de eventos.
@OnQueueProgress()
@OnGlobalQueueProgress()
handler(job: Job, progress: number) - El progreso del trabajo job se actualizó para valorar el progreso.
@OnQueueCompleted()
@OnGlobalQueueCompleted()
handler(job: Job, result: any) Trabajo job completado con éxito con un resultado
@OnQueueFailed()
@OnGlobalQueueFailed()
handler(job: Job, err: Error)El trabajo job ha fallado por err.
@OnQueuePaused()
@OnGlobalQueuePaused()
handler() La cola se ha puesto en pausa.
@OnQueueResumed()
@OnGlobalQueueResumed()
handler(job: Job)La cola se ha reanudado.
@OnQueueCleaned()
@OnGlobalQueueCleaned()
handler(jobs: Job[], type: string) Los trabajos antiguos se han limpiado de la cola. jobs es un array de trabajos limpiados, y type es el tipo de trabajos limpiados.
@OnQueueDrained()
@OnGlobalQueueDrained()
handler() Se emite siempre que la cola haya procesado todos los trabajos en espera (aunque pueda haber algún trabajo retrasado que aún no se haya procesado).
@OnQueueRemoved()
@OnGlobalQueueRemoved()
handler(job: Job) El trabajo job se ha eliminado correctamente.
Cuando se escuchan eventos globales, las firmas de los métodos pueden ser ligeramente diferentes de su contraparte local. En concreto, cualquier firma de método que reciba objetos
job
en la versión local, recibe en cambio unjobId
(number
) en la versión global. Para obtener una referencia al objeto de trabajo real en tal caso, utilice el métodoQueue#getJob
. Esta llamada debe ser esperada, y por lo tanto el manejador debe ser declaradoasync
. Por ejemplo:@OnGlobalQueueCompleted() async onGlobalCompleted(jobId: number, result: any) { const job = await this.immediateQueue.getJob(jobId); console.log('(Global) on completed: job ', job.id, ' -> result: ', result); }
Gestión de colas
Las colas tienen una
API
que te permite realizar funciones de gestión como pausar y reanudar, recuperar el recuento de trabajos en varios estados, y varias más. Puedes encontrar laAPI
de colas completa aquí. Invoca cualquiera de estos métodos directamente sobre el objetoQueue
, como se muestra a continuación con los ejemplos de pausar/reanudar.Pausa una cola con la llamada al método
pause()
. Una cola en pausa no procesará nuevos trabajos hasta que se reanude, pero los trabajos actuales que se estén procesando continuarán hasta que finalicen.await audioQueue.pause(); // to pause a queue await audioQueue.resume(); // ro resume a queue
Separar procesos
Los gestores de trabajos también pueden ejecutarse en un proceso (fuente) separado (bifurcado). Esto tiene varias ventajas:
El proceso está aislado, por lo que si se bloquea no afecta al worker.
Se puede ejecutar código bloqueante sin afectar a la cola (los trabajos no se paran).
Mucho mejor utilización de CPUs multinúcleo.
Menos conexiones a redis.
app.module.ts
import { Module } from '@nestjs/common'; import { BullModule } from '@nestjs/bull'; import { join } from 'path'; @Module({ imports: [ BullModule.registerQueue({ name: 'audio', processors: [join(__dirname, 'processor.js')], // load all processors from an especific route }), ], }) export class AppModule {}
Ten en cuenta que como tu función se ejecuta en un proceso bifurcado, la inyección de dependencias (y el contenedor IoC) no estarán disponibles. Eso significa que tu función procesadora necesitará contener (o crear) todas las instancias de dependencias externas que necesite.
Configuración asíncrona
Es posible que desee pasar opciones de
bull
de forma asíncrona en lugar de estáticamente. En este caso, utiliza el métodoforRootAsync()
que proporciona varias formas de tratar con la configuración asíncrona. Del mismo modo, si quieres pasar opciones de cola de forma asíncrona, utiliza el métodoregisterQueueAsync()
.Un enfoque es utilizar una función de fábrica:
BullModule.forRootAsync({ useFactory: () => ({ redis: { host: 'localhost', port: 6379, }, }), });
Nuestra fábrica se comporta como cualquier otro proveedor asíncrono (por ejemplo, puede ser
async
y es capaz de inyectar dependencias a través deinject
).BullModule.forRootAsync({ imports: [ConfigModule], useFactory: async (configService: ConfigService) => ({ redis: { host: configService.get('QUEUE_HOST'), port: +configService.get('QUEUE_PORT'), }, }), inject: [ConfigService], }); // alternatively, you can use useClass syntax BullModule.forRootAsync({ useClass: BullConfigService, });
La construcción anterior instanciará
BullConfigService
dentro deBullModule
y lo utilizará para proporcionar un objeto de opciones llamando acreateSharedConfiguration()
. Tenga en cuenta que esto significa que elBullConfigService
tiene que implementar la interfazSharedBullConfigurationFactory
, como se muestra a continuación:@Injectable() class BullConfigService implements SharedBullConfigurationFactory { createSharedConfiguration(): BullModuleOptions { return { redis: { host: 'localhost', port: 6379, }, }; } }
Para evitar la creación de
BullConfigService
dentro deBullModule
y utilizar un proveedor importado de un módulo diferente, puede utilizar la sintaxisuseExisting
.BullModule.forRootAsync({ imports: [ConfigModule], useExisting: ConfigService, });
Esta construcción funciona igual que
useClass
con una diferencia crítica -BullModule
buscará módulos importados para reutilizar unConfigService
existente en lugar de instanciar uno nuevo.
Ejemplo práctico diseñado por NestJs
aquí.
Last updated