mirror of
https://github.com/nestjs/nest.git
synced 2026-02-22 07:21:39 +00:00
Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
1b6cefd845 | ||
|
|
941c311395 | ||
|
|
decbbba900 | ||
|
|
f6db1fc9b9 | ||
|
|
9b7eec326e | ||
|
|
651b464939 |
@@ -63,10 +63,9 @@ export class AdvancedGrpcController {
|
||||
*/
|
||||
@GrpcStreamMethod('orders.OrderService')
|
||||
async sync(messages: Observable<any>): Promise<any> {
|
||||
const s = new Subject();
|
||||
const o = s.asObservable();
|
||||
const subject = new Subject();
|
||||
messages.subscribe(msg => {
|
||||
s.next({
|
||||
subject.next({
|
||||
id: 1,
|
||||
itemTypes: [1],
|
||||
shipmentType: {
|
||||
@@ -76,7 +75,7 @@ export class AdvancedGrpcController {
|
||||
},
|
||||
});
|
||||
});
|
||||
return o;
|
||||
return subject.asObservable();
|
||||
}
|
||||
|
||||
/**
|
||||
|
||||
@@ -28,6 +28,7 @@ export const GRPC_DEFAULT_PROTO_LOADER = '@grpc/proto-loader';
|
||||
|
||||
export const NO_MESSAGE_HANDLER = `There is no matching message handler defined in the remote service.`;
|
||||
export const NO_EVENT_HANDLER = `There is no matching event handler defined in the remote service.`;
|
||||
export const MESSAGE_FORMAT_ERROR = `The incoming message didn't match the expected shape (runtime error). Please, make sure that your producer sends the messages in the proper format.`;
|
||||
export const DISCONNECTED_RMQ_MESSAGE = `Disconnected from RMQ. Trying to reconnect.`;
|
||||
export const GRPC_DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH = 4 * 1024 * 1024;
|
||||
export const GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH = 4 * 1024 * 1024;
|
||||
|
||||
@@ -4,15 +4,13 @@ import {
|
||||
CONNECT_EVENT,
|
||||
ERROR_EVENT,
|
||||
MESSAGE_EVENT,
|
||||
MESSAGE_FORMAT_ERROR,
|
||||
MQTT_DEFAULT_URL,
|
||||
NO_MESSAGE_HANDLER,
|
||||
} from '../constants';
|
||||
import { MqttClient } from '../external/mqtt-client.interface';
|
||||
import { CustomTransportStrategy, PacketId, ReadPacket } from '../interfaces';
|
||||
import {
|
||||
MicroserviceOptions,
|
||||
MqttOptions,
|
||||
} from '../interfaces/microservice-configuration.interface';
|
||||
import { MqttOptions } from '../interfaces/microservice-configuration.interface';
|
||||
import { Server } from './server';
|
||||
|
||||
let mqttPackage: any = {};
|
||||
@@ -71,22 +69,26 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {
|
||||
buffer: Buffer,
|
||||
pub: MqttClient,
|
||||
): Promise<any> {
|
||||
const packet = this.deserialize(buffer.toString());
|
||||
if (isUndefined(packet.id)) {
|
||||
return this.handleEvent(channel, packet);
|
||||
}
|
||||
const pattern = channel.replace(/_ack$/, '');
|
||||
const publish = this.getPublisher(pub, pattern, packet.id);
|
||||
const handler = this.getHandlerByPattern(pattern);
|
||||
try {
|
||||
const packet = this.deserialize(buffer.toString());
|
||||
if (isUndefined(packet.id)) {
|
||||
return this.handleEvent(channel, packet);
|
||||
}
|
||||
const pattern = channel.replace(/_ack$/, '');
|
||||
const publish = this.getPublisher(pub, pattern, packet.id);
|
||||
const handler = this.getHandlerByPattern(pattern);
|
||||
|
||||
if (!handler) {
|
||||
const status = 'error';
|
||||
return publish({ id: packet.id, status, err: NO_MESSAGE_HANDLER });
|
||||
if (!handler) {
|
||||
const status = 'error';
|
||||
return publish({ id: packet.id, status, err: NO_MESSAGE_HANDLER });
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(packet.data),
|
||||
) as Observable<any>;
|
||||
response$ && this.send(response$, publish);
|
||||
} catch (err) {
|
||||
this.logger.error(MESSAGE_FORMAT_ERROR);
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(packet.data),
|
||||
) as Observable<any>;
|
||||
response$ && this.send(response$, publish);
|
||||
}
|
||||
|
||||
public getPublisher(client: MqttClient, pattern: any, id: string): any {
|
||||
|
||||
@@ -3,15 +3,13 @@ import { Observable } from 'rxjs';
|
||||
import {
|
||||
CONNECT_EVENT,
|
||||
ERROR_EVENT,
|
||||
MESSAGE_FORMAT_ERROR,
|
||||
NATS_DEFAULT_URL,
|
||||
NO_MESSAGE_HANDLER,
|
||||
} from '../constants';
|
||||
import { Client } from '../external/nats-client.interface';
|
||||
import { CustomTransportStrategy, PacketId } from '../interfaces';
|
||||
import {
|
||||
MicroserviceOptions,
|
||||
NatsOptions,
|
||||
} from '../interfaces/microservice-configuration.interface';
|
||||
import { NatsOptions } from '../interfaces/microservice-configuration.interface';
|
||||
import { ReadPacket } from '../interfaces/packet.interface';
|
||||
import { Server } from './server';
|
||||
|
||||
@@ -85,19 +83,23 @@ export class ServerNats extends Server implements CustomTransportStrategy {
|
||||
client: Client,
|
||||
replyTo: string,
|
||||
) {
|
||||
if (isUndefined(message.id)) {
|
||||
return this.handleEvent(channel, message);
|
||||
try {
|
||||
if (isUndefined(message.id)) {
|
||||
return this.handleEvent(channel, message);
|
||||
}
|
||||
const publish = this.getPublisher(client, replyTo, message.id);
|
||||
const handler = this.getHandlerByPattern(channel);
|
||||
if (!handler) {
|
||||
const status = 'error';
|
||||
return publish({ id: message.id, status, err: NO_MESSAGE_HANDLER });
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(message.data),
|
||||
) as Observable<any>;
|
||||
response$ && this.send(response$, publish);
|
||||
} catch (err) {
|
||||
this.logger.error(MESSAGE_FORMAT_ERROR);
|
||||
}
|
||||
const publish = this.getPublisher(client, replyTo, message.id);
|
||||
const handler = this.getHandlerByPattern(channel);
|
||||
if (!handler) {
|
||||
const status = 'error';
|
||||
return publish({ id: message.id, status, err: NO_MESSAGE_HANDLER });
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(message.data),
|
||||
) as Observable<any>;
|
||||
response$ && this.send(response$, publish);
|
||||
}
|
||||
|
||||
public getPublisher(publisher: Client, replyTo: string, id: string) {
|
||||
|
||||
@@ -4,6 +4,7 @@ import {
|
||||
CONNECT_EVENT,
|
||||
ERROR_EVENT,
|
||||
MESSAGE_EVENT,
|
||||
MESSAGE_FORMAT_ERROR,
|
||||
NO_MESSAGE_HANDLER,
|
||||
REDIS_DEFAULT_URL,
|
||||
} from '../constants';
|
||||
@@ -13,10 +14,7 @@ import {
|
||||
RetryStrategyOptions,
|
||||
} from '../external/redis.interface';
|
||||
import { CustomTransportStrategy, PacketId, ReadPacket } from '../interfaces';
|
||||
import {
|
||||
MicroserviceOptions,
|
||||
RedisOptions,
|
||||
} from '../interfaces/microservice-configuration.interface';
|
||||
import { RedisOptions } from '../interfaces/microservice-configuration.interface';
|
||||
import { Server } from './server';
|
||||
|
||||
let redisPackage: any = {};
|
||||
@@ -84,22 +82,26 @@ export class ServerRedis extends Server implements CustomTransportStrategy {
|
||||
buffer: string | any,
|
||||
pub: RedisClient,
|
||||
) {
|
||||
const packet = this.deserialize(buffer);
|
||||
if (isUndefined(packet.id)) {
|
||||
return this.handleEvent(channel, packet);
|
||||
}
|
||||
const pattern = channel.replace(/_ack$/, '');
|
||||
const publish = this.getPublisher(pub, pattern, packet.id);
|
||||
const handler = this.getHandlerByPattern(pattern);
|
||||
try {
|
||||
const packet = this.deserialize(buffer);
|
||||
if (isUndefined(packet.id)) {
|
||||
return this.handleEvent(channel, packet);
|
||||
}
|
||||
const pattern = channel.replace(/_ack$/, '');
|
||||
const publish = this.getPublisher(pub, pattern, packet.id);
|
||||
const handler = this.getHandlerByPattern(pattern);
|
||||
|
||||
if (!handler) {
|
||||
const status = 'error';
|
||||
return publish({ id: packet.id, status, err: NO_MESSAGE_HANDLER });
|
||||
if (!handler) {
|
||||
const status = 'error';
|
||||
return publish({ id: packet.id, status, err: NO_MESSAGE_HANDLER });
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(packet.data),
|
||||
) as Observable<any>;
|
||||
response$ && this.send(response$, publish);
|
||||
} catch (err) {
|
||||
this.logger.error(MESSAGE_FORMAT_ERROR);
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(packet.data),
|
||||
) as Observable<any>;
|
||||
response$ && this.send(response$, publish);
|
||||
}
|
||||
|
||||
public getPublisher(pub: RedisClient, pattern: any, id: string) {
|
||||
|
||||
@@ -4,6 +4,7 @@ import {
|
||||
CONNECT_EVENT,
|
||||
DISCONNECTED_RMQ_MESSAGE,
|
||||
DISCONNECT_EVENT,
|
||||
MESSAGE_FORMAT_ERROR,
|
||||
NO_MESSAGE_HANDLER,
|
||||
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
|
||||
RQM_DEFAULT_PREFETCH_COUNT,
|
||||
@@ -85,33 +86,37 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
|
||||
}
|
||||
|
||||
public async handleMessage(message: any): Promise<void> {
|
||||
const { content, properties } = message;
|
||||
const packet = JSON.parse(content.toString());
|
||||
const pattern = isString(packet.pattern)
|
||||
? packet.pattern
|
||||
: JSON.stringify(packet.pattern);
|
||||
try {
|
||||
const { content, properties } = message;
|
||||
const packet = JSON.parse(content.toString());
|
||||
const pattern = isString(packet.pattern)
|
||||
? packet.pattern
|
||||
: JSON.stringify(packet.pattern);
|
||||
|
||||
if (isUndefined(packet.id)) {
|
||||
return this.handleEvent(pattern, packet);
|
||||
if (isUndefined(packet.id)) {
|
||||
return this.handleEvent(pattern, packet);
|
||||
}
|
||||
const handler = this.getHandlerByPattern(pattern);
|
||||
|
||||
if (!handler) {
|
||||
const status = 'error';
|
||||
return this.sendMessage(
|
||||
{ status, err: NO_MESSAGE_HANDLER },
|
||||
properties.replyTo,
|
||||
properties.correlationId,
|
||||
);
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(packet.data),
|
||||
) as Observable<any>;
|
||||
|
||||
const publish = <T>(data: T) =>
|
||||
this.sendMessage(data, properties.replyTo, properties.correlationId);
|
||||
|
||||
response$ && this.send(response$, publish);
|
||||
} catch (err) {
|
||||
this.logger.error(MESSAGE_FORMAT_ERROR);
|
||||
}
|
||||
const handler = this.getHandlerByPattern(pattern);
|
||||
|
||||
if (!handler) {
|
||||
const status = 'error';
|
||||
return this.sendMessage(
|
||||
{ status, err: NO_MESSAGE_HANDLER },
|
||||
properties.replyTo,
|
||||
properties.correlationId,
|
||||
);
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(packet.data),
|
||||
) as Observable<any>;
|
||||
|
||||
const publish = <T>(data: T) =>
|
||||
this.sendMessage(data, properties.replyTo, properties.correlationId);
|
||||
|
||||
response$ && this.send(response$, publish);
|
||||
}
|
||||
|
||||
public sendMessage<T = any>(
|
||||
|
||||
@@ -6,16 +6,14 @@ import {
|
||||
CLOSE_EVENT,
|
||||
ERROR_EVENT,
|
||||
MESSAGE_EVENT,
|
||||
MESSAGE_FORMAT_ERROR,
|
||||
NO_MESSAGE_HANDLER,
|
||||
TCP_DEFAULT_HOST,
|
||||
TCP_DEFAULT_PORT,
|
||||
} from '../constants';
|
||||
import { JsonSocket } from '../helpers/json-socket';
|
||||
import { CustomTransportStrategy, PacketId, ReadPacket } from '../interfaces';
|
||||
import {
|
||||
MicroserviceOptions,
|
||||
TcpOptions,
|
||||
} from '../interfaces/microservice-configuration.interface';
|
||||
import { TcpOptions } from '../interfaces/microservice-configuration.interface';
|
||||
import { Server } from './server';
|
||||
|
||||
export class ServerTCP extends Server implements CustomTransportStrategy {
|
||||
@@ -29,8 +27,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
|
||||
constructor(private readonly options: TcpOptions['options']) {
|
||||
super();
|
||||
this.port = this.getOptionsProp(options, 'port') || TCP_DEFAULT_PORT;
|
||||
this.host =
|
||||
this.getOptionsProp(options, 'host') || TCP_DEFAULT_HOST;
|
||||
this.host = this.getOptionsProp(options, 'host') || TCP_DEFAULT_HOST;
|
||||
|
||||
this.init();
|
||||
}
|
||||
@@ -56,30 +53,34 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
|
||||
socket: JsonSocket,
|
||||
packet: ReadPacket & PacketId,
|
||||
) {
|
||||
const pattern = !isString(packet.pattern)
|
||||
? JSON.stringify(packet.pattern)
|
||||
: packet.pattern;
|
||||
try {
|
||||
const pattern = !isString(packet.pattern)
|
||||
? JSON.stringify(packet.pattern)
|
||||
: packet.pattern;
|
||||
|
||||
if (isUndefined(packet.id)) {
|
||||
return this.handleEvent(pattern, packet);
|
||||
}
|
||||
const handler = this.getHandlerByPattern(pattern);
|
||||
if (!handler) {
|
||||
const status = 'error';
|
||||
return socket.sendMessage({
|
||||
id: packet.id,
|
||||
status,
|
||||
err: NO_MESSAGE_HANDLER,
|
||||
});
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(packet.data),
|
||||
) as Observable<any>;
|
||||
if (isUndefined(packet.id)) {
|
||||
return this.handleEvent(pattern, packet);
|
||||
}
|
||||
const handler = this.getHandlerByPattern(pattern);
|
||||
if (!handler) {
|
||||
const status = 'error';
|
||||
return socket.sendMessage({
|
||||
id: packet.id,
|
||||
status,
|
||||
err: NO_MESSAGE_HANDLER,
|
||||
});
|
||||
}
|
||||
const response$ = this.transformToObservable(
|
||||
await handler(packet.data),
|
||||
) as Observable<any>;
|
||||
|
||||
response$ &&
|
||||
this.send(response$, data =>
|
||||
socket.sendMessage(Object.assign(data, { id: packet.id })),
|
||||
);
|
||||
response$ &&
|
||||
this.send(response$, data =>
|
||||
socket.sendMessage(Object.assign(data, { id: packet.id })),
|
||||
);
|
||||
} catch (err) {
|
||||
this.logger.error(MESSAGE_FORMAT_ERROR);
|
||||
}
|
||||
}
|
||||
|
||||
public handleClose(): undefined | number | NodeJS.Timer {
|
||||
|
||||
Reference in New Issue
Block a user