feat(microservices): add specific transport id to microservices

This commit is contained in:
maxbronnikov10
2025-02-11 02:20:11 +03:00
parent fa7525a678
commit 076fb0e168
12 changed files with 111 additions and 18 deletions

View File

@@ -1,4 +1,4 @@
import { Transport } from '../enums';
import { TransportId } from './microservice-configuration.interface';
/**
* @publicApi
@@ -7,7 +7,7 @@ export interface CustomTransportStrategy {
/**
* Unique transport identifier.
*/
readonly transportId?: Transport | symbol;
transportId?: TransportId;
/**
* Method called when the transport is being initialized.
* @param callback Function to be called upon initialization

View File

@@ -32,6 +32,8 @@ export type MicroserviceOptions =
| KafkaOptions
| CustomStrategy;
export type TransportId = Transport | symbol;
export type AsyncMicroserviceOptions = {
inject: InjectionToken[];
useFactory: (...args: any[]) => MicroserviceOptions;

View File

@@ -1,7 +1,6 @@
import { Transport } from '../enums/transport.enum';
import {
CustomStrategy,
GrpcOptions,
KafkaOptions,
MicroserviceOptions,
MqttOptions,

View File

@@ -22,7 +22,10 @@ import { InvalidProtoDefinitionException } from '../errors/invalid-proto-definit
import { ChannelOptions } from '../external/grpc-options.interface';
import { getGrpcPackageDefinition } from '../helpers';
import { MessageHandler } from '../interfaces';
import { GrpcOptions } from '../interfaces/microservice-configuration.interface';
import {
GrpcOptions,
TransportId,
} from '../interfaces/microservice-configuration.interface';
import { Server } from './server';
const CANCELLED_EVENT = 'cancelled';
@@ -54,7 +57,7 @@ interface GrpcCall<TRequest = any, TMetadata = any> {
* @publicApi
*/
export class ServerGrpc extends Server<never, never> {
public readonly transportId = Transport.GRPC;
public transportId: TransportId = Transport.GRPC;
protected readonly url: string;
protected grpcClient: GrpcServer;

View File

@@ -26,7 +26,12 @@ import {
RecordMetadata,
} from '../external/kafka.interface';
import { KafkaLogger, KafkaParser } from '../helpers';
import { KafkaOptions, OutgoingResponse, ReadPacket } from '../interfaces';
import {
KafkaOptions,
OutgoingResponse,
ReadPacket,
TransportId,
} from '../interfaces';
import { KafkaRequestSerializer } from '../serializers/kafka-request.serializer';
import { Server } from './server';
@@ -36,7 +41,7 @@ let kafkaPackage: any = {};
* @publicApi
*/
export class ServerKafka extends Server<never, KafkaStatus> {
public readonly transportId = Transport.KAFKA;
public transportId: TransportId = Transport.KAFKA;
protected logger = new Logger(ServerKafka.name);
protected client: Kafka | null = null;

View File

@@ -15,7 +15,10 @@ import {
PacketId,
ReadPacket,
} from '../interfaces';
import { MqttOptions } from '../interfaces/microservice-configuration.interface';
import {
MqttOptions,
TransportId,
} from '../interfaces/microservice-configuration.interface';
import { MqttRecord } from '../record-builders/mqtt.record-builder';
import { MqttRecordSerializer } from '../serializers/mqtt-record.serializer';
import { Server } from './server';
@@ -33,7 +36,7 @@ type MqttClient = any;
* @publicApi
*/
export class ServerMqtt extends Server<MqttEvents, MqttStatus> {
public readonly transportId = Transport.MQTT;
public transportId: TransportId = Transport.MQTT;
protected readonly url: string;
protected mqttClient: MqttClient;
protected pendingEventListeners: Array<{

View File

@@ -9,7 +9,10 @@ import { NatsContext } from '../ctx-host/nats.context';
import { NatsRequestJSONDeserializer } from '../deserializers/nats-request-json.deserializer';
import { Transport } from '../enums';
import { NatsEvents, NatsEventsMap, NatsStatus } from '../events/nats.events';
import { NatsOptions } from '../interfaces/microservice-configuration.interface';
import {
NatsOptions,
TransportId,
} from '../interfaces/microservice-configuration.interface';
import { IncomingRequest } from '../interfaces/packet.interface';
import { NatsRecord } from '../record-builders';
import { NatsRecordSerializer } from '../serializers/nats-record.serializer';
@@ -36,7 +39,7 @@ export class ServerNats<
E extends NatsEvents = NatsEvents,
S extends NatsStatus = NatsStatus,
> extends Server<E, S> {
public readonly transportId = Transport.NATS;
public transportId: TransportId = Transport.NATS;
private natsClient: Client;
protected statusEventEmitter = new EventEmitter<{

View File

@@ -11,7 +11,7 @@ import {
RedisEventsMap,
RedisStatus,
} from '../events/redis.events';
import { IncomingRequest, RedisOptions } from '../interfaces';
import { IncomingRequest, RedisOptions, TransportId } from '../interfaces';
import { Server } from './server';
// To enable type safety for Redis. This cant be uncommented by default
@@ -27,7 +27,7 @@ let redisPackage = {} as any;
* @publicApi
*/
export class ServerRedis extends Server<RedisEvents, RedisStatus> {
public readonly transportId = Transport.REDIS;
public transportId: TransportId = Transport.REDIS;
protected subClient: Redis;
protected pubClient: Redis;

View File

@@ -22,7 +22,7 @@ import { RmqContext } from '../ctx-host';
import { Transport } from '../enums';
import { RmqEvents, RmqEventsMap, RmqStatus } from '../events/rmq.events';
import { RmqUrl } from '../external/rmq-url.interface';
import { MessageHandler, RmqOptions } from '../interfaces';
import { MessageHandler, RmqOptions, TransportId } from '../interfaces';
import {
IncomingRequest,
OutgoingResponse,
@@ -54,7 +54,7 @@ const INFINITE_CONNECTION_ATTEMPTS = -1;
* @publicApi
*/
export class ServerRMQ extends Server<RmqEvents, RmqStatus> {
public readonly transportId = Transport.RMQ;
public transportId: TransportId = Transport.RMQ;
protected server: AmqpConnectionManager | null = null;
protected channel: ChannelWrapper | null = null;

View File

@@ -20,14 +20,17 @@ import {
ReadPacket,
WritePacket,
} from '../interfaces';
import { TcpOptions } from '../interfaces/microservice-configuration.interface';
import {
TcpOptions,
TransportId,
} from '../interfaces/microservice-configuration.interface';
import { Server } from './server';
/**
* @publicApi
*/
export class ServerTCP extends Server<TcpEvents, TcpStatus> {
public readonly transportId = Transport.TCP;
public transportId: TransportId = Transport.TCP;
protected server: NetSocket;
protected readonly port: number;

View File

@@ -51,7 +51,7 @@ export abstract class Server<
/**
* Unique transport identifier.
*/
readonly transportId?: Transport | symbol;
public transportId?: Transport | symbol;
protected readonly messageHandlers = new Map<string, MessageHandler>();
protected readonly logger: LoggerService = new Logger(Server.name);
@@ -59,6 +59,14 @@ export abstract class Server<
protected deserializer: ConsumerDeserializer;
protected _status$ = new ReplaySubject<Status>(1);
/**
* Sets the transport identifier.
* @param transportId Unique transport identifier.
*/
public setTransportId(transportId: Transport | symbol): void {
this.transportId = transportId;
}
/**
* Returns an observable that emits status changes.
*/

View File

@@ -64,5 +64,72 @@ describe('ServerFactory', () => {
}) instanceof ServerGrpc,
).to.be.true;
});
it(`should return redis server with specific transport id`, () => {
const transportId = Symbol('test');
const server = ServerFactory.create({
transport: Transport.REDIS,
});
server.setTransportId(transportId);
expect(server instanceof ServerRedis).to.be.true;
expect(server.transportId === transportId).to.be.true;
});
it(`should return mqtt server with specific transport id`, () => {
const transportId = Symbol('test');
const server = ServerFactory.create({
transport: Transport.MQTT,
});
server.setTransportId(transportId);
expect(server instanceof ServerMqtt).to.be.true;
expect(server.transportId === transportId).to.be.true;
});
it(`should return nats server with specific transport id`, () => {
const transportId = Symbol('test');
const server = ServerFactory.create({
transport: Transport.NATS,
});
server.setTransportId(transportId);
expect(server instanceof ServerNats).to.be.true;
expect(server.transportId === transportId).to.be.true;
});
it(`should return rmq server with specific transport id`, () => {
const transportId = Symbol('test');
const server = ServerFactory.create({
transport: Transport.RMQ,
});
server.setTransportId(transportId);
expect(server instanceof ServerRMQ).to.be.true;
expect(server.transportId === transportId).to.be.true;
});
it(`should return kafka server with specific transport id`, () => {
const transportId = Symbol('test');
const server = ServerFactory.create({
transport: Transport.KAFKA,
});
server.setTransportId(transportId);
expect(server instanceof ServerKafka).to.be.true;
expect(server.transportId === transportId).to.be.true;
});
it(`should return grpc server with specific transport id`, () => {
const transportId = Symbol('test');
const server = ServerFactory.create({
transport: Transport.GRPC,
options: { protoPath: '', package: '' },
});
server.setTransportId(transportId);
expect(server instanceof ServerGrpc).to.be.true;
expect(server.transportId === transportId).to.be.true;
});
});
});