test(): add missing unit tests, resolve conflicts

This commit is contained in:
Kamil Myśliwiec
2019-03-31 17:11:50 +02:00
8 changed files with 533 additions and 18 deletions

View File

@@ -1,6 +1,10 @@
import * as ProtoLoader from '@grpc/proto-loader';
import { INestApplication } from '@nestjs/common';
import { Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import { fail } from 'assert';
import { expect } from 'chai';
import * as GRPC from 'grpc';
import { join } from 'path';
import * as request from 'supertest';
import { GrpcController } from '../src/grpc/grpc.controller';
@@ -8,8 +12,9 @@ import { GrpcController } from '../src/grpc/grpc.controller';
describe('GRPC transport', () => {
let server;
let app: INestApplication;
let client: any;
beforeEach(async () => {
before(async () => {
const module = await Test.createTestingModule({
controllers: [GrpcController],
}).compile();
@@ -24,18 +29,83 @@ describe('GRPC transport', () => {
protoPath: join(__dirname, '../src/grpc/math.proto'),
},
});
// Start gRPC microservice
await app.startAllMicroservicesAsync();
await app.init();
// Load proto-buffers for test gRPC dispatch
const proto = ProtoLoader.loadSync(
join(__dirname, '../src/grpc/math.proto'),
) as any;
// Create Raw gRPC client object
const protoGRPC = GRPC.loadPackageDefinition(proto) as any;
// Create client connected to started services at standard 5000 port
client = new protoGRPC.math.Math(
'localhost:5000',
GRPC.credentials.createInsecure(),
);
});
it(`/POST`, () => {
it(`GRPC Sending and Receiving HTTP POST`, () => {
return request(server)
.post('/')
.send([1, 2, 3, 4, 5])
.expect(200, { result: 15 });
});
afterEach(async () => {
it('GRPC Sending and receiving Stream from RX handler', async () => {
const callHandler = client.SumStream();
callHandler.on('data', (msg: number) => {
expect(msg).to.eql({ result: 15 });
callHandler.cancel();
});
callHandler.on('error', (err: any) => {
// We want to fail only on real errors while Cancellation error
// is expected
if (
String(err)
.toLowerCase()
.indexOf('cancelled') === -1
) {
fail('gRPC Stream error happened, error: ' + err);
}
});
return new Promise((resolve, reject) => {
callHandler.write({ data: [1, 2, 3, 4, 5] });
setTimeout(() => resolve(), 1000);
});
});
it('GRPC Sending and receiving Stream from Call Passthrough handler', async () => {
const callHandler = client.SumStreamPass();
callHandler.on('data', (msg: number) => {
expect(msg).to.eql({ result: 15 });
callHandler.cancel();
});
callHandler.on('error', (err: any) => {
// We want to fail only on real errors while Cancellation error
// is expected
if (
String(err)
.toLowerCase()
.indexOf('cancelled') === -1
) {
fail('gRPC Stream error happened, error: ' + err);
}
});
return new Promise((resolve, reject) => {
callHandler.write({ data: [1, 2, 3, 4, 5] });
setTimeout(() => resolve(), 1000);
});
});
after(async () => {
await app.close();
client.close();
});
});

View File

@@ -1,5 +1,12 @@
import { Body, Controller, HttpCode, Post } from '@nestjs/common';
import { Client, ClientGrpc, GrpcMethod, Transport } from '@nestjs/microservices';
import {
Client,
ClientGrpc,
GrpcMethod,
GrpcStreamCall,
GrpcStreamMethod,
Transport,
} from '@nestjs/microservices';
import { join } from 'path';
import { Observable, of } from 'rxjs';
@@ -27,4 +34,27 @@ export class GrpcController {
result: data.reduce((a, b) => a + b),
});
}
@GrpcStreamMethod('Math')
async sumStream(messages: Observable<any>): Promise<any> {
return new Promise<any>((resolve, reject) => {
messages.subscribe(
msg => {
resolve({
result: msg.data.reduce((a, b) => a + b),
});
},
err => {
reject(err);
},
);
});
}
@GrpcStreamCall('Math')
async sumStreamPass(stream: any) {
stream.on('data', (msg: any) => {
stream.write({ result: msg.data.reduce((a, b) => a + b) });
});
}
}

View File

@@ -3,7 +3,9 @@ syntax = "proto3";
package math;
service Math {
rpc Sum (RequestSum) returns (SumResult) {}
rpc Sum (RequestSum) returns (SumResult);
rpc SumStream(stream RequestSum) returns(stream SumResult);
rpc SumStreamPass(stream RequestSum) returns(stream SumResult);
}
message SumResult {

View File

@@ -141,6 +141,7 @@
],
"exclude": [
"node_modules/",
"packages/**/test/**",
"packages/**/*.spec.ts",
"packages/**/adapters/*.ts",
"packages/**/nest-*.ts",

View File

@@ -2,6 +2,12 @@ import { PATTERN_HANDLER_METADATA, PATTERN_METADATA } from '../constants';
import { PatternHandler } from '../enums/pattern-handler.enum';
import { PatternMetadata } from '../interfaces/pattern-metadata.interface';
export enum GrpcMethodStreamingType {
NO_STREAMING = 'no_stream',
RX_STREAMING = 'rx_stream',
PT_STREAMING = 'pt_stream',
}
/**
* Subscribes to incoming messages which fulfils chosen pattern.
*/
@@ -39,21 +45,82 @@ export function GrpcMethod(service: string, method?: string): MethodDecorator {
};
}
/**
* Registers gRPC call through RX handler for service and method
*
* @param service String parameter reflecting the name of service definition from proto file
*/
export function GrpcStreamMethod(service?: string);
/**
* @param service String parameter reflecting the name of service definition from proto file
* @param method Optional string parameter reflecting the name of method inside of a service definition coming after rpc keyword
*/
export function GrpcStreamMethod(service: string, method?: string);
export function GrpcStreamMethod(service: string, method?: string) {
return (
target: any,
key: string | symbol,
descriptor: PropertyDescriptor,
) => {
const metadata = createMethodMetadata(
target,
key,
service,
method,
GrpcMethodStreamingType.RX_STREAMING,
);
return MessagePattern(metadata)(target, key, descriptor);
};
}
/**
* Registers gRPC call pass through handler for service and method
*
* @param service String parameter reflecting the name of service definition from proto file
*/
export function GrpcStreamCall(service?: string);
/**
* @param service String parameter reflecting the name of service definition from proto file
* @param method Optional string parameter reflecting the name of method inside of a service definition coming after rpc keyword
*/
export function GrpcStreamCall(service: string, method?: string);
export function GrpcStreamCall(service: string, method?: string) {
return (
target: any,
key: string | symbol,
descriptor: PropertyDescriptor,
) => {
const metadata = createMethodMetadata(
target,
key,
service,
method,
GrpcMethodStreamingType.PT_STREAMING,
);
return MessagePattern(metadata)(target, key, descriptor);
};
}
export function createMethodMetadata(
target: any,
key: string | symbol,
service: string | undefined,
method: string | undefined,
streaming = GrpcMethodStreamingType.NO_STREAMING,
) {
const capitalizeFirstLetter = (str: string) =>
str.charAt(0).toUpperCase() + str.slice(1);
if (!service) {
const { name } = target.constructor;
return { service: name, rpc: capitalizeFirstLetter(key as string) };
return {
service: name,
rpc: capitalizeFirstLetter(key as string),
streaming,
};
}
if (service && !method) {
return { service, rpc: capitalizeFirstLetter(key as string) };
return { service, rpc: capitalizeFirstLetter(key as string), streaming };
}
return { service, rpc: method };
return { service, rpc: method, streaming };
}

View File

@@ -1,5 +1,5 @@
import { isObject, isUndefined } from '@nestjs/common/utils/shared.utils';
import { fromEvent } from 'rxjs';
import { fromEvent, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';
import {
CANCEL_EVENT,
@@ -8,6 +8,7 @@ import {
GRPC_DEFAULT_PROTO_LOADER,
GRPC_DEFAULT_URL,
} from '../constants';
import { GrpcMethodStreamingType } from '../decorators';
import { InvalidGrpcPackageException } from '../errors/invalid-grpc-package.exception';
import { InvalidProtoDefinitionException } from '../errors/invalid-proto-definition.exception';
import { CustomTransportStrategy } from '../interfaces';
@@ -25,7 +26,9 @@ interface GrpcCall<TRequest = any, TMetadata = any> {
metadata: TMetadata;
end: Function;
write: Function;
on: Function;
}
export class ServerGrpc extends Server implements CustomTransportStrategy {
private readonly url: string;
private grpcClient: any;
@@ -94,36 +97,110 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
return services;
}
/**
* Will create service mapping from gRPC generated Object to handlers
* defined with @GrpcMethod or @GrpcStreamMethod annotations
*
* @param grpcService
* @param name
*/
public async createService(grpcService: any, name: string) {
const service = {};
// tslint:disable-next-line:forin
for (const methodName in grpcService.prototype) {
const methodHandler = this.getHandlerByPattern(
this.createPattern(name, methodName),
);
let pattern = '';
let methodHandler = null;
let streamingType = GrpcMethodStreamingType.NO_STREAMING;
const methodFunction = grpcService.prototype[methodName];
const methodReqStreaming = methodFunction.requestStream;
if (!isUndefined(methodReqStreaming) && methodReqStreaming) {
// Try first pattern to be presented, RX streaming pattern would be
// a preferable pattern to select among a few defined
pattern = this.createPattern(
name,
methodName,
GrpcMethodStreamingType.RX_STREAMING,
);
methodHandler = this.messageHandlers.get(pattern);
streamingType = GrpcMethodStreamingType.RX_STREAMING;
// If first pattern didn't match to any of handlers then try
// pass-through handler to be presented
if (!methodHandler) {
pattern = this.createPattern(
name,
methodName,
GrpcMethodStreamingType.PT_STREAMING,
);
methodHandler = this.messageHandlers.get(pattern);
streamingType = GrpcMethodStreamingType.PT_STREAMING;
}
} else {
pattern = this.createPattern(
name,
methodName,
GrpcMethodStreamingType.NO_STREAMING,
);
// Select handler if any presented for No-Streaming pattern
methodHandler = this.messageHandlers.get(pattern);
streamingType = GrpcMethodStreamingType.NO_STREAMING;
}
if (!methodHandler) {
continue;
}
service[methodName] = await this.createServiceMethod(
methodHandler,
grpcService.prototype[methodName],
streamingType,
);
}
return service;
}
public createPattern(service: string, methodName: string): string {
/**
* Will create a string of a JSON serialized format
*
* @param service name of the service which should be a match to gRPC service definition name
* @param methodName name of the method which is coming after rpc keyword
* @param streaming GrpcMethodStreamingType parameter which should correspond to
* stream keyword in gRPC service request part
*/
public createPattern(
service: string,
methodName: string,
streaming: GrpcMethodStreamingType,
): string {
return JSON.stringify({
service,
rpc: methodName,
streaming,
});
}
/**
* Will return async function which will handle gRPC call
* with Rx streams or as a direct call passthrough
*
* @param methodHandler
* @param protoNativeHandler
*/
public createServiceMethod(
methodHandler: Function,
protoNativeHandler: any,
streamType: GrpcMethodStreamingType,
): Function {
// If proto handler has request stream as "true" then we expect it to have
// streaming from the side of requester
if (protoNativeHandler.requestStream) {
// If any handlers were defined with GrpcStreamMethod annotation use RX
if (streamType === GrpcMethodStreamingType.RX_STREAMING)
return this.createStreamDuplexMethod(methodHandler);
// If any handlers were defined with GrpcStreamCall annotation
else if (streamType === GrpcMethodStreamingType.PT_STREAMING)
return this.createStreamCallMethod(methodHandler);
}
return protoNativeHandler.responseStream
? this.createStreamServiceMethod(methodHandler)
: this.createUnaryServiceMethod(methodHandler);
@@ -150,6 +227,41 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
};
}
public createStreamDuplexMethod(methodHandler: Function) {
return async (call: GrpcCall) => {
const req = new Subject<any>();
call.on('data', (m: any) => req.next(m));
call.on('error', (e: any) => {
// Check if error means that stream ended on other end
if (
String(e)
.toLowerCase()
.indexOf('cancelled') > -1
) {
call.end();
return;
}
// If another error then just pass it along
req.error(e);
});
call.on('end', () => req.complete());
const handler = methodHandler(req.asObservable());
const res = this.transformToObservable(await handler);
await res
.pipe(takeUntil(fromEvent(call as any, CANCEL_EVENT)))
.forEach(m => call.write(m));
call.end();
};
}
public createStreamCallMethod(methodHandler: Function) {
return async (call: GrpcCall) => {
methodHandler(call);
};
}
public close() {
this.grpcClient && this.grpcClient.forceShutdown();
this.grpcClient = null;

View File

@@ -2,6 +2,9 @@ import { expect } from 'chai';
import { PATTERN_METADATA } from '../../constants';
import {
GrpcMethod,
GrpcMethodStreamingType,
GrpcStreamCall,
GrpcStreamMethod,
MessagePattern,
} from '../../decorators/message-pattern.decorator';
@@ -35,6 +38,7 @@ describe('@GrpcMethod', () => {
expect(metadata).to.be.eql({
service: TestService.name,
rpc: 'Test',
streaming: GrpcMethodStreamingType.NO_STREAMING,
});
});
@@ -44,6 +48,7 @@ describe('@GrpcMethod', () => {
expect(metadata).to.be.eql({
service: 'TestService2',
rpc: 'Test2',
streaming: GrpcMethodStreamingType.NO_STREAMING,
});
});
@@ -53,6 +58,93 @@ describe('@GrpcMethod', () => {
expect(metadata).to.be.eql({
service: 'TestService2',
rpc: 'Test2',
streaming: GrpcMethodStreamingType.NO_STREAMING,
});
});
});
describe('@GrpcStreamMethod', () => {
class TestService {
@GrpcStreamMethod()
public test() {}
@GrpcStreamMethod('TestService2')
public test2() {}
@GrpcStreamMethod('TestService2', 'Test2')
public test3() {}
}
it('should derive method and service name', () => {
const svc = new TestService();
const metadata = Reflect.getMetadata(PATTERN_METADATA, svc.test);
expect(metadata).to.be.eql({
service: TestService.name,
rpc: 'Test',
streaming: GrpcMethodStreamingType.RX_STREAMING,
});
});
it('should derive method', () => {
const svc = new TestService();
const metadata = Reflect.getMetadata(PATTERN_METADATA, svc.test2);
expect(metadata).to.be.eql({
service: 'TestService2',
rpc: 'Test2',
streaming: GrpcMethodStreamingType.RX_STREAMING,
});
});
it('should override both method and service', () => {
const svc = new TestService();
const metadata = Reflect.getMetadata(PATTERN_METADATA, svc.test3);
expect(metadata).to.be.eql({
service: 'TestService2',
rpc: 'Test2',
streaming: GrpcMethodStreamingType.RX_STREAMING,
});
});
});
describe('@GrpcStreamCall', () => {
class TestService {
@GrpcStreamCall()
public test() {}
@GrpcStreamCall('TestService2')
public test2() {}
@GrpcStreamCall('TestService2', 'Test2')
public test3() {}
}
it('should derive method and service name', () => {
const svc = new TestService();
const metadata = Reflect.getMetadata(PATTERN_METADATA, svc.test);
expect(metadata).to.be.eql({
service: TestService.name,
rpc: 'Test',
streaming: GrpcMethodStreamingType.PT_STREAMING,
});
});
it('should derive method', () => {
const svc = new TestService();
const metadata = Reflect.getMetadata(PATTERN_METADATA, svc.test2);
expect(metadata).to.be.eql({
service: 'TestService2',
rpc: 'Test2',
streaming: GrpcMethodStreamingType.PT_STREAMING,
});
});
it('should override both method and service', () => {
const svc = new TestService();
const metadata = Reflect.getMetadata(PATTERN_METADATA, svc.test3);
expect(metadata).to.be.eql({
service: 'TestService2',
rpc: 'Test2',
streaming: GrpcMethodStreamingType.PT_STREAMING,
});
});
});

View File

@@ -1,4 +1,5 @@
import { Logger } from '@nestjs/common';
import { GrpcMethodStreamingType } from '@nestjs/microservices';
import { expect } from 'chai';
import { join } from 'path';
import { of } from 'rxjs';
@@ -109,9 +110,10 @@ describe('ServerGrpc', () => {
});
describe('createService', () => {
const objectToMap = obj =>
new Map(Object.keys(obj).map(key => [key, obj[key]]) as any);
it('should call "createServiceMethod"', async () => {
const objectToMap = obj =>
new Map(Object.keys(obj).map(key => [key, obj[key]]) as any);
const handlers = objectToMap({
test: null,
test2: () => ({}),
@@ -136,16 +138,93 @@ describe('ServerGrpc', () => {
);
expect(spy.calledOnce).to.be.true;
});
describe('when RX streaming', () => {
it('should call "createPattern" with proper arguments', async () => {
const handlers = objectToMap({
test2: {
requestStream: true,
},
});
const createPatternStub = sinon
.stub(server, 'createPattern')
.onFirstCall()
.returns('test2');
sinon.stub(server, 'createServiceMethod').callsFake(() => ({} as any));
(server as any).messageHandlers = handlers;
await server.createService(
{
prototype: {
test2: {
requestStream: true,
},
},
},
'name',
);
expect(
createPatternStub.calledWith(
'name',
'test2',
GrpcMethodStreamingType.RX_STREAMING,
),
).to.be.true;
});
});
describe('when pass through streaming', () => {
it('should call "createPattern" with proper arguments', async () => {
const handlers = objectToMap({
test2: {
requestStream: true,
},
});
const createPatternStub = sinon
.stub(server, 'createPattern')
.onFirstCall()
.returns('_invalid')
.onSecondCall()
.returns('test2');
sinon.stub(server, 'createServiceMethod').callsFake(() => ({} as any));
(server as any).messageHandlers = handlers;
await server.createService(
{
prototype: {
test2: {
requestStream: true,
},
},
},
'name',
);
expect(
createPatternStub.calledWith(
'name',
'test2',
GrpcMethodStreamingType.PT_STREAMING,
),
).to.be.true;
});
});
});
describe('createPattern', () => {
it('should return pattern', () => {
const service = 'test';
const method = 'method';
expect(server.createPattern(service, method)).to.be.eql(
expect(
server.createPattern(
service,
method,
GrpcMethodStreamingType.NO_STREAMING,
),
).to.be.eql(
JSON.stringify({
service,
rpc: method,
streaming: GrpcMethodStreamingType.NO_STREAMING,
}),
);
});
@@ -156,7 +235,11 @@ describe('ServerGrpc', () => {
it('should call "createStreamServiceMethod"', () => {
const cln = sinon.spy();
const spy = sinon.spy(server, 'createStreamServiceMethod');
server.createServiceMethod(cln, { responseStream: true } as any);
server.createServiceMethod(
cln,
{ responseStream: true } as any,
GrpcMethodStreamingType.NO_STREAMING,
);
expect(spy.called).to.be.true;
});
@@ -165,11 +248,43 @@ describe('ServerGrpc', () => {
it('should call "createUnaryServiceMethod"', () => {
const cln = sinon.spy();
const spy = sinon.spy(server, 'createUnaryServiceMethod');
server.createServiceMethod(cln, { responseStream: false } as any);
server.createServiceMethod(
cln,
{ responseStream: false } as any,
GrpcMethodStreamingType.NO_STREAMING,
);
expect(spy.called).to.be.true;
});
});
describe('when request is a stream', () => {
describe('when stream type is RX_STREAMING', () => {
it('should call "createStreamDuplexMethod"', () => {
const cln = sinon.spy();
const spy = sinon.spy(server, 'createStreamDuplexMethod');
server.createServiceMethod(
cln,
{ requestStream: true } as any,
GrpcMethodStreamingType.RX_STREAMING,
);
expect(spy.called).to.be.true;
});
});
describe('when stream type is PT_STREAMING', () => {
it('should call "createStreamCallMethod"', () => {
const cln = sinon.spy();
const spy = sinon.spy(server, 'createStreamCallMethod');
server.createServiceMethod(
cln,
{ requestStream: true } as any,
GrpcMethodStreamingType.PT_STREAMING,
);
expect(spy.called).to.be.true;
});
});
});
});
describe('createStreamServiceMethod', () => {
@@ -236,6 +351,32 @@ describe('ServerGrpc', () => {
});
});
describe('createStreamDuplexMethod', () => {
it('should wrap call into Subject', () => {
const handler = sinon.spy();
const fn = server.createStreamDuplexMethod(handler);
const call = {
on: (event, callback) => callback(),
end: sinon.spy(),
write: sinon.spy(),
};
fn(call as any);
expect(handler.called).to.be.true;
});
});
describe('createStreamCallMethod', () => {
it('should pass through to "methodHandler"', () => {
const handler = sinon.spy();
const fn = server.createStreamCallMethod(handler);
const args = [1, 2, 3];
fn(args as any);
expect(handler.calledWith(args)).to.be.true;
});
});
describe('close', () => {
it('should call "forceShutdown"', () => {
const grpcClient = { forceShutdown: sinon.spy() };