chore: rename to initialied, update unit tests

This commit is contained in:
Kamil Myśliwiec
2023-02-03 10:19:43 +01:00
parent 255b9cb8a0
commit f285d7c954
2 changed files with 11 additions and 33 deletions

View File

@@ -46,7 +46,7 @@ export class ClientKafka extends ClientProxy {
protected consumer: Consumer | null = null;
protected producer: Producer | null = null;
protected parser: KafkaParser | null = null;
protected initialized$: Promise<void> | null = null;
protected initialized: Promise<void> | null = null;
protected responsePatterns: string[] = [];
protected consumerAssignments: { [key: string]: number } = {};
protected brokers: string[] | BrokersFunction;
@@ -94,15 +94,15 @@ export class ClientKafka extends ClientProxy {
this.consumer && (await this.consumer.disconnect());
this.producer = null;
this.consumer = null;
this.initialized$ = null;
this.initialized = null;
this.client = null;
}
public async connect(): Promise<Producer> {
if (this.client) {
return this.initialized$.then(() => this.producer);
if (this.initialized) {
return this.initialized.then(() => this.producer);
}
this.initialized$ = new Promise(async (resolve, reject) => {
this.initialized = new Promise(async (resolve, reject) => {
try {
this.client = this.createClient();
@@ -143,7 +143,7 @@ export class ClientKafka extends ClientProxy {
reject(err);
}
});
return this.initialized$.then(() => this.producer);
return this.initialized.then(() => this.producer);
}
public async bindTopics(): Promise<void> {

View File

@@ -11,7 +11,6 @@ import {
} from '../../external/kafka.interface';
describe('ClientKafka', () => {
// static
const topic = 'test.topic';
const partition = 0;
const replyTopic = 'test.topic.reply';
@@ -25,7 +24,6 @@ describe('ClientKafka', () => {
const heartbeat = async () => {};
const pause = () => () => {};
// message
const message: KafkaMessage = {
key: Buffer.from(key),
offset,
@@ -35,7 +33,6 @@ describe('ClientKafka', () => {
attributes,
};
// deserialized message
const deserializedMessage: any = {
key,
offset,
@@ -47,7 +44,6 @@ describe('ClientKafka', () => {
partition,
};
// payloads
const payload: EachMessagePayload = {
topic,
partition,
@@ -152,26 +148,6 @@ describe('ClientKafka', () => {
pause,
};
const deserializedPayloadError: EachMessagePayload = {
topic,
partition,
message: Object.assign(
{
headers: {
[KafkaHeaders.CORRELATION_ID]: correlationId,
[KafkaHeaders.NEST_ERR]: NO_MESSAGE_HANDLER,
},
},
deserializedMessage,
{
size: 0,
value: null,
},
),
heartbeat,
pause,
};
let client: ClientKafka;
let callback: sinon.SinonSpy;
let connect: sinon.SinonSpy;
@@ -182,7 +158,7 @@ describe('ClientKafka', () => {
let consumerStub: sinon.SinonStub;
let producerStub: sinon.SinonStub;
let createClientStub: sinon.SinonStub;
let kafkaClient;
let kafkaClient: any;
beforeEach(() => {
client = new ClientKafka({});
@@ -322,7 +298,8 @@ describe('ClientKafka', () => {
});
it('should expect the connection to be reused', async () => {
(client as any).client = kafkaClient;
(client as any).initialized = Promise.resolve({});
await client.connect();
expect(createClientStub.calledOnce).to.be.false;
@@ -368,7 +345,8 @@ describe('ClientKafka', () => {
});
it('should expect the connection to be reused', async () => {
(client as any).client = kafkaClient;
(client as any).initialized = Promise.resolve({});
await client.connect();
expect(createClientStub.calledOnce).to.be.false;