Merge pull request #10345 from tolgap/fix/9517-keep-alive-connections-blocking

fix(platform): shutdown hooks not firing caused by open http connections
This commit is contained in:
Kamil Mysliwiec
2022-11-07 11:02:20 +01:00
committed by GitHub
8 changed files with 241 additions and 4 deletions

View File

@@ -0,0 +1,79 @@
import { NestExpressApplication } from '@nestjs/platform-express';
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import * as EventSource from 'eventsource';
import { AppModule } from '../src/app.module';
describe('Sse (Express Application)', () => {
let app: NestExpressApplication;
let eventSource: EventSource;
describe('without forceCloseConnections', () => {
beforeEach(async () => {
const moduleFixture = await Test.createTestingModule({
imports: [AppModule],
}).compile();
app = moduleFixture.createNestApplication<NestExpressApplication>();
await app.listen(3000);
const url = await app.getUrl();
eventSource = new EventSource(url + '/sse', {
headers: { connection: 'keep-alive' },
});
});
// The order of actions is very important here. When not using `forceCloseConnections`,
// the SSe eventsource should close the connections in order to signal the server that
// the keep-alive connection can be ended.
afterEach(async () => {
eventSource.close();
await app.close();
});
it('receives events from server', done => {
eventSource.addEventListener('message', event => {
expect(JSON.parse(event.data)).to.eql({
hello: 'world',
});
done();
});
});
});
describe('with forceCloseConnections', () => {
beforeEach(async () => {
const moduleFixture = await Test.createTestingModule({
imports: [AppModule],
}).compile();
app = moduleFixture.createNestApplication<NestExpressApplication>({
forceCloseConnections: true,
});
await app.listen(3000);
const url = await app.getUrl();
eventSource = new EventSource(url + '/sse', {
headers: { connection: 'keep-alive' },
});
});
afterEach(async () => {
await app.close();
eventSource.close();
});
it('receives events from server', done => {
eventSource.addEventListener('message', event => {
expect(JSON.parse(event.data)).to.eql({
hello: 'world',
});
done();
});
});
});
});

View File

@@ -0,0 +1,86 @@
import {
FastifyAdapter,
NestFastifyApplication,
} from '@nestjs/platform-fastify';
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import * as EventSource from 'eventsource';
import { AppModule } from '../src/app.module';
describe('Sse (Fastify Application)', () => {
let app: NestFastifyApplication;
let eventSource: EventSource;
describe('without forceCloseConnections', () => {
beforeEach(async () => {
const moduleFixture = await Test.createTestingModule({
imports: [AppModule],
}).compile();
app = moduleFixture.createNestApplication<NestFastifyApplication>(
new FastifyAdapter(),
);
await app.listen(3000);
const url = await app.getUrl();
eventSource = new EventSource(url + '/sse', {
headers: { connection: 'keep-alive' },
});
});
// The order of actions is very important here. When not using `forceCloseConnections`,
// the SSe eventsource should close the connections in order to signal the server that
// the keep-alive connection can be ended.
afterEach(async () => {
eventSource.close();
await app.close();
});
it('receives events from server', done => {
eventSource.addEventListener('message', event => {
expect(JSON.parse(event.data)).to.eql({
hello: 'world',
});
done();
});
});
});
describe('with forceCloseConnections', () => {
beforeEach(async () => {
const moduleFixture = await Test.createTestingModule({
imports: [AppModule],
}).compile();
app = moduleFixture.createNestApplication<NestFastifyApplication>(
new FastifyAdapter({
forceCloseConnections: true,
}),
);
await app.listen(3000);
const url = await app.getUrl();
eventSource = new EventSource(url + '/sse', {
headers: { connection: 'keep-alive' },
});
});
afterEach(async () => {
await app.close();
eventSource.close();
});
it('receives events from server', done => {
eventSource.addEventListener('message', event => {
expect(JSON.parse(event.data)).to.eql({
hello: 'world',
});
done();
});
});
});
});

