fix(microservices): await respond callback, return promise to await (kafka) #5701

This commit is contained in:
Kamil Myśliwiec
2020-12-10 10:53:49 +01:00
parent a9451b64fc
commit 021c32380b
2 changed files with 9 additions and 6 deletions

View File

@@ -19,6 +19,7 @@ import {
KafkaMessage,
Message,
Producer,
RecordMetadata,
} from '../external/kafka.interface';
import { KafkaLogger, KafkaParser } from '../helpers';
import {
@@ -129,7 +130,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
replyTopic: string,
replyPartition: string,
correlationId: string,
): (data: any) => any {
): (data: any) => Promise<RecordMetadata[]> {
return (data: any) =>
this.sendMessage(data, replyTopic, replyPartition, correlationId);
}
@@ -183,7 +184,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
replyTopic: string,
replyPartition: string,
correlationId: string,
): void {
): Promise<RecordMetadata[]> {
const outgoingMessage = this.serializer.serialize(message.response);
this.assignReplyPartition(replyPartition, outgoingMessage);
this.assignCorrelationIdHeader(correlationId, outgoingMessage);
@@ -197,7 +198,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
},
this.options.send || {},
);
this.producer.send(replyMessage);
return this.producer.send(replyMessage);
}
public assignIsDisposedHeader(

View File

@@ -61,14 +61,16 @@ export abstract class Server {
public send(
stream$: Observable<any>,
respond: (data: WritePacket) => void,
respond: (data: WritePacket) => unknown | Promise<unknown>,
): Subscription {
let dataBuffer: WritePacket[] = null;
const scheduleOnNextTick = (data: WritePacket) => {
if (!dataBuffer) {
dataBuffer = [data];
process.nextTick(() => {
dataBuffer.forEach(buffer => respond(buffer));
process.nextTick(async () => {
for (const item of dataBuffer) {
await respond(item);
}
dataBuffer = null;
});
} else if (!data.isDisposed) {