mirror of
https://github.com/nestjs/nest.git
synced 2026-02-21 23:11:44 +00:00
feat(microservices): Add producer reference to KafkaContext
This commit is contained in:
@@ -1,4 +1,4 @@
|
||||
import { Consumer, KafkaMessage } from '../external/kafka.interface';
|
||||
import { Consumer, KafkaMessage, Producer } from '../external/kafka.interface';
|
||||
import { BaseRpcContext } from './base-rpc.context';
|
||||
|
||||
type KafkaContextArgs = [
|
||||
@@ -7,6 +7,7 @@ type KafkaContextArgs = [
|
||||
topic: string,
|
||||
consumer: Consumer,
|
||||
heartbeat: () => Promise<void>,
|
||||
producer: Producer,
|
||||
];
|
||||
|
||||
export class KafkaContext extends BaseRpcContext<KafkaContextArgs> {
|
||||
@@ -48,4 +49,11 @@ export class KafkaContext extends BaseRpcContext<KafkaContextArgs> {
|
||||
getHeartbeat() {
|
||||
return this.args[4];
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the Kafka producer reference,
|
||||
*/
|
||||
getProducer() {
|
||||
return this.args[5];
|
||||
}
|
||||
}
|
||||
|
||||
@@ -168,6 +168,7 @@ export class ServerKafka extends Server implements CustomTransportStrategy {
|
||||
payload.topic,
|
||||
this.consumer,
|
||||
payload.heartbeat,
|
||||
this.producer,
|
||||
]);
|
||||
const handler = this.getHandlerByPattern(packet.pattern);
|
||||
// if the correlation id or reply topic is not set
|
||||
|
||||
@@ -1,6 +1,10 @@
|
||||
import { expect } from 'chai';
|
||||
import { KafkaContext } from '../../ctx-host';
|
||||
import { Consumer, KafkaMessage } from '../../external/kafka.interface';
|
||||
import {
|
||||
Consumer,
|
||||
KafkaMessage,
|
||||
Producer,
|
||||
} from '../../external/kafka.interface';
|
||||
|
||||
describe('KafkaContext', () => {
|
||||
const args = [
|
||||
@@ -9,12 +13,20 @@ describe('KafkaContext', () => {
|
||||
undefined,
|
||||
{ test: 'consumer' },
|
||||
() => {},
|
||||
{ test: 'producer' },
|
||||
];
|
||||
let context: KafkaContext;
|
||||
|
||||
beforeEach(() => {
|
||||
context = new KafkaContext(
|
||||
args as [KafkaMessage, number, string, Consumer, () => Promise<void>],
|
||||
args as [
|
||||
KafkaMessage,
|
||||
number,
|
||||
string,
|
||||
Consumer,
|
||||
() => Promise<void>,
|
||||
Producer,
|
||||
],
|
||||
);
|
||||
});
|
||||
describe('getTopic', () => {
|
||||
@@ -42,4 +54,9 @@ describe('KafkaContext', () => {
|
||||
expect(context.getHeartbeat()).to.be.eql(args[4]);
|
||||
});
|
||||
});
|
||||
describe('getProducer', () => {
|
||||
it('should return producer instance', () => {
|
||||
expect(context.getProducer()).to.deep.eq({ test: 'producer' });
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user