feat(microservice): add tcp over tls support

Signed-off-by: nomaxg <noahgolub2@gmail.com>
This commit is contained in:
nomaxg
2022-11-21 21:54:39 -05:00
committed by Noah Golub
parent d3a025cbf8
commit b9c235ae18
11 changed files with 474 additions and 4 deletions

View File

@@ -0,0 +1,142 @@
import { INestApplication } from '@nestjs/common';
import { Transport } from '@nestjs/microservices';
import { Test } from '@nestjs/testing';
import { expect } from 'chai';
import * as request from 'supertest';
import { AppController } from '../src/tcp-tls/app.controller';
import { ApplicationModule } from '../src/tcp-tls/app.module';
import * as fs from 'fs';
import * as path from 'path';
describe('RPC TLS transport', () => {
let server;
let app: INestApplication;
let key: string;
let cert: string;
before(() => {
// Generate a self-signed key pair
key = fs
.readFileSync(path.join(__dirname, '../src/tcp-tls/privkey.pem'), 'utf8')
.toString();
cert = fs
.readFileSync(path.join(__dirname, '../src/tcp-tls/ca.cert.pem'), 'utf8')
.toString();
});
beforeEach(async () => {
const module = await Test.createTestingModule({
imports: [ApplicationModule],
}).compile();
app = module.createNestApplication();
server = app.getHttpAdapter().getInstance();
app.connectMicroservice({
transport: Transport.TCP,
options: {
host: '0.0.0.0',
tlsOptions: { key: key, cert: cert },
},
});
await app.startAllMicroservices();
await app.init();
});
it(`/POST TLS`, () => {
return request(server)
.post('/?command=sum')
.send([1, 2, 3, 4, 5])
.expect(200, '15');
});
it(`/POST (Promise/async)`, () => {
return request(server)
.post('/?command=asyncSum')
.send([1, 2, 3, 4, 5])
.expect(200)
.expect(200, '15');
});
it(`/POST (Observable stream)`, () => {
return request(server)
.post('/?command=streamSum')
.send([1, 2, 3, 4, 5])
.expect(200, '15');
});
it(`/POST (useFactory client)`, () => {
return request(server)
.post('/useFactory?command=sum')
.send([1, 2, 3, 4, 5])
.expect(200, '15');
});
it(`/POST (useClass client)`, () => {
return request(server)
.post('/useClass?command=sum')
.send([1, 2, 3, 4, 5])
.expect(200, '15');
});
it(`/POST (concurrent)`, () => {
return request(server)
.post('/concurrent')
.send([
Array.from({ length: 10 }, (v, k) => k + 1),
Array.from({ length: 10 }, (v, k) => k + 11),
Array.from({ length: 10 }, (v, k) => k + 21),
Array.from({ length: 10 }, (v, k) => k + 31),
Array.from({ length: 10 }, (v, k) => k + 41),
Array.from({ length: 10 }, (v, k) => k + 51),
Array.from({ length: 10 }, (v, k) => k + 61),
Array.from({ length: 10 }, (v, k) => k + 71),
Array.from({ length: 10 }, (v, k) => k + 81),
Array.from({ length: 10 }, (v, k) => k + 91),
])
.expect(200, 'true');
});
it(`/POST (streaming)`, () => {
return request(server)
.post('/stream')
.send([1, 2, 3, 4, 5])
.expect(200, '15');
});
it(`/POST (pattern not found)`, () => {
return request(server).post('/?command=test').expect(500);
});
it(`/POST (event notification)`, done => {
request(server)
.post('/notify')
.send([1, 2, 3, 4, 5])
.end(() => {
setTimeout(() => {
expect(AppController.IS_NOTIFIED).to.be.true;
done();
}, 1000);
});
});
it('/POST (custom client)', () => {
return request(server)
.post('/error?client=custom')
.send({})
.expect(200)
.expect('true');
});
it('/POST (standard client)', () => {
return request(server)
.post('/error?client=standard')
.send({})
.expect(200)
.expect('false');
});
afterEach(async () => {
await app.close();
});
});

View File

