Proyecto task

Es el proyecto donde están ubicados los crons y async jobs de la plataforma de Inlaze. Hay colas para cualquier tipo de ejecución que necesiten las plataformas, la mayoría de estas están configuradas con Bull y algunas otras con BullMQ debido a su facilidad en el manejo y mejoras. La idea es que todos los async jobs estén configurados con BullMQ, pero debido a la falta de compatibilidad con el desarrollo propio de los crons, esto requiere una reestructuración y, por lo tanto, no se puede realizar en este momento.

Resumen

Este documento proporciona una guía detallada sobre cómo crear y gestionar colas para la ejecución de tareas asincrónicas en la plataforma de Inlaze utilizando Bull y BullMQ.

Descripción

La receta describe el proceso de configuración y gestión de colas y crons en la plataforma de Inlaze utilizando Bull y BullMQ. Estas herramientas permiten manejar tareas asincrónicas de manera eficiente, lo cual es crucial para mantener la operatividad y la escalabilidad de la plataforma.

Prerrequisitos

  • Tener instalado Redis, PostgreSQL y Redis Commander para visualizar los jobs de Redis.

  • Tener instalado y configurado el proyecto de tasks: sport-enlace-sas/inlaze-backend-task.

  • Conexión de Bull y BullMQ con NestJS y Redis.

Casos de Uso

  • Ejecución de tareas asincrónicas como solicitudes HTTP.

  • Manejo y procesamiento de crons.

  • Gestión de logs y otras operaciones programadas.

Instrucciones Paso a Paso

Paso 1: Preparación

  1. Asegúrese de que Redis, PostgreSQL y Redis Commander estén instalados y configurados.

  2. Clone y configure el proyecto desde el repositorio sport-enlace-sas/inlaze-backend-task.

Paso 2: Proceso Principal

