mirror of
https://github.com/nestjs/nest.git
synced 2026-02-21 23:11:44 +00:00
Merge branch 'feature/grpc-multipackage' of https://github.com/AlexDaSoul/nest
This commit is contained in:
@@ -21,6 +21,18 @@ export class GrpcController {
|
||||
})
|
||||
client: ClientGrpc;
|
||||
|
||||
@Client({
|
||||
transport: Transport.GRPC,
|
||||
options: {
|
||||
package: ['math', 'math2'],
|
||||
protoPath: [
|
||||
join(__dirname, 'math.proto'),
|
||||
join(__dirname, 'math2.proto'),
|
||||
],
|
||||
},
|
||||
})
|
||||
client2: ClientGrpc;
|
||||
|
||||
@Post()
|
||||
@HttpCode(200)
|
||||
call(@Body() data: number[]): Observable<number> {
|
||||
@@ -57,4 +69,18 @@ export class GrpcController {
|
||||
stream.write({ result: msg.data.reduce((a, b) => a + b) });
|
||||
});
|
||||
}
|
||||
|
||||
@GrpcMethod('Math2')
|
||||
async sum2({ data }: { data: number[] }): Promise<any> {
|
||||
return of({
|
||||
result: data.reduce((a, b) => a + b),
|
||||
});
|
||||
}
|
||||
|
||||
@Post()
|
||||
@HttpCode(200)
|
||||
call2(@Body() data: number[]): Observable<number> {
|
||||
const svc = this.client2.getService<any>('Math2');
|
||||
return svc.sum({ data });
|
||||
}
|
||||
}
|
||||
|
||||
15
integration/microservices/src/grpc/math2.proto
Normal file
15
integration/microservices/src/grpc/math2.proto
Normal file
@@ -0,0 +1,15 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package math2;
|
||||
|
||||
service Math2 {
|
||||
rpc Sum2 (RequestSum) returns (SumResult) {}
|
||||
}
|
||||
|
||||
message SumResult {
|
||||
int32 result = 1;
|
||||
}
|
||||
|
||||
message RequestSum {
|
||||
repeated int32 data = 1;
|
||||
}
|
||||
@@ -32,8 +32,8 @@ export interface GrpcOptions {
|
||||
maxSendMessageLength?: number;
|
||||
maxReceiveMessageLength?: number;
|
||||
credentials?: any;
|
||||
protoPath: string;
|
||||
package: string;
|
||||
protoPath: string | string[];
|
||||
package: string | string[];
|
||||
loader?: {
|
||||
keepCase?: boolean;
|
||||
alternateCommentMode?: boolean;
|
||||
|
||||
@@ -22,7 +22,7 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
|
||||
protected readonly logger = new Logger(ClientProxy.name);
|
||||
protected readonly clients = new Map<string, any>();
|
||||
protected readonly url: string;
|
||||
protected grpcClient: any;
|
||||
protected grpcClients: any[] = [];
|
||||
|
||||
constructor(protected readonly options: GrpcOptions['options']) {
|
||||
super();
|
||||
@@ -35,12 +35,18 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
|
||||
require('grpc'),
|
||||
);
|
||||
grpcProtoLoaderPackage = loadPackage(protoLoader, ClientGrpcProxy.name);
|
||||
this.grpcClient = this.createClient();
|
||||
this.grpcClients = this.createClients();
|
||||
}
|
||||
|
||||
public getService<T extends {}>(name: string): T {
|
||||
const grpcClient = this.createClientByServiceName(name);
|
||||
const protoMethods = Object.keys(this.grpcClient[name].prototype);
|
||||
const getClient = this.getClient(name);
|
||||
|
||||
if (!getClient) {
|
||||
throw new InvalidGrpcServiceException();
|
||||
}
|
||||
|
||||
const protoMethods = Object.keys(getClient[name].prototype);
|
||||
const grpcService = {} as T;
|
||||
|
||||
protoMethods.forEach(m => {
|
||||
@@ -55,7 +61,9 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
|
||||
}
|
||||
|
||||
public createClientByServiceName(name: string) {
|
||||
if (!this.grpcClient[name]) {
|
||||
const getClient = this.getClient(name);
|
||||
|
||||
if (!getClient) {
|
||||
throw new InvalidGrpcServiceException();
|
||||
}
|
||||
const maxSendMessageLengthKey = 'grpc.max_send_message_length';
|
||||
@@ -86,11 +94,7 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
|
||||
options.credentials || grpcPackage.credentials.createInsecure();
|
||||
|
||||
delete options.credentials;
|
||||
const grpcClient = new this.grpcClient[name](
|
||||
this.url,
|
||||
credentials,
|
||||
options,
|
||||
);
|
||||
const grpcClient = new getClient[name](this.url, credentials, options);
|
||||
this.clients.set(name, grpcClient);
|
||||
return grpcClient;
|
||||
}
|
||||
@@ -155,16 +159,28 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
|
||||
};
|
||||
}
|
||||
|
||||
public createClient(): any {
|
||||
public createClients(): any[] {
|
||||
const grpcContext = this.loadProto();
|
||||
const packageName = this.getOptionsProp(this.options, 'package');
|
||||
const grpcPkg = this.lookupPackage(grpcContext, packageName);
|
||||
if (!grpcPkg) {
|
||||
const invalidPackageError = new InvalidGrpcPackageException();
|
||||
this.logger.error(invalidPackageError.message, invalidPackageError.stack);
|
||||
throw invalidPackageError;
|
||||
const packageOpt = this.getOptionsProp(this.options, 'package');
|
||||
const grpcPkgs = [];
|
||||
const packageNames = Array.isArray(packageOpt) ? packageOpt : [packageOpt];
|
||||
|
||||
for (const packageName of packageNames) {
|
||||
const grpcPkg = this.lookupPackage(grpcContext, packageName);
|
||||
|
||||
if (!grpcPkg) {
|
||||
const invalidPackageError = new InvalidGrpcPackageException();
|
||||
this.logger.error(
|
||||
invalidPackageError.message,
|
||||
invalidPackageError.stack,
|
||||
);
|
||||
throw invalidPackageError;
|
||||
}
|
||||
|
||||
grpcPkgs.push(grpcPkg);
|
||||
}
|
||||
return grpcPkg;
|
||||
|
||||
return grpcPkgs;
|
||||
}
|
||||
|
||||
public loadProto(): any {
|
||||
@@ -197,8 +213,11 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
|
||||
}
|
||||
|
||||
public close() {
|
||||
this.grpcClient && this.grpcClient.close();
|
||||
this.grpcClient = null;
|
||||
this.grpcClients.forEach(client => {
|
||||
client.close();
|
||||
});
|
||||
|
||||
this.grpcClients = [];
|
||||
}
|
||||
|
||||
public async connect(): Promise<any> {
|
||||
@@ -214,6 +233,10 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
|
||||
);
|
||||
}
|
||||
|
||||
protected getClient(name: string): any {
|
||||
return this.grpcClients.find(client => client.hasOwnProperty(name));
|
||||
}
|
||||
|
||||
protected publish(packet: any, callback: (packet: any) => any): any {
|
||||
throw new Error(
|
||||
'Method is not supported in gRPC mode. Use ClientGrpc instead (learn more in the documentation).',
|
||||
|
||||
@@ -33,8 +33,8 @@ export interface GrpcOptions {
|
||||
maxSendMessageLength?: number;
|
||||
maxReceiveMessageLength?: number;
|
||||
credentials?: any;
|
||||
protoPath: string;
|
||||
package: string;
|
||||
protoPath: string | string[];
|
||||
package: string | string[];
|
||||
protoLoader?: string;
|
||||
loader?: {
|
||||
keepCase?: boolean;
|
||||
|
||||
@@ -61,23 +61,15 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
|
||||
|
||||
public async bindEvents() {
|
||||
const grpcContext = this.loadProto();
|
||||
const packageName = this.getOptionsProp(this.options, 'package');
|
||||
const grpcPkg = this.lookupPackage(grpcContext, packageName);
|
||||
if (!grpcPkg) {
|
||||
const invalidPackageError = new InvalidGrpcPackageException();
|
||||
this.logger.error(invalidPackageError.message, invalidPackageError.stack);
|
||||
throw invalidPackageError;
|
||||
}
|
||||
const packageOpt = this.getOptionsProp(this.options, 'package');
|
||||
|
||||
// Take all of the services defined in grpcPkg and assign them to
|
||||
// method handlers defined in Controllers
|
||||
for (const definition of this.getServiceNames(grpcPkg)) {
|
||||
this.grpcClient.addService(
|
||||
// First parameter requires exact service definition from proto
|
||||
definition.service.service,
|
||||
// Here full proto definition required along with namespaced pattern name
|
||||
await this.createService(definition.service, definition.name),
|
||||
);
|
||||
// if packages more then 1
|
||||
const packageNames = Array.isArray(packageOpt) ? packageOpt : [packageOpt];
|
||||
|
||||
for (const packageName of packageNames) {
|
||||
const grpcPkg = this.lookupPackage(grpcContext, packageName);
|
||||
|
||||
await this.createServices(grpcPkg);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -398,4 +390,23 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
|
||||
// Otherwise add next through dot syntax
|
||||
return name + '.' + key;
|
||||
}
|
||||
|
||||
private async createServices(grpcPkg: any) {
|
||||
if (!grpcPkg) {
|
||||
const invalidPackageError = new InvalidGrpcPackageException();
|
||||
this.logger.error(invalidPackageError.message, invalidPackageError.stack);
|
||||
throw invalidPackageError;
|
||||
}
|
||||
|
||||
// Take all of the services defined in grpcPkg and assign them to
|
||||
// method handlers defined in Controllers
|
||||
for (const definition of this.getServiceNames(grpcPkg)) {
|
||||
this.grpcClient.addService(
|
||||
// First parameter requires exact service definition from proto
|
||||
definition.service.service,
|
||||
// Here full proto definition required along with namespaced pattern name
|
||||
await this.createService(definition.service, definition.name),
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -16,16 +16,26 @@ class NoopLogger extends Logger {
|
||||
|
||||
class GrpcService {
|
||||
test = null;
|
||||
test2 = null;
|
||||
}
|
||||
|
||||
describe('ClientGrpcProxy', () => {
|
||||
let client: ClientGrpcProxy;
|
||||
let clientMulti: ClientGrpcProxy;
|
||||
|
||||
beforeEach(() => {
|
||||
client = new ClientGrpcProxy({
|
||||
protoPath: join(__dirname, './test.proto'),
|
||||
package: 'test',
|
||||
});
|
||||
|
||||
clientMulti = new ClientGrpcProxy({
|
||||
protoPath: ['test.proto', 'test2.proto'],
|
||||
package: ['test', 'test2'],
|
||||
loader: {
|
||||
includeDirs: [join(__dirname, '.')],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
describe('getService', () => {
|
||||
@@ -36,16 +46,43 @@ describe('ClientGrpcProxy', () => {
|
||||
InvalidGrpcServiceException,
|
||||
);
|
||||
});
|
||||
|
||||
it('should throw "InvalidGrpcServiceException" (multiple proto)', () => {
|
||||
(clientMulti as any).grpcClient = {};
|
||||
|
||||
expect(() => clientMulti.getService('test')).to.throw(
|
||||
InvalidGrpcServiceException,
|
||||
);
|
||||
|
||||
expect(() => clientMulti.getService('test2')).to.throw(
|
||||
InvalidGrpcServiceException,
|
||||
);
|
||||
});
|
||||
});
|
||||
describe('when "grpcClient[name]" is not nil', () => {
|
||||
it('should create grpcService', () => {
|
||||
(client as any).grpcClient = {
|
||||
(client as any).grpcClients[0] = {
|
||||
test: GrpcService,
|
||||
};
|
||||
expect(() => client.getService('test')).to.not.throw(
|
||||
InvalidGrpcServiceException,
|
||||
);
|
||||
});
|
||||
|
||||
describe('when "grpcClient[name]" is not nil (multiple proto)', () => {
|
||||
it('should create grpcService', () => {
|
||||
(clientMulti as any).grpcClients[0] = {
|
||||
test: GrpcService,
|
||||
test2: GrpcService,
|
||||
};
|
||||
expect(() => clientMulti.getService('test')).to.not.throw(
|
||||
InvalidGrpcServiceException,
|
||||
);
|
||||
expect(() => clientMulti.getService('test2')).to.not.throw(
|
||||
InvalidGrpcServiceException,
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
@@ -194,14 +231,14 @@ describe('ClientGrpcProxy', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('createClient', () => {
|
||||
describe('createClients', () => {
|
||||
describe('when package does not exist', () => {
|
||||
it('should throw "InvalidGrpcPackageException"', () => {
|
||||
sinon.stub(client, 'lookupPackage').callsFake(() => null);
|
||||
(client as any).logger = new NoopLogger();
|
||||
|
||||
try {
|
||||
client.createClient();
|
||||
client.createClients();
|
||||
} catch (err) {
|
||||
expect(err).to.be.instanceof(InvalidGrpcPackageException);
|
||||
}
|
||||
@@ -225,7 +262,7 @@ describe('ClientGrpcProxy', () => {
|
||||
describe('close', () => {
|
||||
it('should call "close" method', () => {
|
||||
const grpcClient = { close: sinon.spy() };
|
||||
(client as any).grpcClient = grpcClient;
|
||||
(client as any).grpcClients[0] = grpcClient;
|
||||
|
||||
client.close();
|
||||
expect(grpcClient.close.called).to.be.true;
|
||||
|
||||
7
packages/microservices/test/client/test2.proto
Normal file
7
packages/microservices/test/client/test2.proto
Normal file
@@ -0,0 +1,7 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package test2;
|
||||
|
||||
service TestService {
|
||||
|
||||
}
|
||||
@@ -6,6 +6,7 @@ import { of } from 'rxjs';
|
||||
import * as sinon from 'sinon';
|
||||
import { InvalidGrpcPackageException } from '../../errors/invalid-grpc-package.exception';
|
||||
import { ServerGrpc } from '../../server/server-grpc';
|
||||
import { ClientGrpcProxy } from '../../client';
|
||||
|
||||
class NoopLogger extends Logger {
|
||||
log(message: any, context?: string): void {}
|
||||
@@ -15,11 +16,21 @@ class NoopLogger extends Logger {
|
||||
|
||||
describe('ServerGrpc', () => {
|
||||
let server: ServerGrpc;
|
||||
let serverMulti: ServerGrpc;
|
||||
|
||||
beforeEach(() => {
|
||||
server = new ServerGrpc({
|
||||
protoPath: join(__dirname, './test.proto'),
|
||||
package: 'test',
|
||||
} as any);
|
||||
|
||||
serverMulti = new ServerGrpc({
|
||||
protoPath: ['test.proto', 'test2.proto'],
|
||||
package: ['test', 'test2'],
|
||||
loader: {
|
||||
includeDirs: [join(__dirname, '.')],
|
||||
},
|
||||
});
|
||||
});
|
||||
|
||||
describe('listen', () => {
|
||||
@@ -50,6 +61,34 @@ describe('ServerGrpc', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('listen (multiple proto)', () => {
|
||||
let callback: sinon.SinonSpy;
|
||||
let bindEventsStub: sinon.SinonStub;
|
||||
|
||||
beforeEach(() => {
|
||||
callback = sinon.spy();
|
||||
bindEventsStub = sinon
|
||||
.stub(serverMulti, 'bindEvents')
|
||||
.callsFake(() => ({} as any));
|
||||
});
|
||||
|
||||
it('should call "bindEvents"', async () => {
|
||||
await serverMulti.listen(callback);
|
||||
expect(bindEventsStub.called).to.be.true;
|
||||
});
|
||||
it('should call "client.start"', async () => {
|
||||
const client = { start: sinon.spy() };
|
||||
sinon.stub(serverMulti, 'createClient').callsFake(() => client);
|
||||
|
||||
await serverMulti.listen(callback);
|
||||
expect(client.start.called).to.be.true;
|
||||
});
|
||||
it('should call callback', async () => {
|
||||
await serverMulti.listen(callback);
|
||||
expect(callback.called).to.be.true;
|
||||
});
|
||||
});
|
||||
|
||||
describe('bindEvents', () => {
|
||||
beforeEach(() => {
|
||||
sinon.stub(server, 'loadProto').callsFake(() => ({}));
|
||||
@@ -90,6 +129,45 @@ describe('ServerGrpc', () => {
|
||||
});
|
||||
});
|
||||
|
||||
describe('bindEvents (multiple proto)', () => {
|
||||
beforeEach(() => {
|
||||
sinon.stub(serverMulti, 'loadProto').callsFake(() => ({}));
|
||||
});
|
||||
describe('when package does not exist', () => {
|
||||
it('should throw "InvalidGrpcPackageException"', async () => {
|
||||
sinon.stub(serverMulti, 'lookupPackage').callsFake(() => null);
|
||||
(serverMulti as any).logger = new NoopLogger();
|
||||
try {
|
||||
await serverMulti.bindEvents();
|
||||
} catch (err) {
|
||||
expect(err).to.be.instanceOf(InvalidGrpcPackageException);
|
||||
}
|
||||
});
|
||||
});
|
||||
describe('when package exist', () => {
|
||||
it('should call "addService"', async () => {
|
||||
const serviceNames = [
|
||||
{
|
||||
name: 'test',
|
||||
service: true,
|
||||
},
|
||||
];
|
||||
sinon.stub(serverMulti, 'lookupPackage').callsFake(() => ({
|
||||
test: { service: true },
|
||||
}));
|
||||
sinon
|
||||
.stub(serverMulti, 'getServiceNames')
|
||||
.callsFake(() => serviceNames);
|
||||
|
||||
(serverMulti as any).grpcClient = { addService: sinon.spy() };
|
||||
|
||||
await serverMulti.bindEvents();
|
||||
expect((serverMulti as any).grpcClient.addService.calledTwice).to.be
|
||||
.true;
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('getServiceNames', () => {
|
||||
it('should return filtered object keys', () => {
|
||||
const obj = {
|
||||
|
||||
3
packages/microservices/test/server/test2.proto
Normal file
3
packages/microservices/test/server/test2.proto
Normal file
@@ -0,0 +1,3 @@
|
||||
syntax = "proto3";
|
||||
|
||||
package test2;
|
||||
@@ -4,7 +4,7 @@ import { join } from 'path';
|
||||
export const grpcClientOptions: ClientOptions = {
|
||||
transport: Transport.GRPC,
|
||||
options: {
|
||||
package: 'hero',
|
||||
protoPath: join(__dirname, './hero/hero.proto'),
|
||||
package: 'hero', // ['hero', 'hero2']
|
||||
protoPath: join(__dirname, './hero/hero.proto'), // ['./hero/hero.proto', './hero/hero2.proto']
|
||||
},
|
||||
};
|
||||
|
||||
Reference in New Issue
Block a user