feat(microservices): add multiple grpc packages and proto files to config

before:

grpcOptions: <GrpcOptions>{
    transport: Transport.GRPC,
    options: {
        url: 'url',
        package: 'name.package',
        protoPath: 'alone.proto'
    }
}
after:

grpcOptions: <GrpcOptions>{
    transport: Transport.GRPC,
    options: {
        url: 'url',
        package: ['name.package', 'other.package'],
        protoPath: [
        'one.proto',
        'two.proto',
        'any.proto'
        ]
    }
}
This commit is contained in:
Alex Dukhnovskiy
2019-10-03 20:22:19 +03:00
parent fc7b44c474
commit 9e2d606382
12 changed files with 694 additions and 29 deletions

View File

@@ -14,6 +14,15 @@ export class GrpcController {
})
client: ClientGrpc;
@Client({
transport: Transport.GRPC,
options: {
package: ['math', 'math2'],
protoPath: [join(__dirname, 'math.proto'), join(__dirname, 'math2.proto')],
},
})
client2: ClientGrpc;
@Post()
@HttpCode(200)
call(@Body() data: number[]): Observable<number> {
@@ -21,10 +30,24 @@ export class GrpcController {
return svc.sum({ data });
}
@Post()
@HttpCode(200)
call2(@Body() data: number[]): Observable<number> {
const svc = this.client2.getService<any>('Math2');
return svc.sum({ data });
}
@GrpcMethod('Math')
async sum({ data }: { data: number[] }): Promise<any> {
return of({
result: data.reduce((a, b) => a + b),
});
}
@GrpcMethod('Math2')
async sum2({ data }: { data: number[] }): Promise<any> {
return of({
result: data.reduce((a, b) => a + b),
});
}
}

View File

@@ -12,4 +12,4 @@ message SumResult {
message RequestSum {
repeated int32 data = 1;
}
}

View File

@@ -0,0 +1,15 @@
syntax = "proto3";
package math2;
service Math2 {
rpc Sum2 (RequestSum) returns (SumResult) {}
}
message SumResult {
int32 result = 1;
}
message RequestSum {
repeated int32 data = 1;
}

View File

