chore: fix tests

Just combining the backpressure tests and the math tests. There were issues
with the tests because of port collisions.
This commit is contained in:
Ben Lesh
2023-11-17 14:46:56 -06:00
parent 2e8f5cebf7
commit d27d1818cd
5 changed files with 30 additions and 86 deletions

View File

@@ -1,71 +0,0 @@
import * as GRPC from '@grpc/grpc-js';
import * as ProtoLoader from '@grpc/proto-loader';
import { INestApplication } from '@nestjs/common';
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import { fail } from 'assert';
import { expect } from 'chai';
import { join } from 'path';
import * as request from 'supertest';
import { GrpcController } from '../src/grpc/grpc.controller';
describe('GRPC transport', () => {
let server;
let app: INestApplication;
let client: any;
before(async () => {
const module = await Test.createTestingModule({
controllers: [GrpcController],
}).compile();
app = module.createNestApplication();
server = app.getHttpAdapter().getInstance();
app.connectMicroservice<MicroserviceOptions>({
transport: Transport.GRPC,
options: {
package: ['backpressure'],
protoPath: [join(__dirname, '../src/grpc/backpressure.proto')],
},
});
// Start gRPC microservice
await app.startAllMicroservices();
await app.init();
// Load proto-buffers for test gRPC dispatch
const proto = ProtoLoader.loadSync(
join(__dirname, '../src/grpc/backpressure.proto'),
);
// Create Raw gRPC client object
const protoGRPC = GRPC.loadPackageDefinition(proto) as any;
client = new protoGRPC.backpressure.Backpressure(
'localhost:5000',
GRPC.credentials.createInsecure(),
);
});
it(`GRPC with backpressure control`, async function () {
// This test hit the gRPC server with 1000 messages, but the server
// has to process large (> 1MB) messages, so it will definitely hit
// issues where writing to the stream needs to be paused until a drain
// event. Prior to this test, a bug existed where the server would
// send the incorrect number of messages due to improper backpressure
// handling that wrote messages more than once.
this.timeout(10000);
const largeMessages = client.streamLargeMessages();
// [0, 1, 2, ..., 999]
const expectedIds = Array.from({ length: 1000 }, (_, n) => n);
const receivedIds: number[] = [];
await largeMessages.forEach(msg => {
receivedIds.push(msg.id);
});
expect(receivedIds).to.deep.equal(expectedIds);
});
});

View File

@@ -128,6 +128,27 @@ describe('GRPC transport', () => {
});
});
it(`GRPC with backpressure control`, async function () {
// This test hit the gRPC server with 1000 messages, but the server
// has to process large (> 1MB) messages, so it will definitely hit
// issues where writing to the stream needs to be paused until a drain
// event. Prior to this test, a bug existed where the server would
// send the incorrect number of messages due to improper backpressure
// handling that wrote messages more than once.
this.timeout(10000);
const largeMessages = client.streamLargeMessages();
// [0, 1, 2, ..., 999]
const expectedIds = Array.from({ length: 1000 }, (_, n) => n);
const receivedIds: number[] = [];
await largeMessages.forEach(msg => {
receivedIds.push(msg.id);
});
expect(receivedIds).to.deep.equal(expectedIds);
});
after(async () => {
await app.close();
});

View File

@@ -1,14 +0,0 @@
syntax = "proto3";
package backpressure;
service Backpressure {
rpc StreamLargeMessages(Empty) returns (stream BackpressureData) {}
}
message BackpressureData {
int32 id = 1;
string data = 2;
}
message Empty {}

View File

@@ -128,7 +128,7 @@ export class GrpcController {
return svc.sum2({ data });
}
@GrpcMethod('Backpressure', 'StreamLargeMessages')
@GrpcMethod('Math')
streamLargeMessages(_req: unknown, _meta: unknown) {
// Send 1000 messages of >1MB each relatively fast
// This should be enough to trigger backpressure issues

View File

@@ -7,8 +7,16 @@ service Math {
rpc SumStream(stream RequestSum) returns(stream SumResult);
rpc SumStreamPass(stream RequestSum) returns(stream SumResult);
rpc Divide (RequestDivide) returns (DivideResult);
rpc StreamLargeMessages(Empty) returns (stream BackpressureData) {}
}
message BackpressureData {
int32 id = 1;
string data = 2;
}
message Empty {}
message SumResult {
int32 result = 1;
}