@@ -0,0 +1,141 @@
import {
Body,
Controller,
HttpCode,
Inject,
Post,
Query,
} from '@nestjs/common';
import {
Client,
ClientProxy,
EventPattern,
MessagePattern,
RpcException,
Transport,
} from '@nestjs/microservices';
import { from, lastValueFrom, Observable, of, throwError } from 'rxjs';
import { catchError, scan } from 'rxjs/operators';
import * as fs from 'fs';
import * as path from 'path';
@Controller()
export class AppController {
constructor(
@Inject('USE_CLASS_CLIENT') private useClassClient: ClientProxy,
@Inject('USE_FACTORY_CLIENT') private useFactoryClient: ClientProxy,
@Inject('CUSTOM_PROXY_CLIENT') private customClient: ClientProxy,
) {}
static IS_NOTIFIED = false;
@Client({
transport: Transport.TCP,
options: {
tlsOptions: {
ca: [
fs
.readFileSync(path.join(__dirname, 'ca.cert.pem'), 'utf-8')
.toString(),
],
},
},
})
client: ClientProxy;
@Post()
@HttpCode(200)
call(@Query('command') cmd, @Body() data: number[]): Observable<number> {
return this.client.send<number>({ cmd }, data);
}
@Post('useFactory')
@HttpCode(200)
callWithClientUseFactory(
@Query('command') cmd,
@Body() data: number[],
): Observable<number> {
return this.useFactoryClient.send<number>({ cmd }, data);
}
@Post('useClass')
@HttpCode(200)
callWithClientUseClass(
@Query('command') cmd,
@Body() data: number[],
): Observable<number> {
return this.useClassClient.send<number>({ cmd }, data);
}
@Post('stream')
@HttpCode(200)
stream(@Body() data: number[]): Observable<number> {
return this.client
.send<number>({ cmd: 'streaming' }, data)
.pipe(scan((a, b) => a + b));
}
@Post('concurrent')
@HttpCode(200)
concurrent(@Body() data: number[][]): Promise<boolean> {
const send = async (tab: number[]) => {
const expected = tab.reduce((a, b) => a + b);
const result = await lastValueFrom(
this.client.send<number>({ cmd: 'sum' }, tab),
);
return result === expected;
};
return data
.map(async tab => send(tab))
.reduce(async (a, b) => (await a) && b);
}
@Post('error')
@HttpCode(200)
serializeError(
@Query('client') query: 'custom' | 'standard' = 'standard',
@Body() body: Record<string, any>,
): Observable<boolean> {
const client = query === 'custom' ? this.customClient : this.client;
return client.send({ cmd: 'err' }, {}).pipe(
catchError(err => {
return of(err instanceof RpcException);
}),
);
}
@MessagePattern({ cmd: 'sum' })
sum(data: number[]): number {
return (data || []).reduce((a, b) => a + b);
}
@MessagePattern({ cmd: 'asyncSum' })
async asyncSum(data: number[]): Promise<number> {
return (data || []).reduce((a, b) => a + b);
}
@MessagePattern({ cmd: 'streamSum' })
streamSum(data: number[]): Observable<number> {
return of((data || []).reduce((a, b) => a + b));
}
@MessagePattern({ cmd: 'streaming' })
streaming(data: number[]): Observable<number> {
return from(data);
}
@MessagePattern({ cmd: 'err' })
throwAnError() {
return throwError(() => new Error('err'));
}
@Post('notify')
async sendNotification(): Promise<any> {
return this.client.emit<number>('notification', true);
}
@EventPattern('notification')
eventHandler(data: boolean) {
AppController.IS_NOTIFIED = data;
}
}

View File

@@ -0,0 +1,90 @@
import { Module, Injectable } from '@nestjs/common';
import { AppController } from './app.controller';
import {
ClientsModule,
Transport,
ClientsModuleOptionsFactory,
ClientOptions,
ClientTCP,
RpcException,
} from '@nestjs/microservices';
import * as fs from 'fs';
import * as path from 'path';
const caCert = fs.readFileSync(path.join(__dirname, 'ca.cert.pem')).toString();
class ErrorHandlingProxy extends ClientTCP {
constructor() {
super({
tlsOptions: { ca: caCert },
});
}
serializeError(err) {
return new RpcException(err);
}
}
@Injectable()
class ConfigService {
private readonly config = {
transport: Transport.TCP,
};
get(key: string, defaultValue?: any) {
return this.config[key] || defaultValue;
}
}
@Module({
providers: [ConfigService],
exports: [ConfigService],
})
class ConfigModule {}
@Injectable()
class ClientOptionService implements ClientsModuleOptionsFactory {
constructor(private readonly configService: ConfigService) {}
createClientOptions(): Promise<ClientOptions> | ClientOptions {
return {
transport: this.configService.get('transport'),
options: {
tlsOptions: { ca: caCert },
},
};
}
}
@Module({
imports: [
ClientsModule.registerAsync([
{
imports: [ConfigModule],
name: 'USE_FACTORY_CLIENT',
useFactory: (configService: ConfigService) => ({
transport: configService.get('transport'),
options: {
tlsOptions: { ca: caCert },
},
}),
inject: [ConfigService],
},
{
imports: [ConfigModule],
name: 'USE_CLASS_CLIENT',
useClass: ClientOptionService,
inject: [ConfigService],
},
{
imports: [ConfigModule],
inject: [ConfigService],
name: 'CUSTOM_PROXY_CLIENT',
useFactory: (config: ConfigService) => ({
customClass: ErrorHandlingProxy,
}),
},
]),
],
controllers: [AppController],
})
export class ApplicationModule {}