@@ -119,17 +119,23 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
public createClient(): any {
const grpcContext = this.loadProto();
const packageName = this.getOptionsProp<GrpcOptions>(
const packageOpt = this.getOptionsProp<GrpcOptions>(
this.options,
'package',
);
const grpcPkg = this.lookupPackage(grpcContext, packageName);
if (!grpcPkg) {
const invalidPackageError = new InvalidGrpcPackageException();
this.logger.error(invalidPackageError.message, invalidPackageError.stack);
throw invalidPackageError;
const packageNames = Array.isArray(packageOpt) ? packageOpt : [packageOpt];
for (const packageName of packageNames) {
const grpcPkg = this.lookupPackage(grpcContext, packageName);
if (!grpcPkg) {
const invalidPackageError = new InvalidGrpcPackageException();
this.logger.error(invalidPackageError.message, invalidPackageError.stack);
throw invalidPackageError;
}
return grpcPkg;
}
return grpcPkg;
}
public loadProto(): any {

View File

@@ -24,8 +24,8 @@ export interface GrpcOptions {
maxSendMessageLength?: number;
maxReceiveMessageLength?: number;
credentials?: any;
protoPath: string;
package: string;
protoPath: string | string[];
package: string | string[];
protoLoader?: string;
loader?: {
keepCase?: boolean;

View File

@@ -62,28 +62,14 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
this.options,
'package',
);
// if packages more then 1
const packageNames = Array.isArray(packageOpt) ? packageOpt : [packageOpt];
for (const packageName of packageNames) {
const grpcPkg = this.lookupPackage(grpcContext, packageName);
if (!grpcPkg) {
const invalidPackageError = new InvalidGrpcPackageException();
this.logger.error(invalidPackageError.message, invalidPackageError.stack);
throw invalidPackageError;
}
// Take all of the services defined in grpcPkg and assign them to
// method handlers defined in Controllers
for (const definition of this.getServiceNames(grpcPkg)) {
this.grpcClient.addService(
// First parameter requires exact service definition from proto
definition.service.service,
// Here full proto definition required along with namespaced pattern name
await this.createService(definition.service, definition.name),
);
}
await this.createServices(grpcPkg);
}
}
@@ -276,4 +262,23 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
// Otherwise add next through dot syntax
return name + '.' + key;
}
private async createServices(grpcPkg: any) {
if (!grpcPkg) {
const invalidPackageError = new InvalidGrpcPackageException();
this.logger.error(invalidPackageError.message, invalidPackageError.stack);
throw invalidPackageError;
}
// Take all of the services defined in grpcPkg and assign them to
// method handlers defined in Controllers
for (const definition of this.getServiceNames(grpcPkg)) {
this.grpcClient.addService(
// First parameter requires exact service definition from proto
definition.service.service,
// Here full proto definition required along with namespaced pattern name
await this.createService(definition.service, definition.name),
);
}
}
}

View File

@@ -0,0 +1,286 @@
import { Logger } from '@nestjs/common';
import { expect } from 'chai';
import { join } from 'path';
import { Observable } from 'rxjs';
import * as sinon from 'sinon';
import { ClientGrpcProxy } from '../../client/client-grpc';
import { InvalidGrpcPackageException } from '../../errors/invalid-grpc-package.exception';
import { InvalidGrpcServiceException } from '../../errors/invalid-grpc-service.exception';
import { InvalidProtoDefinitionException } from '../../errors/invalid-proto-definition.exception';
// tslint:disable:no-string-literal
class NoopLogger extends Logger {
log(message: any, context?: string): void {}
error(message: any, trace?: string, context?: string): void {}
warn(message: any, context?: string): void {}
}
class GrpcService {
test = null;
test2 = null;
}
describe('ClientGrpcProxy', () => {
let client: ClientGrpcProxy;
beforeEach(() => {
client = new ClientGrpcProxy({
protoPath: [join(__dirname, './test.proto'), join(__dirname, './test2.proto')],
package: ['test', 'test2'],
});
});
describe('getService', () => {
describe('when "grpcClient[name]" is nil', () => {
it('should throw "InvalidGrpcServiceException"', () => {
(client as any).grpcClient = {};
expect(() => client.getService('test')).to.throw(
InvalidGrpcServiceException,
);
expect(() => client.getService('test2')).to.throw(
InvalidGrpcServiceException,
);
});
});
describe('when "grpcClient[name]" is not nil', () => {
it('should create grpcService', () => {
(client as any).grpcClient = {
test: GrpcService,
test2: GrpcService,
};
expect(() => client.getService('test')).to.not.throw(
InvalidGrpcServiceException,
);
expect(() => client.getService('test2')).to.not.throw(
InvalidGrpcServiceException,
);
});
});
});
describe('createServiceMethod', () => {
const methodName = 'test';
const methodName2 = 'test2';
describe('when method is a response stream', () => {
it('should call "createStreamServiceMethod" (test)', () => {
const cln = { [methodName]: { responseStream: true } };
const spy = sinon.spy(client, 'createStreamServiceMethod');
client.createServiceMethod(cln, methodName);
expect(spy.called).to.be.true;
});
it('should call "createStreamServiceMethod" (test2)', () => {
const cln = { [methodName2]: { responseStream: true } };
const spy = sinon.spy(client, 'createStreamServiceMethod');
client.createServiceMethod(cln, methodName2);
expect(spy.called).to.be.true;
});
});
describe('when method is not a response stream', () => {
it('should call "createUnaryServiceMethod" (test)', () => {
const cln = { [methodName]: { responseStream: false } };
const spy = sinon.spy(client, 'createUnaryServiceMethod');
client.createServiceMethod(cln, methodName);
expect(spy.called).to.be.true;
});
it('should call "createUnaryServiceMethod" (test2)', () => {
const cln = { [methodName2]: { responseStream: false } };
const spy = sinon.spy(client, 'createUnaryServiceMethod');
client.createServiceMethod(cln, methodName2);
expect(spy.called).to.be.true;
});
});
});
describe('createStreamServiceMethod', () => {
it('should return observable', () => {
const fn = client.createStreamServiceMethod({}, 'method');
expect(fn()).to.be.instanceof(Observable);
});
describe('on subscribe', () => {
const methodName = 'm';
const obj = { [methodName]: () => ({ on: (type, fn) => fn() }) };
let stream$: Observable<any>;
beforeEach(() => {
stream$ = client.createStreamServiceMethod(obj, methodName)();
});
it('should call native method', () => {
const spy = sinon.spy(obj, methodName);
stream$.subscribe(() => ({}), () => ({}));
expect(spy.called).to.be.true;
});
});
describe('flow-control', () => {
const methodName = 'm';
type EvtCallback = (...args: any[]) => void;
let callMock: {
on: (type: string, fn: EvtCallback) => void;
cancel: sinon.SinonSpy;
finished: boolean;
destroy: sinon.SinonSpy;
removeAllListeners: sinon.SinonSpy;
};
let eventCallbacks: { [type: string]: EvtCallback };
let obj, dataSpy, errorSpy, completeSpy;
let stream$: Observable<any>;
beforeEach(() => {
dataSpy = sinon.spy();
errorSpy = sinon.spy();
completeSpy = sinon.spy();
eventCallbacks = {};
callMock = {
on: (type, fn) => (eventCallbacks[type] = fn),
cancel: sinon.spy(),
finished: false,
destroy: sinon.spy(),
removeAllListeners: sinon.spy(),
};
obj = { [methodName]: () => callMock };
stream$ = client.createStreamServiceMethod(obj, methodName)();
});
it('propagates server errors', () => {
const err = new Error('something happened');
stream$.subscribe(dataSpy, errorSpy, completeSpy);
eventCallbacks.data('a');
eventCallbacks.data('b');
callMock.finished = true;
eventCallbacks.error(err);
eventCallbacks.data('c');
expect(Object.keys(eventCallbacks).length).to.eq(3);
expect(dataSpy.args).to.eql([['a'], ['b']]);
expect(errorSpy.args[0][0]).to.eql(err);
expect(completeSpy.called).to.be.false;
expect(callMock.cancel.called).to.be.false;
});
it('handles client side cancel', () => {
const grpcServerCancelErrMock = {
details: 'Cancelled',
};
const subscription = stream$.subscribe(dataSpy, errorSpy);
eventCallbacks.data('a');
eventCallbacks.data('b');
subscription.unsubscribe();
eventCallbacks.error(grpcServerCancelErrMock);
eventCallbacks.end();
eventCallbacks.data('c');
expect(callMock.cancel.called, 'should call call.cancel()').to.be.true;
expect(callMock.destroy.called, 'should call call.destroy()').to.be
.true;
expect(dataSpy.args).to.eql([['a'], ['b']]);
expect(errorSpy.called, 'should not error if client canceled').to.be
.false;
});
});
});
describe('createUnaryServiceMethod', () => {
it('should return observable', () => {
const fn = client.createUnaryServiceMethod({}, 'method');
expect(fn()).to.be.instanceof(Observable);
});
describe('on subscribe', () => {
const methodName = 'm';
const obj = { [methodName]: callback => callback(null, {}) };
let stream$: Observable<any>;
beforeEach(() => {
stream$ = client.createUnaryServiceMethod(obj, methodName)();
});
it('should call native method', () => {
const spy = sinon.spy(obj, methodName);
stream$.subscribe(() => ({}), () => ({}));
expect(spy.called).to.be.true;
});
});
});
describe('createClient', () => {
describe('when package does not exist', () => {
it('should throw "InvalidGrpcPackageException"', () => {
sinon.stub(client, 'lookupPackage').callsFake(() => null);
(client as any).logger = new NoopLogger();
try {
client.createClient();
} catch (err) {
expect(err).to.be.instanceof(InvalidGrpcPackageException);
}
});
});
});
describe('loadProto', () => {
describe('when proto is invalid', () => {
it('should throw InvalidProtoDefinitionException', () => {
sinon.stub(client, 'getOptionsProp' as any).callsFake(() => {
throw new Error();
});
(client as any).logger = new NoopLogger();
expect(() => client.loadProto()).to.throws(
InvalidProtoDefinitionException,
);
});
});
});
describe('close', () => {
it('should call "close" method', () => {
const grpcClient = { close: sinon.spy() };
(client as any).grpcClient = grpcClient;
client.close();
expect(grpcClient.close.called).to.be.true;
});
});
describe('publish', () => {
it('should throw exception', () => {
expect(() => client['publish'](null, null)).to.throws(Error);
});
});
describe('send', () => {
it('should throw exception', () => {
expect(() => client.send(null, null)).to.throws(Error);
});
});
describe('connect', () => {
it('should throw exception', () => {
client.connect().catch(error => expect(error).to.be.instanceof(Error));
});
});
describe('dispatchEvent', () => {
it('should throw exception', () => {
client['dispatchEvent'](null).catch(error =>
expect(error).to.be.instanceof(Error),
);
});
});
});

View File

@@ -0,0 +1,7 @@
syntax = "proto3";
package test2;
service TestService {
}

View File

@@ -0,0 +1,319 @@
import { Logger } from '@nestjs/common';
import { expect } from 'chai';
import { join } from 'path';
import { of } from 'rxjs';
import * as sinon from 'sinon';
import { InvalidGrpcPackageException } from '../../errors/invalid-grpc-package.exception';
import { ServerGrpc } from '../../server/server-grpc';
import { ClientGrpcProxy } from '../../client';
class NoopLogger extends Logger {
log(message: any, context?: string): void {}
error(message: any, trace?: string, context?: string): void {}
warn(message: any, context?: string): void {}
}
describe('ServerGrpc', () => {
let server: ServerGrpc;
beforeEach(() => {
server = new ServerGrpc({
protoPath: [join(__dirname, './test.proto'), join(__dirname, './test2.proto')],
package: ['test', 'test2'],
} as any);
});
describe('listen', () => {
let callback: sinon.SinonSpy;
let bindEventsStub: sinon.SinonStub;
beforeEach(() => {
callback = sinon.spy();
bindEventsStub = sinon
.stub(server, 'bindEvents')
.callsFake(() => ({} as any));
});
it('should call "bindEvents"', async () => {
await server.listen(callback);
expect(bindEventsStub.called).to.be.true;
});
it('should call "client.start"', async () => {
const client = { start: sinon.spy() };
sinon.stub(server, 'createClient').callsFake(() => client);
await server.listen(callback);
expect(client.start.called).to.be.true;
});
it('should call callback', async () => {
await server.listen(callback);
expect(callback.called).to.be.true;
});
});
describe('bindEvents', () => {
describe('when package does not exist', () => {
it('should throw "InvalidGrpcPackageException"', async () => {
sinon.stub(server, 'lookupPackage').callsFake(() => null);
(server as any).logger = new NoopLogger();
try {
await server.bindEvents();
} catch (err) {
expect(err).to.be.instanceOf(InvalidGrpcPackageException);
}
});
});
describe('when package exist', () => {
it('should call "addService"', async () => {
const serviceNames = [
{
name: 'test-multi',
service: true,
},
];
sinon.stub(server, 'lookupPackage').callsFake(() => ({
'test-multi': { service: true },
}));
sinon.stub(server, 'getServiceNames').callsFake(() => serviceNames);
(server as any).grpcClient = { addService: sinon.spy() };
await server.bindEvents();
expect((server as any).grpcClient.addService.calledTwice).to.be.true;
});
});
});
describe('getServiceNames', () => {
it('should return filtered object keys', () => {
const obj = {
key: { service: true },
key2: { service: true },
key3: { service: false },
};
const expected = [
{
name: 'key',
service: { service: true },
},
{
name: 'key2',
service: { service: true },
},
];
expect(server.getServiceNames(obj)).to.be.eql(expected);
});
});
describe('createService', () => {
it('should call "createServiceMethod"', async () => {
const objectToMap = obj =>
new Map(Object.keys(obj).map(key => [key, obj[key]]) as any);
const handlers = objectToMap({
test: null,
test2: () => ({}),
});
sinon
.stub(server, 'createPattern')
.onFirstCall()
.returns('test')
.onSecondCall()
.returns('test2');
const spy = sinon
.stub(server, 'createServiceMethod')
.callsFake(() => ({} as any));
(server as any).messageHandlers = handlers;
await server.createService(
{
prototype: { test: true, test2: true },
},
'name',
);
expect(spy.calledOnce).to.be.true;
});
});
describe('createPattern', () => {
it('should return pattern', () => {
const service = 'test';
const service2 = 'test2';
const method = 'method';
expect(server.createPattern(service, method)).to.be.eql(
JSON.stringify({
service,
rpc: method,
}),
);
expect(server.createPattern(service2, method)).to.be.eql(
JSON.stringify({
service: service2,
rpc: method,
}),
);
});
});
describe('createServiceMethod', () => {
describe('when method is a response stream', () => {
it('should call "createStreamServiceMethod"', () => {
const cln = sinon.spy();
const spy = sinon.spy(server, 'createStreamServiceMethod');
server.createServiceMethod(cln, { responseStream: true } as any);
expect(spy.called).to.be.true;
});
});
describe('when method is not a response stream', () => {
it('should call "createUnaryServiceMethod"', () => {
const cln = sinon.spy();
const spy = sinon.spy(server, 'createUnaryServiceMethod');
server.createServiceMethod(cln, { responseStream: false } as any);
expect(spy.called).to.be.true;
});
});
});
describe('createStreamServiceMethod', () => {
it('should return function', () => {
const fn = server.createStreamServiceMethod(sinon.spy());
expect(fn).to.be.a('function');
});
describe('on call', () => {
it('should call native method', async () => {
const call = {
write: sinon.spy(),
end: sinon.spy(),
addListener: sinon.spy(),
removeListener: sinon.spy(),
};
const callback = sinon.spy();
const native = sinon.spy();
await server.createStreamServiceMethod(native)(call, callback);
expect(native.called).to.be.true;
expect(call.addListener.calledWith('cancelled')).to.be.true;
expect(call.removeListener.calledWith('cancelled')).to.be.true;
});
it(`should close the result observable when receiving an 'cancelled' event from the client`, async () => {
let cancelCb: () => void;
const call = {
write: sinon
.stub()
.onSecondCall()
.callsFake(() => cancelCb()),
end: sinon.spy(),
addListener: (name, cb) => (cancelCb = cb),
removeListener: sinon.spy(),
};
const result$ = of(1, 2, 3);
const callback = sinon.spy();
const native = sinon
.stub()
.returns(new Promise((resolve, reject) => resolve(result$)));
await server.createStreamServiceMethod(native)(call, callback);
expect(call.write.calledTwice).to.be.true;
expect(call.end.called).to.be.true;
});
});
});
describe('createUnaryServiceMethod', () => {
it('should return observable', () => {
const fn = server.createUnaryServiceMethod(sinon.spy());
expect(fn).to.be.a('function');
});
describe('on call', () => {
it('should call native & callback methods', async () => {
const call = { write: sinon.spy(), end: sinon.spy() };
const callback = sinon.spy();
const native = sinon.spy();
await server.createUnaryServiceMethod(native)(call, callback);
expect(native.called).to.be.true;
expect(callback.called).to.be.true;
});
});
});
describe('close', () => {
it('should call "forceShutdown"', () => {
const grpcClient = { forceShutdown: sinon.spy() };
(server as any).grpcClient = grpcClient;
server.close();
expect(grpcClient.forceShutdown.called).to.be.true;
});
});
describe('deserialize', () => {
it(`should return parsed json`, () => {
const obj = { test: 'test' };
expect(server.deserialize(obj)).to.deep.equal(
JSON.parse(JSON.stringify(obj)),
);
});
it(`should not parse argument if it is not an object`, () => {
const content = 'test';
expect(server.deserialize(content)).to.equal(content);
});
});
describe('proto interfaces parser should account for package namespaces', () => {
it('should parse multi-level proto package tree"', () => {
const grpcPkg = {
A: {
C: {
E: {
service: {
serviceName: {},
},
},
},
},
B: {
D: {
service: {
serviceName: {},
},
},
},
};
const svcs = server.getServiceNames(grpcPkg);
expect(svcs.length).to.be.equal(
2,
'Amount of services collected from namespace should be equal 2',
);
expect(svcs[0].name).to.be.equal('A.C.E');
expect(svcs[1].name).to.be.equal('B.D');
});
it('should parse single level proto package tree"', () => {
const grpcPkg = {
A: {
service: {
serviceName: {},
},
},
B: {
service: {
serviceName: {},
},
},
};
const services = server.getServiceNames(grpcPkg);
expect(services.length).to.be.equal(
2,
'Amount of services collected from namespace should be equal 2',
);
expect(services[0].name).to.be.equal('A');
expect(services[1].name).to.be.equal('B');
});
});
});

View File

@@ -1,3 +1,3 @@
syntax = "proto3";
package test;
package test;

View File

@@ -0,0 +1,4 @@
syntax = "proto3";
package test2;

View File

@@ -4,7 +4,7 @@ import { join } from 'path';
export const grpcClientOptions: ClientOptions = {
transport: Transport.GRPC,
options: {
package: 'hero',
protoPath: join(__dirname, './hero/hero.proto'),
package: 'hero', // ['hero', 'hero2']
protoPath: join(__dirname, './hero/hero.proto'), // ['./hero/hero.proto', './hero/hero2.proto']
},
};