refactor: cleanup imports, move write fn to class body

This commit is contained in:
Kamil Myśliwiec
2023-08-30 14:50:53 +02:00
parent 74d65f438a
commit de2adcff9d

View File

@@ -26,7 +26,6 @@ import { ChannelOptions } from '../external/grpc-options.interface';
import { CustomTransportStrategy, MessageHandler } from '../interfaces';
import { GrpcOptions } from '../interfaces/microservice-configuration.interface';
import { Server } from './server';
import { Writable } from 'stream';
let grpcPackage: any = {};
let grpcProtoLoaderPackage: any = {};
@@ -243,7 +242,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
const result$ = this.transformToObservable(await handler);
try {
await writeObservableToGrpc(result$, call);
await this.writeObservableToGrpc(result$, call);
} catch (err) {
call.emit('error', err);
return;
@@ -251,6 +250,110 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
};
}
/**
* Writes an observable to a GRPC call.
*
* This function will ensure that backpressure is managed while writing values
* that come from an observable to a GRPC call.
*
* @param source The observable we want to write out to the GRPC call.
* @param call The GRPC call we want to write to.
* @returns A promise that resolves when we're done writing to the call.
*/
public writeObservableToGrpc<T>(
source: Observable<T>,
call: GrpcCall<T>,
): Promise<void> {
return new Promise((resolve, reject) => {
// This buffer is used to house values that arrive
// while the call is in the process of writing and draining.
const buffer: T[] = [];
let isComplete = false;
let writing = false;
const cleanups: (() => void)[] = [];
const cleanup = () => {
for (const cleanup of cleanups) {
cleanup();
}
};
const write = (value: T) => {
writing = true;
call.write(value);
};
const done = () => {
call.end();
resolve();
cleanup();
};
// Handling backpressure by waiting for drain event
const drainHandler = () => {
if (writing) {
writing = false;
if (buffer.length > 0) {
// Write any queued values we have in
// our buffer. Note that the `writing` boolean
// flips from false to true synchronously in this case
// that will prevent values arriving in our `next` call
// below from being interleaved in the written output.
write(buffer.shift()!);
} else if (isComplete) {
// Otherwise, if we're complete, end the call.
done();
}
}
};
call.on('drain', drainHandler);
cleanups.push(() => {
call.off('drain', drainHandler);
});
const subscription = new Subscription();
// Make sure that a cancel event unsubscribes from
// the source observable.
const cancelHandler = () => {
subscription.unsubscribe();
done();
};
call.on(CANCEL_EVENT, cancelHandler);
cleanups.push(() => {
call.off(CANCEL_EVENT, cancelHandler);
});
subscription.add(
source.subscribe({
next: (value: T) => {
if (writing) {
// If a value arrives while we're writing
// then we queue it up to be processed FIFO.
buffer.push(value);
} else {
// If we're not currently writing, then
// we can write the value immediately.
write(value);
}
},
error: (err: any) => {
call.emit('error', err);
reject(err);
cleanup();
},
complete: () => {
isComplete = true;
if (buffer.length === 0) {
done();
}
},
}),
);
});
}
public createRequestStreamMethod(
methodHandler: Function,
isResponseStream: boolean,
@@ -277,7 +380,7 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
const handler = methodHandler(req.asObservable(), call.metadata, call);
const res = this.transformToObservable(await handler);
if (isResponseStream) {
await writeObservableToGrpc(res, call);
await this.writeObservableToGrpc(res, call);
} else {
const response = await lastValueFrom(
res.pipe(
@@ -481,107 +584,3 @@ export class ServerGrpc extends Server implements CustomTransportStrategy {
}
}
}
/**
* Writes an observable to a GRPC call.
*
* This function will ensure that backpressure is managed while writing values
* that come from an observable to a GRPC call.
*
* @param source The observable we want to write out to the GRPC call.
* @param call The GRPC call we want to write to.
* @returns A promise that resolves when we're done writing to the call.
*/
function writeObservableToGrpc<T>(
source: Observable<T>,
call: GrpcCall<T>,
): Promise<void> {
return new Promise((resolve, reject) => {
// This buffer is used to house values that arrive
// while the call is in the process of writing and draining.
const buffer: T[] = [];
let isComplete = false;
let writing = false;
const cleanups: (() => void)[] = [];
const cleanup = () => {
for (const cleanup of cleanups) {
cleanup();
}
};
const write = (value: T) => {
writing = true;
call.write(value);
};
const done = () => {
call.end();
resolve();
cleanup();
};
// Handling backpressure by waiting for drain event
const drainHandler = () => {
if (writing) {
writing = false;
if (buffer.length > 0) {
// Write any queued values we have in
// our buffer. Note that the `writing` boolean
// flips from false to true synchronously in this case
// that will prevent values arriving in our `next` call
// below from being interleaved in the written output.
write(buffer.shift()!);
} else if (isComplete) {
// Otherwise, if we're complete, end the call.
done();
}
}
};
call.on('drain', drainHandler);
cleanups.push(() => {
call.off('drain', drainHandler);
});
const subscription = new Subscription();
// Make sure that a cancel event unsubscribes from
// the source observable.
const cancelHandler = () => {
subscription.unsubscribe();
done();
};
call.on(CANCEL_EVENT, cancelHandler);
cleanups.push(() => {
call.off(CANCEL_EVENT, cancelHandler);
});
subscription.add(
source.subscribe({
next: (value: T) => {
if (writing) {
// If a value arrives while we're writing
// then we queue it up to be processed FIFO.
buffer.push(value);
} else {
// If we're not currently writing, then
// we can write the value immediately.
write(value);
}
},
error: (err: any) => {
call.emit('error', err);
reject(err);
cleanup();
},
complete: () => {
isComplete = true;
if (buffer.length === 0) {
done();
}
},
}),
);
});
}