View File

@@ -0,0 +1,12 @@
import { Controller, MessageEvent, Sse } from '@nestjs/common';
import { interval, map, Observable } from 'rxjs';
@Controller()
export class AppController {
@Sse('sse')
sse(): Observable<MessageEvent> {
return interval(1000).pipe(
map(_ => ({ data: { hello: 'world' } } as MessageEvent)),
);
}
}

View File

@@ -0,0 +1,7 @@
import { Module } from '@nestjs/common';
import { AppController } from './app.controller';
@Module({
controllers: [AppController],
})
export class AppModule {}

View File

@@ -0,0 +1,23 @@
{
"compilerOptions": {
"module": "commonjs",
"declaration": false,
"noImplicitAny": false,
"removeComments": true,
"lib": ["dom"],
"noLib": false,
"emitDecoratorMetadata": true,
"experimentalDecorators": true,
"target": "es6",
"sourceMap": true,
"allowJs": true,
"outDir": "./dist"
},
"include": [
"src/**/*",
"e2e/**/*"
],
"exclude": [
"node_modules"
]
}

View File

@@ -25,4 +25,9 @@ export interface NestApplicationOptions extends NestApplicationContextOptions {
* Whether to register the raw request body on the request. Use `req.rawBody`.
*/
rawBody?: boolean;
/**
* Force close open HTTP connections. Useful if restarting your application hangs due to
* keep-alive connections in the HTTP adapter.
*/
forceCloseConnections?: boolean;
}

View File

@@ -33,7 +33,7 @@ import * as cors from 'cors';
import * as express from 'express';
import * as http from 'http';
import * as https from 'https';
import { pipeline } from 'stream';
import { Duplex, pipeline } from 'stream';
import { ServeStaticOptions } from '../interfaces/serve-static-options.interface';
import { getBodyParserOptions } from './utils/get-body-parser-options.util';
@@ -49,6 +49,7 @@ type VersionedRoute = <
export class ExpressAdapter extends AbstractHttpAdapter {
private readonly routerMethodFactory = new RouterMethodFactory();
private readonly logger = new Logger(ExpressAdapter.name);
private readonly openConnections = new Set<Duplex>();
constructor(instance?: any) {
super(instance || express());
@@ -139,6 +140,8 @@ export class ExpressAdapter extends AbstractHttpAdapter {
}
public close() {
this.closeOpenConnections();
if (!this.httpServer) {
return undefined;
}
@@ -207,9 +210,13 @@ export class ExpressAdapter extends AbstractHttpAdapter {
options.httpsOptions,
this.getInstance(),
);
return;
} else {
this.httpServer = http.createServer(this.getInstance());
}
if (options?.forceCloseConnections) {
this.trackOpenConnections();
}
this.httpServer = http.createServer(this.getInstance());
}
public registerParserMiddleware(prefix?: string, rawBody?: boolean) {
@@ -382,6 +389,21 @@ export class ExpressAdapter extends AbstractHttpAdapter {
}
}
private trackOpenConnections() {
this.httpServer.on('connection', (socket: Duplex) => {
this.openConnections.add(socket);
socket.on('close', () => this.openConnections.delete(socket));
});
}
private closeOpenConnections() {
for (const socket of this.openConnections) {
socket.destroy();
this.openConnections.delete(socket);
}
}
private isMiddlewareApplied(name: string): boolean {
const app = this.getInstance();
return (

View File

@@ -2,7 +2,10 @@ import { NestFactory } from '@nestjs/core';
import { AppModule } from './app.module';
async function bootstrap() {
const app = await NestFactory.create(AppModule);
const app = await NestFactory.create(AppModule, {
forceCloseConnections: true,
});
app.enableShutdownHooks();
await app.listen(3000);
console.log(`Application is running on: ${await app.getUrl()}`);
}