Merge pull request #16262 from jobnow/feat/microservices-configurable-max-buffer-size

Feat/microservices configurable max buffer size
This commit is contained in:
Kamil Mysliwiec
2026-02-02 14:20:43 +01:00
committed by GitHub
8 changed files with 374 additions and 2 deletions

View File

@@ -19,6 +19,7 @@ export class ClientTCP extends ClientProxy<TcpEvents, TcpStatus> {
protected readonly host: string;
protected readonly socketClass: Type<TcpSocket>;
protected readonly tlsOptions?: ConnectionOptions;
protected readonly maxBufferSize?: number;
protected socket: TcpSocket | null = null;
protected connectionPromise: Promise<any> | null = null;
protected pendingEventListeners: Array<{
@@ -32,6 +33,7 @@ export class ClientTCP extends ClientProxy<TcpEvents, TcpStatus> {
this.host = this.getOptionsProp(options, 'host', TCP_DEFAULT_HOST);
this.socketClass = this.getOptionsProp(options, 'socketClass', JsonSocket);
this.tlsOptions = this.getOptionsProp(options, 'tlsOptions');
this.maxBufferSize = this.getOptionsProp(options, 'maxBufferSize');
this.initializeSerializer(options);
this.initializeDeserializer(options);
@@ -108,6 +110,13 @@ export class ClientTCP extends ClientProxy<TcpEvents, TcpStatus> {
} else {
socket = new net.Socket();
}
// Pass maxBufferSize only if socketClass is JsonSocket
// For custom socket classes, users should handle maxBufferSize in their own implementation
if (this.maxBufferSize !== undefined && this.socketClass === JsonSocket) {
return new this.socketClass(socket, {
maxBufferSize: this.maxBufferSize,
});
}
return new this.socketClass(socket);
}

View File

@@ -4,7 +4,11 @@ import { CorruptedPacketLengthException } from '../errors/corrupted-packet-lengt
import { MaxPacketLengthExceededException } from '../errors/max-packet-length-exceeded.exception';
import { TcpSocket } from './tcp-socket';
const MAX_BUFFER_SIZE = (512 * 1024 * 1024) / 4; // 512 MBs in characters with 4 bytes per character (32-bit)
const DEFAULT_MAX_BUFFER_SIZE = (512 * 1024 * 1024) / 4; // 512 MBs in characters with 4 bytes per character (32-bit)
export interface JsonSocketOptions {
maxBufferSize?: number;
}
export class JsonSocket extends TcpSocket {
private contentLength: number | null = null;
@@ -12,6 +16,12 @@ export class JsonSocket extends TcpSocket {
private readonly stringDecoder = new StringDecoder();
private readonly delimiter = '#';
private readonly maxBufferSize: number;
constructor(socket: any, options?: JsonSocketOptions) {
super(socket);
this.maxBufferSize = options?.maxBufferSize ?? DEFAULT_MAX_BUFFER_SIZE;
}
protected handleSend(message: any, callback?: (err?: any) => void) {
this.socket.write(this.formatMessageData(message), 'utf-8', callback);
@@ -23,7 +33,7 @@ export class JsonSocket extends TcpSocket {
: dataRaw;
this.buffer += data;
if (this.buffer.length > MAX_BUFFER_SIZE) {
if (this.buffer.length > this.maxBufferSize) {
const bufferLength = this.buffer.length;
this.buffer = '';
throw new MaxPacketLengthExceededException(bufferLength);

View File

@@ -43,5 +43,10 @@ export interface TcpClientOptions {
deserializer?: Deserializer;
tlsOptions?: ConnectionOptions;
socketClass?: Type<TcpSocket>;
/**
* Maximum buffer size in characters (default: 128MB in characters, i.e., (512 * 1024 * 1024) / 4).
* This limit prevents memory exhaustion when receiving large TCP messages.
*/
maxBufferSize?: number;
};
}

View File

@@ -109,6 +109,11 @@ export interface TcpOptions {
tlsOptions?: TlsOptions;
deserializer?: Deserializer;
socketClass?: Type<TcpSocket>;
/**
* Maximum buffer size in characters (default: 128MB in characters, i.e., (512 * 1024 * 1024) / 4).
* This limit prevents memory exhaustion when receiving large TCP messages.
*/
maxBufferSize?: number;
};
}

View File

@@ -37,6 +37,7 @@ export class ServerTCP extends Server<TcpEvents, TcpStatus> {
protected readonly port: number;
protected readonly host: string;
protected readonly socketClass: Type<TcpSocket>;
protected readonly maxBufferSize?: number;
protected isManuallyTerminated = false;
protected retryAttemptsCount = 0;
protected tlsOptions?: TlsOptions;
@@ -51,6 +52,7 @@ export class ServerTCP extends Server<TcpEvents, TcpStatus> {
this.host = this.getOptionsProp(options, 'host', TCP_DEFAULT_HOST);
this.socketClass = this.getOptionsProp(options, 'socketClass', JsonSocket);
this.tlsOptions = this.getOptionsProp(options, 'tlsOptions');
this.maxBufferSize = this.getOptionsProp(options, 'maxBufferSize');
this.init();
this.initializeSerializer(options);
@@ -211,6 +213,13 @@ export class ServerTCP extends Server<TcpEvents, TcpStatus> {
}
protected getSocketInstance(socket: Socket): TcpSocket {
// Pass maxBufferSize only if socketClass is JsonSocket
// For custom socket classes, users should handle maxBufferSize in their own implementation
if (this.maxBufferSize !== undefined && this.socketClass === JsonSocket) {
return new this.socketClass(socket, {
maxBufferSize: this.maxBufferSize,
});
}
return new this.socketClass(socket);
}
}

View File

@@ -4,6 +4,7 @@ import * as sinon from 'sinon';
import { TLSSocket } from 'tls';
import { ClientTCP } from '../../client/client-tcp';
import { TcpEventsMap } from '../../events/tcp.events';
import { TcpSocket } from '../../helpers/tcp-socket';
describe('ClientTCP', () => {
let client: ClientTCP;
@@ -260,4 +261,53 @@ describe('ClientTCP', () => {
expect(jsonSocket.socket).instanceOf(NetSocket);
});
});
describe('maxBufferSize', () => {
const DEFAULT_MAX_BUFFER_SIZE = (512 * 1024 * 1024) / 4;
describe('when maxBufferSize is not provided', () => {
it('should use default maxBufferSize', () => {
const client = new ClientTCP({});
const socket = client.createSocket();
expect(socket['maxBufferSize']).to.equal(DEFAULT_MAX_BUFFER_SIZE);
});
});
describe('when maxBufferSize is provided', () => {
it('should use custom maxBufferSize', () => {
const customSize = 5000;
const client = new ClientTCP({ maxBufferSize: customSize });
const socket = client.createSocket();
expect(socket['maxBufferSize']).to.equal(customSize);
});
it('should pass maxBufferSize to JsonSocket', () => {
const customSize = 10000;
const client = new ClientTCP({ maxBufferSize: customSize });
const socket = client.createSocket();
expect(socket['maxBufferSize']).to.equal(customSize);
});
});
describe('when custom socketClass is provided', () => {
it('should not pass maxBufferSize to custom socket class', () => {
class CustomSocket extends TcpSocket {
constructor(socket: any) {
super(socket);
}
protected handleSend() {}
protected handleData() {}
}
const client = new ClientTCP({
socketClass: CustomSocket as any,
maxBufferSize: 5000,
});
const socket = client.createSocket();
expect(socket).to.be.instanceOf(CustomSocket);
// Custom socket should not have maxBufferSize property
expect(socket['maxBufferSize']).to.be.undefined;
});
});
});
});

View File

@@ -0,0 +1,229 @@
import { expect } from 'chai';
import { Socket } from 'net';
import * as sinon from 'sinon';
import { MaxPacketLengthExceededException } from '../../errors/max-packet-length-exceeded.exception';
import { TcpEventsMap } from '../../events/tcp.events';
import { JsonSocket } from '../../helpers/json-socket';
const DEFAULT_MAX_BUFFER_SIZE = (512 * 1024 * 1024) / 4; // 512 MBs in characters with 4 bytes per character (32-bit)
describe('JsonSocket maxBufferSize', () => {
describe('default maxBufferSize', () => {
it('should use default maxBufferSize when not provided', () => {
const socket = new JsonSocket(new Socket());
expect(socket['maxBufferSize']).to.equal(DEFAULT_MAX_BUFFER_SIZE);
});
it('should accept data up to default maxBufferSize', () => {
const socket = new JsonSocket(new Socket());
// Account for header length (number + '#')
// Use a smaller size to ensure total buffer (header + data) doesn't exceed limit
// Create valid JSON string data
const headerOverhead = 20; // Approximate header size for large numbers
const dataSize = DEFAULT_MAX_BUFFER_SIZE - headerOverhead;
const largeData = '"' + 'x'.repeat(dataSize - 2) + '"'; // Valid JSON string
const packet = `${largeData.length}#${largeData}`;
expect(() => {
socket['handleData'](packet);
}).to.not.throw();
});
it('should throw MaxPacketLengthExceededException when exceeding default maxBufferSize', () => {
const socket = new JsonSocket(new Socket());
const largeData = 'x'.repeat(DEFAULT_MAX_BUFFER_SIZE + 1);
const packet = `${largeData.length}#${largeData}`;
expect(() => {
socket['handleData'](packet);
}).to.throw(MaxPacketLengthExceededException);
});
});
describe('custom maxBufferSize', () => {
it('should use custom maxBufferSize when provided', () => {
const customSize = 1000;
const socket = new JsonSocket(new Socket(), {
maxBufferSize: customSize,
});
expect(socket['maxBufferSize']).to.equal(customSize);
});
it('should accept data up to custom maxBufferSize', () => {
const customSize = 1000;
const socket = new JsonSocket(new Socket(), {
maxBufferSize: customSize,
});
// Account for header length (number + '#')
// For 1000, header is "1000#" = 5 characters
const headerOverhead = 5;
const dataSize = customSize - headerOverhead;
// Create valid JSON string data
const data = '"' + 'x'.repeat(dataSize - 2) + '"'; // Valid JSON string
const packet = `${data.length}#${data}`;
expect(() => {
socket['handleData'](packet);
}).to.not.throw();
});
it('should throw MaxPacketLengthExceededException when exceeding custom maxBufferSize', () => {
const customSize = 1000;
const socket = new JsonSocket(new Socket(), {
maxBufferSize: customSize,
});
const largeData = 'x'.repeat(customSize + 1);
const packet = `${largeData.length}#${largeData}`;
expect(() => {
socket['handleData'](packet);
}).to.throw(MaxPacketLengthExceededException);
});
it('should throw MaxPacketLengthExceededException with correct buffer length', () => {
const customSize = 1000;
const socket = new JsonSocket(new Socket(), {
maxBufferSize: customSize,
});
const largeData = 'x'.repeat(customSize + 100);
const packet = `${largeData.length}#${largeData}`;
// Total buffer size will be: header length (5) + data length (1100) = 1105
const expectedBufferSize = packet.length;
try {
socket['handleData'](packet);
expect.fail('Should have thrown MaxPacketLengthExceededException');
} catch (err) {
expect(err).to.be.instanceof(MaxPacketLengthExceededException);
expect(err.message).to.include(String(expectedBufferSize));
}
});
});
describe('chunked data exceeding maxBufferSize', () => {
it('should throw MaxPacketLengthExceededException when chunked data exceeds limit', () => {
const customSize = 100;
const socket = new JsonSocket(new Socket(), {
maxBufferSize: customSize,
});
// Send data in chunks without a valid header delimiter
// This will accumulate in the buffer without being processed
// First chunk: partial header
socket['handleData']('50');
// Second chunk: more data that accumulates beyond limit
// Buffer now has "50" (2 chars), send enough to exceed customSize
const exceedingData = 'x'.repeat(customSize);
expect(() => {
socket['handleData'](exceedingData);
}).to.throw(MaxPacketLengthExceededException);
});
it('should clear buffer after throwing MaxPacketLengthExceededException', () => {
const customSize = 100;
const socket = new JsonSocket(new Socket(), {
maxBufferSize: customSize,
});
const largeData = 'x'.repeat(customSize + 1);
const packet = `${largeData.length}#${largeData}`;
try {
socket['handleData'](packet);
} catch (err) {
// Expected
}
expect(socket['buffer']).to.equal('');
});
});
describe('error handling when maxBufferSize exceeded', () => {
it(`should emit ${TcpEventsMap.ERROR} event when maxBufferSize is exceeded`, () => {
const customSize = 100;
const socket = new JsonSocket(new Socket(), {
maxBufferSize: customSize,
});
const socketEmitSpy: sinon.SinonSpy<any, any> = sinon.spy(
socket['socket'],
'emit',
);
const largeData = 'x'.repeat(customSize + 1);
const packet = Buffer.from(`${largeData.length}#${largeData}`);
socket['onData'](packet);
expect(socketEmitSpy.called).to.be.true;
expect(socketEmitSpy.calledWith(TcpEventsMap.ERROR)).to.be.true;
socketEmitSpy.restore();
});
it(`should send a FIN packet when maxBufferSize is exceeded`, () => {
const customSize = 100;
const socket = new JsonSocket(new Socket(), {
maxBufferSize: customSize,
});
const socketEndSpy = sinon.spy(socket['socket'], 'end');
const largeData = 'x'.repeat(customSize + 1);
const packet = Buffer.from(`${largeData.length}#${largeData}`);
socket['onData'](packet);
expect(socketEndSpy.calledOnce).to.be.true;
socketEndSpy.restore();
});
});
describe('edge cases', () => {
it('should handle maxBufferSize of 0', () => {
const socket = new JsonSocket(new Socket(), { maxBufferSize: 0 });
expect(socket['maxBufferSize']).to.equal(0);
const packet = '5#"test"';
expect(() => {
socket['handleData'](packet);
}).to.throw(MaxPacketLengthExceededException);
});
it('should handle very large custom maxBufferSize', () => {
const veryLargeSize = 10 * 1024 * 1024; // 10MB in characters
const socket = new JsonSocket(new Socket(), {
maxBufferSize: veryLargeSize,
});
expect(socket['maxBufferSize']).to.equal(veryLargeSize);
// Account for header length (number + '#')
// For 10MB, header is approximately "10485760#" = 10 characters
const headerOverhead = 20; // Safe overhead for large numbers
const dataSize = veryLargeSize - headerOverhead;
// Create valid JSON string data
const data = '"' + 'x'.repeat(dataSize - 2) + '"'; // Valid JSON string
const packet = `${data.length}#${data}`;
expect(() => {
socket['handleData'](packet);
}).to.not.throw();
});
it('should handle maxBufferSize exactly at the limit', () => {
const customSize = 100;
const socket = new JsonSocket(new Socket(), {
maxBufferSize: customSize,
});
// Account for header: "100#" = 4 characters
// So data can be 100 - 4 = 96 characters to stay at limit
const headerOverhead = 4;
const dataSize = customSize - headerOverhead;
// Create valid JSON string data
const data = '"' + 'x'.repeat(dataSize - 2) + '"'; // Valid JSON string
const packet = `${data.length}#${data}`;
// Should not throw when exactly at limit
expect(() => {
socket['handleData'](packet);
}).to.not.throw();
});
});
});

View File

@@ -1,7 +1,9 @@
import { expect } from 'chai';
import { Socket as NetSocket } from 'net';
import * as sinon from 'sinon';
import { NO_MESSAGE_HANDLER } from '../../constants';
import { BaseRpcContext } from '../../ctx-host/base-rpc.context';
import { TcpSocket } from '../../helpers/tcp-socket';
import { ServerTCP } from '../../server/server-tcp';
import { objectToMap } from './utils/object-to-map';
@@ -137,4 +139,57 @@ describe('ServerTCP', () => {
expect(handler.calledWith(data)).to.be.true;
});
});
describe('maxBufferSize', () => {
const DEFAULT_MAX_BUFFER_SIZE = (512 * 1024 * 1024) / 4;
describe('when maxBufferSize is not provided', () => {
it('should use default maxBufferSize', () => {
const server = new ServerTCP({});
const socket = new NetSocket();
const jsonSocket = server['getSocketInstance'](socket);
expect(jsonSocket['maxBufferSize']).to.equal(DEFAULT_MAX_BUFFER_SIZE);
});
});
describe('when maxBufferSize is provided', () => {
it('should use custom maxBufferSize', () => {
const customSize = 5000;
const server = new ServerTCP({ maxBufferSize: customSize });
const socket = new NetSocket();
const jsonSocket = server['getSocketInstance'](socket);
expect(jsonSocket['maxBufferSize']).to.equal(customSize);
});
it('should pass maxBufferSize to JsonSocket', () => {
const customSize = 10000;
const server = new ServerTCP({ maxBufferSize: customSize });
const socket = new NetSocket();
const jsonSocket = server['getSocketInstance'](socket);
expect(jsonSocket['maxBufferSize']).to.equal(customSize);
});
});
describe('when custom socketClass is provided', () => {
it('should not pass maxBufferSize to custom socket class', () => {
class CustomSocket extends TcpSocket {
constructor(socket: any) {
super(socket);
}
protected handleSend() {}
protected handleData() {}
}
const server = new ServerTCP({
socketClass: CustomSocket as any,
maxBufferSize: 5000,
});
const socket = new NetSocket();
const customSocket = server['getSocketInstance'](socket);
expect(customSocket).to.be.instanceOf(CustomSocket);
// Custom socket should not have maxBufferSize property
expect(customSocket['maxBufferSize']).to.be.undefined;
});
});
});
});