mirror of
https://github.com/nestjs/nest.git
synced 2026-02-21 23:11:44 +00:00
fix(microservices): fix backpressure with integration test
Resolves an issue where the code assumed the first value written to the stream where `write` returned `false` was not written, when in fact it is. Adds integration test to verify the fix. Updates unit tests to have the proper behavior in the mock. NOTE: This fix has been independently verified in our business environment. NOTEx2: This is totally my (@benlesh) fault. :/ fixes #12768
This commit is contained in:
71
integration/microservices/e2e/backpressure-grpc.spec.ts
Normal file
71
integration/microservices/e2e/backpressure-grpc.spec.ts
Normal file
@@ -0,0 +1,71 @@
|
||||
import * as GRPC from '@grpc/grpc-js';
|
||||
import * as ProtoLoader from '@grpc/proto-loader';
|
||||
import { INestApplication } from '@nestjs/common';
|
||||
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
|
||||
import { Test } from '@nestjs/testing';
|
||||
import { fail } from 'assert';
|
||||
import { expect } from 'chai';
|
||||
import { join } from 'path';
|
||||
import * as request from 'supertest';
|
||||
import { GrpcController } from '../src/grpc/grpc.controller';
|
||||
|
||||
describe('GRPC transport', () => {
|
||||
let server;
|
||||
let app: INestApplication;
|
||||
let client: any;
|
||||
|
||||
before(async () => {
|
||||
const module = await Test.createTestingModule({
|
||||
controllers: [GrpcController],
|
||||
}).compile();
|
||||
|
||||
app = module.createNestApplication();
|
||||
server = app.getHttpAdapter().getInstance();
|
||||
|
||||
app.connectMicroservice<MicroserviceOptions>({
|
||||
transport: Transport.GRPC,
|
||||
options: {
|
||||
package: ['backpressure'],
|
||||
protoPath: [join(__dirname, '../src/grpc/backpressure.proto')],
|
||||
},
|
||||
});
|
||||
|
||||
// Start gRPC microservice
|
||||
await app.startAllMicroservices();
|
||||
await app.init();
|
||||
|
||||
// Load proto-buffers for test gRPC dispatch
|
||||
const proto = ProtoLoader.loadSync(
|
||||
join(__dirname, '../src/grpc/backpressure.proto'),
|
||||
);
|
||||
|
||||
// Create Raw gRPC client object
|
||||
const protoGRPC = GRPC.loadPackageDefinition(proto) as any;
|
||||
|
||||
client = new protoGRPC.backpressure.Backpressure(
|
||||
'localhost:5000',
|
||||
GRPC.credentials.createInsecure(),
|
||||
);
|
||||
});
|
||||
|
||||
it(`GRPC with backpressure control`, async function () {
|
||||
// This test hit the gRPC server with 1000 messages, but the server
|
||||
// has to process large (> 1MB) messages, so it will definitely hit
|
||||
// issues where writing to the stream needs to be paused until a drain
|
||||
// event. Prior to this test, a bug existed where the server would
|
||||
// send the incorrect number of messages due to improper backpressure
|
||||
// handling that wrote messages more than once.
|
||||
this.timeout(10000);
|
||||
|
||||
const largeMessages = client.streamLargeMessages();
|
||||
// [0, 1, 2, ..., 999]
|
||||
const expectedIds = Array.from({ length: 1000 }, (_, n) => n);
|
||||
const receivedIds: number[] = [];
|
||||
|
||||
await largeMessages.forEach(msg => {
|
||||
receivedIds.push(msg.id);
|
||||
});
|
||||
|
||||
expect(receivedIds).to.deep.equal(expectedIds);
|
||||
});
|
||||
});
|
||||
14
integration/microservices/src/grpc/backpressure.proto
Normal file
14
integration/microservices/src/grpc/backpressure.proto
Normal file
@@ -0,0 +1,14 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package backpressure;
|
||||
|
||||
service Backpressure {
|
||||
rpc StreamLargeMessages(Empty) returns (stream BackpressureData) {}
|
||||
}
|
||||
|
||||
message BackpressureData {
|
||||
int32 id = 1;
|
||||
string data = 2;
|
||||
}
|
||||
|
||||
message Empty {}
|
||||
@@ -128,6 +128,27 @@ export class GrpcController {
|
||||
return svc.sum2({ data });
|
||||
}
|
||||
|
||||
@GrpcMethod('Backpressure', 'StreamLargeMessages')
|
||||
streamLargeMessages(_req: unknown, _meta: unknown) {
|
||||
// Send 1000 messages of >1MB each relatively fast
|
||||
// This should be enough to trigger backpressure issues
|
||||
// while writing to the socket.
|
||||
return new Observable(subscriber => {
|
||||
let n = 0;
|
||||
const interval = setInterval(() => {
|
||||
// We'll be checking the ids. The `data` is just to make the
|
||||
// message large enough to trigger backpressure issues.
|
||||
subscriber.next({ id: n++, data: 'a'.repeat(1024 * 1024) });
|
||||
if (n === 1000) {
|
||||
subscriber.complete();
|
||||
}
|
||||
}, 0);
|
||||
return () => {
|
||||
clearInterval(interval);
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
@Post('error')
|
||||
@HttpCode(200)
|
||||
serializeError(
|
||||
|
||||
@@ -269,6 +269,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
|
||||
let shouldErrorAfterDraining = false;
|
||||
let error: any;
|
||||
let shouldResolveAfterDraining = false;
|
||||
let writing = true;
|
||||
|
||||
// Used to manage finalization
|
||||
const subscription = new Subscription();
|
||||
@@ -290,19 +291,18 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
|
||||
subscription.add(() => call.end());
|
||||
|
||||
const drain = () => {
|
||||
writing = true;
|
||||
while (valuesWaitingToBeDrained.length > 0) {
|
||||
// Try to write the value, THEN shift it off, because
|
||||
// if we can't write the value, we need to keep it in the
|
||||
// buffer at it's position to ensure ordering.
|
||||
const value = valuesWaitingToBeDrained[0];
|
||||
if (!call.write(value)) {
|
||||
// We can't write anymore so we need to wait for the drain event
|
||||
// stop draining for now.
|
||||
return;
|
||||
const value = valuesWaitingToBeDrained.shift()!;
|
||||
if (writing) {
|
||||
// The first time `call.write` returns false, we need to stop.
|
||||
// It wrote the value, but it won't write anything else.
|
||||
writing = call.write(value);
|
||||
if (!writing) {
|
||||
// We can't write anymore so we need to wait for the drain event
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
||||
// We successfully wrote the value, so we can shift it off the buffer
|
||||
valuesWaitingToBeDrained.shift();
|
||||
}
|
||||
|
||||
if (shouldResolveAfterDraining) {
|
||||
@@ -320,7 +320,9 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
|
||||
subscription.add(
|
||||
source.subscribe({
|
||||
next(value) {
|
||||
if (!call.write(value)) {
|
||||
if (writing) {
|
||||
writing = call.write(value);
|
||||
} else {
|
||||
// If we can't write, that's because we need to
|
||||
// wait for the drain event before we can write again
|
||||
// buffer the value and wait for the drain event
|
||||
@@ -383,8 +385,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
|
||||
if (isResponseStream) {
|
||||
try {
|
||||
await this.writeObservableToGrpc(res, call);
|
||||
}
|
||||
catch (err) {
|
||||
} catch (err) {
|
||||
call.emit('error', err);
|
||||
return;
|
||||
}
|
||||
|
||||
@@ -602,11 +602,12 @@ describe('ServerGrpc', () => {
|
||||
const call = {
|
||||
write: sinon.spy(value => {
|
||||
// Simulating a writable stream becoming overwhelmed.
|
||||
const canWrite = writeCounter++ < highwaterMark;
|
||||
if (canWrite) {
|
||||
if (writeCounter++ < highwaterMark) {
|
||||
// We can write this value to the stream.
|
||||
written.push(value);
|
||||
}
|
||||
return canWrite;
|
||||
// But as soon as we pass the highwater mark, we can't write anymore.
|
||||
return writeCounter < highwaterMark;
|
||||
}),
|
||||
end: sinon.spy(() => {
|
||||
written.push('end');
|
||||
|
||||
Reference in New Issue
Block a user