Proyecto task
Es el proyecto donde están ubicados los crons y async jobs de la plataforma de Inlaze. Hay colas para cualquier tipo de ejecución que necesiten las plataformas, la mayoría de estas están configuradas con Bull y algunas otras con BullMQ debido a su facilidad en el manejo y mejoras. La idea es que todos los async jobs estén configurados con BullMQ, pero debido a la falta de compatibilidad con el desarrollo propio de los crons, esto requiere una reestructuración y, por lo tanto, no se puede realizar en este momento.
Resumen
Este documento proporciona una guía detallada sobre cómo crear y gestionar colas para la ejecución de tareas asincrónicas en la plataforma de Inlaze utilizando Bull y BullMQ.
Descripción
La receta describe el proceso de configuración y gestión de colas y crons en la plataforma de Inlaze utilizando Bull y BullMQ. Estas herramientas permiten manejar tareas asincrónicas de manera eficiente, lo cual es crucial para mantener la operatividad y la escalabilidad de la plataforma.
Prerrequisitos
Tener instalado Redis, PostgreSQL y Redis Commander para visualizar los jobs de Redis.
Tener instalado y configurado el proyecto de tasks: sport-enlace-sas/inlaze-backend-task.
Conexión de Bull y BullMQ con NestJS y Redis.
Casos de Uso
Ejecución de tareas asincrónicas como solicitudes HTTP.
Manejo y procesamiento de crons.
Gestión de logs y otras operaciones programadas.
Instrucciones Paso a Paso
Paso 1: Preparación
Asegúrese de que Redis, PostgreSQL y Redis Commander estén instalados y configurados.
Clone y configure el proyecto desde el repositorio sport-enlace-sas/inlaze-backend-task.
Paso 2: Proceso Principal
Crear una Cola con Bull
Crear la carpeta en el path /src/tasks/queues/http:
mkdir -p /src/tasks/queues/http
Crear el módulo en el path /src/tasks/queues/http:
touch /src/tasks/queues/http/http.module.ts
Ingresar el contenido del módulo:
import { BullModule } from "@nestjs/bull"; import { Module } from "@nestjs/common"; import { CronJobModule } from "@app/tasks/cron-job.module"; import { HttpConsumer } from "./consumers/http-queue.consumer"; import { HttpService } from "./services/http.service"; @Module({ imports: [ BullModule.registerQueue({ name: "http-queue", limiter: { max: 1, duration: 1000 }, }), CronJobModule, ], providers: [HttpConsumer, HttpService], }) export class HttpModule {}
Crear el HttpConsumer:
mkdir /src/tasks/queues/http/consumers touch /src/tasks/queues/http/consumers/http.consumer.ts
import { Processor, Process, InjectQueue, OnQueueActive, OnQueueCleaned, OnQueueCompleted, OnQueueStalled, OnQueueFailed, OnQueueWaiting, OnQueuePaused, OnQueueResumed, } from "@nestjs/bull"; import { Job, JobId, Queue } from "bull"; import { CronJobService } from "@app/tasks/services/cron-job.service"; import { HttpService } from "./services/http.service"; @Processor("http-queue") export class HttpConsumer { constructor( private readonly _cronJobService: CronJobService, @InjectQueue("http-queue") private readonly _queue: Queue, private readonly httpService: HttpService, ) {} @Process("process-http-request-task") async runRequest(job: Job<HttpDataDto>): Promise<unknown> { await this._cronJobService.checkJobExpiration(job, this._queue); return this.httpService.runRequest(job); } @OnQueueActive() async onActive(job: Job): Promise<void> { await this._cronJobService.onQueueActive(job, this._queue); } @OnQueueCompleted() async onCompleted(job: Job, result: unknown): Promise<void> { await this._cronJobService.onQueueCompleted(job, this._queue, result); } @OnQueueStalled() async onStalled(job: Job): Promise<void> { await this._cronJobService.onQueueStalled(job, this._queue); } @OnQueueFailed() async onFailed(job: Job, error: unknown): Promise<void> { await this._cronJobService.onQueueFailed(job, error, this._queue); } @OnQueueWaiting() async onWaiting(jobId: JobId): Promise<void> { await this._cronJobService.onQueueWaiting(jobId, this._queue); } @OnQueueCleaned() onCleaned(): void { this._cronJobService.onQueueCleaned(this._queue); } @OnQueuePaused() onPaused(): void { this._cronJobService.onQueuePaused(this._queue); } @OnQueueResumed() onResumed(): void { this._cronJobService.onQueueResumed(this._queue); } }
Crear el servicio de HTTP para ejecutar peticiones:
mkdir /src/tasks/queues/http/services touch /src/tasks/queues/http/services/http.service.ts
import { HttpAdapter, IHttpAdapter } from "@inlaze_techlead/inlaze-common"; import { Inject, Injectable } from "@nestjs/common"; import { Job } from "bull"; import { HttpDataDto } from "../../http/dto"; import { plainToInstance } from "class-transformer"; import { validateSync } from "class-validator"; @Injectable() export class HttpService { constructor(@Inject(HttpAdapter) private readonly httpAdapter: IHttpAdapter) {} async runRequest(job: Job<HttpData>): Promise<unknown> { const jobData = plainToInstance(HttpDataDto, job.data, { excludeExtraneousValues: true }); const errors = validateSync(jobData); if (errors.length) throw new Error(errors.toString()); return this.httpAdapter.request(jobData); } }
Crear la cola registrada como una constante para poder importarla en cualquier lugar:
mkdir /src/tasks/queues/http/constants touch /src/tasks/queues/http/constants/queue-options.constants.ts
import { BullModuleOptions } from "@nestjs/bull"; export const HTTP_QUEUE_OPTIONS: BullModuleOptions = { name: "http-queue", limiter: { max: 1, duration: 1000 }, };
Agregar la constante a las constantes de task en /src/tasks/task.constants.ts:
import { BullModuleOptions } from "@nestjs/bull"; import { HTTP_QUEUE_OPTIONS } from "./queues/apis/apis.constants"; export const QUEUE_LIST: BullModuleOptions[] = [ HTTP_QUEUE_OPTIONS, ]; export const QUEUE_NAMES: string[] = [ "http-queue", ]; export const CRON_QUEUES: string[] = [ "http-queue", ];
Importar el módulo creado en el task module /src/tasks/task.module.ts:
import { BullModule } from "@nestjs/bull"; import { Module } from "@nestjs/common"; import { TaskController } from "./controllers/task.controller"; import { QUEUE_LIST } from "./task.constants"; import { CronJobController } from "./controllers/cron.job.controller"; import { QueueController } from "./controllers/queue.controller"; import { CronJob, JobOptions, JobResult, CronJobSchedule } from "@inlaze_techlead/inlaze-typeorm"; import { TypeOrmModule } from "@nestjs/typeorm"; import { HttpModule } from "./queues/apis/api.module"; @Module({ imports: [ TypeOrmModule.forFeature([CronJobSchedule, CronJob, JobOptions, JobResult], "admin"), BullModule.registerQueue(...QUEUE_LIST), HttpModule, ], controllers: [TaskController, CronJobController, QueueController], }) export class TaskModule {}
Ejecutar el job:
const axios = require('axios'); let data = JSON.stringify({ "name": "process-http-request-task", "data": { "url": "https://processor.inlaze.com/notify-scrapper", "body": {}, "method": "POST", "headers": { "x-token": "y" } }, "queue": "http-queue" }); let config = { method: 'post', maxBodyLength: Infinity, url: 'localhost:3003/queue/apply-async', headers: { 'Content-Type': 'application/json' }, data : data }; axios.request(config) .then((response)=> { console.log(JSON.stringify(response.data)); }) .catch((error) => { console.log(error); });
Creando un cron para el job:
const axios = require('axios'); let data = JSON.stringify({ "data": { "url": "https://processor.inlaze.com/notify-scrapper", "body": {}, "method": "POST", "headers": { "x-token": "y" } }, "cronExpression": "* * * * *", "name": "Execute processor every minute", "taskName": "process-http-request-task", "description": "Execute processor http request every minute", "enabled": true, "queue": "http-queue", "tag": "http_processor" }); let config = { method: 'post', maxBodyLength: Infinity, url: 'localhost:3003/cron-job', headers: { 'Content-Type': 'application/json' }, data : data }; axios.request(config) .then((response) => { console.log(JSON.stringify(response.data)); }) .catch((error) => { console.log(error); });
Ejecutando el job desde código:
import { InjectQueue } from "@nestjs/bull"; import { Injectable } from "@nestjs/common"; import { Queue } from "bull"; @Injectable() export class Executor { constructor(@InjectQueue("http-queue") private readonly httpQueue: Queue) {} async applyHttpJob(): Promise<void> { await this.httpQueue.add({ url: "https://processor.inlaze.com/notify-scrapper", body: {}, method: "POST", headers: { "x-token": "y" }, }); } }
Crear una Cola con BullMQ
Crear módulo para procesar logger:
mkdir -p src/tasks/queues/logger mkdir -p src/tasks/queues/logger/services mkdir -p src/tasks/queues/logger/consumers mkdir -p src/tasks/queues/logger/constants touch src/tasks/queues/logger/logger.module.ts touch src/tasks/queues/logger/services/logger.service.ts touch src/tasks/queues/logger/consumers/logger.processor.ts touch src/tasks/queues/logger/constants/logger.constants.ts
Crear
logger.module.ts
:import { Module } from "@nestjs/common"; import { BullModule } from "@nestjs/bullmq"; import { LoggerService } from "./services/logger.service"; import { LoggerProcessor } from "./consumers/logger.processor"; import { LOGGER_QUEUE_OPTIONS } from "./constants/logger.constants"; @Module({ imports: [BullModule.registerQueue(LOGGER_QUEUE_OPTIONS)], providers: [LoggerProcessor, LoggerService], }) export class LoggerModule {}
Crear
logger.service.ts
:import { Injectable } from "@nestjs/common"; @Injectable() export class LoggerService { constructor() {} async log(params): Promise<void> { return Promise.resolve(params); } }
Crear
logger.processor.ts
:import { Processor, WorkerHost } from "@nestjs/bullmq"; import { Job } from "bullmq"; import { LoggerService } from "../services/logger.service"; import { LOGGER_QUEUE_NAME } from "../constants/logger.constants"; @Processor({ name: LOGGER_QUEUE_NAME }, { limiter: { max: 100, duration: 1 } }) export class LoggerProcessor extends WorkerHost { constructor(private readonly _loggerService: LoggerService) { super(); } async process(params: Job): Promise<void> { return this._loggerService.log(params); } }
Crear
logger.constants.ts
:import { RegisterQueueOptions } from "@nestjs/bullmq"; export const LOGGER_QUEUE_NAME: string = "logger-queue"; export const LOGGER_TASK_NAME: string = "process-log-task"; export const LOGGER_QUEUE_OPTIONS: RegisterQueueOptions = { name: LOGGER_QUEUE_NAME, defaultJobOptions: { removeOnFail: true, removeOnComplete: true }, };
Usar desde código (ejemplo en un factory):
import { LoggerConfigFinderService, LoggerService, loggerConfig, nodeConfig } from "@inlaze_techlead/inlaze-common"; import { Module } from "@nestjs/common"; import { ConfigType } from "@nestjs/config"; import { Queue } from "bullmq"; import { BullModule, getQueueToken } from "@nestjs/bullmq"; import { LOGGER_QUEUE_NAME, LOGGER_QUEUE_OPTIONS, LOGGER_TASK_NAME } from "@app/tasks/queues/logger/constants/logger.constants"; @Module({ imports: [BullModule.registerQueue(LOGGER_QUEUE_OPTIONS)], providers: [ { provide: LoggerService, useFactory( loggerEnvConfig: ConfigType<typeof loggerConfig>, nodeEnvConfig: ConfigType<typeof nodeConfig>, loggerConfigFinderService: LoggerConfigFinderService, loggerQueue: Queue, ): LoggerService { const service = new LoggerService( loggerEnvConfig, nodeEnvConfig, loggerConfigFinderService, ); service.cb = (data, loggerConfig): undefined => void loggerQueue.add( LOGGER_TASK_NAME, { data, loggerConfig }, { removeOnComplete: true, removeOnFail: true }, ); return service; }, inject: [ loggerConfig.KEY, nodeConfig.KEY, LoggerConfigFinderService, getQueueToken(LOGGER_QUEUE_NAME), ], }, ], }) export class LoggerModule {}
Paso 3: Validación
Verificar que el job se ejecute correctamente revisando los logs en Redis Commander.
Asegurarse de que los crons se ejecuten según lo programado verificando las tareas en Redis y PostgreSQL.
Solución de Problemas
Problemas Comunes
Problema: La cola no se procesa.
Solución: Verifique la conexión a Redis y asegúrese de que el nombre de la cola esté correctamente configurado.
Problema: El cron no se ejecuta.
Solución: Verifique la expresión cron y asegúrese de que esté correctamente formateada.
Preguntas Frecuentes
Pregunta: ¿Puedo usar Bull y BullMQ simultáneamente en un proyecto?
Respuesta: Sí, pero es recomendable migrar completamente a BullMQ si se busca aprovechar sus mejoras y simplificaciones.
Pregunta: ¿Cómo puedo depurar errores en los jobs?
Respuesta: Utilice Redis Commander para inspeccionar los jobs fallidos y revisar los logs generados por el servicio.
Recursos Adicionales
Recetas Relacionadas
Recursos Externos
Conclusión
Esta receta proporciona una guía completa para la configuración y gestión de colas y crons en la plataforma de Inlaze utilizando Bull y BullMQ. Siguiendo estos pasos, se puede asegurar una gestión eficiente de tareas asincrónicas, mejorando la operatividad y escalabilidad de la plataforma. No dude en proporcionar retroalimentación o hacer preguntas si es necesario.
Historial de Revisión
1.0
2024-07-06
Documento Inicial
Inyer Marin
Last updated