Bull
Definición
Bull es una librería Node que implementa un sistema de colas rápido y robusto basado en redis.
Aunque es posible implementar colas directamente utilizando los comandos de Redis, esta biblioteca proporciona una API que se encarga de todos los detalles de bajo nivel y enriquece la funcionalidad básica de Redis para que los casos de uso más complejos se puedan manejar fácilmente.
Las colas pueden resolver muchos problemas diferentes de una manera elegante, desde suavizar los picos de procesamiento hasta crear canales de comunicación robustos entre microservicios o descargar el trabajo pesado de un servidor a muchos Workers más pequeños, etc.
Instalación
pnpm add bull
Colas simples
Para crear una cola simple, debemos instanciar un objeto de Bull así:
const myFirstQueue = new Bull('my-first-queue');
Una instancia de cola puede tener normalmente 3 roles principales diferentes: Un productor de trabajos (producer), un consumidor de trabajos (consumer) y/o un receptor de eventos (listeners).
Una cola puede tener muchos producers, muchos consumer y muchos listeners. Un aspecto importante es que los productores pueden añadir trabajos a una cola aunque no haya consumidores disponibles en ese momento: las colas proporcionan comunicación asíncrona, que es una de las características que las hace tan potentes, también puedes tener uno o varios Workers consumiendo trabajos de la cola, que consumirán los trabajos en un orden determinado: FIFO (por defecto), LIFO o según prioridades.
En cuanto a los Workers, pueden ejecutarse en el mismo proceso o en procesos diferentes, en la misma máquina o en un clúster. Redis actuará como punto común, y siempre que un consumidor o productor pueda conectarse a Redis, podrán cooperar procesando los trabajos.
Producers
Un productor de trabajos es simplemente un programa de Nodo que añade trabajos a una cola, para inicializar un producer simplemente se debe hacer algo como lo siguiente:
const job = await myFirstQueue.add({
// Job data
foo: 'bar'
});
Consumers
Un consumer o Worker, no es más que un programa de Node que define una función de proceso como ésta:
myFirstQueue.process(async (job) => {
return doSomething(job.data);
});
La función de proceso será llamada cada vez que el Worker esté inactivo y haya trabajos que procesar en la cola. Dado que el consumidor no necesita estar en línea cuando se añaden los trabajos, puede ocurrir que la cola ya tenga muchos trabajos esperando en ella, por lo que el proceso se mantendrá ocupado procesando los trabajos uno a uno hasta que todos ellos estén terminados.
El valor devuelto por su función de proceso se almacenará en el objeto jobs y se podrá acceder a él posteriormente, por ejemplo en un listener para el evento completado
Listeners
Por último, puedes limitarte a escuchar los eventos que se produzcan en la cola. Los escuchadores pueden ser locales, lo que significa que sólo recibirán las notificaciones producidas en la instancia de cola dada, o globales, lo que significa que escucharán todos los eventos de una cola dada. Por tanto, puedes adjuntar un listener a cualquier instancia, incluso a instancias que estén actuando como consumidoras o productoras. Pero ten en cuenta que un evento local nunca se disparará si la cola no es un consumidor o productor, tendrás que utilizar eventos globales en ese caso.
// Define a local completed event
myFirstQueue.on('completed', (job, result) => {
console.log(`Job completed with result ${result}`);
})
Ciclo de vida de un trabajo
Para utilizar todo el potencial de las colas de Bull, es importante entender el ciclo de vida de un trabajo. Desde el momento en que un producer llama al método add en una instancia de cola, un trabajo entra en un ciclo de vida en el que estará en diferentes estados, hasta su finalización o fallo (aunque técnicamente un trabajo fallido podría ser reintentado y obtener un nuevo ciclo de vida).

