mirror of
https://github.com/nestjs/nest.git
synced 2026-02-21 23:11:44 +00:00
fix(microservices): cleanup upstream subscription in client-side gRPC stream calls
This commit is contained in:
@@ -232,11 +232,17 @@ export class ClientGrpcProxy extends ClientProxy implements ClientGrpc {
|
||||
callArgs.unshift(maybeMetadata);
|
||||
}
|
||||
const call = client[methodName](...callArgs);
|
||||
|
||||
const upstreamSubscription: Subscription =
|
||||
upstreamSubjectOrData.subscribe(
|
||||
(val: unknown) => call.write(val),
|
||||
(err: unknown) => call.emit('error', err),
|
||||
() => call.end(),
|
||||
);
|
||||
|
||||
return () => {
|
||||
upstreamSubscription.unsubscribe();
|
||||
};
|
||||
});
|
||||
}
|
||||
return new Observable(observer => {
|
||||
|
||||
@@ -267,11 +267,12 @@ describe('ClientGrpcProxy', () => {
|
||||
});
|
||||
});
|
||||
describe('when stream request', () => {
|
||||
let clientCallback: (err: Error | null | undefined, response: any) => void;
|
||||
const writeSpy = sinon.spy();
|
||||
const methodName = 'm';
|
||||
const obj = {
|
||||
[methodName]: callback => {
|
||||
callback(null, {});
|
||||
clientCallback = callback;
|
||||
return {
|
||||
write: writeSpy,
|
||||
};
|
||||
@@ -287,6 +288,11 @@ describe('ClientGrpcProxy', () => {
|
||||
stream$ = client.createUnaryServiceMethod(obj, methodName)(upstream);
|
||||
});
|
||||
|
||||
afterEach(() => {
|
||||
// invoke client callback to allow resources to be cleaned up
|
||||
clientCallback(null, {});
|
||||
})
|
||||
|
||||
it('should subscribe to request upstream', () => {
|
||||
const upstreamSubscribe = sinon.spy(upstream, 'subscribe');
|
||||
stream$.subscribe(
|
||||
|
||||
Reference in New Issue
Block a user