View File

@@ -0,0 +1,17 @@
-----BEGIN CERTIFICATE-----
MIICpDCCAYwCCQCyP27z3r0PFjANBgkqhkiG9w0BAQsFADAUMRIwEAYDVQQDDAls
b2NhbGhvc3QwHhcNMjIxMjAyMDQ0NTQ1WhcNMzIxMTI5MDQ0NTQ1WjAUMRIwEAYD
VQQDDAlsb2NhbGhvc3QwggEiMA0GCSqGSIb3DQEBAQUAA4IBDwAwggEKAoIBAQDZ
1IdZZXqrwXql4AIOJnlfpoGKOKoIalnK7KaKHTsq1QOF8z2abFuNBVIIrO0etQ/0
PPAaFGkXl6HHBuA5PrFpsw3V1wSnNs1Cns9NhvypHI2V71lkwBJrEaSicNWL2AOE
QkQ9cZ4YsTGd0BrM8D5VvgXdrC7gOXfj7Hx3E4K+wFO/Gi4AUXl5CXxleSFcW4U+
jFulfq/DE8rBZXs29IsGeVkkgUoICjQ4Ey4zE6EY7f3SPKgU8gfgzYyGSd/ZZ/E7
6M2yakEUX448Nl4BeuNWroBHVm1pSiMo+Cm1g34pJScPrx1yw6qquziCc/2n1M6O
B4WGIZAmJDWnAOEjjrxFAgMBAAEwDQYJKoZIhvcNAQELBQADggEBAABGByZZUjaq
ZygICSH2qUGHPPIyrfaCe0qM7de6kYfxxPYQQZb0HDynzv780Lq1002XeT02fNR+
5sBCVFuKvS8BNvTq6kHzO1FiWIk/E5fQcYNToYSeEcXgWFLhJMty7+R6sIc9y8PH
2YNehf78Jjm9ukM52sLc4+JWl7AEeqPrOHZdh/ve8M2gTfimFKTrW5cEAVPIOPhp
2t5BdDKt8ZxgrGC7iRxga+v80VUOHRGfrd3hf3NlDQZO8upVGY8DdJhPRDB72+R0
kzJ7eyQwlGXM20atiFxPk43h0f273MneIJG8NgGiVU0ND4XnZkAB3KSAu7pB+nEw
QRYMYDgo/8Q=
-----END CERTIFICATE-----

View File