Crear una Cola con Bull

  1. Crear la carpeta en el path /src/tasks/queues/http:

    mkdir -p /src/tasks/queues/http
  2. Crear el módulo en el path /src/tasks/queues/http:

    touch /src/tasks/queues/http/http.module.ts
  3. Ingresar el contenido del módulo:

    import { BullModule } from "@nestjs/bull";
    import { Module } from "@nestjs/common";
    import { CronJobModule } from "@app/tasks/cron-job.module";
    import { HttpConsumer } from "./consumers/http-queue.consumer";
    import { HttpService } from "./services/http.service";
    
    @Module({
      imports: [
        BullModule.registerQueue({
          name: "http-queue",
          limiter: { max: 1, duration: 1000 },
        }),
        CronJobModule,
      ],
      providers: [HttpConsumer, HttpService],
    })
    export class HttpModule {}
  4. Crear el HttpConsumer:

    mkdir /src/tasks/queues/http/consumers
    touch /src/tasks/queues/http/consumers/http.consumer.ts
    import {
      Processor,
      Process,
      InjectQueue,
      OnQueueActive,
      OnQueueCleaned,
      OnQueueCompleted,
      OnQueueStalled,
      OnQueueFailed,
      OnQueueWaiting,
      OnQueuePaused,
      OnQueueResumed,
    } from "@nestjs/bull";
    import { Job, JobId, Queue } from "bull";
    import { CronJobService } from "@app/tasks/services/cron-job.service";
    import { HttpService } from "./services/http.service";
    
    @Processor("http-queue")
    export class HttpConsumer {
      constructor(
        private readonly _cronJobService: CronJobService,
        @InjectQueue("http-queue") private readonly _queue: Queue,
        private readonly httpService: HttpService,
      ) {}
    
      @Process("process-http-request-task")
      async runRequest(job: Job<HttpDataDto>): Promise<unknown> {
        await this._cronJobService.checkJobExpiration(job, this._queue);
        return this.httpService.runRequest(job);
      }
    
      @OnQueueActive()
      async onActive(job: Job): Promise<void> {
        await this._cronJobService.onQueueActive(job, this._queue);
      }
    
      @OnQueueCompleted()
      async onCompleted(job: Job, result: unknown): Promise<void> {
        await this._cronJobService.onQueueCompleted(job, this._queue, result);
      }
    
      @OnQueueStalled()
      async onStalled(job: Job): Promise<void> {
        await this._cronJobService.onQueueStalled(job, this._queue);
      }
    
      @OnQueueFailed()
      async onFailed(job: Job, error: unknown): Promise<void> {
        await this._cronJobService.onQueueFailed(job, error, this._queue);
      }
    
      @OnQueueWaiting()
      async onWaiting(jobId: JobId): Promise<void> {
        await this._cronJobService.onQueueWaiting(jobId, this._queue);
      }
    
      @OnQueueCleaned()
      onCleaned(): void {
        this._cronJobService.onQueueCleaned(this._queue);
      }
    
      @OnQueuePaused()
      onPaused(): void {
        this._cronJobService.onQueuePaused(this._queue);
      }
    
      @OnQueueResumed()
      onResumed(): void {
        this._cronJobService.onQueueResumed(this._queue);
      }
    }
  5. Crear el servicio de HTTP para ejecutar peticiones:

    mkdir /src/tasks/queues/http/services
    touch /src/tasks/queues/http/services/http.service.ts
    import { HttpAdapter, IHttpAdapter } from "@inlaze_techlead/inlaze-common";
    import { Inject, Injectable } from "@nestjs/common";
    import { Job } from "bull";
    import { HttpDataDto } from "../../http/dto";
    import { plainToInstance } from "class-transformer";
    import { validateSync } from "class-validator";
    
    @Injectable()
    export class HttpService {
      constructor(@Inject(HttpAdapter) private readonly httpAdapter: IHttpAdapter) {}
    
      async runRequest(job: Job<HttpData>): Promise<unknown> {
        const jobData = plainToInstance(HttpDataDto, job.data, { excludeExtraneousValues: true });
        const errors = validateSync(jobData);
        if (errors.length) throw new Error(errors.toString());
    
        return this.httpAdapter.request(jobData);
      }
    }
  6. Crear la cola registrada como una constante para poder importarla en cualquier lugar:

    mkdir /src/tasks/queues/http/constants
    touch /src/tasks/queues/http/constants/queue-options.constants.ts
    import { BullModuleOptions } from "@nestjs/bull";
    
    export const HTTP_QUEUE_OPTIONS: BullModuleOptions = {
      name: "http-queue",
      limiter: { max: 1, duration: 1000 },
    };
  7. Agregar la constante a las constantes de task en /src/tasks/task.constants.ts:

    import { BullModuleOptions } from "@nestjs/bull";
    import { HTTP_QUEUE_OPTIONS } from "./queues/apis/apis.constants";
    
    export const QUEUE_LIST: BullModuleOptions[] = [
      HTTP_QUEUE_OPTIONS,
    ];
    
    export const QUEUE_NAMES: string[] = [
      "http-queue",
    ];
    
    export const CRON_QUEUES: string[] = [
      "http-queue",
    ];
  8. Importar el módulo creado en el task module /src/tasks/task.module.ts:

    import { BullModule } from "@nestjs/bull";
    import { Module } from "@nestjs/common";
    import { TaskController } from "./controllers/task.controller";
    import { QUEUE_LIST } from "./task.constants";
    import { CronJobController } from "./controllers/cron.job.controller";
    import { QueueController } from "./controllers/queue.controller";
    import { CronJob, JobOptions, JobResult, CronJobSchedule } from "@inlaze_techlead/inlaze-typeorm";
    import { TypeOrmModule } from "@nestjs/typeorm";
    import { HttpModule } from "./queues/apis/api.module";
    
    @Module({
      imports: [
        TypeOrmModule.forFeature([CronJobSchedule, CronJob, JobOptions, JobResult], "admin"),
        BullModule.registerQueue(...QUEUE_LIST),
        HttpModule,
      ],
      controllers: [TaskController, CronJobController, QueueController],
    })
    export class TaskModule {}
  9. Ejecutar el job:

    const axios = require('axios');
    let data = JSON.stringify({
      "name": "process-http-request-task",
      "data": {
        "url": "https://processor.inlaze.com/notify-scrapper",
        "body": {},
        "method": "POST",
        "headers": {
          "x-token": "y"
        }
      },
      "queue": "http-queue"
    });
    
    let config = {
      method: 'post',
      maxBodyLength: Infinity,
      url: 'localhost:3003/queue/apply-async',
      headers: { 
        'Content-Type': 'application/json'
      },
      data : data
    };
    
    axios.request(config)
    .then((response)=> { console.log(JSON.stringify(response.data)); }) 
    .catch((error) => { console.log(error); }); 
  10. Creando un cron para el job:

    const axios = require('axios');
    let data = JSON.stringify({
      "data": {
        "url": "https://processor.inlaze.com/notify-scrapper",
        "body": {},
        "method": "POST",
        "headers": {
          "x-token": "y"
        }
      },
      "cronExpression": "* * * * *",
      "name": "Execute processor every minute",
      "taskName": "process-http-request-task",
      "description": "Execute processor http request every minute",
      "enabled": true,
      "queue": "http-queue",
      "tag": "http_processor"
    });
    
    let config = {
      method: 'post',
      maxBodyLength: Infinity,
      url: 'localhost:3003/cron-job',
      headers: { 
        'Content-Type': 'application/json'
      },
      data : data
    };
    
    axios.request(config)
    .then((response) => {
      console.log(JSON.stringify(response.data));
    })
    .catch((error) => {
      console.log(error);
    });
  11. Ejecutando el job desde código:

    import { InjectQueue } from "@nestjs/bull";
    import { Injectable } from "@nestjs/common";
    import { Queue } from "bull";
    
    @Injectable()
    export class Executor {
      constructor(@InjectQueue("http-queue") private readonly httpQueue: Queue) {}
    
      async applyHttpJob(): Promise<void> {
        await this.httpQueue.add({
          url: "https://processor.inlaze.com/notify-scrapper",
          body: {},
          method: "POST",
          headers: { "x-token": "y" },
        });
      }
    }

