Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature Request: Efficient Event Publishing with NestJS and RabbitMQ #1940

Open
masterj3y opened this issue Feb 22, 2025 · 0 comments
Open

Comments

@masterj3y
Copy link
Contributor

masterj3y commented Feb 22, 2025

Goal

I want to implement an event publisher in NestJS that can:

  1. Publish events internally to local event handlers.
  2. Publish events externally using RabbitMQ for microservices communication.

Current Implementation

I wrote the following custom event publisher that:

  • Scans for all IEventHandler implementations using ExplorerService.
  • Registers event handlers dynamically.
  • Routes events internally or publishes them to RabbitMQ based on metadata.

Here’s the current implementation of the RabbitMQEventPublisher:

import { AmqpConnection } from '@golevelup/nestjs-rabbitmq';
import { ModuleRef, ModulesContainer, Reflector } from '@nestjs/core';
import { AsyncContext, IEvent, IEventHandler, IEventPublisher } from '@nestjs/cqrs';
import { CqrsEventMetaData } from '../decorators/cqrs-event.decorator';
import { RMQ_EVENTS_EXCHANGE_NAME } from 'infrastructure/config';
import { Logger, Type } from '@nestjs/common';
import { ExplorerService } from '@nestjs/cqrs/dist/services/explorer.service';
import { InstanceWrapper } from '@nestjs/core/injector/instance-wrapper';
import { defaultEventIdProvider } from '../helpers/default-id-provder.helper';

export class RabbitMQEventPublisher<EventBase extends IEvent = IEvent>
  implements IEventPublisher<EventBase>
{
  private readonly logger = new Logger(RabbitMQEventPublisher.name);
  private readonly eventIdProvider = defaultEventIdProvider;
  private readonly eventHandlers = new Map<
    string,
    (eventBase: EventBase) => Promise<void>
  >();

  constructor(
    private readonly reflector: Reflector,
    modulesContainer: ModulesContainer,
    private readonly moduleRef: ModuleRef,
    explorerService: ExplorerService,
    private readonly amqpConnection: AmqpConnection,
  ) {
    const modules = [...modulesContainer.values()];
    explorerService
      .flatMap<IEventHandler>(modules, (instance) =>
        explorerService.filterByMetadataKey(instance, '__eventsHandler__'),
      )
      .forEach((handler) => this.registerHandler(handler));
  }

  private registerHandler(handler: InstanceWrapper<IEventHandler<EventBase>>) {
    const typeRef = handler.metatype as Type<IEventHandler<EventBase>>;
    const events = this.reflectEvents(typeRef);

    if (!events || events.length === 0) {
      this.logger.error(`No events found for handler: ${typeRef.name}`);
      return;
    }

    events.forEach((event) => {
      const eventId = this.eventIdProvider.getEventId(event);
      const boundHandler = this.bind(handler, eventId!);
      this.eventHandlers.set(event.name, boundHandler);
      this.logger.log(`Registered event handler for: ${event.name}`);
    });
  }

  private reflectEvents(handler: Type<IEventHandler<EventBase>>): Type<EventBase>[] {
    const events = Reflect.getMetadata('__eventsHandler__', handler) || [];
    if (events.length === 0) {
      this.logger.error(`No metadata found for handler: ${handler.name}`);
    }
    return events;
  }

  async publish<TEvent extends EventBase>(
    event: TEvent,
    _dispatcherContext?: unknown,
    _asyncContext?: AsyncContext,
  ) {
    const eventMetadata = this.reflector.get<CqrsEventMetaData>(
      'metadata',
      event.constructor,
    );

    const eventData = {
      message: event,
      event: event.constructor.name,
    };

    try {
      const handler = this.eventHandlers.get(event.constructor.name);

      if (!handler) {
        this.logger.error(`No handler found for event: ${event.constructor.name}`);
        return;
      }

      this.logger.log(`Executing handler for event: ${event.constructor.name}`);
      await handler(event);

      if (eventMetadata && eventMetadata.routingKeys.length > 0) {
        for (const routingKey of eventMetadata.routingKeys) {
          this.amqpConnection.publish(RMQ_EVENTS_EXCHANGE_NAME, routingKey, eventData);
        }
      }
    } catch (e) {
      this.logger.error(`Error publishing event <${eventData.event}>`, e);
    }
  }
}

Question

Is this the best way to retrieve event handlers dynamically, or is there an existing feature in NestJS CQRS that I should use instead?

  • I am currently scanning for IEventHandler instances manually using ExplorerService and ModulesContainer.
  • Does the NestJS CQRS module already provide a built-in way to retrieve event handlers dynamically without this manual lookup?

Would appreciate any insights or alternative approaches! 🚀

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant