Merge pull request #10106 from jmcdo29/fix/continue-on-stream-cancel

fix: add a check if the res is destroyed before sending response
This commit is contained in:
Kamil Mysliwiec
2022-08-16 10:26:56 +02:00
committed by GitHub
5 changed files with 82 additions and 2 deletions

View File

@@ -8,6 +8,11 @@ import { readFileSync } from 'fs';
import { join } from 'path';
import * as request from 'supertest';
import { AppModule } from '../src/app.module';
import {
getHttpBaseOptions,
sendCanceledHttpRequest,
sendHttpRequest,
} from './utils';
const readme = readFileSync(join(process.cwd(), 'Readme.md'));
const readmeString = readme.toString();
@@ -68,4 +73,11 @@ describe('Express FileSend', () => {
it('should return an error if the file does not exist', async () => {
return request(app.getHttpServer()).get('/file/not/exist').expect(400);
});
it('should allow for the client to end the response and be able to make another', async () => {
await app.listen(0);
const url = await getHttpBaseOptions(app);
await sendCanceledHttpRequest(new URL('/file/slow', url));
const res = await sendHttpRequest(new URL('/file/stream', url));
expect(res.statusCode).to.be.eq(200);
}).timeout(5000);
});

View File

@@ -0,0 +1,47 @@
import { INestApplication } from '@nestjs/common';
import { IncomingMessage, request, RequestOptions } from 'http';
import { URL } from 'url';
export const getHttpBaseOptions = async (
app: INestApplication,
): Promise<URL> => {
const url = await app.getUrl();
return new URL(url);
};
export const sendCanceledHttpRequest = async (url: URL) => {
return new Promise((resolve, reject) => {
const req = request(url, res => {
// close the request once we get the first response of data
res.on('data', () => {
req.destroy();
});
// response is closed, move on to next request and verify it's doable
res.on('close', resolve);
});
// fire the request
req.end();
});
};
export const sendHttpRequest = async (url: URL) => {
return new Promise<IncomingMessage>((resolve, reject) => {
const req = request(url, res => {
// this makes sure that the response actually starts and is read. We could verify this value against the same
// that is in an earlier test, but all we care about in _this_ test is that the status code is 200
res.on('data', chunk => {
// no op
});
// fail the test if somethin goes wrong
res.on('error', err => {
reject(err);
});
// pass the response back so we can verify values in the test
res.on('end', () => {
resolve(res);
});
});
// fire the request
req.end();
});
};

View File

@@ -36,4 +36,9 @@ export class AppController {
getNonExistantFile(): StreamableFile {
return this.appService.getFileThatDoesNotExist();
}
@Get('/file/slow')
getSlowFile(): StreamableFile {
return this.appService.getSlowStream();
}
}

View File

@@ -1,11 +1,16 @@
import { Injectable, StreamableFile } from '@nestjs/common';
import { randomBytes } from 'crypto';
import { createReadStream, readFileSync } from 'fs';
import { join } from 'path';
import { Observable, of } from 'rxjs';
import { Readable } from 'stream';
import { NonFile } from './non-file';
@Injectable()
export class AppService {
// `randomBytes` has a max value of 2^31 -1. That's all this is
private readonly MAX_BITES = Math.pow(2, 31) - 1;
getReadStream(): StreamableFile {
return new StreamableFile(
createReadStream(join(process.cwd(), 'Readme.md')),
@@ -39,4 +44,12 @@ export class AppService {
getFileThatDoesNotExist(): StreamableFile {
return new StreamableFile(createReadStream('does-not-exist.txt'));
}
getSlowStream(): StreamableFile {
const stream = new Readable();
stream.push(Buffer.from(randomBytes(this.MAX_BITES)));
// necessary for a `new Readable()`. Doesn't do anything
stream._read = () => {};
return new StreamableFile(stream);
}
}

View File

@@ -4,6 +4,7 @@ import { isFunction } from '../utils/shared.utils';
import { StreamableFileOptions } from './streamable-options.interface';
export interface StreamableHandlerResponse {
destroyed: boolean;
statusCode: number;
send: (msg: string) => void;
}
@@ -15,8 +16,10 @@ export class StreamableFile {
err: Error,
response: StreamableHandlerResponse,
) => void = (err: Error, res) => {
res.statusCode = 400;
res.send(err.message);
if (!res.destroyed) {
res.statusCode = 400;
res.send(err.message);
}
};
constructor(buffer: Uint8Array, options?: StreamableFileOptions);