mirror of
https://github.com/nestjs/nest.git
synced 2026-02-21 23:11:44 +00:00
sample: revert microservices sample local updates
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
import { NestFactory } from '@nestjs/core';
|
||||
import { Transport } from '@nestjs/microservices';
|
||||
import { Kafka } from '@nestjs/microservices/external/kafka.interface';
|
||||
import { MicroserviceOptions, Transport } from '@nestjs/microservices';
|
||||
import { AppModule } from './app.module';
|
||||
|
||||
async function bootstrap() {
|
||||
@@ -16,65 +15,13 @@ async function bootstrap() {
|
||||
*
|
||||
*/
|
||||
const app = await NestFactory.create(AppModule);
|
||||
const msvc = app.connectMicroservice({
|
||||
transport: Transport.KAFKA,
|
||||
options: {
|
||||
client: {
|
||||
brokers: ['localhost:9092'],
|
||||
},
|
||||
consumer: {
|
||||
groupId: 'my-kafka-consumer',
|
||||
},
|
||||
},
|
||||
app.connectMicroservice<MicroserviceOptions>({
|
||||
transport: Transport.TCP,
|
||||
options: { retryAttempts: 5, retryDelay: 3000 },
|
||||
});
|
||||
|
||||
await app.startAllMicroservices();
|
||||
await app.listen(Math.floor(Math.random() * 1000) + 3000);
|
||||
await app.listen(3001);
|
||||
console.log(`Application is running on: ${await app.getUrl()}`);
|
||||
|
||||
const kafka = msvc.unwrap<Kafka>();
|
||||
const admin = kafka.admin();
|
||||
|
||||
const topicName = 'math';
|
||||
try {
|
||||
const topicExists = await admin.fetchTopicMetadata({
|
||||
topics: [topicName],
|
||||
});
|
||||
|
||||
console.log(
|
||||
`Topic "${topicName}" already exists with ${topicExists.topics[0].partitions.length} partitions.`,
|
||||
);
|
||||
|
||||
// Update "math.reply" topic to have 2 partitions if it has only 1 partition
|
||||
const replyTopicName = 'math.reply';
|
||||
const replyTopicExists = await admin.fetchTopicMetadata({
|
||||
topics: [replyTopicName],
|
||||
});
|
||||
if (replyTopicExists.topics[0].partitions.length === 1) {
|
||||
await admin.createPartitions({
|
||||
topicPartitions: [
|
||||
{
|
||||
topic: replyTopicName,
|
||||
count: 2,
|
||||
},
|
||||
],
|
||||
});
|
||||
console.log(`Topic "${replyTopicName}" updated with 2 partitions.`);
|
||||
}
|
||||
} catch (error) {
|
||||
if (error.message.includes('does not host this topic-partition')) {
|
||||
await admin.createTopics({
|
||||
topics: [
|
||||
{
|
||||
topic: topicName,
|
||||
numPartitions: 2,
|
||||
},
|
||||
],
|
||||
});
|
||||
console.log(`Topic "${topicName}" created with 2 partitions.`);
|
||||
} else {
|
||||
console.error('Error creating topic:', error.message);
|
||||
}
|
||||
}
|
||||
}
|
||||
bootstrap();
|
||||
|
||||
@@ -1,27 +1,21 @@
|
||||
import { Controller, Get, Inject } from '@nestjs/common';
|
||||
import { ClientKafka, MessagePattern, Payload } from '@nestjs/microservices';
|
||||
import { ClientProxy, MessagePattern } from '@nestjs/microservices';
|
||||
import { Observable } from 'rxjs';
|
||||
import { MATH_SERVICE } from './math.constants';
|
||||
|
||||
@Controller()
|
||||
export class MathController {
|
||||
constructor(@Inject(MATH_SERVICE) private readonly client: ClientKafka) {}
|
||||
|
||||
async onModuleInit() {
|
||||
this.client.subscribeToResponseOf('sum');
|
||||
this.client.subscribeToResponseOf('math');
|
||||
await this.client.connect();
|
||||
}
|
||||
constructor(@Inject(MATH_SERVICE) private readonly client: ClientProxy) {}
|
||||
|
||||
@Get()
|
||||
execute(): Observable<number> {
|
||||
const pattern = 'sum';
|
||||
const pattern = { cmd: 'sum' };
|
||||
const data = [1, 2, 3, 4, 5];
|
||||
return this.client.send<number>(pattern, data);
|
||||
}
|
||||
|
||||
@MessagePattern('sum')
|
||||
sum(@Payload() data: number[]): number {
|
||||
@MessagePattern({ cmd: 'sum' })
|
||||
sum(data: number[]): number {
|
||||
return (data || []).reduce((a, b) => a + b);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,17 +5,7 @@ import { MathController } from './math.controller';
|
||||
|
||||
@Module({
|
||||
imports: [
|
||||
ClientsModule.register([
|
||||
{
|
||||
name: MATH_SERVICE,
|
||||
transport: Transport.KAFKA,
|
||||
options: {
|
||||
client: {
|
||||
brokers: ['localhost:9092'],
|
||||
},
|
||||
},
|
||||
},
|
||||
]),
|
||||
ClientsModule.register([{ name: MATH_SERVICE, transport: Transport.TCP }]),
|
||||
],
|
||||
controllers: [MathController],
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user