diff --git a/integration/microservices/e2e/topic-exchange-rmq.spec.ts b/integration/microservices/e2e/topic-exchange-rmq.spec.ts new file mode 100644 index 00000000000..2a2bd1ff68e --- /dev/null +++ b/integration/microservices/e2e/topic-exchange-rmq.spec.ts @@ -0,0 +1,38 @@ +import { INestApplication } from '@nestjs/common'; +import { MicroserviceOptions, Transport } from '@nestjs/microservices'; +import { Test } from '@nestjs/testing'; +import * as request from 'supertest'; +import { RMQTopicExchangeController } from '../src/rmq/topic-exchange-rmq.controller'; + +describe('RabbitMQ transport (Topic Exchange - wildcards)', () => { + let server: any; + let app: INestApplication; + + beforeEach(async () => { + const module = await Test.createTestingModule({ + controllers: [RMQTopicExchangeController], + }).compile(); + + app = module.createNestApplication(); + server = app.getHttpAdapter().getInstance(); + + app.connectMicroservice({ + transport: Transport.RMQ, + options: { + urls: [`amqp://0.0.0.0:5672`], + queue: 'test2', + wildcards: true, + }, + }); + await app.startAllMicroservices(); + await app.init(); + }); + + it(`should send message to wildcard topic exchange`, () => { + return request(server).get('/topic-exchange').expect(200, 'wildcard.a.b'); + }); + + afterEach(async () => { + await app.close(); + }); +}); diff --git a/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts b/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts new file mode 100644 index 00000000000..f482ab9a05f --- /dev/null +++ b/integration/microservices/src/rmq/topic-exchange-rmq.controller.ts @@ -0,0 +1,36 @@ +import { Controller, Get } from '@nestjs/common'; +import { + ClientProxy, + ClientProxyFactory, + Ctx, + MessagePattern, + RmqContext, + Transport, +} from '@nestjs/microservices'; +import { lastValueFrom } from 'rxjs'; + +@Controller() +export class RMQTopicExchangeController { + client: ClientProxy; + + constructor() { + this.client = ClientProxyFactory.create({ + transport: Transport.RMQ, + options: { + urls: [`amqp://localhost:5672`], + queue: 'test2', + wildcards: true, + }, + }); + } + + @Get('topic-exchange') + async topicExchange() { + return lastValueFrom(this.client.send('wildcard.a.b', 1)); + } + + @MessagePattern('wildcard.*.*') + handleTopicExchange(@Ctx() ctx: RmqContext): string { + return ctx.getPattern(); + } +} diff --git a/packages/microservices/client/client-rmq.ts b/packages/microservices/client/client-rmq.ts index fa75d0fc33c..2985c169e9d 100644 --- a/packages/microservices/client/client-rmq.ts +++ b/packages/microservices/client/client-rmq.ts @@ -1,7 +1,8 @@ +/* eslint-disable @typescript-eslint/no-redundant-type-constituents */ import { Logger } from '@nestjs/common/services/logger.service'; import { loadPackage } from '@nestjs/common/utils/load-package.util'; import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util'; -import { isFunction } from '@nestjs/common/utils/shared.utils'; +import { isFunction, isString } from '@nestjs/common/utils/shared.utils'; import { EventEmitter } from 'events'; import { EmptyError, @@ -55,8 +56,8 @@ export class ClientRMQ extends ClientProxy { protected readonly logger = new Logger(ClientProxy.name); protected connection$: ReplaySubject; protected connectionPromise: Promise; - protected client: AmqpConnectionManager = null; - protected channel: ChannelWrapper = null; + protected client: AmqpConnectionManager | null = null; + protected channel: ChannelWrapper | null = null; protected pendingEventListeners: Array<{ event: keyof RmqEvents; callback: RmqEvents[keyof RmqEvents]; @@ -113,7 +114,7 @@ export class ClientRMQ extends ClientProxy { this.registerDisconnectListener(this.client); this.registerConnectListener(this.client); this.pendingEventListeners.forEach(({ event, callback }) => - this.client.on(event, callback), + this.client!.on(event, callback), ); this.pendingEventListeners = []; @@ -140,7 +141,7 @@ export class ClientRMQ extends ClientProxy { public createChannel(): Promise { return new Promise(resolve => { - this.channel = this.client.createChannel({ + this.channel = this.client!.createChannel({ json: false, setup: (channel: Channel) => this.setupChannel(channel, resolve), }); @@ -215,6 +216,22 @@ export class ClientRMQ extends ClientProxy { ); } + if (this.options.wildcards) { + const exchange = this.getOptionsProp( + this.options, + 'exchange', + this.options.queue, + ); + const exchangeType = this.getOptionsProp( + this.options, + 'exchangeType', + 'topic', + ); + await channel.assertExchange(exchange, exchangeType, { + durable: true, + }); + } + await channel.prefetch(prefetchCount, isGlobalPrefetchCount); await this.consumeChannel(channel); resolve(); @@ -224,8 +241,8 @@ export class ClientRMQ extends ClientProxy { const noAck = this.getOptionsProp(this.options, 'noAck', RQM_DEFAULT_NOACK); await channel.consume( this.replyQueue, - (msg: ConsumeMessage) => - this.responseEmitter.emit(msg.properties.correlationId, msg), + (msg: ConsumeMessage | null) => + this.responseEmitter.emit(msg!.properties.correlationId, msg), { noAck, }, @@ -359,23 +376,44 @@ export class ClientRMQ extends ClientProxy { delete serializedPacket.options; this.responseEmitter.on(correlationId, listener); - this.channel - .sendToQueue( + + const content = Buffer.from(JSON.stringify(serializedPacket)); + const sendOptions = { + replyTo: this.replyQueue, + persistent: this.getOptionsProp( + this.options, + 'persistent', + RQM_DEFAULT_PERSISTENT, + ), + ...options, + headers: this.mergeHeaders(options?.headers), + correlationId, + }; + + if (this.options.wildcards) { + const stringifiedPattern = isString(message.pattern) + ? message.pattern + : JSON.stringify(message.pattern); + + // The exchange is the same as the queue when wildcards are enabled + // and the exchange is not explicitly set + const exchange = this.getOptionsProp( + this.options, + 'exchange', this.queue, - Buffer.from(JSON.stringify(serializedPacket)), - { - replyTo: this.replyQueue, - persistent: this.getOptionsProp( - this.options, - 'persistent', - RQM_DEFAULT_PERSISTENT, - ), - ...options, - headers: this.mergeHeaders(options?.headers), - correlationId, - }, - ) - .catch(err => callback({ err })); + ); + + this.channel!.publish( + exchange, + stringifiedPattern, + content, + sendOptions, + ).catch(err => callback({ err })); + } else { + this.channel!.sendToQueue(this.queue, content, sendOptions).catch(err => + callback({ err }), + ); + } return () => this.responseEmitter.removeListener(correlationId, listener); } catch (err) { callback({ err }); @@ -390,22 +428,39 @@ export class ClientRMQ extends ClientProxy { const options = serializedPacket.options; delete serializedPacket.options; - return new Promise((resolve, reject) => - this.channel.sendToQueue( - this.queue, - Buffer.from(JSON.stringify(serializedPacket)), - { - persistent: this.getOptionsProp( - this.options, - 'persistent', - RQM_DEFAULT_PERSISTENT, - ), - ...options, - headers: this.mergeHeaders(options?.headers), - }, - (err: unknown) => (err ? reject(err as Error) : resolve()), - ), - ); + return new Promise((resolve, reject) => { + const content = Buffer.from(JSON.stringify(serializedPacket)); + const sendOptions = { + persistent: this.getOptionsProp( + this.options, + 'persistent', + RQM_DEFAULT_PERSISTENT, + ), + ...options, + headers: this.mergeHeaders(options?.headers), + }; + const errorCallback = (err: unknown) => + err ? reject(err as Error) : resolve(); + + return this.options.wildcards + ? this.channel!.publish( + // The exchange is the same as the queue when wildcards are enabled + // and the exchange is not explicitly set + this.getOptionsProp(this.options, 'exchange', this.queue), + isString(packet.pattern) + ? packet.pattern + : JSON.stringify(packet.pattern), + content, + sendOptions, + errorCallback, + ) + : this.channel!.sendToQueue( + this.queue, + content, + sendOptions, + errorCallback, + ); + }); } protected initializeSerializer(options: RmqOptions['options']) { diff --git a/packages/microservices/interfaces/microservice-configuration.interface.ts b/packages/microservices/interfaces/microservice-configuration.interface.ts index 0c371076bae..c23a5839106 100644 --- a/packages/microservices/interfaces/microservice-configuration.interface.ts +++ b/packages/microservices/interfaces/microservice-configuration.interface.ts @@ -215,22 +215,89 @@ export interface NatsOptions { export interface RmqOptions { transport?: Transport.RMQ; options?: { + /** + * An array of connection URLs to try in order. + */ urls?: string[] | RmqUrl[]; + /** + * The name of the queue. + */ queue?: string; + /** + * A prefetch count for this channel. The count given is the maximum number of messages sent over the channel that can be awaiting acknowledgement; + * once there are count messages outstanding, the server will not send more messages on this channel until one or more have been acknowledged. + */ prefetchCount?: number; + /** + * Sets the per-channel behavior for prefetching messages. + */ isGlobalPrefetchCount?: boolean; + /** + * Amqplib queue options. + * @see https://amqp-node.github.io/amqplib/channel_api.html#channel_assertQueue + */ queueOptions?: AmqplibQueueOptions; + /** + * AMQP Connection Manager socket options. + */ socketOptions?: AmqpConnectionManagerSocketOptions; - exchange?: string; - routingKey?: string; + /** + * Iif true, the broker won’t expect an acknowledgement of messages delivered to this consumer; i.e., it will dequeue messages as soon as they’ve been sent down the wire. + * @default false + */ noAck?: boolean; + /** + * A name which the server will use to distinguish message deliveries for the consumer; mustn’t be already in use on the channel. It’s usually easier to omit this, in which case the server will create a random name and supply it in the reply. + */ consumerTag?: string; + /** + * A serializer for the message payload. + */ serializer?: Serializer; + /** + * A deserializer for the message payload. + */ deserializer?: Deserializer; + /** + * A reply queue for the producer. + * @default 'amq.rabbitmq.reply-to' + */ replyQueue?: string; + /** + * If truthy, the message will survive broker restarts provided it’s in a queue that also survives restarts. + */ persistent?: boolean; + /** + * Additional headers to be sent with every message. + * Applies only to the producer configuration. + */ headers?: Record; + /** + * When false, a queue will not be asserted before consuming. + * @default false + */ noAssert?: boolean; + /** + * Name for the exchange. Defaults to the queue name when "wildcards" is set to true. + * @default '' + */ + exchange?: string; + /** + * Type of the exchange + * @default 'topic' + */ + exchangeType?: 'direct' | 'fanout' | 'topic' | 'headers'; + /** + * Additional routing key for the topic exchange. + */ + routingKey?: string; + /** + * Set to true only if you want to use Topic Exchange for routing messages to queues. + * Enabling this will allow you to use wildcards (*, #) as message and event patterns. + * @see https://www.rabbitmq.com/tutorials/tutorial-five-python#topic-exchange + * @default false + */ + wildcards?: boolean; /** * Maximum number of connection attempts. * Applies only to the consumer configuration. diff --git a/packages/microservices/server/server-rmq.ts b/packages/microservices/server/server-rmq.ts index 20fdea33e31..08d3fe081d2 100644 --- a/packages/microservices/server/server-rmq.ts +++ b/packages/microservices/server/server-rmq.ts @@ -1,3 +1,4 @@ +/* eslint-disable @typescript-eslint/no-redundant-type-constituents */ import { isNil, isString, @@ -21,7 +22,7 @@ import { RmqContext } from '../ctx-host'; import { Transport } from '../enums'; import { RmqEvents, RmqEventsMap, RmqStatus } from '../events/rmq.events'; import { RmqUrl } from '../external/rmq-url.interface'; -import { RmqOptions } from '../interfaces'; +import { MessageHandler, RmqOptions } from '../interfaces'; import { IncomingRequest, OutgoingResponse, @@ -38,10 +39,12 @@ import { Server } from './server'; // import('amqp-connection-manager').AmqpConnectionManager; // type ChannelWrapper = import('amqp-connection-manager').ChannelWrapper; // type Message = import('amqplib').Message; +// type Channel = import('amqplib').Channel | import('amqplib').ConfirmChannel; type AmqpConnectionManager = any; type ChannelWrapper = any; type Message = any; +type Channel = any; let rmqPackage = {} as any; // as typeof import('amqp-connection-manager'); @@ -53,13 +56,14 @@ const INFINITE_CONNECTION_ATTEMPTS = -1; export class ServerRMQ extends Server { public readonly transportId = Transport.RMQ; - protected server: AmqpConnectionManager = null; - protected channel: ChannelWrapper = null; + protected server: AmqpConnectionManager | null = null; + protected channel: ChannelWrapper | null = null; protected connectionAttempts = 0; protected readonly urls: string[] | RmqUrl[]; protected readonly queue: string; protected readonly noAck: boolean; protected readonly queueOptions: any; + protected readonly wildcardHandlers = new Map(); protected pendingEventListeners: Array<{ event: keyof RmqEvents; callback: RmqEvents[keyof RmqEvents]; @@ -106,14 +110,14 @@ export class ServerRMQ extends Server { callback?: (err?: unknown, ...optionalParams: unknown[]) => void, ) { this.server = this.createClient(); - this.server.once(RmqEventsMap.CONNECT, () => { + this.server!.once(RmqEventsMap.CONNECT, () => { if (this.channel) { return; } this._status$.next(RmqStatus.CONNECTED); - this.channel = this.server.createChannel({ + this.channel = this.server!.createChannel({ json: false, - setup: (channel: any) => this.setupChannel(channel, callback!), + setup: (channel: Channel) => this.setupChannel(channel, callback!), }); }); @@ -126,12 +130,12 @@ export class ServerRMQ extends Server { this.registerConnectListener(); this.registerDisconnectListener(); this.pendingEventListeners.forEach(({ event, callback }) => - this.server.on(event, callback), + this.server!.on(event, callback), ); this.pendingEventListeners = []; const connectFailedEvent = 'connectFailed'; - this.server.once(connectFailedEvent, (error: Record) => { + this.server!.once(connectFailedEvent, (error: Record) => { this._status$.next(RmqStatus.DISCONNECTED); this.logger.error(CONNECTION_FAILED_MESSAGE); @@ -162,20 +166,20 @@ export class ServerRMQ extends Server { } private registerConnectListener() { - this.server.on(RmqEventsMap.CONNECT, (err: any) => { + this.server!.on(RmqEventsMap.CONNECT, (err: any) => { this._status$.next(RmqStatus.CONNECTED); }); } private registerDisconnectListener() { - this.server.on(RmqEventsMap.DISCONNECT, (err: any) => { + this.server!.on(RmqEventsMap.DISCONNECT, (err: any) => { this._status$.next(RmqStatus.DISCONNECTED); this.logger.error(DISCONNECTED_RMQ_MESSAGE); this.logger.error(err); }); } - public async setupChannel(channel: any, callback: Function) { + public async setupChannel(channel: Channel, callback: Function) { const noAssert = this.getOptionsProp(this.options, 'noAssert') ?? this.queueOptions.noAssert ?? @@ -196,21 +200,44 @@ export class ServerRMQ extends Server { RQM_DEFAULT_PREFETCH_COUNT, ); - if (this.options.exchange && this.options.routingKey) { - await channel.assertExchange(this.options.exchange, 'topic', { + if (this.options.exchange || this.options.wildcards) { + // Use queue name as exchange name if exchange is not provided and "wildcards" is set to true + const exchange = this.getOptionsProp( + this.options, + 'exchange', + this.options.queue, + ); + const exchangeType = this.getOptionsProp( + this.options, + 'exchangeType', + 'topic', + ); + await channel.assertExchange(exchange, exchangeType, { durable: true, }); - await channel.bindQueue( - this.queue, - this.options.exchange, - this.options.routingKey, - ); + + if (this.options.routingKey) { + await channel.bindQueue(this.queue, exchange, this.options.routingKey); + } + + if (this.options.wildcards) { + const routingKeys = Array.from(this.getHandlers().keys()); + await Promise.all( + routingKeys.map(routingKey => + channel.bindQueue(this.queue, exchange, routingKey), + ), + ); + + // When "wildcards" is set to true, we need to initialize wildcard handlers + // otherwise we would not be able to associate the incoming messages with the handlers + this.initializeWildcardHandlersIfExist(); + } } await channel.prefetch(prefetchCount, isGlobalPrefetchCount); channel.consume( this.queue, - (msg: Record) => this.handleMessage(msg, channel), + (msg: Record | null) => this.handleMessage(msg!, channel), { noAck: this.noAck, consumerTag: this.getOptionsProp( @@ -246,7 +273,7 @@ export class ServerRMQ extends Server { if (!handler) { if (!this.noAck) { this.logger.warn(RQM_NO_MESSAGE_HANDLER`${pattern}`); - this.channel.nack(rmqContext.getMessage() as Message, false, false); + this.channel!.nack(rmqContext.getMessage() as Message, false, false); } const status = 'error'; const noHandlerPacket = { @@ -277,7 +304,7 @@ export class ServerRMQ extends Server { ): Promise { const handler = this.getHandlerByPattern(pattern); if (!handler && !this.noAck) { - this.channel.nack(context.getMessage() as Message, false, false); + this.channel!.nack(context.getMessage() as Message, false, false); return this.logger.warn(RQM_NO_EVENT_HANDLER`${pattern}`); } return super.handleEvent(pattern, packet, context); @@ -295,7 +322,8 @@ export class ServerRMQ extends Server { delete outgoingResponse.options; const buffer = Buffer.from(JSON.stringify(outgoingResponse)); - this.channel.sendToQueue(replyTo, buffer, { correlationId, ...options }); + const sendOptions = { correlationId, ...options }; + this.channel!.sendToQueue(replyTo, buffer, sendOptions); } public unwrap(): T { @@ -318,6 +346,29 @@ export class ServerRMQ extends Server { } } + public getHandlerByPattern(pattern: string): MessageHandler | null { + if (!this.options.wildcards) { + return super.getHandlerByPattern(pattern); + } + + // Search for non-wildcard handler first + const handler = super.getHandlerByPattern(pattern); + if (handler) { + return handler; + } + + // Search for wildcard handler + if (this.wildcardHandlers.size === 0) { + return null; + } + for (const [regex, handler] of this.wildcardHandlers) { + if (regex.test(pattern)) { + return handler; + } + } + return null; + } + protected initializeSerializer(options: RmqOptions['options']) { this.serializer = options?.serializer ?? new RmqRecordSerializer(); } @@ -329,4 +380,28 @@ export class ServerRMQ extends Server { return content.toString(); } } + + private initializeWildcardHandlersIfExist() { + if (this.wildcardHandlers.size !== 0) { + return; + } + const handlers = this.getHandlers(); + + handlers.forEach((handler, pattern) => { + const regex = this.convertRoutingKeyToRegex(pattern); + if (regex) { + this.wildcardHandlers.set(regex, handler); + } + }); + } + + private convertRoutingKeyToRegex(routingKey: string): RegExp | undefined { + if (!routingKey.includes('#') && !routingKey.includes('*')) { + return; + } + let regexPattern = routingKey.replace(/\\/g, '\\\\').replace(/\./g, '\\.'); + regexPattern = regexPattern.replace(/\*/g, '[^.]+'); + regexPattern = regexPattern.replace(/#/g, '.*'); + return new RegExp(`^${regexPattern}$`); + } } diff --git a/tools/gulp/tasks/move.ts b/tools/gulp/tasks/move.ts index 0991e1e6f84..666901c39a1 100644 --- a/tools/gulp/tasks/move.ts +++ b/tools/gulp/tasks/move.ts @@ -3,11 +3,7 @@ import { join } from 'path'; import { samplePath } from '../config'; import { containsPackageJson, getDirs } from '../util/task-helpers'; -const distFiles = src([ - 'packages/**/*', - '!packages/**/*.ts', - 'packages/**/*.d.ts', -]); +const distFiles = src(['packages/**/*.js', 'packages/**/*.d.ts']); /** * Moves the compiled nest files into "node_module" folder.