Compare commits

...

12 Commits

Author SHA1 Message Date
Kamil Myśliwiec
3c5180d2d5 chore(@nestjs) publish v10.4.8 release 2024-11-15 14:51:54 +01:00
Kamil Mysliwiec
a7b73e3107 Merge pull request #14143 from nestjs/feat/expose-listening-stream
feat(core): expose listening stream from http adapter host
2024-11-15 14:40:16 +01:00
Kamil Mysliwiec
491ed77f22 Merge pull request #14059 from v-sum/fix-disregarded-rmq-client-options
fix(microservices): include discarded rmq client options
2024-11-15 14:32:46 +01:00
Kamil Myśliwiec
e64ab182ba refactor(core): replace internal init with an inline promise 2024-11-15 14:32:09 +01:00
Kamil Mysliwiec
dade6d5889 Merge pull request #14139 from mksony/chore/ensure-application-init-before-accepting-sigterm
chore(core): defer application shutdown until init finishes
2024-11-15 14:31:03 +01:00
Kamil Mysliwiec
49dc36d9e5 Merge pull request #14132 from nestjs/fix/mqtt-microservices-qos2
fix(microservices): no messages emitted with mqtt when qos set
2024-11-15 14:30:42 +01:00
Kamil Mysliwiec
831d29553d Merge pull request #14133 from nestjs/fix/flaky-durable-provider
fix(core): flaky durable provider, remove instance on error
2024-11-15 14:30:28 +01:00
Kamil Myśliwiec
1a076fc4cb feat(core): expose listening stream from http adapter host 2024-11-15 11:23:16 +01:00
Max Karacsony
5c6986f0c7 chore(core): defer application shutdown until init finishes 2024-11-14 13:50:07 +01:00
Kamil Myśliwiec
229d97f018 fix(core): flaky durable provider, remove instance on error #13953 2024-11-12 13:23:51 +01:00
Kamil Myśliwiec
1fe4dc2cad fix(microservices): no messages emitted with mqtt when qos set #14079 2024-11-12 12:35:25 +01:00
Vasile Sumanschi
9cd43532ae fix(microservices): include discarded rmq client options
all other properties, beside 'connectionOptions', from the socketOptions object ( of type AmqpConnectionManagerSocketOptions )
were being discarded on the creation of the AmqpConnectionManager client
https://github.com/nestjs/nest/blob/master/packages/microservices/external/rmq-url.interface.ts#L47
https://github.com/jwalton/node-amqp-connection-manager/blob/v4.1.14/src/AmqpConnectionManager.ts#L46

https://github.com/nestjs/nest/issues/5788#issuecomment-2373361313
2024-10-09 01:17:27 +03:00
24 changed files with 273 additions and 35 deletions

1
.gitignore vendored
View File

@@ -50,3 +50,4 @@ build/config\.gypi
.npmrc
pnpm-lock.yaml
/.history

View File

