mirror of
https://github.com/nestjs/nest.git
synced 2026-02-21 23:11:44 +00:00
168 lines
4.4 KiB
TypeScript
168 lines
4.4 KiB
TypeScript
import { Logger, Type } from '@nestjs/common';
|
|
import * as net from 'net';
|
|
import { EmptyError, lastValueFrom } from 'rxjs';
|
|
import { share, tap } from 'rxjs/operators';
|
|
import { ConnectionOptions } from 'tls';
|
|
import {
|
|
CLOSE_EVENT,
|
|
ECONNREFUSED,
|
|
ERROR_EVENT,
|
|
MESSAGE_EVENT,
|
|
TCP_DEFAULT_HOST,
|
|
TCP_DEFAULT_PORT,
|
|
} from '../constants';
|
|
import { JsonSocket, TcpSocket } from '../helpers';
|
|
import { connect as tlsConnect, TLSSocket } from 'tls';
|
|
import { PacketId, ReadPacket, WritePacket } from '../interfaces';
|
|
import { TcpClientOptions } from '../interfaces/client-metadata.interface';
|
|
import { ClientProxy } from './client-proxy';
|
|
|
|
export class ClientTCP extends ClientProxy {
|
|
protected connection: Promise<any>;
|
|
private readonly logger = new Logger(ClientTCP.name);
|
|
private readonly port: number;
|
|
private readonly host: string;
|
|
private readonly socketClass: Type<TcpSocket>;
|
|
private isConnected = false;
|
|
private socket: TcpSocket;
|
|
public tlsOptions?: ConnectionOptions;
|
|
|
|
constructor(options: TcpClientOptions['options']) {
|
|
super();
|
|
this.port = this.getOptionsProp(options, 'port') || TCP_DEFAULT_PORT;
|
|
this.host = this.getOptionsProp(options, 'host') || TCP_DEFAULT_HOST;
|
|
this.socketClass =
|
|
this.getOptionsProp(options, 'socketClass') || JsonSocket;
|
|
this.tlsOptions = this.getOptionsProp(options, 'tlsOptions');
|
|
|
|
this.initializeSerializer(options);
|
|
this.initializeDeserializer(options);
|
|
}
|
|
|
|
public connect(): Promise<any> {
|
|
if (this.connection) {
|
|
return this.connection;
|
|
}
|
|
this.socket = this.createSocket();
|
|
this.bindEvents(this.socket);
|
|
|
|
const source$ = this.connect$(this.socket.netSocket).pipe(
|
|
tap(() => {
|
|
this.isConnected = true;
|
|
this.socket.on(MESSAGE_EVENT, (buffer: WritePacket & PacketId) =>
|
|
this.handleResponse(buffer),
|
|
);
|
|
}),
|
|
share(),
|
|
);
|
|
|
|
// For TLS connections, the connection is initiated when the socket is created
|
|
if (!this.tlsOptions) {
|
|
this.socket.connect(this.port, this.host);
|
|
}
|
|
this.connection = lastValueFrom(source$).catch(err => {
|
|
if (err instanceof EmptyError) {
|
|
return;
|
|
}
|
|
throw err;
|
|
});
|
|
|
|
return this.connection;
|
|
}
|
|
|
|
public async handleResponse(buffer: unknown): Promise<void> {
|
|
const { err, response, isDisposed, id } =
|
|
await this.deserializer.deserialize(buffer);
|
|
const callback = this.routingMap.get(id);
|
|
if (!callback) {
|
|
return undefined;
|
|
}
|
|
if (isDisposed || err) {
|
|
return callback({
|
|
err,
|
|
response,
|
|
isDisposed: true,
|
|
});
|
|
}
|
|
callback({
|
|
err,
|
|
response,
|
|
});
|
|
}
|
|
|
|
public createSocket(): TcpSocket {
|
|
let socket: net.Socket | TLSSocket;
|
|
/**
|
|
* TLS enabled, "upgrade" the TCP Socket to TLS
|
|
*/
|
|
if (this.tlsOptions) {
|
|
socket = tlsConnect({
|
|
...this.tlsOptions,
|
|
port: this.port,
|
|
host: this.host,
|
|
socket,
|
|
});
|
|
} else {
|
|
socket = new net.Socket();
|
|
}
|
|
return new this.socketClass(socket);
|
|
}
|
|
|
|
public close() {
|
|
this.socket && this.socket.end();
|
|
this.handleClose();
|
|
}
|
|
|
|
public bindEvents(socket: TcpSocket) {
|
|
socket.on(
|
|
ERROR_EVENT,
|
|
(err: any) => err.code !== ECONNREFUSED && this.handleError(err),
|
|
);
|
|
socket.on(CLOSE_EVENT, () => this.handleClose());
|
|
}
|
|
|
|
public handleError(err: any) {
|
|
this.logger.error(err);
|
|
}
|
|
|
|
public handleClose() {
|
|
this.isConnected = false;
|
|
this.socket = null;
|
|
this.connection = undefined;
|
|
|
|
if (this.routingMap.size > 0) {
|
|
const err = new Error('Connection closed');
|
|
for (const callback of this.routingMap.values()) {
|
|
callback({ err });
|
|
}
|
|
this.routingMap.clear();
|
|
}
|
|
}
|
|
|
|
protected publish(
|
|
partialPacket: ReadPacket,
|
|
callback: (packet: WritePacket) => any,
|
|
): () => void {
|
|
try {
|
|
const packet = this.assignPacketId(partialPacket);
|
|
const serializedPacket = this.serializer.serialize(packet);
|
|
|
|
this.routingMap.set(packet.id, callback);
|
|
this.socket.sendMessage(serializedPacket);
|
|
|
|
return () => this.routingMap.delete(packet.id);
|
|
} catch (err) {
|
|
callback({ err });
|
|
}
|
|
}
|
|
|
|
protected async dispatchEvent(packet: ReadPacket): Promise<any> {
|
|
const pattern = this.normalizePattern(packet.pattern);
|
|
const serializedPacket = this.serializer.serialize({
|
|
...packet,
|
|
pattern,
|
|
});
|
|
return this.socket.sendMessage(serializedPacket);
|
|
}
|
|
}
|