diff --git a/packages/microservices/client/client-grpc.ts b/packages/microservices/client/client-grpc.ts index 5c1d90de3..b951db2da 100644 --- a/packages/microservices/client/client-grpc.ts +++ b/packages/microservices/client/client-grpc.ts @@ -241,13 +241,19 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc { }); } return new Observable(observer => { - client[methodName](...args, (error: any, data: any) => { + const call = client[methodName](...args, (error: any, data: any) => { if (error) { return observer.error(this.serializeError(error)); } observer.next(data); observer.complete(); }); + + return () => { + if (!call.finished) { + call.cancel(); + } + }; }); }; } diff --git a/packages/microservices/test/client/client-grpc.spec.ts b/packages/microservices/test/client/client-grpc.spec.ts index ae041fa99..6d38e5941 100644 --- a/packages/microservices/test/client/client-grpc.spec.ts +++ b/packages/microservices/test/client/client-grpc.spec.ts @@ -257,7 +257,15 @@ describe('ClientGrpcProxy', () => { }); describe('on subscribe', () => { const methodName = 'm'; - const obj = { [methodName]: callback => callback(null, {}) }; + const obj = { + [methodName]: callback => { + callback(null, {}); + + return { + finished: true, + }; + }, + }; let stream$: Observable; @@ -317,6 +325,47 @@ describe('ClientGrpcProxy', () => { expect(upstreamSubscribe.called).to.be.true; }); }); + + describe('flow-control', () => { + it('should cancel call on client unsubscribe', () => { + const methodName = 'm'; + + const dataSpy = sinon.spy(); + const errorSpy = sinon.spy(); + const completeSpy = sinon.spy(); + + const callMock = { + cancel: sinon.spy(), + finished: false, + }; + + let handler: (error: any, data: any) => void; + + const obj = { + [methodName]: (callback, ...args) => { + handler = callback; + + return callMock; + }, + }; + + const stream$ = client.createUnaryServiceMethod(obj, methodName)(); + + const subsciption = stream$.subscribe({ + next: dataSpy, + error: errorSpy, + complete: completeSpy, + }); + + subsciption.unsubscribe(); + handler(null, 'a'); + + expect(dataSpy.called).to.be.false; + expect(errorSpy.called).to.be.false; + expect(completeSpy.called).to.be.false; + expect(callMock.cancel.called).to.be.true; + }); + }); }); describe('createClients', () => {