Crear una Cola con BullMQ

  1. Crear módulo para procesar logger:

    mkdir -p src/tasks/queues/logger
    mkdir -p src/tasks/queues/logger/services
    mkdir -p src/tasks/queues/logger/consumers
    mkdir -p src/tasks/queues/logger/constants
    
    touch src/tasks/queues/logger/logger.module.ts
    touch src/tasks/queues/logger/services/logger.service.ts
    touch src/tasks/queues/logger/consumers/logger.processor.ts
    touch src/tasks/queues/logger/constants/logger.constants.ts
  2. Crear logger.module.ts:

    import { Module } from "@nestjs/common";
    import { BullModule } from "@nestjs/bullmq";
    import { LoggerService } from "./services/logger.service";
    import { LoggerProcessor } from "./consumers/logger.processor";
    import { LOGGER_QUEUE_OPTIONS } from "./constants/logger.constants";
    
    @Module({
      imports: [BullModule.registerQueue(LOGGER_QUEUE_OPTIONS)],
      providers: [LoggerProcessor, LoggerService],
    })
    export class LoggerModule {}
  3. Crear logger.service.ts:

    import { Injectable } from "@nestjs/common";
    
    @Injectable()
    export class LoggerService {
      constructor() {}
    
      async log(params): Promise<void> {
        return Promise.resolve(params);
      }
    }
  4. Crear logger.processor.ts:

    import { Processor, WorkerHost } from "@nestjs/bullmq";
    import { Job } from "bullmq";
    import { LoggerService } from "../services/logger.service";
    import { LOGGER_QUEUE_NAME } from "../constants/logger.constants";
    
    @Processor({ name: LOGGER_QUEUE_NAME }, { limiter: { max: 100, duration: 1 } })
    export class LoggerProcessor extends WorkerHost {
      constructor(private readonly _loggerService: LoggerService) {
        super();
      }
    
      async process(params: Job): Promise<void> {
        return this._loggerService.log(params);
      }
    }
  5. Crear logger.constants.ts:

    import { RegisterQueueOptions } from "@nestjs/bullmq";
    
    export const LOGGER_QUEUE_NAME: string = "logger-queue";
    export const LOGGER_TASK_NAME: string = "process-log-task";
    
    export const LOGGER_QUEUE_OPTIONS: RegisterQueueOptions = {
      name: LOGGER_QUEUE_NAME,
      defaultJobOptions: { removeOnFail: true, removeOnComplete: true },
    };
  6. Usar desde código (ejemplo en un factory):

    import { LoggerConfigFinderService, LoggerService, loggerConfig, nodeConfig } from "@inlaze_techlead/inlaze-common";
    import { Module } from "@nestjs/common";
    import { ConfigType } from "@nestjs/config";
    import { Queue } from "bullmq";
    import { BullModule, getQueueToken } from "@nestjs/bullmq";
    import { LOGGER_QUEUE_NAME, LOGGER_QUEUE_OPTIONS, LOGGER_TASK_NAME } from "@app/tasks/queues/logger/constants/logger.constants";
    
    @Module({
      imports: [BullModule.registerQueue(LOGGER_QUEUE_OPTIONS)],
      providers: [
        {
          provide: LoggerService,
          useFactory(
            loggerEnvConfig: ConfigType<typeof loggerConfig>,
            nodeEnvConfig: ConfigType<typeof nodeConfig>,
            loggerConfigFinderService: LoggerConfigFinderService,
            loggerQueue: Queue,
          ): LoggerService {
            const service = new LoggerService(
              loggerEnvConfig,
              nodeEnvConfig,
              loggerConfigFinderService,
            );
            service.cb = (data, loggerConfig): undefined =>
              void loggerQueue.add(
                LOGGER_TASK_NAME,
                { data, loggerConfig },
                { removeOnComplete: true, removeOnFail: true },
              );
            return service;
          },
          inject: [
            loggerConfig.KEY,
            nodeConfig.KEY,
            LoggerConfigFinderService,
            getQueueToken(LOGGER_QUEUE_NAME),
          ],
        },
      ],
    })
    export class LoggerModule {}