@@ -0,0 +1,28 @@
-----BEGIN PRIVATE KEY-----
MIIEvAIBADANBgkqhkiG9w0BAQEFAASCBKYwggSiAgEAAoIBAQDZ1IdZZXqrwXql
4AIOJnlfpoGKOKoIalnK7KaKHTsq1QOF8z2abFuNBVIIrO0etQ/0PPAaFGkXl6HH
BuA5PrFpsw3V1wSnNs1Cns9NhvypHI2V71lkwBJrEaSicNWL2AOEQkQ9cZ4YsTGd
0BrM8D5VvgXdrC7gOXfj7Hx3E4K+wFO/Gi4AUXl5CXxleSFcW4U+jFulfq/DE8rB
ZXs29IsGeVkkgUoICjQ4Ey4zE6EY7f3SPKgU8gfgzYyGSd/ZZ/E76M2yakEUX448
Nl4BeuNWroBHVm1pSiMo+Cm1g34pJScPrx1yw6qquziCc/2n1M6OB4WGIZAmJDWn
AOEjjrxFAgMBAAECggEANUKZtWnyjHxKGLSkzADsPE7h7YHdUSFvwwYJ0ktDZD2h
FudacJ994wUiAa0GbTOoKvebXUUQTQxuKdOsj1Kc3lNBVr+0C46CsX9TAIm4zUCF
/dr/6HpuBm/R6UXdcMvoUDZDqSJWnYL1trhjVSiIlT5ZANJQw9JJVhlEdXj3xtuc
I9aC+33f9hKO1wzei/mTjIRGyRIeKselZPpA7qJnE4s3hmZxnO/rSqyPIvh1XLfZ
3Eoyyg+xNpTZ8JqlHB5d7hDSnj8cjboa7IYpHJDXN3r8Aui+R9e/sQkezHbF7fR0
xHpBVYQvGMuqSnTBkdJfq4qPAR1K49UrpEXB2GHaIQKBgQDs3t/ZUA138AiPIeLd
aTsEPhf5dWEZynfNvXZ0VcoSr8ckaiaq9OEllprAriYWj+XphTDsBUAh/R7KlOR4
eb+m6OwH7LseGiLIEr54GGP0LzVXAkfH2/uR1cak8qAmHB00jNEg7sj0eVAsHO3f
WQm67f+RNP/IgAa+V8JKIkgTwwKBgQDrbAH8eAQsq9rjpxcic1EUI7uFHzr+cKf/
4Y8ThLUNAzNfAbQWRBYjS1R8GM79Wiuh+WT1ooHKLryuLF7LVukvKHJ5GiNFBmaO
llf72Zf1y4tBE2RCXQbf6h8+ohSDC/hwYy+w20/i2KzSBKkS0+gQuAX/HzfTpRd3
q3/uEniXVwKBgDNnElDIbIPQlSrqgZ7mzSXYi79Y15+PLnx5VxFb5KQ1fRPL7WRA
C/PqQN77a8yNoakRfFJbuVUm5t2zffkfApYoCcCWgOzBYzbjym2pbVd6PysIlacr
d+Zn69mzxUk/5J6YyHFLIFTdVqacCIrleZUVPNa4F6HdFpmL1d/cnKOdAoGAMDuB
sKsaF9jh0LBkEf/URa8IdT6vxH9qPAeHW7VdrpvQQ4/CyKkMbBC772zZw5hcxiOl
Zpnzw2uN5pVamohk3++GfH85aKPmESKGRigPdSFNl3iUmvAaP3flDN0CHNMwBD6d
/7r/A/fmeGTSCvR1YC+DswA/XNI/G5p8bFdGc6MCgYBd9oQiZlkYMiDGPUAjx+DO
kqtAmc8DLJEanSbWdIxL2bGL04cgBRPssM4m0UScx4PucvqWEPdN/5Ug0z5TrD77
2K5nZSBUdy4DunBImz1NHRQEiytkrYX0LesGr02QlzIH4wmwb1TFu7rLkr6KfNuV
xqWi+JVY8N4vuHAxCeEALw==
-----END PRIVATE KEY-----

View File

@@ -247,4 +247,4 @@
],
"exit": true
}
}
}

View File

@@ -2,6 +2,7 @@ import { Logger, Type } from '@nestjs/common';
import * as net from 'net';
import { EmptyError, lastValueFrom } from 'rxjs';
import { share, tap } from 'rxjs/operators';
import { ConnectionOptions } from 'tls';
import {
CLOSE_EVENT,
ECONNREFUSED,
@@ -11,6 +12,7 @@ import {
TCP_DEFAULT_PORT,
} from '../constants';
import { JsonSocket, TcpSocket } from '../helpers';
import { connect as tlsConnect, TLSSocket } from 'tls';
import { PacketId, ReadPacket, WritePacket } from '../interfaces';
import { TcpClientOptions } from '../interfaces/client-metadata.interface';
import { ClientProxy } from './client-proxy';
@@ -23,6 +25,7 @@ export class ClientTCP extends ClientProxy {
private readonly socketClass: Type<TcpSocket>;
private isConnected = false;
private socket: TcpSocket;
public tlsOptions?: ConnectionOptions;
constructor(options: TcpClientOptions['options']) {
super();
@@ -30,6 +33,7 @@ export class ClientTCP extends ClientProxy {
this.host = this.getOptionsProp(options, 'host') || TCP_DEFAULT_HOST;
this.socketClass =
this.getOptionsProp(options, 'socketClass') || JsonSocket;
this.tlsOptions = this.getOptionsProp(options, 'tlsOptions');
this.initializeSerializer(options);
this.initializeDeserializer(options);
@@ -42,6 +46,10 @@ export class ClientTCP extends ClientProxy {
this.socket = this.createSocket();
this.bindEvents(this.socket);
if (!this.tlsOptions) {
this.socket.connect(this.port, this.host);
}
const source$ = this.connect$(this.socket.netSocket).pipe(
tap(() => {
this.isConnected = true;
@@ -52,7 +60,6 @@ export class ClientTCP extends ClientProxy {
share(),
);
this.socket.connect(this.port, this.host);
this.connection = lastValueFrom(source$).catch(err => {
if (err instanceof EmptyError) {
return;
@@ -84,7 +91,21 @@ export class ClientTCP extends ClientProxy {
}
public createSocket(): TcpSocket {
return new this.socketClass(new net.Socket());
let socket: net.Socket | TLSSocket;
/**
* TLS enabled, "upgrade" the TCP Socket to TLS
*/
if (this.tlsOptions) {
socket = tlsConnect({
...this.tlsOptions,
port: this.port,
host: this.host,
socket,
});
} else {
socket = new net.Socket();
}
return new this.socketClass(socket);
}
public close() {

View File

@@ -12,6 +12,7 @@ import {
RmqOptions,
} from './microservice-configuration.interface';
import { Serializer } from './serializer.interface';
import { ConnectionOptions } from 'tls';
export type ClientOptions =
| RedisOptions
@@ -34,6 +35,7 @@ export interface TcpClientOptions {
port?: number;
serializer?: Serializer;
deserializer?: Deserializer;
tlsOptions?: ConnectionOptions;
socketClass?: Type<TcpSocket>;
};
}

View File

@@ -1,4 +1,5 @@
import { Type } from '@nestjs/common';
import { TlsOptions } from 'tls';
import { Transport } from '../enums/transport.enum';
import { ChannelOptions } from '../external/grpc-options.interface';
import {
@@ -78,6 +79,7 @@ export interface TcpOptions {
retryAttempts?: number;
retryDelay?: number;
serializer?: Serializer;
tlsOptions?: TlsOptions;
deserializer?: Deserializer;
socketClass?: Type<TcpSocket>;
};

View File

@@ -15,6 +15,7 @@ import {
import { TcpContext } from '../ctx-host/tcp.context';
import { Transport } from '../enums';
import { JsonSocket, TcpSocket } from '../helpers';
import { createServer as tlsCreateServer } from 'tls';
import {
CustomTransportStrategy,
IncomingRequest,
@@ -35,6 +36,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
private readonly socketClass: Type<TcpSocket>;
private isExplicitlyTerminated = false;
private retryAttemptsCount = 0;
private tlsOptions?;
constructor(private readonly options: TcpOptions['options']) {
super();
@@ -42,6 +44,7 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
this.host = this.getOptionsProp(options, 'host') || TCP_DEFAULT_HOST;
this.socketClass =
this.getOptionsProp(options, 'socketClass') || JsonSocket;
this.tlsOptions = this.getOptionsProp(options, 'tlsOptions');
this.init();
this.initializeSerializer(options);
@@ -124,7 +127,16 @@ export class ServerTCP extends Server implements CustomTransportStrategy {
}
private init() {
this.server = net.createServer(this.bindHandler.bind(this));
if (this.tlsOptions) {
// TLS enabled, use tls server
this.server = tlsCreateServer(
this.tlsOptions,
this.bindHandler.bind(this),
);
} else {
// TLS disabled, use net server
this.server = net.createServer(this.bindHandler.bind(this));
}
this.server.on(ERROR_EVENT, this.handleError.bind(this));
this.server.on(CLOSE_EVENT, this.handleClose.bind(this));
}

View File

@@ -1,5 +1,7 @@
import { expect } from 'chai';
import * as sinon from 'sinon';
import { TLSSocket } from 'tls';
import { Socket as NetSocket } from 'net';
import { ClientTCP } from '../../client/client-tcp';
import { ERROR_EVENT } from '../../constants';
@@ -214,4 +216,17 @@ describe('ClientTCP', () => {
expect(sendMessageStub.called).to.be.true;
});
});
describe('tls', () => {
it('should upgrade to TLS', () => {
const client = new ClientTCP({ tlsOptions: {} });
console.log(client);
const jsonSocket = client.createSocket();
expect(jsonSocket.socket).instanceOf(TLSSocket);
});
it('should not upgrade to TLS, if not requested', () => {
const jsonSocket = new ClientTCP({}).createSocket();
expect(jsonSocket.socket).instanceOf(NetSocket);
});
});
});