feature(@nestjs/microservices) add stan transport strategy

This commit is contained in:
Kamil Myśliwiec
2018-03-17 14:56:55 +01:00
parent 81691afb46
commit 43da706a61
15 changed files with 1480 additions and 199 deletions

View File

@@ -5,7 +5,7 @@ import { FastifyAdapter } from '@nestjs/core/adapters/fastify-adapter';
import * as fastify from 'fastify';
async function bootstrap() {
const app = await NestFactory.create(ApplicationModule);
const app = await NestFactory.create(ApplicationModule, new FastifyAdapter(fastify));
app.useGlobalPipes(new ValidationPipe());
await app.listen(3000);
}

1265
package-lock.json generated

File diff suppressed because it is too large Load Diff

View File

@@ -41,10 +41,13 @@
"fast-safe-stringify": "^1.2.0",
"fastify-formbody": "^2.0.0",
"fastify-multipart": "^0.4.1",
"grpc": "^1.10.0",
"iterare": "0.0.8",
"json-socket": "^0.2.1",
"kafka-node": "^2.4.1",
"multer": "^1.3.0",
"nats": "^0.8.4",
"node-nats-streaming": "0.0.28",
"opencollective": "^1.0.3",
"optional": "^0.1.4",
"pump": "^3.0.0",
@@ -53,14 +56,14 @@
"rxjs": "^5.5.6",
"rxjs-grpc": "^0.1.6",
"socket.io": "^2.0.3",
"trouter": "^1.0.0",
"typescript": "^2.7.2"
"trouter": "^1.0.0"
},
"devDependencies": {
"@types/chai": "^3.5.2",
"@types/chai-as-promised": "0.0.31",
"@types/cors": "^2.8.3",
"@types/express": "^4.0.39",
"@types/kafka-node": "^2.0.6",
"@types/mocha": "^2.2.38",
"@types/node": "^7.0.5",
"@types/redis": "^0.12.36",
@@ -93,7 +96,8 @@
"prettier": "^1.9.2",
"sinon": "^2.1.0",
"sinon-chai": "^2.8.0",
"ts-node": "^3.2.0"
"ts-node": "^3.2.0",
"typescript": "^2.7.2"
},
"collective": {
"type": "opencollective",

View File

@@ -9,26 +9,26 @@ import { ReadPacket, PacketId } from 'src/microservices';
export class ClientNats extends ClientProxy {
private readonly logger = new Logger(ClientProxy.name);
private readonly url: string;
private pubClient: nats.Client;
private subClient: nats.Client;
private publisher: nats.Client;
private consumer: nats.Client;
constructor(private readonly options: ClientOptions) {
super();
this.url = options.url || NATS_DEFAULT_URL;
}
protected async sendMessage(
protected async publish(
partialPacket: ReadPacket,
callback: (packet: WritePacket) => any,
) {
if (!this.pubClient || !this.subClient) {
if (!this.publisher || !this.consumer) {
await this.init(callback);
}
const packet = this.assignPacketId(partialPacket);
const pattern = JSON.stringify(partialPacket.pattern);
const responseChannel = this.getResPatternName(pattern, packet.id);
const subscriptionId = this.subClient.subscribe(
const subscriptionId = this.consumer.subscribe(
responseChannel,
(message: WritePacket & PacketId) => {
const { err, response, isDisposed } = message;
@@ -38,7 +38,7 @@ export class ClientNats extends ClientProxy {
response: null,
isDisposed: true,
});
return this.subClient.unsubscribe(subscriptionId);
return this.consumer.unsubscribe(subscriptionId);
}
callback({
err,
@@ -46,7 +46,7 @@ export class ClientNats extends ClientProxy {
});
},
);
this.pubClient.publish(this.getAckPatternName(pattern), packet as any);
this.publisher.publish(this.getAckPatternName(pattern), packet as any);
}
public getAckPatternName(pattern: string): string {
@@ -58,17 +58,17 @@ export class ClientNats extends ClientProxy {
}
public close() {
this.pubClient && this.pubClient.close();
this.subClient && this.subClient.close();
this.pubClient = this.subClient = null;
this.publisher && this.publisher.close();
this.consumer && this.consumer.close();
this.publisher = this.consumer = null;
}
public async init(callback: (...args) => any) {
this.pubClient = await this.createClient();
this.subClient = await this.createClient();
this.publisher = await this.createClient();
this.consumer = await this.createClient();
this.handleError(this.pubClient, callback);
this.handleError(this.subClient, callback);
this.handleError(this.publisher, callback);
this.handleError(this.consumer, callback);
}
public createClient(): Promise<nats.Client> {
@@ -85,7 +85,7 @@ export class ClientNats extends ClientProxy {
const errorCallback = err => {
if (err.code === 'ECONNREFUSED') {
callback(err, null);
this.pubClient = this.subClient = null;
this.publisher = this.consumer = null;
}
this.logger.error(err);
};

View File

@@ -5,6 +5,7 @@ import { Transport } from '../enums/transport.enum';
import { ClientProxy } from './client-proxy';
import { Closeable } from '../interfaces/closeable.interface';
import { ClientNats } from './client-nats';
import { ClientStan } from './client-stan';
export class ClientProxyFactory {
public static create(options: ClientOptions): ClientProxy & Closeable {
@@ -14,6 +15,8 @@ export class ClientProxyFactory {
return new ClientRedis(options);
case Transport.NATS:
return new ClientNats(options);
case Transport.STAN:
return new ClientStan(options);
default:
return new ClientTCP(options);
}

View File

@@ -6,7 +6,7 @@ import { _throw } from 'rxjs/observable/throw';
import { ReadPacket, PacketId, WritePacket } from './../interfaces';
export abstract class ClientProxy {
protected abstract sendMessage(
protected abstract publish(
packet: ReadPacket,
callback: (packet: WritePacket) => void,
);
@@ -16,7 +16,7 @@ export abstract class ClientProxy {
return _throw(new InvalidMessageException());
}
return new Observable((observer: Observer<T>) => {
this.sendMessage({ pattern, data }, this.createObserver(observer));
this.publish({ pattern, data }, this.createObserver(observer));
});
}

View File

@@ -24,7 +24,7 @@ export class ClientRedis extends ClientProxy {
this.url = options.url || REDIS_DEFAULT_URL;
}
protected async sendMessage(
protected async publish(
partialPacket: ReadPacket,
callback: (packet: WritePacket) => any,
) {

View File

@@ -0,0 +1,97 @@
import * as stan from 'node-nats-streaming';
import { ClientProxy } from './client-proxy';
import { Logger } from '@nestjs/common/services/logger.service';
import { ClientOptions } from '../interfaces/client-metadata.interface';
import {
STAN_DEFAULT_URL,
ERROR_EVENT,
CONNECT_EVENT,
MESSAGE_EVENT,
} from './../constants';
import { WritePacket } from './../interfaces';
import { ReadPacket, PacketId } from 'src/microservices';
export class ClientStan extends ClientProxy {
private readonly logger = new Logger(ClientProxy.name);
private readonly url: string;
private client: stan.Stan;
constructor(private readonly options: ClientOptions) {
super();
this.url = options.url || STAN_DEFAULT_URL;
}
protected async publish(
partialPacket: ReadPacket,
callback: (packet: WritePacket) => any,
) {
if (!this.client) {
await this.init(callback);
}
const packet = this.assignPacketId(partialPacket);
const pattern = JSON.stringify(partialPacket.pattern);
const responseChannel = this.getResPatternName(pattern, packet.id);
// TODO: Options
const subscription = this.client.subscribe(responseChannel, 'default');
subscription.on(MESSAGE_EVENT, (message: WritePacket & PacketId) => {
const { err, response, isDisposed } = message;
if (isDisposed || err) {
callback({
err,
response: null,
isDisposed: true,
});
return subscription.unsubscribe();
}
callback({
err,
response,
});
});
this.client.publish(
this.getAckPatternName(pattern),
packet as any,
err => err && callback({ err }),
);
}
public getAckPatternName(pattern: string): string {
return `${pattern}_ack`;
}
public getResPatternName(pattern: string, id: string): string {
return `${pattern}_${id}_res`;
}
public close() {
this.client && this.client.close();
this.client = null;
}
public init(callback: (...args) => any) {
this.client = this.createClient();
this.handleError(this.client, callback);
}
public createClient(): stan.Stan {
return stan.connect('clusterId', 'clientId2', {
url: this.url,
});
}
public handleError(client: stan.Stan, callback: (...args) => any) {
const errorCallback = err => {
if (err.code === 'ECONNREFUSED') {
callback(err, null);
this.client = null;
}
this.logger.error(err);
};
client.addListener(ERROR_EVENT, errorCallback);
client.on(CONNECT_EVENT, () => {
client.removeListener(ERROR_EVENT, errorCallback);
client.addListener(ERROR_EVENT, err => this.logger.error(err));
});
}
}

View File

@@ -39,7 +39,7 @@ export class ClientTCP extends ClientProxy {
});
}
protected async sendMessage(
protected async publish(
partialPacket: ReadPacket,
callback: (packet: WritePacket) => any,
) {

View File

@@ -2,6 +2,8 @@ export const TCP_DEFAULT_PORT = 3000;
export const TCP_DEFAULT_HOST = 'localhost';
export const REDIS_DEFAULT_URL = 'redis://localhost:6379';
export const NATS_DEFAULT_URL = 'nats://localhost:4222';
export const MQTT_DEFAULT_URL = 'mqtt://localhost:1883'
export const STAN_DEFAULT_URL = 'nats://localhost:4222';
export const CONNECT_EVENT = 'connect';
export const MESSAGE_EVENT = 'message';

View File

@@ -2,4 +2,5 @@ export enum Transport {
TCP,
REDIS,
NATS,
STAN,
}

View File

@@ -8,6 +8,7 @@ import { Server } from './server';
import { Transport } from '../enums/transport.enum';
import { race } from 'rxjs/operators/race';
import { ServerNats } from './server-nats';
import { ServerStan } from './server-stan';
export class ServerFactory {
public static create(
@@ -19,6 +20,8 @@ export class ServerFactory {
return new ServerRedis(options);
case Transport.NATS:
return new ServerNats(options);
case Transport.STAN:
return new ServerStan(options);
default:
return new ServerTCP(options);
}

View File

@@ -0,0 +1,119 @@
import * as mqtt from 'mqtt';
import { Server } from './server';
import { NO_PATTERN_MESSAGE } from '../constants';
import { MicroserviceOptions } from '../interfaces/microservice-configuration.interface';
import { CustomTransportStrategy, PacketId } from './../interfaces';
import { Observable } from 'rxjs/Observable';
import { catchError } from 'rxjs/operators';
import { empty } from 'rxjs/observable/empty';
import { finalize } from 'rxjs/operators';
import {
MQTT_DEFAULT_URL,
CONNECT_EVENT,
MESSAGE_EVENT,
ERROR_EVENT,
} from './../constants';
import { ReadPacket } from '@nestjs/microservices';
export class ServerMqtt extends Server implements CustomTransportStrategy {
private readonly url: string;
private subClient: mqtt.MqttClient;
private pubClient: mqtt.MqttClient;
constructor(private readonly options: MicroserviceOptions) {
super();
this.url = options.url || MQTT_DEFAULT_URL;
}
public async listen(callback: () => void) {
this.subClient = await this.createMqttClient();
this.pubClient = await this.createMqttClient();
this.handleError(this.pubClient);
this.handleError(this.subClient);
this.start(callback);
}
public start(callback?: () => void) {
this.bindEvents(this.subClient, this.pubClient);
this.subClient.on(CONNECT_EVENT, callback);
}
public bindEvents(
subClient: mqtt.MqttClient,
pubClient: mqtt.MqttClient,
) {
subClient.on(MESSAGE_EVENT, this.getMessageHandler(pubClient).bind(this));
const registeredPatterns = Object.keys(this.messageHandlers);
registeredPatterns.forEach(pattern =>
subClient.subscribe(this.getAckQueueName(pattern)),
);
}
public close() {
this.pubClient && this.pubClient.end();
this.subClient && this.subClient.end();
}
public createMqttClient(): Promise<mqtt.MqttClient> {
const client = mqtt.connect(this.url, {
reconnectPeriod: this.options.retryDelay,
});
return new Promise(resolve =>
client.on(CONNECT_EVENT, () => resolve(client)),
);
}
public getMessageHandler(pub: mqtt.MqttClient): any {
return async (channel, buffer) =>
await this.handleMessage(channel, buffer, pub);
}
public async handleMessage(
channel,
buffer: string | any,
pub: mqtt.MqttClient,
): Promise<any> {
const packet = this.serialize(buffer);
const pattern = channel.replace(/_ack$/, '');
const publish = this.getPublisher(pub, pattern, packet.id);
const status = 'error';
if (!this.messageHandlers[pattern]) {
return publish({ id: packet.id, status, err: NO_PATTERN_MESSAGE });
}
const handler = this.messageHandlers[pattern];
const response$ = this.transformToObservable(
await handler(packet.data),
) as Observable<any>;
response$ && this.send(response$, publish);
}
public getPublisher(pub: mqtt.MqttClient, pattern: any, id: string): any {
return response =>
pub.publish(
this.getResQueueName(pattern, id),
JSON.stringify(Object.assign(response, { id })),
);
}
public serialize(content): ReadPacket & PacketId {
try {
return JSON.parse(content);
} catch (e) {
return content;
}
}
public getAckQueueName(pattern: string): string {
return `${pattern}_ack`;
}
public getResQueueName(pattern: string, id: string): string {
return `${pattern}_${id}_res`;
}
public handleError(stream) {
stream.on(ERROR_EVENT, err => this.logger.error(err));
}
}

View File

@@ -12,8 +12,8 @@ import { ReadPacket } from './../interfaces/packet.interface';
export class ServerNats extends Server implements CustomTransportStrategy {
private readonly url: string;
private subClient: nats.Client;
private pubClient: nats.Client;
private consumer: nats.Client;
private publisher: nats.Client;
constructor(private readonly options: MicroserviceOptions) {
super();
@@ -21,34 +21,34 @@ export class ServerNats extends Server implements CustomTransportStrategy {
}
public listen(callback: () => void) {
this.subClient = this.createNatsClient();
this.pubClient = this.createNatsClient();
this.consumer = this.createNatsClient();
this.publisher = this.createNatsClient();
this.handleError(this.pubClient);
this.handleError(this.subClient);
this.handleError(this.publisher);
this.handleError(this.consumer);
this.start(callback);
}
public start(callback?: () => void) {
this.bindEvents(this.subClient, this.pubClient);
this.subClient.on(CONNECT_EVENT, callback);
this.bindEvents(this.consumer, this.publisher);
this.consumer.on(CONNECT_EVENT, callback);
}
public bindEvents(subClient: nats.Client, pubClient: nats.Client) {
public bindEvents(consumer: nats.Client, publisher: nats.Client) {
const registeredPatterns = Object.keys(this.messageHandlers);
registeredPatterns.forEach(pattern => {
const channel = this.getAckQueueName(pattern);
subClient.subscribe(
consumer.subscribe(
channel,
this.getMessageHandler(channel, pubClient).bind(this),
() => this.getMessageHandler(channel, publisher),
);
});
}
public close() {
this.pubClient && this.pubClient.close();
this.subClient && this.subClient.close();
this.pubClient = this.subClient = null;
this.publisher && this.publisher.close();
this.consumer && this.consumer.close();
this.publisher = this.consumer = null;
}
public createNatsClient(): nats.Client {
@@ -83,9 +83,9 @@ export class ServerNats extends Server implements CustomTransportStrategy {
response$ && this.send(response$, publish);
}
public getPublisher(pubClient: nats.Client, pattern: any, id: string) {
public getPublisher(publisher: nats.Client, pattern: any, id: string) {
return response =>
pubClient.publish(
publisher.publish(
this.getResQueueName(pattern, id),
Object.assign(response, { id }) as any,
);

View File

@@ -0,0 +1,107 @@
import * as stan from 'node-nats-streaming';
import { Server } from './server';
import { NO_PATTERN_MESSAGE, MESSAGE_EVENT, STAN_DEFAULT_URL } from '../constants';
import { MicroserviceOptions } from '../interfaces/microservice-configuration.interface';
import { CustomTransportStrategy, PacketId } from './../interfaces';
import { Observable } from 'rxjs/Observable';
import { catchError } from 'rxjs/operators';
import { empty } from 'rxjs/observable/empty';
import { finalize } from 'rxjs/operators';
import { NATS_DEFAULT_URL, CONNECT_EVENT, ERROR_EVENT } from './../constants';
import { ReadPacket } from './../interfaces/packet.interface';
export class ServerStan extends Server implements CustomTransportStrategy {
private readonly url: string;
private consumer: stan.Stan;
private publisher: stan.Stan;
constructor(private readonly options: MicroserviceOptions) {
super();
this.url = options.url || STAN_DEFAULT_URL;
}
public async listen(callback: () => void) {
this.consumer = await this.createStanClient('consumer');
this.publisher = await this.createStanClient('producer');
this.start(callback);
}
public start(callback?: () => void) {
this.bindEvents(this.consumer, this.publisher);
callback();
}
public bindEvents(consumer: stan.Stan, publisher: stan.Stan) {
const registeredPatterns = Object.keys(this.messageHandlers);
registeredPatterns.forEach(pattern => {
const channel = this.getAckQueueName(pattern);
// TODO: Add opts
const subscription = consumer.subscribe(channel, 'default');
subscription.on(
MESSAGE_EVENT,
() => this.getMessageHandler(channel, publisher),
);
});
}
public close() {
this.publisher && this.publisher.close();
this.consumer && this.consumer.close();
this.publisher = this.consumer = null;
}
public createStanClient(clientId: string): Promise<stan.Stan> {
const client = stan.connect('clusterId', clientId, {
url: this.url,
});
this.handleError(client);
return new Promise(resolve =>
client.on(CONNECT_EVENT, () => resolve(client)),
);
}
public getMessageHandler(channel: string, pubClient: stan.Stan) {
return async buffer => await this.handleMessage(channel, buffer, pubClient);
}
public async handleMessage(
channel: string,
message: ReadPacket & PacketId,
pub: stan.Stan,
) {
const pattern = channel.replace(/_ack$/, '');
const publish = this.getPublisher(pub, pattern, message.id);
const status = 'error';
if (!this.messageHandlers[pattern]) {
return publish({ id: message.id, status, err: NO_PATTERN_MESSAGE });
}
const handler = this.messageHandlers[pattern];
const response$ = this.transformToObservable(
await handler(message.data),
) as Observable<any>;
response$ && this.send(response$, publish);
}
public getPublisher(publisher: stan.Stan, pattern: any, id: string) {
return response =>
publisher.publish(this.getResQueueName(pattern, id), Object.assign(
response,
{ id },
) as any);
}
public getAckQueueName(pattern: string): string {
return `${pattern}_ack`;
}
public getResQueueName(pattern: string, id: string): string {
return `${pattern}_${id}_res`;
}
public handleError(stream) {
stream.on(ERROR_EVENT, err => this.logger.error(err));
}
}