Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(microservices): add specific transport id to microservices #14606

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Transport } from '../enums';
import { TransportId } from './microservice-configuration.interface';

/**
* @publicApi
Expand All @@ -7,7 +7,7 @@ export interface CustomTransportStrategy {
/**
* Unique transport identifier.
*/
readonly transportId?: Transport | symbol;
readonly transportId?: TransportId;
/**
* Method called when the transport is being initialized.
* @param callback Function to be called upon initialization
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ export type MicroserviceOptions =
| KafkaOptions
| CustomStrategy;

export type TransportId = Transport | symbol;

export type AsyncMicroserviceOptions = {
inject: InjectionToken[];
useFactory: (...args: any[]) => MicroserviceOptions;
Expand All @@ -55,6 +57,7 @@ export interface CustomStrategy {
*/
export interface GrpcOptions {
transport?: Transport.GRPC;
transportId?: TransportId;
options: {
url?: string;
maxSendMessageLength?: number;
Expand Down Expand Up @@ -98,6 +101,7 @@ export interface GrpcOptions {
*/
export interface TcpOptions {
transport?: Transport.TCP;
transportId?: TransportId;
options?: {
host?: string;
port?: number;
Expand All @@ -115,6 +119,7 @@ export interface TcpOptions {
*/
export interface RedisOptions {
transport?: Transport.REDIS;
transportId?: TransportId;
options?: {
host?: string;
port?: number;
Expand All @@ -134,6 +139,7 @@ export interface RedisOptions {
*/
export interface MqttOptions {
transport?: Transport.MQTT;
transportId?: TransportId;
options?: MqttClientOptions & {
url?: string;
serializer?: Serializer;
Expand Down Expand Up @@ -165,6 +171,7 @@ export interface MqttOptions {
*/
export interface NatsOptions {
transport?: Transport.NATS;
transportId?: TransportId;
options?: {
headers?: Record<string, string>;
authenticator?: any;
Expand Down Expand Up @@ -214,6 +221,7 @@ export interface NatsOptions {
*/
export interface RmqOptions {
transport?: Transport.RMQ;
transportId?: TransportId;
options?: {
urls?: string[] | RmqUrl[];
queue?: string;
Expand Down Expand Up @@ -253,6 +261,7 @@ export interface KafkaParserConfig {
*/
export interface KafkaOptions {
transport?: Transport.KAFKA;
transportId?: TransportId;
options?: {
/**
* Defaults to `"-server"` on server side and `"-client"` on client side.
Expand All @@ -270,3 +279,7 @@ export interface KafkaOptions {
producerOnlyMode?: boolean;
};
}

export interface BuildServerSettings {
transportId?: TransportId;
}
40 changes: 32 additions & 8 deletions packages/microservices/server/server-factory.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Transport } from '../enums/transport.enum';
import {
BuildServerSettings,
CustomStrategy,
GrpcOptions,
KafkaOptions,
Expand All @@ -20,25 +21,48 @@ import { ServerTCP } from './server-tcp';

export class ServerFactory {
public static create(microserviceOptions: MicroserviceOptions) {
const { transport, options } = microserviceOptions as Exclude<
const { transport, transportId, options } = microserviceOptions as Exclude<
MicroserviceOptions,
CustomStrategy
>;

const buildServerSettings: BuildServerSettings = {
transportId,
};

switch (transport) {
case Transport.REDIS:
return new ServerRedis(options as Required<RedisOptions>['options']);
return new ServerRedis(
options as Required<RedisOptions>['options'],
buildServerSettings,
);
case Transport.NATS:
return new ServerNats(options as Required<NatsOptions>['options']);
return new ServerNats(
options as Required<NatsOptions>['options'],
buildServerSettings,
);
case Transport.MQTT:
return new ServerMqtt(options as Required<MqttOptions>['options']);
return new ServerMqtt(
options as Required<MqttOptions>['options'],
buildServerSettings,
);
case Transport.GRPC:
return new ServerGrpc(options);
return new ServerGrpc(options, buildServerSettings);
case Transport.KAFKA:
return new ServerKafka(options as Required<KafkaOptions>['options']);
return new ServerKafka(
options as Required<KafkaOptions>['options'],
buildServerSettings,
);
case Transport.RMQ:
return new ServerRMQ(options as Required<RmqOptions>['options']);
return new ServerRMQ(
options as Required<RmqOptions>['options'],
buildServerSettings,
);
default:
return new ServerTCP(options as Required<TcpOptions>['options']);
return new ServerTCP(
options as Required<TcpOptions>['options'],
buildServerSettings,
);
}
}
}
14 changes: 11 additions & 3 deletions packages/microservices/server/server-grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@ import { InvalidProtoDefinitionException } from '../errors/invalid-proto-definit
import { ChannelOptions } from '../external/grpc-options.interface';
import { getGrpcPackageDefinition } from '../helpers';
import { MessageHandler } from '../interfaces';
import { GrpcOptions } from '../interfaces/microservice-configuration.interface';
import {
BuildServerSettings,
GrpcOptions,
TransportId,
} from '../interfaces/microservice-configuration.interface';
import { Server } from './server';

const CANCELLED_EVENT = 'cancelled';
Expand Down Expand Up @@ -54,7 +58,7 @@ interface GrpcCall<TRequest = any, TMetadata = any> {
* @publicApi
*/
export class ServerGrpc extends Server<never, never> {
public readonly transportId = Transport.GRPC;
public readonly transportId: TransportId = Transport.GRPC;
protected readonly url: string;
protected grpcClient: GrpcServer;

Expand All @@ -64,8 +68,12 @@ export class ServerGrpc extends Server<never, never> {
);
}

constructor(private readonly options: Readonly<GrpcOptions>['options']) {
constructor(
private readonly options: Readonly<GrpcOptions>['options'],
buildServerSettings?: BuildServerSettings,
) {
super();
this.transportId = buildServerSettings?.transportId ?? Transport.GRPC;
this.url = this.getOptionsProp(options, 'url') || GRPC_DEFAULT_URL;

const protoLoader =
Expand Down
16 changes: 13 additions & 3 deletions packages/microservices/server/server-kafka.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,13 @@ import {
RecordMetadata,
} from '../external/kafka.interface';
import { KafkaLogger, KafkaParser } from '../helpers';
import { KafkaOptions, OutgoingResponse, ReadPacket } from '../interfaces';
import {
BuildServerSettings,
KafkaOptions,
OutgoingResponse,
ReadPacket,
TransportId,
} from '../interfaces';
import { KafkaRequestSerializer } from '../serializers/kafka-request.serializer';
import { Server } from './server';

Expand All @@ -36,7 +42,7 @@ let kafkaPackage: any = {};
* @publicApi
*/
export class ServerKafka extends Server<never, KafkaStatus> {
public readonly transportId = Transport.KAFKA;
public readonly transportId: TransportId = Transport.KAFKA;

protected logger = new Logger(ServerKafka.name);
protected client: Kafka | null = null;
Expand All @@ -47,8 +53,12 @@ export class ServerKafka extends Server<never, KafkaStatus> {
protected clientId: string;
protected groupId: string;

constructor(protected readonly options: Required<KafkaOptions>['options']) {
constructor(
protected readonly options: Required<KafkaOptions>['options'],
buildServerSettings?: BuildServerSettings,
) {
super();
this.transportId = buildServerSettings?.transportId ?? Transport.KAFKA;

const clientOptions = this.getOptionsProp(
this.options,
Expand Down
14 changes: 11 additions & 3 deletions packages/microservices/server/server-mqtt.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ import {
PacketId,
ReadPacket,
} from '../interfaces';
import { MqttOptions } from '../interfaces/microservice-configuration.interface';
import {
BuildServerSettings,
MqttOptions,
TransportId,
} from '../interfaces/microservice-configuration.interface';
import { MqttRecord } from '../record-builders/mqtt.record-builder';
import { MqttRecordSerializer } from '../serializers/mqtt-record.serializer';
import { Server } from './server';
Expand All @@ -33,16 +37,20 @@ type MqttClient = any;
* @publicApi
*/
export class ServerMqtt extends Server<MqttEvents, MqttStatus> {
public readonly transportId = Transport.MQTT;
public readonly transportId: TransportId = Transport.MQTT;
protected readonly url: string;
protected mqttClient: MqttClient;
protected pendingEventListeners: Array<{
event: keyof MqttEvents;
callback: MqttEvents[keyof MqttEvents];
}> = [];

constructor(private readonly options: Required<MqttOptions>['options']) {
constructor(
private readonly options: Required<MqttOptions>['options'],
buildServerSettings?: BuildServerSettings,
) {
super();
this.transportId = buildServerSettings?.transportId ?? Transport.MQTT;
this.url = this.getOptionsProp(options, 'url', MQTT_DEFAULT_URL);

mqttPackage = this.loadPackage('mqtt', ServerMqtt.name, () =>
Expand Down
14 changes: 11 additions & 3 deletions packages/microservices/server/server-nats.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@ import { NatsContext } from '../ctx-host/nats.context';
import { NatsRequestJSONDeserializer } from '../deserializers/nats-request-json.deserializer';
import { Transport } from '../enums';
import { NatsEvents, NatsEventsMap, NatsStatus } from '../events/nats.events';
import { NatsOptions } from '../interfaces/microservice-configuration.interface';
import {
BuildServerSettings,
NatsOptions,
TransportId,
} from '../interfaces/microservice-configuration.interface';
import { IncomingRequest } from '../interfaces/packet.interface';
import { NatsRecord } from '../record-builders';
import { NatsRecordSerializer } from '../serializers/nats-record.serializer';
Expand All @@ -36,16 +40,20 @@ export class ServerNats<
E extends NatsEvents = NatsEvents,
S extends NatsStatus = NatsStatus,
> extends Server<E, S> {
public readonly transportId = Transport.NATS;
public readonly transportId: TransportId = Transport.NATS;

private natsClient: Client;
protected statusEventEmitter = new EventEmitter<{
[key in keyof NatsEvents]: Parameters<NatsEvents[key]>;
}>();
private readonly subscriptions: Subscription[] = [];

constructor(private readonly options: Required<NatsOptions>['options']) {
constructor(
private readonly options: Required<NatsOptions>['options'],
buildServerSettings?: BuildServerSettings,
) {
super();
this.transportId = buildServerSettings?.transportId ?? Transport.NATS;

natsPackage = this.loadPackage('nats', ServerNats.name, () =>
require('nats'),
Expand Down
15 changes: 12 additions & 3 deletions packages/microservices/server/server-redis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ import {
RedisEventsMap,
RedisStatus,
} from '../events/redis.events';
import { IncomingRequest, RedisOptions } from '../interfaces';
import {
BuildServerSettings,
IncomingRequest,
RedisOptions,
TransportId,
} from '../interfaces';
import { Server } from './server';

// To enable type safety for Redis. This cant be uncommented by default
Expand All @@ -27,7 +32,7 @@ let redisPackage = {} as any;
* @publicApi
*/
export class ServerRedis extends Server<RedisEvents, RedisStatus> {
public readonly transportId = Transport.REDIS;
public readonly transportId: TransportId = Transport.REDIS;

protected subClient: Redis;
protected pubClient: Redis;
Expand All @@ -38,8 +43,12 @@ export class ServerRedis extends Server<RedisEvents, RedisStatus> {
callback: RedisEvents[keyof RedisEvents];
}> = [];

constructor(protected readonly options: Required<RedisOptions>['options']) {
constructor(
protected readonly options: Required<RedisOptions>['options'],
buildServerSettings?: BuildServerSettings,
) {
super();
this.transportId = buildServerSettings?.transportId ?? Transport.REDIS;

redisPackage = this.loadPackage('ioredis', ServerRedis.name, () =>
require('ioredis'),
Expand Down
10 changes: 7 additions & 3 deletions packages/microservices/server/server-rmq.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import { RmqContext } from '../ctx-host';
import { Transport } from '../enums';
import { RmqEvents, RmqEventsMap, RmqStatus } from '../events/rmq.events';
import { RmqUrl } from '../external/rmq-url.interface';
import { RmqOptions } from '../interfaces';
import { BuildServerSettings, RmqOptions, TransportId } from '../interfaces';
import {
IncomingRequest,
OutgoingResponse,
Expand Down Expand Up @@ -51,7 +51,7 @@ const INFINITE_CONNECTION_ATTEMPTS = -1;
* @publicApi
*/
export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
public readonly transportId = Transport.RMQ;
public readonly transportId: TransportId = Transport.RMQ;

protected server: AmqpConnectionManager = null;
protected channel: ChannelWrapper = null;
Expand All @@ -65,8 +65,12 @@ export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
callback: RmqEvents[keyof RmqEvents];
}> = [];

constructor(protected readonly options: Required<RmqOptions>['options']) {
constructor(
protected readonly options: Required<RmqOptions>['options'],
buildServerSettings?: BuildServerSettings,
) {
super();
this.transportId = buildServerSettings?.transportId ?? Transport.RMQ;
this.urls = this.getOptionsProp(this.options, 'urls') || [RQM_DEFAULT_URL];
this.queue =
this.getOptionsProp(this.options, 'queue') || RQM_DEFAULT_QUEUE;
Expand Down
Loading