Cuando un trabajo se añade a una cola puede estar en uno de estos dos estados, puede estar en el estado "wait", que es, de hecho, una lista de espera, donde todos los trabajos deben entrar antes de poder ser procesados, o puede estar en un estado "delayed": un estado retrasado implica que el trabajo está esperando algún tiempo de espera o a ser promovido para ser procesado, sin embargo, un trabajo retrasado no será procesado directamente, en su lugar será colocado al principio de la lista de espera y procesado tan pronto como un trabajador esté ocioso.
El siguiente estado de un trabajo es el estado "active". El estado activo está representado por un conjunto, y son trabajos que se están procesando en ese momento, es decir, que se están ejecutando en la función de proceso explicada en el capítulo anterior. Un trabajo puede estar en estado activo durante un tiempo ilimitado hasta que se complete el proceso o se lance una excepción, de modo que el trabajo terminará en estado "completed" o "failed".
Trabajos estancados
En Bull, definimos el concepto de trabajos estancados. Un trabajo estancado es un trabajo que se está procesando pero en el que Bull sospecha que la función de proceso se ha colgado. Esto ocurre cuando la función de proceso está procesando un trabajo y está manteniendo la CPU tan ocupada que el trabajador no es capaz de decirle a la cola que todavía está intentando el trabajo.
Cuando un trabajo se bloquea, dependiendo de la configuración del trabajo, éste puede ser reintentado por otro trabajador inactivo o puede pasar al estado de fallo.
Los trabajos estancados pueden evitarse asegurándose de que la función de proceso no mantiene ocupado el bucle de eventos del Nodo durante demasiado tiempo (estamos hablando de varios segundos con las opciones por defecto de Bull)
Eventos
Una cola en Bull genera un puñado de eventos que son útiles en muchos casos de uso. Los eventos pueden ser locales para una instancia de cola determinada (un worker), por ejemplo, si un trabajo se completa en un worker determinado se emitirá un evento local sólo para esa instancia. Sin embargo, es posible escuchar todos los eventos, anteponiendo global: al nombre del evento local. Entonces podemos escuchar todos los eventos producidos por todos los workers de una cola dada.
Un evento local puede ser el siguiente:
queue.on('completed', job => {
console.log(`Job with id ${job.id} has been completed`);
})
Y un evento global es definido así
queue.on('global:completed', jobId => {
console.log(`Job with id ${jobId} has been completed`);
})
Todos los eventos de Bull
bull/REFERENCE.md at master · OptimalBits/bull
Opciones de las colas
Una cola puede ser instanciada con algunas opciones útiles, por ejemplo, puede especificar la ubicación y la contraseña de su servidor Redis, así como algunas otras configuraciones útiles.
Todos estos ajustes se describen en la referencia de Bull
bull/REFERENCE.md at master · OptimalBits/bull
Limitador de velocidad
Es posible crear colas que limiten el número de trabajos procesados en una unidad de tiempo. El limitador se define por cola, independientemente del número de workers, por lo que puedes escalar horizontalmente y seguir limitando el ritmo de procesamiento fácilmente
// Limit queue to max 1.000 jobs per 5 seconds.
const myRateLimitedQueue = new Queue('rateLimited', {
limiter: {
max: 1000,
duration: 5000
}
});
Cuando una cola alcanza el límite de velocidad, los trabajos solicitados se incorporan a la cola retrasada (delayed).
Nombre de trabajos
Es posible dar nombres a los trabajos. Esto no cambia nada de la mecánica de la cola, pero puede utilizarse para obtener un código más claro y una mejor visualización en las herramientas de interfaz de usuario:
// Jobs producer
const myJob = await transcoderQueue.add('image', { input: 'myimagefile' });
const myJob = await transcoderQueue.add('audio', { input: 'myaudiofile' });
const myJob = await transcoderQueue.add('video', { input: 'myvideofile' });
// Worker
transcoderQueue.process('image', processImage);
transcoderQueue.process('audio', processAudio);
transcoderQueue.process('video', processVideo);
Tenga en cuenta que cada instancia de cola debe proporcionar un procesador para cada trabajo nombrado o se producirá una excepción.
Sandboxed Processors
Al definir una función de proceso, también es posible proporcionar una configuración de concurrencia. Esta configuración permite al worker procesar varios trabajos en paralelo. Los trabajos se siguen procesando en el mismo proceso del Nodo, y si los trabajos son muy intensivos se manejarán sin problemas.
A veces los trabajos son más intensivos en CPU, lo que podría bloquear el bucle de eventos del Nodo durante demasiado tiempo y Bull podría decidir que el trabajo se ha estancado. Para evitar esta situación, es posible ejecutar las funciones de proceso en procesos de Nodo separados. En este caso, el parámetro de concurrencia decidirá el número máximo de procesos concurrentes que se permite ejecutar.
Llamamos a este tipo de procesos por procesos "sandboxed", y también tienen la propiedad de que si el crash no afectará a ningún otro proceso, y un nuevo proceso se generará automáticamente para reemplazarlo.
Tipo de trabajos
El tipo de trabajo por defecto en Bull es "FIFO" (first in first out), lo que significa que los trabajos se procesan en el mismo orden en que llegan a la cola. A veces es útil procesar los trabajos en un orden diferente.
LIFO
Lifo (last in first out) significa que los trabajos se añaden al principio de la cola y, por tanto, se procesarán en cuanto el trabajador esté inactivo.
const myJob = await myqueue.add({ foo: 'bar' }, { lifo: true });
Delayed
También es posible añadir trabajos a la cola que se retrasan una cierta cantidad de tiempo antes de ser procesados. Tenga en cuenta que el parámetro de retraso significa la cantidad mínima de tiempo que el trabajo esperará antes de ser procesado. Una vez transcurrido el tiempo de retraso, el trabajo se moverá al principio de la cola y se procesará en cuanto haya un worker libre.
// Delayed 5 seconds
const myJob = await myqueue.add({ foo: 'bar' }, { delay: 5000 });
Prioritized
Los trabajos pueden añadirse a una cola con un valor de prioridad. Los trabajos con mayor prioridad se procesarán antes que los trabajos con menor prioridad. La prioridad más alta es 1, y más baja cuanto mayor sea el número entero utilizado. Tenga en cuenta que las colas de prioridad son un poco más lentas que una cola estándar (actualmente el tiempo de inserción es O(n), siendo n el número de trabajos que están esperando en la cola, en lugar de O(1) para las colas estándar).
const myJob = await myqueue.add({ foo: 'bar' }, { priority: 3 });
Repeatable
Los trabajos repetibles son trabajos especiales que se repiten indefinidamente o hasta que se alcanza una fecha máxima determinada o el número de repeticiones, según una especificación cron o un intervalo de tiempo.
// Repeat every 10 seconds for 100 times.
const myJob = await myqueue.add(
{ foo: 'bar' },
{
repeat: {
every: 10000,
limit: 100
}
}
);
// Repeat payment job once every day at 3:15 (am)
paymentsQueue.add(paymentsData, { repeat: { cron: '15 3 * * *' } });
Algunas consideraciones sobre los trabajos repetibles
Bull es lo suficientemente inteligente como para no añadir el mismo trabajo repetido si las opciones de repetición son las mismas. (CUIDADO: Un id de trabajo es parte de las opciones de repetición desde: https://github.com/OptimalBits/bull/pull/603, por lo tanto pasar ids de trabajos permitirá que trabajos con el mismo cron sean insertados en la cola).
Si no hay trabajadores en ejecución, los trabajos repetibles no se acumularán la próxima vez que un worker esté en línea.
Los trabajos repetibles pueden eliminarse mediante el método removeRepeatable.
Last updated