mirror of
https://github.com/nestjs/nest.git
synced 2026-02-21 23:11:44 +00:00
fix(microservices): support client cancel grpc request in unary stream
This commit is contained in:
@@ -216,10 +216,17 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
|
||||
|
||||
if (isRequestStream && isUpstreamSubject) {
|
||||
return new Observable(observer => {
|
||||
let isClientCanceled = false;
|
||||
const callArgs = [
|
||||
(error: unknown, data: unknown) => {
|
||||
(error: any, data: unknown) => {
|
||||
if (error) {
|
||||
return observer.error(this.serializeError(error));
|
||||
if (error.details === GRPC_CANCELLED || error.code === 1) {
|
||||
call.destroy();
|
||||
if (isClientCanceled) {
|
||||
return;
|
||||
}
|
||||
}
|
||||
return observer.error(this.serializeError(error));
|
||||
}
|
||||
observer.next(data);
|
||||
observer.complete();
|
||||
@@ -240,6 +247,10 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
|
||||
|
||||
return () => {
|
||||
upstreamSubscription.unsubscribe();
|
||||
if (!call.finished) {
|
||||
isClientCanceled = true;
|
||||
call.cancel();
|
||||
}
|
||||
};
|
||||
});
|
||||
}
|
||||
|
||||
@@ -308,11 +308,6 @@ describe('ClientGrpcProxy', () => {
|
||||
stream$ = client.createUnaryServiceMethod(obj, methodName)(upstream);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
// invoke client callback to allow resources to be cleaned up
|
||||
clientCallback(null, {});
|
||||
});
|
||||
|
||||
it('should subscribe to request upstream', () => {
|
||||
const upstreamSubscribe = sinon.spy(upstream, 'subscribe');
|
||||
stream$.subscribe({
|
||||
@@ -365,6 +360,56 @@ describe('ClientGrpcProxy', () => {
|
||||
expect(completeSpy.called).to.be.false;
|
||||
expect(callMock.cancel.called).to.be.true;
|
||||
});
|
||||
|
||||
it('should cancel call on client unsubscribe case client streaming', () => {
|
||||
const methodName = 'm';
|
||||
|
||||
const dataSpy = sinon.spy();
|
||||
const errorSpy = sinon.spy();
|
||||
const completeSpy = sinon.spy();
|
||||
const writeSpy = sinon.spy();
|
||||
|
||||
const callMock = {
|
||||
cancel: sinon.spy(),
|
||||
finished: false,
|
||||
write: writeSpy,
|
||||
};
|
||||
|
||||
let handler: (error: any, data: any) => void;
|
||||
const obj = {
|
||||
[methodName]: callback => {
|
||||
handler = callback;
|
||||
return callMock;
|
||||
},
|
||||
};
|
||||
|
||||
(obj[methodName] as any).requestStream = true;
|
||||
let upstream: Subject<unknown> = new Subject();
|
||||
let stream$: Observable<any> = client.createUnaryServiceMethod(obj, methodName)(upstream);
|
||||
|
||||
const upstreamSubscribe = sinon.spy(upstream, 'subscribe');
|
||||
stream$.subscribe({
|
||||
next: () => ({}),
|
||||
error: () => ({}),
|
||||
});
|
||||
upstream.next({ test: true });
|
||||
|
||||
const subscription = stream$.subscribe({
|
||||
next: dataSpy,
|
||||
error: errorSpy,
|
||||
complete: completeSpy,
|
||||
});
|
||||
|
||||
subscription.unsubscribe();
|
||||
handler(null, 'a');
|
||||
|
||||
expect(dataSpy.called).to.be.false;
|
||||
expect(writeSpy.called).to.be.true;
|
||||
expect(errorSpy.called).to.be.false;
|
||||
expect(completeSpy.called).to.be.false;
|
||||
expect(callMock.cancel.called).to.be.true;
|
||||
expect(upstreamSubscribe.called).to.be.true;
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
Reference in New Issue
Block a user