fix(microservices): fix grpc stream method return type

Ensure GrpcStreamMethod returns Observable directly instead of wrapping it in a Promise when called locally.

Closes #15953
This commit is contained in:
shash-hq
2025-12-14 21:49:58 +05:30
parent 8c77a1955e
commit 541ef2ffef
2 changed files with 23 additions and 9 deletions

View File

@@ -152,16 +152,18 @@ export function GrpcStreamMethod(
// Override original method to call the "drainBuffer" method on the first parameter
// This is required to avoid premature message emission
descriptor.value = async function (
this: any,
observable: any,
...args: any[]
) {
const result = await Promise.resolve(
originalMethod.apply(this, [observable, ...args]),
);
descriptor.value = function (this: any, observable: any, ...args: any[]) {
const result = originalMethod.apply(this, [observable, ...args]);
const isPromise = result && typeof result.then === 'function';
if (isPromise) {
return result.then((data: any) => {
if (observable && observable.drainBuffer) {
observable.drainBuffer();
}
return data;
});
}
// Drain buffer if "drainBuffer" method is available
if (observable && observable.drainBuffer) {
observable.drainBuffer();
}

View File

@@ -247,6 +247,18 @@ describe('@GrpcStreamMethod', () => {
streaming: GrpcMethodStreamingType.RX_STREAMING,
});
});
it('should return Observable directly (not wrapped in Promise) when called method directly', () => {
class TestService {
@GrpcStreamMethod()
test(data$: any) {
return data$;
}
}
const service = new TestService();
const result = service.test({});
expect(result).to.not.have.property('then');
});
});
describe('@GrpcStreamCall', () => {