Compare commits

...

6 Commits

Author SHA1 Message Date
Kamil Myśliwiec
1b6cefd845 fix(microservices): wrap handle message methods with try..catch 2019-05-30 13:40:49 +02:00
Kamil Myśliwiec
941c311395 refactor(tests): update var names in integration tests (grpc) 2019-05-30 13:09:52 +02:00
Kamil Myśliwiec
decbbba900 Merge branch 'master' into 6.3.0 2019-05-30 13:07:59 +02:00
Kamil Myśliwiec
f6db1fc9b9 Merge branch 'master' to 6.3.0 2019-05-30 11:47:17 +02:00
Kamil Myśliwiec
9b7eec326e Merge branch 'master' into 6.3.0 2019-05-30 11:45:09 +02:00
Kamil Myśliwiec
651b464939 Merge branch 'anton-alation-grpc-stream-duplex-decorator' into 6.3.0 2019-05-30 11:44:21 +02:00
7 changed files with 121 additions and 109 deletions

View File

@@ -63,10 +63,9 @@ export class AdvancedGrpcController {
*/
@GrpcStreamMethod('orders.OrderService')
async sync(messages: Observable<any>): Promise<any> {
const s = new Subject();
const o = s.asObservable();
const subject = new Subject();
messages.subscribe(msg => {
s.next({
subject.next({
id: 1,
itemTypes: [1],
shipmentType: {
@@ -76,7 +75,7 @@ export class AdvancedGrpcController {
},
});
});
return o;
return subject.asObservable();
}
/**

View File

@@ -28,6 +28,7 @@ export const GRPC_DEFAULT_PROTO_LOADER = '@grpc/proto-loader';
export const NO_MESSAGE_HANDLER = `There is no matching message handler defined in the remote service.`;
export const NO_EVENT_HANDLER = `There is no matching event handler defined in the remote service.`;
export const MESSAGE_FORMAT_ERROR = `The incoming message didn't match the expected shape (runtime error). Please, make sure that your producer sends the messages in the proper format.`;
export const DISCONNECTED_RMQ_MESSAGE = `Disconnected from RMQ. Trying to reconnect.`;
export const GRPC_DEFAULT_MAX_RECEIVE_MESSAGE_LENGTH = 4 * 1024 * 1024;
export const GRPC_DEFAULT_MAX_SEND_MESSAGE_LENGTH = 4 * 1024 * 1024;

View File

@@ -4,15 +4,13 @@ import {
CONNECT_EVENT,
ERROR_EVENT,
MESSAGE_EVENT,
MESSAGE_FORMAT_ERROR,
MQTT_DEFAULT_URL,
NO_MESSAGE_HANDLER,
} from '../constants';
import { MqttClient } from '../external/mqtt-client.interface';
import { CustomTransportStrategy, PacketId, ReadPacket } from '../interfaces';
import {
MicroserviceOptions,
MqttOptions,
} from '../interfaces/microservice-configuration.interface';
import { MqttOptions } from '../interfaces/microservice-configuration.interface';
import { Server } from './server';
let mqttPackage: any = {};
@@ -71,22 +69,26 @@ export class ServerMqtt extends Server implements CustomTransportStrategy {
buffer: Buffer,
pub: MqttClient,
): Promise<any> {
const packet = this.deserialize(buffer.toString());
if (isUndefined(packet.id)) {
return this.handleEvent(channel, packet);
}
const pattern = channel.replace(/_ack$/, '');
const publish = this.getPublisher(pub, pattern, packet.id);
const handler = this.getHandlerByPattern(pattern);
try {
const packet = this.deserialize(buffer.toString());
if (isUndefined(packet.id)) {
return this.handleEvent(channel, packet);
}
const pattern = channel.replace(/_ack$/, '');
const publish = this.getPublisher(pub, pattern, packet.id);
const handler = this.getHandlerByPattern(pattern);
if (!handler) {
const status = 'error';
return publish({ id: packet.id, status, err: NO_MESSAGE_HANDLER });
if (!handler) {
const status = 'error';
return publish({ id: packet.id, status, err: NO_MESSAGE_HANDLER });
}
const response$ = this.transformToObservable(
await handler(packet.data),
) as Observable<any>;
response$ && this.send(response$, publish);
} catch (err) {
this.logger.error(MESSAGE_FORMAT_ERROR);
}
const response$ = this.transformToObservable(
await handler(packet.data),
) as Observable<any>;
response$ && this.send(response$, publish);
}
public getPublisher(client: MqttClient, pattern: any, id: string): any {

View File

@@ -3,15 +3,13 @@ import { Observable } from 'rxjs';
import {
CONNECT_EVENT,
ERROR_EVENT,
MESSAGE_FORMAT_ERROR,
NATS_DEFAULT_URL,
NO_MESSAGE_HANDLER,
} from '../constants';
import { Client } from '../external/nats-client.interface';
import { CustomTransportStrategy, PacketId } from '../interfaces';
import {
MicroserviceOptions,
NatsOptions,
} from '../interfaces/microservice-configuration.interface';
import { NatsOptions } from '../interfaces/microservice-configuration.interface';
import { ReadPacket } from '../interfaces/packet.interface';
import { Server } from './server';
@@ -85,19 +83,23 @@ export class ServerNats extends Server implements CustomTransportStrategy {
client: Client,
replyTo: string,
) {
if (isUndefined(message.id)) {
return this.handleEvent(channel, message);
try {
if (isUndefined(message.id)) {
return this.handleEvent(channel, message);
}
const publish = this.getPublisher(client, replyTo, message.id);
const handler = this.getHandlerByPattern(channel);
if (!handler) {
const status = 'error';
return publish({ id: message.id, status, err: NO_MESSAGE_HANDLER });
}
const response$ = this.transformToObservable(
await handler(message.data),
) as Observable<any>;
response$ && this.send(response$, publish);
} catch (err) {
this.logger.error(MESSAGE_FORMAT_ERROR);
}
const publish = this.getPublisher(client, replyTo, message.id);
const handler = this.getHandlerByPattern(channel);
if (!handler) {
const status = 'error';
return publish({ id: message.id, status, err: NO_MESSAGE_HANDLER });
}
const response$ = this.transformToObservable(
await handler(message.data),
) as Observable<any>;
response$ && this.send(response$, publish);
}
public getPublisher(publisher: Client, replyTo: string, id: string) {

View File

@@ -4,6 +4,7 @@ import {
CONNECT_EVENT,
ERROR_EVENT,
MESSAGE_EVENT,
MESSAGE_FORMAT_ERROR,
NO_MESSAGE_HANDLER,
REDIS_DEFAULT_URL,
} from '../constants';
@@ -13,10 +14,7 @@ import {
RetryStrategyOptions,
} from '../external/redis.interface';
import { CustomTransportStrategy, PacketId, ReadPacket } from '../interfaces';
import {
MicroserviceOptions,
RedisOptions,
} from '../interfaces/microservice-configuration.interface';
import { RedisOptions } from '../interfaces/microservice-configuration.interface';
import { Server } from './server';
let redisPackage: any = {};
@@ -84,22 +82,26 @@ export class ServerRedis extends Server implements CustomTransportStrategy {
buffer: string | any,
pub: RedisClient,
) {
const packet = this.deserialize(buffer);
if (isUndefined(packet.id)) {
return this.handleEvent(channel, packet);
}
const pattern = channel.replace(/_ack$/, '');
const publish = this.getPublisher(pub, pattern, packet.id);
const handler = this.getHandlerByPattern(pattern);
try {
const packet = this.deserialize(buffer);
if (isUndefined(packet.id)) {
return this.handleEvent(channel, packet);
}
const pattern = channel.replace(/_ack$/, '');
const publish = this.getPublisher(pub, pattern, packet.id);
const handler = this.getHandlerByPattern(pattern);
if (!handler) {
const status = 'error';
return publish({ id: packet.id, status, err: NO_MESSAGE_HANDLER });
if (!handler) {
const status = 'error';
return publish({ id: packet.id, status, err: NO_MESSAGE_HANDLER });
}
const response$ = this.transformToObservable(
await handler(packet.data),
) as Observable<any>;
response$ && this.send(response$, publish);
} catch (err) {
this.logger.error(MESSAGE_FORMAT_ERROR);
}
const response$ = this.transformToObservable(
await handler(packet.data),
) as Observable<any>;
response$ && this.send(response$, publish);
}
public getPublisher(pub: RedisClient, pattern: any, id: string) {

View File

@@ -4,6 +4,7 @@ import {
CONNECT_EVENT,
DISCONNECTED_RMQ_MESSAGE,
DISCONNECT_EVENT,
MESSAGE_FORMAT_ERROR,
NO_MESSAGE_HANDLER,
RQM_DEFAULT_IS_GLOBAL_PREFETCH_COUNT,
RQM_DEFAULT_PREFETCH_COUNT,
@@ -85,33 +86,37 @@ export class ServerRMQ extends Server implements CustomTransportStrategy {
}
public async handleMessage(message: any): Promise<void> {
const { content, properties } = message;
const packet = JSON.parse(content.toString());
const pattern = isString(packet.pattern)
? packet.pattern
: JSON.stringify(packet.pattern);
try {
const { content, properties } = message;
const packet = JSON.parse(content.toString());
const pattern = isString(packet.pattern)
? packet.pattern
: JSON.stringify(packet.pattern);
if (isUndefined(packet.id)) {
return this.handleEvent(pattern, packet);
if (isUndefined(packet.id)) {
return this.handleEvent(pattern, packet);
}
const handler = this.getHandlerByPattern(pattern);
if (!handler) {
const status = 'error';
return this.sendMessage(
{ status, err: NO_MESSAGE_HANDLER },
properties.replyTo,
properties.correlationId,
);
}
const response$ = this.transformToObservable(
await handler(packet.data),
) as Observable<any>;
const publish = <T>(data: T) =>
this.sendMessage(data, properties.replyTo, properties.correlationId);
response$ && this.send(response$, publish);
} catch (err) {
this.logger.error(MESSAGE_FORMAT_ERROR);
}
const handler = this.getHandlerByPattern(pattern);
if (!handler) {
const status = 'error';
return this.sendMessage(
{ status, err: NO_MESSAGE_HANDLER },
properties.replyTo,
properties.correlationId,
);
}
const response$ = this.transformToObservable(
await handler(packet.data),
) as Observable<any>;
const publish = <T>(data: T) =>
this.sendMessage(data, properties.replyTo, properties.correlationId);
response$ && this.send(response$, publish);
}
public sendMessage<T = any>(

View File

@@ -6,16 +6,14 @@ import {
CLOSE_EVENT,
ERROR_EVENT,
MESSAGE_EVENT,
MESSAGE_FORMAT_ERROR,
NO_MESSAGE_HANDLER,
TCP_DEFAULT_HOST,
TCP_DEFAULT_PORT,
} from '../constants';
import { JsonSocket } from '../helpers/json-socket';
import { CustomTransportStrategy, PacketId, ReadPacket } from '../interfaces';
import {
MicroserviceOptions,
TcpOptions,
} from '../interfaces/microservice-configuration.interface';
import { TcpOptions } from '../interfaces/microservice-configuration.interface';
import { Server } from './server';
export class ServerTCP extends Server implements CustomTransportStrategy {
@@ -29,8 +27,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
constructor(private readonly options: TcpOptions['options']) {
super();
this.port = this.getOptionsProp(options, 'port') || TCP_DEFAULT_PORT;
this.host =
this.getOptionsProp(options, 'host') || TCP_DEFAULT_HOST;
this.host = this.getOptionsProp(options, 'host') || TCP_DEFAULT_HOST;
this.init();
}
@@ -56,30 +53,34 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
socket: JsonSocket,
packet: ReadPacket & PacketId,
) {
const pattern = !isString(packet.pattern)
? JSON.stringify(packet.pattern)
: packet.pattern;
try {
const pattern = !isString(packet.pattern)
? JSON.stringify(packet.pattern)
: packet.pattern;
if (isUndefined(packet.id)) {
return this.handleEvent(pattern, packet);
}
const handler = this.getHandlerByPattern(pattern);
if (!handler) {
const status = 'error';
return socket.sendMessage({
id: packet.id,
status,
err: NO_MESSAGE_HANDLER,
});
}
const response$ = this.transformToObservable(
await handler(packet.data),
) as Observable<any>;
if (isUndefined(packet.id)) {
return this.handleEvent(pattern, packet);
}
const handler = this.getHandlerByPattern(pattern);
if (!handler) {
const status = 'error';
return socket.sendMessage({
id: packet.id,
status,
err: NO_MESSAGE_HANDLER,
});
}
const response$ = this.transformToObservable(
await handler(packet.data),
) as Observable<any>;
response$ &&
this.send(response$, data =>
socket.sendMessage(Object.assign(data, { id: packet.id })),
);
response$ &&
this.send(response$, data =>
socket.sendMessage(Object.assign(data, { id: packet.id })),
);
} catch (err) {
this.logger.error(MESSAGE_FORMAT_ERROR);
}
}
public handleClose(): undefined | number | NodeJS.Timer {