@@ -27,10 +27,14 @@ describe('Durable providers', () => {
tenantId: number,
end: (err?: any) => void,
endpoint = '/durable',
opts: {
forceError: boolean;
} = { forceError: false },
) =>
request(server)
.get(endpoint)
.set({ ['x-tenant-id']: tenantId })
.set({ ['x-force-error']: opts.forceError ? 'true' : 'false' })
.end((err, res) => {
if (err) return end(err);
end(res);
@@ -84,6 +88,23 @@ describe('Durable providers', () => {
);
expect(result.body).deep.equal({ tenantId: '3' });
});
it(`should not cache durable providers that throw errors`, async () => {
let result: request.Response;
result = await new Promise<request.Response>(resolve =>
performHttpCall(10, resolve, '/durable/echo', { forceError: true }),
);
expect(result.statusCode).equal(412);
// The second request should be successful
result = await new Promise<request.Response>(resolve =>
performHttpCall(10, resolve, '/durable/echo'),
);
expect(result.body).deep.equal({ tenantId: '10' });
});
});
after(async () => {

View File

@@ -6,6 +6,8 @@ const tenants = new Map<string, ContextId>();
export class DurableContextIdStrategy implements ContextIdStrategy {
attach(contextId: ContextId, request: Request) {
const tenantId = request.headers['x-tenant-id'] as string;
const forceError = request.headers['x-force-error'] === 'true';
let tenantSubTreeId: ContextId;
if (tenants.has(tenantId)) {
@@ -14,10 +16,18 @@ export class DurableContextIdStrategy implements ContextIdStrategy {
tenantSubTreeId = { id: +tenantId } as ContextId;
tenants.set(tenantId, tenantSubTreeId);
}
const payload: {
tenantId: string;
forceError?: boolean;
} = { tenantId };
if (forceError) {
payload.forceError = true;
}
return {
resolve: (info: HostComponentInfo) =>
info.isTreeDurable ? tenantSubTreeId : contextId,
payload: { tenantId },
payload,
};
}
}

View File

@@ -1,11 +1,23 @@
import { Inject, Injectable, Scope } from '@nestjs/common';
import {
Inject,
Injectable,
PreconditionFailedException,
Scope,
} from '@nestjs/common';
import { REQUEST } from '@nestjs/core';
@Injectable({ scope: Scope.REQUEST, durable: true })
export class DurableService {
public instanceCounter = 0;
constructor(@Inject(REQUEST) public readonly requestPayload: unknown) {}
constructor(
@Inject(REQUEST)
public readonly requestPayload: { tenantId: string; forceError: boolean },
) {
if (requestPayload.forceError) {
throw new PreconditionFailedException('Forced error');
}
}
greeting() {
++this.instanceCounter;

View File

@@ -3,5 +3,5 @@
"packages": [
"packages/*"
],
"version": "10.4.7"
"version": "10.4.8"
}

View File

@@ -1,6 +1,6 @@
{
"name": "@nestjs/common",
"version": "10.4.7",
"version": "10.4.8",
"description": "Nest - modern, fast, powerful node.js web framework (@common)",
"author": "Kamil Mysliwiec",
"homepage": "https://nestjs.com",

View File

@@ -1,3 +1,4 @@
import { Observable, Subject } from 'rxjs';
import { AbstractHttpAdapter } from '../adapters/http-adapter';
/**
@@ -16,6 +17,8 @@ export class HttpAdapterHost<
T extends AbstractHttpAdapter = AbstractHttpAdapter,
> {
private _httpAdapter?: T;
private _listen$ = new Subject<void>();
private isListening = false;
/**
* Accessor for the underlying `HttpAdapter`
@@ -35,4 +38,31 @@ export class HttpAdapterHost<
get httpAdapter(): T {
return this._httpAdapter;
}
/**
* Observable that allows to subscribe to the `listen` event.
* This event is emitted when the HTTP application is listening for incoming requests.
*/
get listen$(): Observable<void> {
return this._listen$.asObservable();
}
/**
* Sets the listening state of the application.
*/
set listening(listening: boolean) {
this.isListening = listening;
if (listening) {
this._listen$.next();
this._listen$.complete();
}
}
/**
* Returns a boolean indicating whether the application is listening for incoming requests.
*/
get listening(): boolean {
return this.isListening;
}
}

View File

@@ -170,6 +170,11 @@ export class Injector {
inquirer,
);
} catch (err) {
wrapper.removeInstanceByContextId(
this.getContextId(contextId, wrapper),
inquirerId,
);
settlementSignal.error(err);
throw err;
}

View File

@@ -168,6 +168,21 @@ export class InstanceWrapper<T = any> {
collection.set(contextId, value);
}
public removeInstanceByContextId(contextId: ContextId, inquirerId?: string) {
if (this.scope === Scope.TRANSIENT && inquirerId) {
return this.removeInstanceByInquirerId(contextId, inquirerId);
}
this.values.delete(contextId);
}
public removeInstanceByInquirerId(contextId: ContextId, inquirerId: string) {
const collection = this.transientMap.get(inquirerId);
if (!collection) {
return;
}
collection.delete(contextId);
}
public addCtorMetadata(index: number, wrapper: InstanceWrapper) {
if (!this[INSTANCE_METADATA_SYMBOL].dependencies) {
this[INSTANCE_METADATA_SYMBOL].dependencies = [];

View File

@@ -54,6 +54,7 @@ export class NestApplicationContext<
private shutdownCleanupRef?: (...args: unknown[]) => unknown;
private _instanceLinksHost: InstanceLinksHost;
private _moduleRefsForHooksByDistance?: Array<Module>;
private initializationPromise?: Promise<void>;
protected get instanceLinksHost() {
if (!this._instanceLinksHost) {
@@ -234,8 +235,16 @@ export class NestApplicationContext<
if (this.isInitialized) {
return this;
}
await this.callInitHook();
await this.callBootstrapHook();
this.initializationPromise = new Promise(async (resolve, reject) => {
try {
await this.callInitHook();
await this.callBootstrapHook();
resolve();
} catch (err) {
reject(err);
}
});
await this.initializationPromise;
this.isInitialized = true;
return this;
@@ -246,6 +255,7 @@ export class NestApplicationContext<
* @returns {Promise<void>}
*/
public async close(signal?: string): Promise<void> {
await this.initializationPromise;
await this.callDestroyHook();
await this.callBeforeShutdownHook(signal);
await this.dispose();
@@ -333,6 +343,7 @@ export class NestApplicationContext<
return;
}
receivedSignal = true;
await this.initializationPromise;
await this.callDestroyHook();
await this.callBeforeShutdownHook(signal);
await this.dispose();

View File

@@ -294,8 +294,12 @@ export class NestApplication
public async listen(port: number | string, hostname: string): Promise<any>;
public async listen(port: number | string, ...args: any[]): Promise<any> {
this.assertNotInPreviewMode('listen');
!this.isInitialized && (await this.init());
if (!this.isInitialized) {
await this.init();
}
const httpAdapterHost = this.container.getHttpAdapterHostRef();
return new Promise((resolve, reject) => {
const errorHandler = (e: any) => {
this.logger.error(e?.toString?.());
@@ -323,6 +327,8 @@ export class NestApplication
if (address) {
this.httpServer.removeListener('error', errorHandler);
this.isListening = true;
httpAdapterHost.listening = true;
resolve(this.httpServer);
}
if (isCallbackInOriginalArgs) {

View File

@@ -1,6 +1,6 @@
{
"name": "@nestjs/core",
"version": "10.4.7",
"version": "10.4.8",
"description": "Nest - modern, fast, powerful node.js web framework (@core)",
"author": "Kamil Mysliwiec",
"license": "MIT",
@@ -36,7 +36,7 @@
"uid": "2.0.2"
},
"devDependencies": {
"@nestjs/common": "10.4.7"
"@nestjs/common": "10.4.8"
},
"peerDependencies": {
"@nestjs/common": "^10.0.0",

View File

@@ -2,11 +2,27 @@ import { expect } from 'chai';
import { HttpAdapterHost } from '../../helpers/http-adapter-host';
describe('HttpAdapterHost', () => {
const applicationRefHost = new HttpAdapterHost();
let applicationRefHost: HttpAdapterHost;
beforeEach(() => {
applicationRefHost = new HttpAdapterHost();
});
it('should wrap application reference', () => {
const ref = {};
applicationRefHost.httpAdapter = ref as any;
expect(applicationRefHost.httpAdapter).to.be.eql(ref);
});
it('should emit listen event when listening is set to true', done => {
applicationRefHost.listen$.subscribe(() => {
expect(applicationRefHost.listening).to.be.true;
done();
});
applicationRefHost.listening = true;
});
it('listening should return false if the application isnt listening yet', () => {
expect(applicationRefHost.listening).to.be.false;
});
});

View File

@@ -1,6 +1,7 @@
import { Scope } from '@nestjs/common';
import { expect } from 'chai';
import * as sinon from 'sinon';
import { createContextId } from '../../helpers';
import { STATIC_CONTEXT } from '../../injector/constants';
import { InstanceWrapper } from '../../injector/instance-wrapper';
@@ -737,6 +738,53 @@ describe('InstanceWrapper', () => {
});
});
describe('removeInstanceByContextId', () => {
describe('without inquirer', () => {
it('should remove instance for given context', () => {
const wrapper = new InstanceWrapper({
scope: Scope.TRANSIENT,
});
const contextId = createContextId();
wrapper.setInstanceByContextId(contextId, { instance: {} });
const existingContext = wrapper.getInstanceByContextId(contextId);
expect(existingContext.instance).to.be.not.undefined;
wrapper.removeInstanceByContextId(contextId);
const removedContext = wrapper.getInstanceByContextId(contextId);
expect(removedContext.instance).to.be.undefined;
});
});
describe('when transient and inquirer has been passed', () => {
it('should remove instance for given context', () => {
const wrapper = new InstanceWrapper({
scope: Scope.TRANSIENT,
});
wrapper.setInstanceByContextId(
STATIC_CONTEXT,
{ instance: {} },
'inquirerId',
);
const existingContext = wrapper.getInstanceByContextId(
STATIC_CONTEXT,
'inquirerId',
);
expect(existingContext.instance).to.be.not.undefined;
wrapper.removeInstanceByContextId(STATIC_CONTEXT, 'inquirerId');
const removedContext = wrapper.getInstanceByContextId(
STATIC_CONTEXT,
'inquirerId',
);
expect(removedContext.instance).to.be.undefined;
});
});
});
describe('isInRequestScope', () => {
describe('when tree and context are not static and is not transient', () => {
it('should return true', () => {

View File

@@ -1,4 +1,4 @@
import { InjectionToken, Scope } from '@nestjs/common';
import { InjectionToken, Provider, Scope } from '@nestjs/common';
import { expect } from 'chai';
import * as sinon from 'sinon';
import { ContextIdFactory } from '../helpers/context-id-factory';
@@ -7,6 +7,7 @@ import { Injector } from '../injector/injector';
import { InstanceLoader } from '../injector/instance-loader';
import { GraphInspector } from '../inspector/graph-inspector';
import { NestApplicationContext } from '../nest-application-context';
import { setTimeout } from 'timers/promises';
describe('NestApplicationContext', () => {
class A {}
@@ -14,6 +15,7 @@ describe('NestApplicationContext', () => {
async function testHelper(
injectionKey: InjectionToken,
scope: Scope,
additionalProviders: Array<Provider> = [],
): Promise<NestApplicationContext> {
const nestContainer = new NestContainer();
const injector = new Injector();
@@ -33,6 +35,10 @@ describe('NestApplicationContext', () => {
moduleRef.token,
);
for (const provider of additionalProviders) {
nestContainer.addProvider(provider, moduleRef.token);
}
nestContainer.addInjectable(
{
provide: injectionKey,
@@ -96,6 +102,58 @@ describe('NestApplicationContext', () => {
expect(processUp).to.be.false;
expect(promisesResolved).to.be.true;
});
it('should defer shutdown until all init hooks are resolved', async () => {
const clock = sinon.useFakeTimers({
toFake: ['setTimeout'],
});
const signal = 'SIGTERM';
const onModuleInitStub = sinon.stub();
const onApplicationShutdownStub = sinon.stub();
class B {
async onModuleInit() {
await setTimeout(5000);
onModuleInitStub();
}
async onApplicationShutdown() {
await setTimeout(1000);
onApplicationShutdownStub();
}
}
const applicationContext = await testHelper(A, Scope.DEFAULT, [
{ provide: B, useClass: B, scope: Scope.DEFAULT },
]);
applicationContext.enableShutdownHooks([signal]);
const ignoreProcessSignal = () => {
// noop to prevent process from exiting
};
process.on(signal, ignoreProcessSignal);
const deferredShutdown = async () => {
setTimeout(1);
process.kill(process.pid, signal);
};
Promise.all([applicationContext.init(), deferredShutdown()]);
await clock.nextAsync();
expect(onModuleInitStub.called).to.be.false;
expect(onApplicationShutdownStub.called).to.be.false;
await clock.nextAsync();
expect(onModuleInitStub.called).to.be.true;
expect(onApplicationShutdownStub.called).to.be.false;
await clock.nextAsync();
expect(onModuleInitStub.called).to.be.true;
expect(onApplicationShutdownStub.called).to.be.true;
clock.restore();
});
});
describe('get', () => {

View File

@@ -210,15 +210,22 @@ export class ClientMqtt extends ClientProxy {
return undefined;
}
return {
...requestOptions,
properties: {
...requestOptions?.properties,
// Cant just spread objects as MQTT won't deliver
// any message with empty object as "userProperties" field
// @url https://github.com/nestjs/nest/issues/14079
let options: MqttRecordOptions = {};
if (requestOptions) {
options = { ...requestOptions };
}
if (this.options?.userProperties) {
options.properties = {
...options.properties,
userProperties: {
...this.options?.userProperties,
...requestOptions?.properties?.userProperties,
...options.properties?.userProperties,
},
},
};
};
}
return options;
}
}

View File

@@ -135,9 +135,7 @@ export class ClientRMQ extends ClientProxy {
public createClient(): AmqpConnectionManager {
const socketOptions = this.getOptionsProp(this.options, 'socketOptions');
return rmqPackage.connect(this.urls, {
connectionOptions: socketOptions?.connectionOptions,
});
return rmqPackage.connect(this.urls, socketOptions);
}
public mergeDisconnectEvent<T = any>(

View File

@@ -1,6 +1,6 @@
{
"name": "@nestjs/microservices",
"version": "10.4.7",
"version": "10.4.8",
"description": "Nest - modern, fast, powerful node.js web framework (@microservices)",
"author": "Kamil Mysliwiec",
"license": "MIT",
@@ -22,8 +22,8 @@
"tslib": "2.7.0"
},
"devDependencies": {
"@nestjs/common": "10.4.7",
"@nestjs/core": "10.4.7"
"@nestjs/common": "10.4.8",
"@nestjs/core": "10.4.8"
},
"peerDependencies": {
"@grpc/grpc-js": "*",

View File

@@ -1,6 +1,6 @@
{
"name": "@nestjs/platform-express",
"version": "10.4.7",
"version": "10.4.8",
"description": "Nest - modern, fast, powerful node.js web framework (@platform-express)",
"author": "Kamil Mysliwiec",
"license": "MIT",
@@ -25,8 +25,8 @@
"tslib": "2.7.0"
},
"devDependencies": {
"@nestjs/common": "10.4.7",
"@nestjs/core": "10.4.7"
"@nestjs/common": "10.4.8",
"@nestjs/core": "10.4.8"
},
"peerDependencies": {
"@nestjs/common": "^10.0.0",

View File

@@ -1,6 +1,6 @@
{
"name": "@nestjs/platform-fastify",
"version": "10.4.7",
"version": "10.4.8",
"description": "Nest - modern, fast, powerful node.js web framework (@platform-fastify)",
"author": "Kamil Mysliwiec",
"license": "MIT",

View File

@@ -1,6 +1,6 @@
{
"name": "@nestjs/platform-socket.io",
"version": "10.4.7",
"version": "10.4.8",
"description": "Nest - modern, fast, powerful node.js web framework (@platform-socket.io)",
"author": "Kamil Mysliwiec",
"license": "MIT",

View File

@@ -1,6 +1,6 @@
{
"name": "@nestjs/platform-ws",
"version": "10.4.7",
"version": "10.4.8",
"description": "Nest - modern, fast, powerful node.js web framework (@platform-ws)",
"author": "Kamil Mysliwiec",
"license": "MIT",

View File

@@ -1,6 +1,6 @@
{
"name": "@nestjs/testing",
"version": "10.4.7",
"version": "10.4.8",
"description": "Nest - modern, fast, powerful node.js web framework (@testing)",
"author": "Kamil Mysliwiec",
"license": "MIT",

View File

@@ -1,6 +1,6 @@
{
"name": "@nestjs/websockets",
"version": "10.4.7",
"version": "10.4.8",
"description": "Nest - modern, fast, powerful node.js web framework (@websockets)",
"author": "Kamil Mysliwiec",
"license": "MIT",
@@ -18,8 +18,8 @@
"tslib": "2.7.0"
},
"devDependencies": {
"@nestjs/common": "10.4.7",
"@nestjs/core": "10.4.7"
"@nestjs/common": "10.4.8",
"@nestjs/core": "10.4.8"
},
"peerDependencies": {
"@nestjs/common": "^10.0.0",