fix(microservices): cleanup unary call on unsubscribe

This commit is contained in:
szilveszterandras
2022-09-16 19:21:38 +03:00
parent 7db5638d3f
commit 269ffa3d62
2 changed files with 57 additions and 2 deletions

View File

@@ -241,13 +241,19 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
});
}
return new Observable(observer => {
client[methodName](...args, (error: any, data: any) => {
const call = client[methodName](...args, (error: any, data: any) => {
if (error) {
return observer.error(this.serializeError(error));
}
observer.next(data);
observer.complete();
});
return () => {
if (!call.finished) {
call.cancel();
}
};
});
};
}

View File

@@ -257,7 +257,15 @@ describe('ClientGrpcProxy', () => {
});
describe('on subscribe', () => {
const methodName = 'm';
const obj = { [methodName]: callback => callback(null, {}) };
const obj = {
[methodName]: callback => {
callback(null, {});
return {
finished: true,
};
},
};
let stream$: Observable<any>;
@@ -317,6 +325,47 @@ describe('ClientGrpcProxy', () => {
expect(upstreamSubscribe.called).to.be.true;
});
});
describe('flow-control', () => {
it('should cancel call on client unsubscribe', () => {
const methodName = 'm';
const dataSpy = sinon.spy();
const errorSpy = sinon.spy();
const completeSpy = sinon.spy();
const callMock = {
cancel: sinon.spy(),
finished: false,
};
let handler: (error: any, data: any) => void;
const obj = {
[methodName]: (callback, ...args) => {
handler = callback;
return callMock;
},
};
const stream$ = client.createUnaryServiceMethod(obj, methodName)();
const subsciption = stream$.subscribe({
next: dataSpy,
error: errorSpy,
complete: completeSpy,
});
subsciption.unsubscribe();
handler(null, 'a');
expect(dataSpy.called).to.be.false;
expect(errorSpy.called).to.be.false;
expect(completeSpy.called).to.be.false;
expect(callMock.cancel.called).to.be.true;
});
});
});
describe('createClients', () => {