From 33f37d6270f060afbc4b03e47ef91638b18ab098 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kamil=20My=C5=9Bliwiec?= Date: Mon, 9 Feb 2026 16:05:02 +0100 Subject: [PATCH] refactor: minor tweaks --- packages/common/utils/load-package.util.ts | 11 ++- packages/core/nest-application.ts | 89 +++++++++++-------- packages/core/nest-factory.ts | 1 + packages/core/router/routes-resolver.ts | 12 +-- packages/microservices/client/client-grpc.ts | 5 +- packages/microservices/client/client-kafka.ts | 16 ++-- packages/microservices/client/client-mqtt.ts | 5 +- packages/microservices/client/client-nats.ts | 9 +- packages/microservices/client/client-redis.ts | 36 ++++---- packages/microservices/client/client-rmq.ts | 11 ++- .../nats-request-json.deserializer.ts | 20 ++--- .../nats-response-json.deserializer.ts | 20 ++--- .../helpers/kafka-reply-partition-assigner.ts | 11 +-- packages/microservices/nest-microservice.ts | 29 +++--- .../serializers/nats-record.serializer.ts | 22 ++--- packages/microservices/server/server-grpc.ts | 2 + packages/microservices/server/server-kafka.ts | 16 ++-- packages/microservices/server/server-mqtt.ts | 16 ++-- packages/microservices/server/server-nats.ts | 14 ++- packages/microservices/server/server-redis.ts | 16 ++-- packages/microservices/server/server-rmq.ts | 18 ++-- packages/microservices/server/server.ts | 2 +- .../test/client/client-redis.spec.ts | 3 + .../kafka-reply-partition-assigner.spec.ts | 1 - .../nats-record.serializer.spec.ts | 3 +- .../adapters/express-adapter.ts | 31 ++++++- packages/platform-ws/adapters/ws-adapter.ts | 5 +- packages/testing/testing-module.builder.ts | 2 +- packages/testing/testing-module.ts | 2 +- 29 files changed, 219 insertions(+), 209 deletions(-) diff --git a/packages/common/utils/load-package.util.ts b/packages/common/utils/load-package.util.ts index f7d71a5a6..e2ac7b5f8 100644 --- a/packages/common/utils/load-package.util.ts +++ b/packages/common/utils/load-package.util.ts @@ -34,9 +34,14 @@ export async function loadPackage( } /** - * Synchronously loads a CJS package using `createRequire` and caches it. - * This is meant for optional dependencies that are CJS-only and must - * be loaded in synchronous contexts (e.g. constructors). + * Synchronously loads a package using `createRequire` and caches it. + * This is meant for optional dependencies that must be loaded in + * synchronous contexts (e.g. constructors). + * + * @param loaderFn Optional synchronous loader (e.g. + * `() => createRequire(import.meta.url)('pkg')`). + * When provided, bundlers can statically analyse the string literal. + * Falls back to a `createRequire` call resolved from this file. */ export function loadPackageSync( packageName: string, diff --git a/packages/core/nest-application.ts b/packages/core/nest-application.ts index 4358ecdcb..e85a38721 100644 --- a/packages/core/nest-application.ts +++ b/packages/core/nest-application.ts @@ -42,28 +42,6 @@ import { NestApplicationContext } from './nest-application-context.js'; import { Resolver } from './router/interfaces/resolver.interface.js'; import { RoutesResolver } from './router/routes-resolver.js'; -let _socketModule: any; -async function getSocketModule() { - if (!_socketModule) { - _socketModule = await optionalRequire( - '@nestjs/websockets/socket-module', - () => import('@nestjs/websockets/socket-module.js'), - ); - } - return _socketModule; -} - -let _microservicesModule: any; -async function getMicroservicesModule() { - if (!_microservicesModule) { - _microservicesModule = await optionalRequire( - '@nestjs/microservices/microservices-module', - () => import('@nestjs/microservices/microservices-module.js'), - ); - } - return _microservicesModule; -} - /** * @publicApi */ @@ -194,21 +172,10 @@ export class NestApplication } // Lazy-load optional modules (ESM-compatible) - const socketMod = await getSocketModule(); - if (socketMod?.SocketModule) { - this.socketModule = new socketMod.SocketModule(); - } - const msMod = await getMicroservicesModule(); - if (msMod?.MicroservicesModule) { - this.microservicesModule = new msMod.MicroservicesModule(); - // Pre-cache the main barrel so connectMicroservice() can stay synchronous - await loadPackage( - '@nestjs/microservices', - 'NestFactory', - () => import('@nestjs/microservices'), - ); - } - + await Promise.all([ + this.loadSocketModule(), + this.loadMicroservicesModule(), + ]); this.applyOptions(); await this.httpAdapter?.init?.(); @@ -493,6 +460,24 @@ export class NestApplication return this; } + /** + * Pre-load optional packages so that createNestApplication, + * createNestMicroservice and createHttpAdapter can stay synchronous. + */ + public async preloadLazyPackages(): Promise { + // Best-effort: silently swallow if packages are not installed + await loadPackage( + '@nestjs/platform-express', + 'TestingModule', + () => import('@nestjs/platform-express'), + ).catch(() => {}); + await loadPackage( + '@nestjs/microservices', + 'TestingModule', + () => import('@nestjs/microservices'), + ).catch(() => {}); + } + private host(): string | undefined { const address = this.httpServer.address(); if (isString(address)) { @@ -521,4 +506,34 @@ export class NestApplication } return instances; } + + private async loadSocketModule() { + if (!this.socketModule) { + const socketModule = await optionalRequire( + '@nestjs/websockets/socket-module', + () => import('@nestjs/websockets/socket-module.js'), + ); + if (socketModule?.SocketModule) { + this.socketModule = new socketModule.SocketModule(); + } + } + } + + private async loadMicroservicesModule() { + if (!this.microservicesModule) { + const msModule = await optionalRequire( + '@nestjs/microservices/microservices-module', + () => import('@nestjs/microservices/microservices-module.js'), + ); + if (msModule?.MicroservicesModule) { + this.microservicesModule = new msModule.MicroservicesModule(); + // Pre-cache the main barrel so connectMicroservice() can stay synchronous + await loadPackage( + '@nestjs/microservices', + 'NestFactory', + () => import('@nestjs/microservices'), + ); + } + } + } } diff --git a/packages/core/nest-factory.ts b/packages/core/nest-factory.ts index 9038253df..13ab85b99 100644 --- a/packages/core/nest-factory.ts +++ b/packages/core/nest-factory.ts @@ -108,6 +108,7 @@ export class NestFactoryStatic { graphInspector, appOptions, ); + await instance.preloadLazyPackages(); const target = this.createNestInstance(instance); return this.createAdapterProxy(target, httpServer); } diff --git a/packages/core/router/routes-resolver.ts b/packages/core/router/routes-resolver.ts index 63a6d3469..f89394dce 100644 --- a/packages/core/router/routes-resolver.ts +++ b/packages/core/router/routes-resolver.ts @@ -152,11 +152,9 @@ export class RoutesResolver implements Resolver { }; const handler = this.routerExceptionsFilter.create({}, callback, undefined); const proxy = this.routerProxy.createProxy(callback, handler); + const prefix = this.applicationConfig.getGlobalPrefix(); applicationRef.setNotFoundHandler && - applicationRef.setNotFoundHandler( - proxy, - this.applicationConfig.getGlobalPrefix(), - ); + applicationRef.setNotFoundHandler(proxy, prefix); } public registerExceptionHandler() { @@ -175,11 +173,9 @@ export class RoutesResolver implements Resolver { ); const proxy = this.routerProxy.createExceptionLayerProxy(callback, handler); const applicationRef = this.container.getHttpAdapterRef(); + const prefix = this.applicationConfig.getGlobalPrefix(); applicationRef.setErrorHandler && - applicationRef.setErrorHandler( - proxy, - this.applicationConfig.getGlobalPrefix(), - ); + applicationRef.setErrorHandler(proxy, prefix); } public mapExternalException(err: any) { diff --git a/packages/microservices/client/client-grpc.ts b/packages/microservices/client/client-grpc.ts index 8c454a85a..9c372c976 100644 --- a/packages/microservices/client/client-grpc.ts +++ b/packages/microservices/client/client-grpc.ts @@ -1,6 +1,7 @@ import { Logger } from '@nestjs/common/services/logger.service.js'; import { loadPackageSync } from '@nestjs/common/utils/load-package.util.js'; import { isFunction, isObject } from '@nestjs/common/utils/shared.utils.js'; +import { createRequire } from 'module'; import { Observable, Subscription } from 'rxjs'; import { GRPC_DEFAULT_PROTO_LOADER, GRPC_DEFAULT_URL } from '../constants.js'; import { InvalidGrpcPackageException } from '../errors/invalid-grpc-package.exception.js'; @@ -50,7 +51,9 @@ export class ClientGrpcProxy const protoLoader = this.getOptionsProp(options, 'protoLoader') || GRPC_DEFAULT_PROTO_LOADER; - grpcPackage = loadPackageSync('@grpc/grpc-js', ClientGrpcProxy.name); + grpcPackage = loadPackageSync('@grpc/grpc-js', ClientGrpcProxy.name, () => + createRequire(import.meta.url)('@grpc/grpc-js'), + ); grpcProtoLoaderPackage = loadPackageSync(protoLoader, ClientGrpcProxy.name); diff --git a/packages/microservices/client/client-kafka.ts b/packages/microservices/client/client-kafka.ts index 027afc660..fdb3c2a8f 100644 --- a/packages/microservices/client/client-kafka.ts +++ b/packages/microservices/client/client-kafka.ts @@ -50,8 +50,6 @@ import { } from '../serializers/kafka-request.serializer.js'; import { ClientProxy } from './client-proxy.js'; -let kafkaPackage: any = {}; - /** * @publicApi */ @@ -117,13 +115,6 @@ export class ClientKafka this.clientId = (clientOptions.clientId || KAFKA_DEFAULT_CLIENT) + postfixId; this.groupId = (consumerOptions.groupId || KAFKA_DEFAULT_GROUP) + postfixId; - - kafkaPackage = loadPackage( - 'kafkajs', - ClientKafka.name, - () => import('kafkajs'), - ); - this.parser = new KafkaParser((options && options.parser) || undefined); this.initializeSerializer(options); @@ -217,7 +208,12 @@ export class ClientKafka } public async createClient(): Promise { - kafkaPackage = await kafkaPackage; + const kafkaPackage = await loadPackage( + 'kafkajs', + ClientKafka.name, + () => import('kafkajs'), + ); + const kafkaConfig: KafkaConfig = Object.assign( { logCreator: KafkaLogger.bind(null, this.logger) }, this.options.client, diff --git a/packages/microservices/client/client-mqtt.ts b/packages/microservices/client/client-mqtt.ts index d033af0ff..1bddc24a5 100644 --- a/packages/microservices/client/client-mqtt.ts +++ b/packages/microservices/client/client-mqtt.ts @@ -1,6 +1,7 @@ import { Logger } from '@nestjs/common/services/logger.service.js'; import { loadPackageSync } from '@nestjs/common/utils/load-package.util.js'; import { isObject } from '@nestjs/common/utils/shared.utils.js'; +import { createRequire } from 'module'; import { EmptyError, fromEvent, lastValueFrom, merge, Observable } from 'rxjs'; import { first, map, share, tap } from 'rxjs/operators'; import { ECONNREFUSED, ENOTFOUND, MQTT_DEFAULT_URL } from '../constants.js'; @@ -47,7 +48,9 @@ export class ClientMqtt extends ClientProxy { super(); this.url = this.getOptionsProp(this.options, 'url') ?? MQTT_DEFAULT_URL; - mqttPackage = loadPackageSync('mqtt', ClientMqtt.name); + mqttPackage = loadPackageSync('mqtt', ClientMqtt.name, () => + createRequire(import.meta.url)('mqtt'), + ); this.initializeSerializer(options); this.initializeDeserializer(options); diff --git a/packages/microservices/client/client-nats.ts b/packages/microservices/client/client-nats.ts index 6dc33d71d..0ede9fbdf 100644 --- a/packages/microservices/client/client-nats.ts +++ b/packages/microservices/client/client-nats.ts @@ -1,6 +1,7 @@ import { Logger } from '@nestjs/common/services/logger.service.js'; -import { loadPackage } from '@nestjs/common/utils/load-package.util.js'; +import { loadPackageSync } from '@nestjs/common/utils/load-package.util.js'; import { isObject } from '@nestjs/common/utils/shared.utils.js'; +import { createRequire } from 'module'; import { EventEmitter } from 'events'; import { NATS_DEFAULT_URL } from '../constants.js'; import { NatsResponseJSONDeserializer } from '../deserializers/nats-response-json.deserializer.js'; @@ -46,7 +47,9 @@ export class ClientNats extends ClientProxy { constructor(protected readonly options: Required['options']) { super(); - natsPackage = loadPackage('nats', ClientNats.name, () => import('nats')); + natsPackage = loadPackageSync('nats', ClientNats.name, () => + createRequire(import.meta.url)('nats'), + ); this.initializeSerializer(options); this.initializeDeserializer(options); @@ -76,8 +79,6 @@ export class ClientNats extends ClientProxy { } public async createClient(): Promise { - natsPackage = await natsPackage; - // Eagerly initialize serializer/deserializer so they can be used synchronously if ( this.serializer && diff --git a/packages/microservices/client/client-redis.ts b/packages/microservices/client/client-redis.ts index 2df23f594..6b2fe0a37 100644 --- a/packages/microservices/client/client-redis.ts +++ b/packages/microservices/client/client-redis.ts @@ -1,5 +1,5 @@ import { Logger } from '@nestjs/common/services/logger.service.js'; -import { loadPackageSync } from '@nestjs/common/utils/load-package.util.js'; +import { loadPackage } from '@nestjs/common/utils/load-package.util.js'; import { REDIS_DEFAULT_HOST, REDIS_DEFAULT_PORT } from '../constants.js'; import { RedisEvents, @@ -16,8 +16,6 @@ import { ClientProxy } from './client-proxy.js'; // type Redis = import('ioredis').Redis; type Redis = any; -let redisPackage = {} as any; - /** * @publicApi */ @@ -26,7 +24,7 @@ export class ClientRedis extends ClientProxy { protected readonly subscriptionsCount = new Map(); protected pubClient: Redis; protected subClient: Redis; - protected connectionPromise: Promise; + protected connectionPromise: Promise | null = null; protected isManuallyClosed = false; protected wasInitialConnectionSuccessful = false; protected pendingEventListeners: Array<{ @@ -37,8 +35,6 @@ export class ClientRedis extends ClientProxy { constructor(protected readonly options: Required['options']) { super(); - redisPackage = loadPackageSync('ioredis', ClientRedis.name); - this.initializeSerializer(options); this.initializeDeserializer(options); } @@ -56,15 +52,21 @@ export class ClientRedis extends ClientProxy { this.pubClient && (await this.pubClient.quit()); this.subClient && (await this.subClient.quit()); this.pubClient = this.subClient = null; + this.connectionPromise = null; this.pendingEventListeners = []; } - public async connect(): Promise { - if (this.pubClient && this.subClient) { + public connect(): Promise { + if (this.connectionPromise) { return this.connectionPromise; } - this.pubClient = this.createClient(); - this.subClient = this.createClient(); + this.connectionPromise = this.handleConnection(); + return this.connectionPromise; + } + + private async handleConnection(): Promise { + this.pubClient = await this.createClient(); + this.subClient = await this.createClient(); [this.pubClient, this.subClient].forEach((client, index) => { const type = index === 0 ? 'pub' : 'sub'; @@ -78,15 +80,15 @@ export class ClientRedis extends ClientProxy { }); this.pendingEventListeners = []; - this.connectionPromise = Promise.all([ - this.subClient.connect(), - this.pubClient.connect(), - ]); - await this.connectionPromise; - return this.connectionPromise; + await Promise.all([this.subClient.connect(), this.pubClient.connect()]); } - public createClient(): Redis { + public async createClient(): Promise { + const redisPackage = await loadPackage( + 'ioredis', + ClientRedis.name, + () => import('ioredis'), + ); const RedisClient = redisPackage.default || redisPackage; return new RedisClient({ host: REDIS_DEFAULT_HOST, diff --git a/packages/microservices/client/client-rmq.ts b/packages/microservices/client/client-rmq.ts index b1ce0cb52..52604b775 100644 --- a/packages/microservices/client/client-rmq.ts +++ b/packages/microservices/client/client-rmq.ts @@ -1,6 +1,7 @@ /* eslint-disable @typescript-eslint/no-redundant-type-constituents */ import { Logger } from '@nestjs/common/services/logger.service.js'; import { loadPackageSync } from '@nestjs/common/utils/load-package.util.js'; +import { createRequire } from 'module'; import { randomStringGenerator } from '@nestjs/common/utils/random-string-generator.util.js'; import { isFunction, isString } from '@nestjs/common/utils/shared.utils.js'; import { EventEmitter } from 'events'; @@ -87,8 +88,14 @@ export class ClientRMQ extends ClientProxy { this.queueOptions.noAssert ?? RQM_DEFAULT_NO_ASSERT; - loadPackageSync('amqplib', ClientRMQ.name); - rmqPackage = loadPackageSync('amqp-connection-manager', ClientRMQ.name); + loadPackageSync('amqplib', ClientRMQ.name, () => + createRequire(import.meta.url)('amqplib'), + ); + rmqPackage = loadPackageSync( + 'amqp-connection-manager', + ClientRMQ.name, + () => createRequire(import.meta.url)('amqp-connection-manager'), + ); this.initializeSerializer(options); this.initializeDeserializer(options); diff --git a/packages/microservices/deserializers/nats-request-json.deserializer.ts b/packages/microservices/deserializers/nats-request-json.deserializer.ts index af7fa3aca..9b1ed47d5 100644 --- a/packages/microservices/deserializers/nats-request-json.deserializer.ts +++ b/packages/microservices/deserializers/nats-request-json.deserializer.ts @@ -1,4 +1,5 @@ -import { loadPackage } from '@nestjs/common/utils/load-package.util.js'; +import { loadPackageSync } from '@nestjs/common/utils/load-package.util.js'; +import { createRequire } from 'module'; import { NatsCodec } from '../external/nats-codec.interface.js'; import { IncomingEvent, IncomingRequest } from '../interfaces/index.js'; import { IncomingRequestDeserializer } from './incoming-request.deserializer.js'; @@ -9,29 +10,19 @@ let natsPackage = {} as any; * @publicApi */ export class NatsRequestJSONDeserializer extends IncomingRequestDeserializer { - private jsonCodec: NatsCodec; + private readonly jsonCodec: NatsCodec; constructor() { super(); - natsPackage = loadPackage( + natsPackage = loadPackageSync( 'nats', NatsRequestJSONDeserializer.name, - () => import('nats'), + () => createRequire(import.meta.url)('nats'), ); - } - - async init() { - natsPackage = await natsPackage; this.jsonCodec = natsPackage.JSONCodec(); } - private ensureJsonCodec() { - if (!this.jsonCodec) { - this.jsonCodec = natsPackage.JSONCodec(); - } - } - deserialize( value: Uint8Array, options?: Record, @@ -39,7 +30,6 @@ export class NatsRequestJSONDeserializer extends IncomingRequestDeserializer { | IncomingRequest | IncomingEvent | Promise { - this.ensureJsonCodec(); const decodedRequest = this.jsonCodec.decode(value); return super.deserialize(decodedRequest, options); } diff --git a/packages/microservices/deserializers/nats-response-json.deserializer.ts b/packages/microservices/deserializers/nats-response-json.deserializer.ts index 338f4d013..a19647018 100644 --- a/packages/microservices/deserializers/nats-response-json.deserializer.ts +++ b/packages/microservices/deserializers/nats-response-json.deserializer.ts @@ -1,4 +1,5 @@ -import { loadPackage } from '@nestjs/common/utils/load-package.util.js'; +import { loadPackageSync } from '@nestjs/common/utils/load-package.util.js'; +import { createRequire } from 'module'; import { NatsCodec } from '../external/nats-codec.interface.js'; import { IncomingResponse } from '../interfaces/index.js'; import { IncomingResponseDeserializer } from './incoming-response.deserializer.js'; @@ -9,34 +10,23 @@ let natsPackage = {} as any; * @publicApi */ export class NatsResponseJSONDeserializer extends IncomingResponseDeserializer { - private jsonCodec: NatsCodec; + private readonly jsonCodec: NatsCodec; constructor() { super(); - natsPackage = loadPackage( + natsPackage = loadPackageSync( 'nats', NatsResponseJSONDeserializer.name, - () => import('nats'), + () => createRequire(import.meta.url)('nats'), ); - } - - async init() { - natsPackage = await natsPackage; this.jsonCodec = natsPackage.JSONCodec(); } - private ensureJsonCodec() { - if (!this.jsonCodec) { - this.jsonCodec = natsPackage.JSONCodec(); - } - } - deserialize( value: Uint8Array, options?: Record, ): IncomingResponse | Promise { - this.ensureJsonCodec(); const decodedRequest = this.jsonCodec.decode(value); return super.deserialize(decodedRequest, options); } diff --git a/packages/microservices/helpers/kafka-reply-partition-assigner.ts b/packages/microservices/helpers/kafka-reply-partition-assigner.ts index e908a1882..0a916ddff 100644 --- a/packages/microservices/helpers/kafka-reply-partition-assigner.ts +++ b/packages/microservices/helpers/kafka-reply-partition-assigner.ts @@ -1,5 +1,6 @@ -import { loadPackage } from '@nestjs/common/utils/load-package.util.js'; +import { loadPackageSync } from '@nestjs/common/utils/load-package.util.js'; import { isUndefined } from '@nestjs/common/utils/shared.utils.js'; +import { createRequire } from 'module'; import { ClientKafka } from '../client/client-kafka.js'; import { Cluster, @@ -21,17 +22,13 @@ export class KafkaReplyPartitionAssigner { cluster: Cluster; }, ) { - kafkaPackage = loadPackage( + kafkaPackage = loadPackageSync( 'kafkajs', KafkaReplyPartitionAssigner.name, - () => import('kafkajs'), + () => createRequire(import.meta.url)('kafkajs'), ); } - async init() { - kafkaPackage = await kafkaPackage; - } - /** * This process can result in imbalanced assignments * @param {array} members array of members, e.g: [{ memberId: 'test-5f93f5a3' }] diff --git a/packages/microservices/nest-microservice.ts b/packages/microservices/nest-microservice.ts index b1d1fe102..924022c6e 100644 --- a/packages/microservices/nest-microservice.ts +++ b/packages/microservices/nest-microservice.ts @@ -24,17 +24,6 @@ import { MicroservicesModule } from './microservices-module.js'; import { ServerFactory } from './server/server-factory.js'; import { Server } from './server/server.js'; -let _socketModule: any; -async function getSocketModule() { - if (!_socketModule) { - _socketModule = await optionalRequire( - '@nestjs/websockets/socket-module', - () => import('@nestjs/websockets/socket-module.js'), - ); - } - return _socketModule; -} - type CompleteMicroserviceOptions = NestMicroserviceOptions & (MicroserviceOptions | AsyncMicroserviceOptions); @@ -264,11 +253,7 @@ export class NestMicroservice } // Lazy-load optional socket module (ESM-compatible) - const socketMod = await getSocketModule(); - if (socketMod?.SocketModule) { - this.socketModule = new socketMod.SocketModule(); - } - + await this.loadSocketModule(); await super.init(); await this.registerModules(); return this; @@ -391,4 +376,16 @@ export class NestMicroservice } return instances; } + + private async loadSocketModule() { + if (!this.socketModule) { + const socketModule = await optionalRequire( + '@nestjs/websockets/socket-module', + () => import('@nestjs/websockets/socket-module.js'), + ); + if (socketModule?.SocketModule) { + this.socketModule = new socketModule.SocketModule(); + } + } + } } diff --git a/packages/microservices/serializers/nats-record.serializer.ts b/packages/microservices/serializers/nats-record.serializer.ts index 976cad04e..70885de6c 100644 --- a/packages/microservices/serializers/nats-record.serializer.ts +++ b/packages/microservices/serializers/nats-record.serializer.ts @@ -1,5 +1,6 @@ -import { loadPackage } from '@nestjs/common/utils/load-package.util.js'; +import { loadPackageSync } from '@nestjs/common/utils/load-package.util.js'; import { isObject } from '@nestjs/common/utils/shared.utils.js'; +import { createRequire } from 'module'; import { NatsCodec } from '../external/nats-codec.interface.js'; import { ReadPacket } from '../interfaces/index.js'; import { Serializer } from '../interfaces/serializer.interface.js'; @@ -11,29 +12,16 @@ export class NatsRecordSerializer implements Serializer< ReadPacket, NatsRecord > { - private jsonCodec: NatsCodec; + private readonly jsonCodec: NatsCodec; constructor() { - natsPackage = loadPackage( - 'nats', - NatsRecordSerializer.name, - () => import('nats'), + natsPackage = loadPackageSync('nats', NatsRecordSerializer.name, () => + createRequire(import.meta.url)('nats'), ); - } - - async init() { - natsPackage = await natsPackage; this.jsonCodec = natsPackage.JSONCodec(); } - private ensureJsonCodec() { - if (!this.jsonCodec) { - this.jsonCodec = natsPackage.JSONCodec(); - } - } - serialize(packet: any): NatsRecord { - this.ensureJsonCodec(); const natsMessage = packet?.data && isObject(packet.data) && packet.data instanceof NatsRecord ? packet.data diff --git a/packages/microservices/server/server-grpc.ts b/packages/microservices/server/server-grpc.ts index 442e07036..34396d147 100644 --- a/packages/microservices/server/server-grpc.ts +++ b/packages/microservices/server/server-grpc.ts @@ -3,6 +3,7 @@ import { isString, isUndefined, } from '@nestjs/common/utils/shared.utils.js'; +import { createRequire } from 'module'; import { EMPTY, Observable, @@ -77,6 +78,7 @@ export class ServerGrpc extends Server { grpcPackage = this.loadPackageSynchronously( '@grpc/grpc-js', ServerGrpc.name, + () => createRequire(import.meta.url)('@grpc/grpc-js'), ); grpcProtoLoaderPackage = this.loadPackageSynchronously( protoLoader, diff --git a/packages/microservices/server/server-kafka.ts b/packages/microservices/server/server-kafka.ts index ea288f617..32a082af0 100644 --- a/packages/microservices/server/server-kafka.ts +++ b/packages/microservices/server/server-kafka.ts @@ -35,8 +35,6 @@ import { import { KafkaRequestSerializer } from '../serializers/kafka-request.serializer.js'; import { Server } from './server.js'; -let kafkaPackage: any = {}; - /** * @publicApi */ @@ -75,12 +73,6 @@ export class ServerKafka extends Server { (clientOptions.clientId || KAFKA_DEFAULT_CLIENT) + postfixId; this.groupId = (consumerOptions.groupId || KAFKA_DEFAULT_GROUP) + postfixId; - kafkaPackage = this.loadPackage( - 'kafkajs', - ServerKafka.name, - () => import('kafkajs'), - ); - this.parser = new KafkaParser((options && options.parser) || undefined); this.initializeSerializer(options); @@ -155,14 +147,18 @@ export class ServerKafka extends Server { } public async createClient(): Promise { - kafkaPackage = await kafkaPackage; + const kafkaPackage = await this.loadPackage( + 'kafkajs', + ServerKafka.name, + () => import('kafkajs'), + ); return new kafkaPackage.Kafka( Object.assign( { logCreator: KafkaLogger.bind(null, this.logger) }, this.options.client, { clientId: this.clientId, brokers: this.brokers }, ) as KafkaConfig, - ); + ) as T; } public async bindEvents(consumer: Consumer) { diff --git a/packages/microservices/server/server-mqtt.ts b/packages/microservices/server/server-mqtt.ts index a42d27daa..715bfb78d 100644 --- a/packages/microservices/server/server-mqtt.ts +++ b/packages/microservices/server/server-mqtt.ts @@ -27,8 +27,6 @@ import { MqttRecord } from '../record-builders/mqtt.record-builder.js'; import { MqttRecordSerializer } from '../serializers/mqtt-record.serializer.js'; import { Server } from './server.js'; -let mqttPackage: any = {}; - // To enable type safety for MQTT. This cant be uncommented by default // because it would require the user to install the mqtt package even if they dont use MQTT // Otherwise, TypeScript would fail to compile the code. @@ -52,12 +50,6 @@ export class ServerMqtt extends Server { super(); this.url = this.getOptionsProp(options, 'url', MQTT_DEFAULT_URL); - mqttPackage = this.loadPackage( - 'mqtt', - ServerMqtt.name, - () => import('mqtt'), - ); - this.initializeSerializer(options); this.initializeDeserializer(options); } @@ -121,8 +113,12 @@ export class ServerMqtt extends Server { } public async createMqttClient(): Promise { - mqttPackage = await mqttPackage; - return mqttPackage.connect(this.url, this.options as MqttOptions); + const mqttPackage = await this.loadPackage( + 'mqtt', + ServerMqtt.name, + () => import('mqtt'), + ); + return mqttPackage.connect(this.url, this.options as any); } public getMessageHandler(pub: MqttClient) { diff --git a/packages/microservices/server/server-nats.ts b/packages/microservices/server/server-nats.ts index b3c7be79f..252ed4fa4 100644 --- a/packages/microservices/server/server-nats.ts +++ b/packages/microservices/server/server-nats.ts @@ -22,8 +22,6 @@ import { NatsRecord } from '../record-builders/index.js'; import { NatsRecordSerializer } from '../serializers/nats-record.serializer.js'; import { Server } from './server.js'; -let natsPackage = {} as any; - // To enable type safety for Nats. This cant be uncommented by default // because it would require the user to install the nats package even if they dont use Nats // Otherwise, TypeScript would fail to compile the code. @@ -54,12 +52,6 @@ export class ServerNats< constructor(private readonly options: Required['options']) { super(); - natsPackage = this.loadPackage( - 'nats', - ServerNats.name, - () => import('nats'), - ); - this.initializeSerializer(options); this.initializeDeserializer(options); } @@ -130,7 +122,11 @@ export class ServerNats< } public async createNatsClient(): Promise { - natsPackage = await natsPackage; + const natsPackage = await this.loadPackage( + 'nats', + ServerNats.name, + () => import('nats'), + ); // Eagerly initialize serializer/deserializer so they can be used synchronously if ( diff --git a/packages/microservices/server/server-redis.ts b/packages/microservices/server/server-redis.ts index 5a1dc8bcf..af0fee58c 100644 --- a/packages/microservices/server/server-redis.ts +++ b/packages/microservices/server/server-redis.ts @@ -25,8 +25,6 @@ import { Server } from './server.js'; // type Redis = import('ioredis').Redis; type Redis = any; -let redisPackage = {} as any; - /** * @publicApi */ @@ -45,12 +43,6 @@ export class ServerRedis extends Server { constructor(protected readonly options: Required['options']) { super(); - redisPackage = this.loadPackage( - 'ioredis', - ServerRedis.name, - () => import('ioredis'), - ); - this.initializeSerializer(options); this.initializeDeserializer(options); } @@ -118,8 +110,12 @@ export class ServerRedis extends Server { } public async createRedisClient(): Promise { - redisPackage = await redisPackage; - const RedisClient = redisPackage.default || redisPackage; + const redisPackage = await this.loadPackage( + 'ioredis', + ServerRedis.name, + () => import('ioredis'), + ); + const RedisClient: any = redisPackage.default || redisPackage; return new RedisClient({ port: REDIS_DEFAULT_PORT, host: REDIS_DEFAULT_HOST, diff --git a/packages/microservices/server/server-rmq.ts b/packages/microservices/server/server-rmq.ts index 60e84d73d..c806f715b 100644 --- a/packages/microservices/server/server-rmq.ts +++ b/packages/microservices/server/server-rmq.ts @@ -4,6 +4,7 @@ import { isString, isUndefined, } from '@nestjs/common/utils/shared.utils.js'; +import { createRequire } from 'module'; import { CONNECTION_FAILED_MESSAGE, DISCONNECTED_RMQ_MESSAGE, @@ -53,8 +54,6 @@ type ChannelWrapper = any; type Message = any; type Channel = any; -let rmqPackage = {} as any; // as typeof import('amqp-connection-manager'); - const INFINITE_CONNECTION_ATTEMPTS = -1; /** @@ -86,11 +85,8 @@ export class ServerRMQ extends Server { this.getOptionsProp(this.options, 'queueOptions') || RQM_DEFAULT_QUEUE_OPTIONS; - this.loadPackage('amqplib', ServerRMQ.name, () => import('amqplib')); - rmqPackage = this.loadPackage( - 'amqp-connection-manager', - ServerRMQ.name, - () => import('amqp-connection-manager'), + this.loadPackageSynchronously('amqplib', ServerRMQ.name, () => + createRequire(import.meta.url)('amqplib'), ); this.initializeSerializer(options); @@ -167,13 +163,17 @@ export class ServerRMQ extends Server { } public async createClient(): Promise { - rmqPackage = await rmqPackage; + const rmqPackage = await this.loadPackage( + 'amqp-connection-manager', + ServerRMQ.name, + () => import('amqp-connection-manager'), + ); const socketOptions = this.getOptionsProp(this.options, 'socketOptions'); return rmqPackage.connect(this.urls, { connectionOptions: socketOptions?.connectionOptions, heartbeatIntervalInSeconds: socketOptions?.heartbeatIntervalInSeconds, reconnectTimeInSeconds: socketOptions?.reconnectTimeInSeconds, - }); + }) as T; } private registerConnectListener() { diff --git a/packages/microservices/server/server.ts b/packages/microservices/server/server.ts index 87b8ddec0..926e0d93c 100644 --- a/packages/microservices/server/server.ts +++ b/packages/microservices/server/server.ts @@ -295,7 +295,7 @@ export abstract class Server< protected loadPackage( name: string, ctx: string, - loader?: Function, + loader?: () => T, ): T | Promise { return loadPackage(name, ctx, loader); } diff --git a/packages/microservices/test/client/client-redis.spec.ts b/packages/microservices/test/client/client-redis.spec.ts index 3369769f2..b30c72bb2 100644 --- a/packages/microservices/test/client/client-redis.spec.ts +++ b/packages/microservices/test/client/client-redis.spec.ts @@ -48,10 +48,12 @@ describe('ClientRedis', () => { pub = { publish: publishSpy }; untypedClient.subClient = sub; untypedClient.pubClient = pub; + untypedClient.connectionPromise = Promise.resolve(); connectSpy = sinon.spy(client, 'connect'); }); afterEach(() => { connectSpy.restore(); + untypedClient.connectionPromise = null; }); it('should subscribe to response pattern name', () => { client['publish'](msg, () => {}); @@ -249,6 +251,7 @@ describe('ClientRedis', () => { let registerErrorListenerSpy: sinon.SinonSpy; beforeEach(async () => { + untypedClient.connectionPromise = null; createClientSpy = sinon.stub(client, 'createClient').callsFake( () => ({ diff --git a/packages/microservices/test/helpers/kafka-reply-partition-assigner.spec.ts b/packages/microservices/test/helpers/kafka-reply-partition-assigner.spec.ts index 176eb4512..aaabc2509 100644 --- a/packages/microservices/test/helpers/kafka-reply-partition-assigner.spec.ts +++ b/packages/microservices/test/helpers/kafka-reply-partition-assigner.spec.ts @@ -15,7 +15,6 @@ describe('kafka reply partition assigner', () => { cluster = { findTopicPartitionMetadata: topic => metadata[topic] }; client = new ClientKafka({}); assigner = new KafkaReplyPartitionAssigner(client, { cluster }); - await assigner.init(); topics = ['topic-A', 'topic-B']; getConsumerAssignments = sinon.spy(client, 'getConsumerAssignments'); diff --git a/packages/microservices/test/serializers/nats-record.serializer.spec.ts b/packages/microservices/test/serializers/nats-record.serializer.spec.ts index e2df70aa7..533931e20 100644 --- a/packages/microservices/test/serializers/nats-record.serializer.spec.ts +++ b/packages/microservices/test/serializers/nats-record.serializer.spec.ts @@ -7,9 +7,8 @@ const jsonCodec = nats.JSONCodec(); describe('NatsRecordSerializer', () => { let instance: NatsRecordSerializer; - beforeEach(async () => { + beforeEach(() => { instance = new NatsRecordSerializer(); - await instance.init(); }); describe('serialize', () => { it('undefined', async () => { diff --git a/packages/platform-express/adapters/express-adapter.ts b/packages/platform-express/adapters/express-adapter.ts index da47c3a02..5727ec36c 100644 --- a/packages/platform-express/adapters/express-adapter.ts +++ b/packages/platform-express/adapters/express-adapter.ts @@ -54,6 +54,7 @@ export class ExpressAdapter extends AbstractHttpAdapter< private readonly routerMethodFactory = new RouterMethodFactory(); private readonly logger = new Logger(ExpressAdapter.name); private readonly openConnections = new Set(); + private readonly registeredPrefixes = new Set(); private onRequestHook?: ( req: express.Request, res: express.Response, @@ -166,11 +167,39 @@ export class ExpressAdapter extends AbstractHttpAdapter< } public setErrorHandler(handler: Function, prefix?: string) { + if (prefix) { + const router = express.Router(); + router.use(handler as any); + return this.use(prefix, router); + } return this.use(handler); } public setNotFoundHandler(handler: Function, prefix?: string) { - return this.use(handler); + if (prefix) { + this.registeredPrefixes.add(prefix); + const router = express.Router(); + router.all('*path', handler as any); + return this.use(prefix, router); + } + return this.use( + ( + req: express.Request, + res: express.Response, + next: express.NextFunction, + ) => { + // When multiple apps share this adapter, a non-prefixed app's 404 + // handler may be registered before a prefixed app's routes. Skip + // requests whose path belongs to another app's prefix so they can + // reach the correct route handlers further in the stack. + for (const registeredPrefix of this.registeredPrefixes) { + if (req.originalUrl.startsWith(registeredPrefix)) { + return next(); + } + } + return (handler as any)(req, res, next); + }, + ); } public isHeadersSent(response: any): boolean { diff --git a/packages/platform-ws/adapters/ws-adapter.ts b/packages/platform-ws/adapters/ws-adapter.ts index 9974b1aec..aa7ef734f 100644 --- a/packages/platform-ws/adapters/ws-adapter.ts +++ b/packages/platform-ws/adapters/ws-adapter.ts @@ -1,5 +1,6 @@ import { INestApplicationContext, Logger } from '@nestjs/common'; import { loadPackageSync } from '@nestjs/common/utils/load-package.util.js'; +import { createRequire } from 'module'; import { isNil, normalizePath } from '@nestjs/common/utils/shared.utils.js'; import { AbstractWsAdapter } from '@nestjs/websockets'; import { @@ -55,7 +56,9 @@ export class WsAdapter extends AbstractWsAdapter { options?: WsAdapterOptions, ) { super(appOrHttpServer); - wsPackage = loadPackageSync('ws', 'WsAdapter'); + wsPackage = loadPackageSync('ws', 'WsAdapter', () => + createRequire(import.meta.url)('ws'), + ); // Normalize CJS/ESM: In CJS, require('ws') returns WebSocket with .Server. // We need .Server for creating WebSocketServer instances. if (!wsPackage.Server) { diff --git a/packages/testing/testing-module.builder.ts b/packages/testing/testing-module.builder.ts index ecbfd846e..71387a047 100644 --- a/packages/testing/testing-module.builder.ts +++ b/packages/testing/testing-module.builder.ts @@ -129,7 +129,7 @@ export class TestingModuleBuilder { root, this.applicationConfig, ); - await testingModule.preloadLazyPackages(); + await testingModule['preloadLazyPackages'](); return testingModule; } diff --git a/packages/testing/testing-module.ts b/packages/testing/testing-module.ts index 3affec9e1..cc6845812 100644 --- a/packages/testing/testing-module.ts +++ b/packages/testing/testing-module.ts @@ -47,7 +47,7 @@ export class TestingModule extends NestApplicationContext { * createNestMicroservice and createHttpAdapter can stay synchronous. * Called from TestingModuleBuilder.compile(). */ - public async preloadLazyPackages(): Promise { + private async preloadLazyPackages(): Promise { // Best-effort: silently swallow if packages are not installed await loadPackage( '@nestjs/platform-express',