template-project/node_modules/batch-cluster/dist/BatchProcess.js
2025-05-30 18:13:30 +08:00

504 lines
25 KiB
JavaScript

"use strict";
var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) {
if (kind === "m") throw new TypeError("Private method is not writable");
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter");
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it");
return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value;
};
var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) {
if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter");
if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it");
return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver);
};
var __importDefault = (this && this.__importDefault) || function (mod) {
return (mod && mod.__esModule) ? mod : { "default": mod };
};
var _BatchProcess_instances, _BatchProcess_lastHealthCheck, _BatchProcess_healthCheckFailures, _BatchProcess_logger, _BatchProcess_lastJobFinshedAt, _BatchProcess_lastJobFailed, _BatchProcess_starting, _BatchProcess_exited, _BatchProcess_whyNotHealthy, _BatchProcess_taskCount, _BatchProcess_currentTask, _BatchProcess_currentTaskTimeout, _BatchProcess_endPromise, _BatchProcess_execTask, _BatchProcess_end, _BatchProcess_awaitNotRunning, _BatchProcess_onTimeout, _BatchProcess_onError, _BatchProcess_onStderr, _BatchProcess_onStdout, _BatchProcess_clearCurrentTask;
Object.defineProperty(exports, "__esModule", { value: true });
exports.BatchProcess = void 0;
const node_timers_1 = __importDefault(require("node:timers"));
const Async_1 = require("./Async");
const Deferred_1 = require("./Deferred");
const Error_1 = require("./Error");
const Object_1 = require("./Object");
const Parser_1 = require("./Parser");
const Pids_1 = require("./Pids");
const Stream_1 = require("./Stream");
const String_1 = require("./String");
const Task_1 = require("./Task");
const Timeout_1 = require("./Timeout");
/**
* BatchProcess manages the care and feeding of a single child process.
*/
class BatchProcess {
/**
* @param onIdle to be called when internal state changes (like the current
* task is resolved, or the process exits)
*/
constructor(proc, opts, onIdle) {
_BatchProcess_instances.add(this);
this.proc = proc;
this.opts = opts;
this.onIdle = onIdle;
this.start = Date.now();
_BatchProcess_lastHealthCheck.set(this, Date.now());
_BatchProcess_healthCheckFailures.set(this, 0);
_BatchProcess_logger.set(this, void 0);
_BatchProcess_lastJobFinshedAt.set(this, Date.now());
_BatchProcess_lastJobFailed.set(this, false
// Only set to true when `proc.pid` is no longer in the process table.
);
// Only set to true when `proc.pid` is no longer in the process table.
_BatchProcess_starting.set(this, true);
_BatchProcess_exited.set(this, false
// override for .whyNotHealthy()
);
// override for .whyNotHealthy()
_BatchProcess_whyNotHealthy.set(this, void 0);
this.failedTaskCount = 0;
_BatchProcess_taskCount.set(this, -1
/**
* Should be undefined if this instance is not currently processing a task.
*/
); // don't count the startupTask
/**
* Should be undefined if this instance is not currently processing a task.
*/
_BatchProcess_currentTask.set(this, void 0);
_BatchProcess_currentTaskTimeout.set(this, void 0);
_BatchProcess_endPromise.set(this, void 0);
this.name = "BatchProcess(" + proc.pid + ")";
__classPrivateFieldSet(this, _BatchProcess_logger, opts.logger, "f");
// don't let node count the child processes as a reason to stay alive
this.proc.unref();
if (proc.pid == null) {
throw new Error("BatchProcess.constructor: child process pid is null");
}
this.pid = proc.pid;
this.proc.on("error", (err) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onError).call(this, "proc.error", err));
this.proc.on("close", () => this.end(false, "proc.close"));
this.proc.on("exit", () => this.end(false, "proc.exit"));
this.proc.on("disconnect", () => this.end(false, "proc.disconnect"));
const stdin = this.proc.stdin;
if (stdin == null)
throw new Error("Given proc had no stdin");
stdin.on("error", (err) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onError).call(this, "stdin.error", err));
const stdout = this.proc.stdout;
if (stdout == null)
throw new Error("Given proc had no stdout");
stdout.on("error", (err) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onError).call(this, "stdout.error", err));
stdout.on("data", (d) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onStdout).call(this, d));
(0, Object_1.map)(this.proc.stderr, (stderr) => {
stderr.on("error", (err) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onError).call(this, "stderr.error", err));
stderr.on("data", (err) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onStderr).call(this, err));
});
const startupTask = new Task_1.Task(opts.versionCommand, Parser_1.SimpleParser);
this.startupTaskId = startupTask.taskId;
if (!this.execTask(startupTask)) {
this.opts.observer.emit("internalError", new Error(this.name + " startup task was not submitted"));
}
// this needs to be at the end of the constructor, to ensure everything is
// set up on `this`
this.opts.observer.emit("childStart", this);
}
get currentTask() {
return __classPrivateFieldGet(this, _BatchProcess_currentTask, "f");
}
get taskCount() {
return __classPrivateFieldGet(this, _BatchProcess_taskCount, "f");
}
get starting() {
return __classPrivateFieldGet(this, _BatchProcess_starting, "f");
}
/**
* @return true if `this.end()` has been requested (which may be due to the
* child process exiting)
*/
get ending() {
return __classPrivateFieldGet(this, _BatchProcess_endPromise, "f") != null;
}
/**
* @return true if `this.end()` has completed running, which includes child
* process cleanup. Note that this may return `true` and the process table may
* still include the child pid. Call {@link BatchProcess#running()} for an authoritative
* (but expensive!) answer.
*/
get ended() {
var _a;
return true === ((_a = __classPrivateFieldGet(this, _BatchProcess_endPromise, "f")) === null || _a === void 0 ? void 0 : _a.settled);
}
/**
* @return true if the child process has exited and is no longer in the
* process table. Note that this may be erroneously false if the process table
* hasn't been checked. Call {@link BatchProcess#running()} for an authoritative (but
* expensive!) answer.
*/
get exited() {
return __classPrivateFieldGet(this, _BatchProcess_exited, "f");
}
/**
* @return a string describing why this process should be recycled, or null if
* the process passes all health checks. Note that this doesn't include if
* we're already busy: see {@link BatchProcess.whyNotReady} if you need to
* know if a process can handle a new task.
*/
get whyNotHealthy() {
var _a, _b;
if (__classPrivateFieldGet(this, _BatchProcess_whyNotHealthy, "f") != null)
return __classPrivateFieldGet(this, _BatchProcess_whyNotHealthy, "f");
if (this.ended) {
return "ended";
}
else if (this.ending) {
return "ending";
}
else if (__classPrivateFieldGet(this, _BatchProcess_healthCheckFailures, "f") > 0) {
return "unhealthy";
}
else if (this.proc.stdin == null || this.proc.stdin.destroyed) {
return "closed";
}
else if (this.opts.maxTasksPerProcess > 0 &&
this.taskCount >= this.opts.maxTasksPerProcess) {
return "worn";
}
else if (this.opts.maxIdleMsPerProcess > 0 &&
this.idleMs > this.opts.maxIdleMsPerProcess) {
return "idle";
}
else if (this.opts.maxFailedTasksPerProcess > 0 &&
this.failedTaskCount >= this.opts.maxFailedTasksPerProcess) {
return "broken";
}
else if (this.opts.maxProcAgeMillis > 0 &&
this.start + this.opts.maxProcAgeMillis < Date.now()) {
return "old";
}
else if ((_b = (this.opts.taskTimeoutMillis > 0 && ((_a = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f")) === null || _a === void 0 ? void 0 : _a.runtimeMs))) !== null && _b !== void 0 ? _b : 0 > this.opts.taskTimeoutMillis) {
return "timeout";
}
else {
return null;
}
}
/**
* @return true if the process doesn't need to be recycled.
*/
get healthy() {
return this.whyNotHealthy == null;
}
/**
* @return true iff no current task. Does not take into consideration if the
* process has ended or should be recycled: see {@link BatchProcess.ready}.
*/
get idle() {
return __classPrivateFieldGet(this, _BatchProcess_currentTask, "f") == null;
}
/**
* @return a string describing why this process cannot currently handle a new
* task, or `undefined` if this process is idle and healthy.
*/
get whyNotReady() {
return !this.idle ? "busy" : this.whyNotHealthy;
}
/**
* @return true iff this process is both healthy and idle, and ready for a
* new task.
*/
get ready() {
return this.whyNotReady == null;
}
get idleMs() {
return this.idle ? Date.now() - __classPrivateFieldGet(this, _BatchProcess_lastJobFinshedAt, "f") : -1;
}
/**
* @return true if the child process is in the process table
*/
running() {
if (__classPrivateFieldGet(this, _BatchProcess_exited, "f"))
return false;
const alive = (0, Pids_1.pidExists)(this.pid);
if (!alive) {
__classPrivateFieldSet(this, _BatchProcess_exited, true, "f");
// once a PID leaves the process table, it's gone for good.
this.end(false, "proc.exit");
}
return alive;
}
notRunning() {
return !this.running();
}
maybeRunHealthcheck() {
const hcc = this.opts.healthCheckCommand;
// if there's no health check command, no-op.
if (hcc == null || (0, String_1.blank)(hcc))
return;
// if the prior health check failed, .ready will be false
if (!this.ready)
return;
if (__classPrivateFieldGet(this, _BatchProcess_lastJobFailed, "f") ||
(this.opts.healthCheckIntervalMillis > 0 &&
Date.now() - __classPrivateFieldGet(this, _BatchProcess_lastHealthCheck, "f") >
this.opts.healthCheckIntervalMillis)) {
__classPrivateFieldSet(this, _BatchProcess_lastHealthCheck, Date.now(), "f");
const t = new Task_1.Task(hcc, Parser_1.SimpleParser);
t.promise
.catch((err) => {
var _a;
this.opts.observer.emit("healthCheckError", err, this);
__classPrivateFieldSet(this, _BatchProcess_healthCheckFailures, (_a = __classPrivateFieldGet(this, _BatchProcess_healthCheckFailures, "f"), _a++, _a), "f");
// BatchCluster will see we're unhealthy and reap us later
})
.finally(() => {
__classPrivateFieldSet(this, _BatchProcess_lastHealthCheck, Date.now(), "f");
});
__classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_execTask).call(this, t);
return t;
}
return;
}
// This must not be async, or new instances aren't started as busy (until the
// startup task is complete)
execTask(task) {
return this.ready ? __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_execTask).call(this, task) : false;
}
/**
* End this child process.
*
* @param gracefully Wait for any current task to be resolved or rejected
* before shutting down the child process.
* @param reason who called end() (used for logging)
* @return Promise that will be resolved when the process has completed.
* Subsequent calls to end() will ignore the parameters and return the first
* endPromise.
*/
// NOT ASYNC! needs to change state immediately.
end(gracefully = true, reason) {
var _a, _b;
return (__classPrivateFieldSet(this, _BatchProcess_endPromise, (_a = __classPrivateFieldGet(this, _BatchProcess_endPromise, "f")) !== null && _a !== void 0 ? _a : new Deferred_1.Deferred().observe(__classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_end).call(this, gracefully, (__classPrivateFieldSet(this, _BatchProcess_whyNotHealthy, (_b = __classPrivateFieldGet(this, _BatchProcess_whyNotHealthy, "f")) !== null && _b !== void 0 ? _b : reason, "f")))), "f")).promise;
}
}
exports.BatchProcess = BatchProcess;
_BatchProcess_lastHealthCheck = new WeakMap(), _BatchProcess_healthCheckFailures = new WeakMap(), _BatchProcess_logger = new WeakMap(), _BatchProcess_lastJobFinshedAt = new WeakMap(), _BatchProcess_lastJobFailed = new WeakMap(), _BatchProcess_starting = new WeakMap(), _BatchProcess_exited = new WeakMap(), _BatchProcess_whyNotHealthy = new WeakMap(), _BatchProcess_taskCount = new WeakMap(), _BatchProcess_currentTask = new WeakMap(), _BatchProcess_currentTaskTimeout = new WeakMap(), _BatchProcess_endPromise = new WeakMap(), _BatchProcess_instances = new WeakSet(), _BatchProcess_execTask = function _BatchProcess_execTask(task) {
var _a;
var _b;
if (this.ending)
return false;
__classPrivateFieldSet(this, _BatchProcess_taskCount, (_b = __classPrivateFieldGet(this, _BatchProcess_taskCount, "f"), _b++, _b), "f");
__classPrivateFieldSet(this, _BatchProcess_currentTask, task, "f");
const cmd = (0, String_1.ensureSuffix)(task.command, "\n");
const isStartupTask = task.taskId === this.startupTaskId;
const taskTimeoutMs = isStartupTask
? this.opts.spawnTimeoutMillis
: this.opts.taskTimeoutMillis;
if (taskTimeoutMs > 0) {
// add the stream flush millis to the taskTimeoutMs, because that time
// should not be counted against the task.
__classPrivateFieldSet(this, _BatchProcess_currentTaskTimeout, node_timers_1.default.setTimeout(() => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onTimeout).call(this, task, taskTimeoutMs), taskTimeoutMs + this.opts.streamFlushMillis), "f");
}
// CAREFUL! If you add a .catch or .finally, the pipeline can emit unhandled
// rejections:
void task.promise.then(() => {
__classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_clearCurrentTask).call(this, task);
// this.#logger().trace("task completed", { task })
if (isStartupTask) {
// no need to emit taskResolved for startup tasks.
__classPrivateFieldSet(this, _BatchProcess_starting, false, "f");
}
else {
this.opts.observer.emit("taskResolved", task, this);
}
// Call _after_ we've cleared the current task:
this.onIdle();
}, (error) => {
__classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_clearCurrentTask).call(this, task);
// this.#logger().trace("task failed", { task, err: error })
if (isStartupTask) {
this.opts.observer.emit("startError", error);
this.end(false, "startError");
}
else {
this.opts.observer.emit("taskError", error, task, this);
}
// Call _after_ we've cleared the current task:
this.onIdle();
});
try {
task.onStart(this.opts);
const stdin = (_a = this.proc) === null || _a === void 0 ? void 0 : _a.stdin;
if (stdin == null || stdin.destroyed) {
task.reject(new Error("proc.stdin unexpectedly closed"));
return false;
}
else {
stdin.write(cmd, (err) => {
if (err != null) {
task.reject(err);
}
});
return true;
}
}
catch (err) {
// child process went away. We should too.
this.end(false, "stdin.error");
return false;
}
}, _BatchProcess_end =
// NOTE: Must only be invoked by this.end(), and only expected to be invoked
// once per instance.
async function _BatchProcess_end(gracefully, reason) {
var _a, _b, _c, _d;
const lastTask = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f");
__classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_clearCurrentTask).call(this);
// NOTE: We wait on all tasks (even startup tasks) so we can assert that
// BatchCluster is idle (and this proc is idle) when the end promise is
// resolved.
// NOTE: holy crap there are a lot of notes here.
// We don't need to wait for the startup task to complete, and we certainly
// don't need to fuss about ending when we're just getting started.
if (lastTask != null && lastTask.taskId !== this.startupTaskId) {
try {
// Let's wait for the process to complete and the streams to flush, as
// that may actually allow the task to complete successfully. Let's not
// wait forever, though.
await (0, Timeout_1.thenOrTimeout)(lastTask.promise, gracefully ? 2000 : 250);
}
catch {
//
}
if (lastTask.pending) {
lastTask.reject(new Error(`end() called before task completed (${JSON.stringify({
gracefully,
lastTask,
})})`));
}
}
// Ignore EPIPE on .end(): if the process immediately ends after the exit
// command, we'll get an EPIPE, so, shush error events *before* we tell the
// child process to exit. See https://github.com/nodejs/node/issues/26828
for (const ea of [
this.proc,
this.proc.stdin,
this.proc.stdout,
this.proc.stderr,
]) {
ea === null || ea === void 0 ? void 0 : ea.removeAllListeners("error");
}
if (true === ((_a = this.proc.stdin) === null || _a === void 0 ? void 0 : _a.writable)) {
const exitCmd = this.opts.exitCommand == null
? null
: (0, String_1.ensureSuffix)(this.opts.exitCommand, "\n");
try {
(_b = this.proc.stdin) === null || _b === void 0 ? void 0 : _b.end(exitCmd);
}
catch {
// don't care
}
}
// None of this *should* be necessary, but we're trying to be as hygienic as
// we can to avoid process zombification.
(0, Stream_1.destroy)(this.proc.stdin);
(0, Stream_1.destroy)(this.proc.stdout);
(0, Stream_1.destroy)(this.proc.stderr);
if (this.opts.cleanupChildProcs &&
gracefully &&
this.opts.endGracefulWaitTimeMillis > 0 &&
!__classPrivateFieldGet(this, _BatchProcess_exited, "f")) {
// Wait for the exit command to take effect:
await __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_awaitNotRunning).call(this, this.opts.endGracefulWaitTimeMillis / 2);
// If it's still running, send the pid a signal:
if (this.running() && this.proc.pid != null)
this.proc.kill();
// Wait for the signal handler to work:
await __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_awaitNotRunning).call(this, this.opts.endGracefulWaitTimeMillis / 2);
}
if (this.opts.cleanupChildProcs &&
this.proc.pid != null &&
this.running()) {
__classPrivateFieldGet(this, _BatchProcess_logger, "f").call(this).warn(this.name + ".end(): force-killing still-running child.");
(0, Pids_1.kill)(this.proc.pid, true);
}
// disconnect may not be a function on proc!
(_d = (_c = this.proc).disconnect) === null || _d === void 0 ? void 0 : _d.call(_c);
this.opts.observer.emit("childEnd", this, reason);
}, _BatchProcess_awaitNotRunning = function _BatchProcess_awaitNotRunning(timeout) {
return (0, Async_1.until)(() => this.notRunning(), timeout);
}, _BatchProcess_onTimeout = function _BatchProcess_onTimeout(task, timeoutMs) {
if (task.pending) {
this.opts.observer.emit("taskTimeout", timeoutMs, task, this);
__classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onError).call(this, "timeout", new Error("waited " + timeoutMs + "ms"), task);
}
}, _BatchProcess_onError = function _BatchProcess_onError(reason, error, task) {
if (task == null) {
task = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f");
}
const cleanedError = new Error(reason + ": " + (0, Error_1.cleanError)(error.message));
if (error.stack != null) {
// Error stacks, if set, will not be redefined from a rethrow:
cleanedError.stack = (0, Error_1.cleanError)(error.stack);
}
__classPrivateFieldGet(this, _BatchProcess_logger, "f").call(this).warn(this.name + ".onError()", {
reason,
task: (0, Object_1.map)(task, (t) => t.command),
error: cleanedError,
});
if (this.ending) {
// .#end is already disconnecting the error listeners, but in any event,
// we don't really care about errors after we've been told to shut down.
return;
}
// clear the task before ending so the onExit from end() doesn't retry the task:
__classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_clearCurrentTask).call(this);
void this.end(false, reason);
if (task != null && this.taskCount === 1) {
__classPrivateFieldGet(this, _BatchProcess_logger, "f").call(this).warn(this.name + ".onError(): startup task failed: " + cleanedError);
this.opts.observer.emit("startError", cleanedError);
}
if (task != null) {
if (task.pending) {
task.reject(cleanedError);
}
else {
this.opts.observer.emit("internalError", new Error(`${this.name}.onError(${cleanedError}) cannot reject already-fulfilled task.`));
}
}
}, _BatchProcess_onStderr = function _BatchProcess_onStderr(data) {
if ((0, String_1.blank)(data))
return;
__classPrivateFieldGet(this, _BatchProcess_logger, "f").call(this).warn(this.name + ".onStderr(): " + data);
const task = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f");
if (task != null && task.pending) {
task.onStderr(data);
}
else if (!this.ending) {
// If we're ending and there isn't a task, don't worry about it.
this.opts.observer.emit("noTaskData", null, data, this);
void this.end(false, "stderr");
}
}, _BatchProcess_onStdout = function _BatchProcess_onStdout(data) {
if (data == null)
return;
const task = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f");
if (task != null && task.pending) {
this.opts.observer.emit("taskData", data, task, this);
task.onStdout(data);
}
else if (this.ending) {
// don't care if we're already being shut down.
}
else if (!(0, String_1.blank)(data)) {
this.opts.observer.emit("noTaskData", data, null, this);
void this.end(false, "stdout.error");
}
}, _BatchProcess_clearCurrentTask = function _BatchProcess_clearCurrentTask(task) {
var _a;
__classPrivateFieldSet(this, _BatchProcess_lastJobFailed, (task === null || task === void 0 ? void 0 : task.state) === "rejected", "f");
if (task != null && task.taskId !== ((_a = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f")) === null || _a === void 0 ? void 0 : _a.taskId))
return;
(0, Object_1.map)(__classPrivateFieldGet(this, _BatchProcess_currentTaskTimeout, "f"), (ea) => clearTimeout(ea));
__classPrivateFieldSet(this, _BatchProcess_currentTaskTimeout, undefined, "f");
__classPrivateFieldSet(this, _BatchProcess_currentTask, undefined, "f");
__classPrivateFieldSet(this, _BatchProcess_lastJobFinshedAt, Date.now(), "f");
};
//# sourceMappingURL=BatchProcess.js.map