Paso 3: Validación

  1. Verificar que el job se ejecute correctamente revisando los logs en Redis Commander.

  2. Asegurarse de que los crons se ejecuten según lo programado verificando las tareas en Redis y PostgreSQL.

Solución de Problemas

Problemas Comunes

  • Problema: La cola no se procesa.

    • Solución: Verifique la conexión a Redis y asegúrese de que el nombre de la cola esté correctamente configurado.

  • Problema: El cron no se ejecuta.

    • Solución: Verifique la expresión cron y asegúrese de que esté correctamente formateada.

Preguntas Frecuentes

  • Pregunta: ¿Puedo usar Bull y BullMQ simultáneamente en un proyecto?

    • Respuesta: Sí, pero es recomendable migrar completamente a BullMQ si se busca aprovechar sus mejoras y simplificaciones.

  • Pregunta: ¿Cómo puedo depurar errores en los jobs?

    • Respuesta: Utilice Redis Commander para inspeccionar los jobs fallidos y revisar los logs generados por el servicio.

Recursos Adicionales

Recetas Relacionadas

Recursos Externos

Conclusión

Esta receta proporciona una guía completa para la configuración y gestión de colas y crons en la plataforma de Inlaze utilizando Bull y BullMQ. Siguiendo estos pasos, se puede asegurar una gestión eficiente de tareas asincrónicas, mejorando la operatividad y escalabilidad de la plataforma. No dude en proporcionar retroalimentación o hacer preguntas si es necesario.

Historial de Revisión


Versión
Fecha
Descripción
Autor

1.0

2024-07-06

Documento Inicial

Inyer Marin

Last updated