"use strict"; var __classPrivateFieldSet = (this && this.__classPrivateFieldSet) || function (receiver, state, value, kind, f) { if (kind === "m") throw new TypeError("Private method is not writable"); if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a setter"); if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot write private member to an object whose class did not declare it"); return (kind === "a" ? f.call(receiver, value) : f ? f.value = value : state.set(receiver, value)), value; }; var __classPrivateFieldGet = (this && this.__classPrivateFieldGet) || function (receiver, state, kind, f) { if (kind === "a" && !f) throw new TypeError("Private accessor was defined without a getter"); if (typeof state === "function" ? receiver !== state || !f : !state.has(receiver)) throw new TypeError("Cannot read private member from an object whose class did not declare it"); return kind === "m" ? f : kind === "a" ? f.call(receiver) : f ? f.value : state.get(receiver); }; var __importDefault = (this && this.__importDefault) || function (mod) { return (mod && mod.__esModule) ? mod : { "default": mod }; }; var _BatchProcess_instances, _BatchProcess_lastHealthCheck, _BatchProcess_healthCheckFailures, _BatchProcess_logger, _BatchProcess_lastJobFinshedAt, _BatchProcess_lastJobFailed, _BatchProcess_starting, _BatchProcess_exited, _BatchProcess_whyNotHealthy, _BatchProcess_taskCount, _BatchProcess_currentTask, _BatchProcess_currentTaskTimeout, _BatchProcess_endPromise, _BatchProcess_execTask, _BatchProcess_end, _BatchProcess_awaitNotRunning, _BatchProcess_onTimeout, _BatchProcess_onError, _BatchProcess_onStderr, _BatchProcess_onStdout, _BatchProcess_clearCurrentTask; Object.defineProperty(exports, "__esModule", { value: true }); exports.BatchProcess = void 0; const node_timers_1 = __importDefault(require("node:timers")); const Async_1 = require("./Async"); const Deferred_1 = require("./Deferred"); const Error_1 = require("./Error"); const Object_1 = require("./Object"); const Parser_1 = require("./Parser"); const Pids_1 = require("./Pids"); const Stream_1 = require("./Stream"); const String_1 = require("./String"); const Task_1 = require("./Task"); const Timeout_1 = require("./Timeout"); /** * BatchProcess manages the care and feeding of a single child process. */ class BatchProcess { /** * @param onIdle to be called when internal state changes (like the current * task is resolved, or the process exits) */ constructor(proc, opts, onIdle) { _BatchProcess_instances.add(this); this.proc = proc; this.opts = opts; this.onIdle = onIdle; this.start = Date.now(); _BatchProcess_lastHealthCheck.set(this, Date.now()); _BatchProcess_healthCheckFailures.set(this, 0); _BatchProcess_logger.set(this, void 0); _BatchProcess_lastJobFinshedAt.set(this, Date.now()); _BatchProcess_lastJobFailed.set(this, false // Only set to true when `proc.pid` is no longer in the process table. ); // Only set to true when `proc.pid` is no longer in the process table. _BatchProcess_starting.set(this, true); _BatchProcess_exited.set(this, false // override for .whyNotHealthy() ); // override for .whyNotHealthy() _BatchProcess_whyNotHealthy.set(this, void 0); this.failedTaskCount = 0; _BatchProcess_taskCount.set(this, -1 /** * Should be undefined if this instance is not currently processing a task. */ ); // don't count the startupTask /** * Should be undefined if this instance is not currently processing a task. */ _BatchProcess_currentTask.set(this, void 0); _BatchProcess_currentTaskTimeout.set(this, void 0); _BatchProcess_endPromise.set(this, void 0); this.name = "BatchProcess(" + proc.pid + ")"; __classPrivateFieldSet(this, _BatchProcess_logger, opts.logger, "f"); // don't let node count the child processes as a reason to stay alive this.proc.unref(); if (proc.pid == null) { throw new Error("BatchProcess.constructor: child process pid is null"); } this.pid = proc.pid; this.proc.on("error", (err) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onError).call(this, "proc.error", err)); this.proc.on("close", () => this.end(false, "proc.close")); this.proc.on("exit", () => this.end(false, "proc.exit")); this.proc.on("disconnect", () => this.end(false, "proc.disconnect")); const stdin = this.proc.stdin; if (stdin == null) throw new Error("Given proc had no stdin"); stdin.on("error", (err) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onError).call(this, "stdin.error", err)); const stdout = this.proc.stdout; if (stdout == null) throw new Error("Given proc had no stdout"); stdout.on("error", (err) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onError).call(this, "stdout.error", err)); stdout.on("data", (d) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onStdout).call(this, d)); (0, Object_1.map)(this.proc.stderr, (stderr) => { stderr.on("error", (err) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onError).call(this, "stderr.error", err)); stderr.on("data", (err) => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onStderr).call(this, err)); }); const startupTask = new Task_1.Task(opts.versionCommand, Parser_1.SimpleParser); this.startupTaskId = startupTask.taskId; if (!this.execTask(startupTask)) { this.opts.observer.emit("internalError", new Error(this.name + " startup task was not submitted")); } // this needs to be at the end of the constructor, to ensure everything is // set up on `this` this.opts.observer.emit("childStart", this); } get currentTask() { return __classPrivateFieldGet(this, _BatchProcess_currentTask, "f"); } get taskCount() { return __classPrivateFieldGet(this, _BatchProcess_taskCount, "f"); } get starting() { return __classPrivateFieldGet(this, _BatchProcess_starting, "f"); } /** * @return true if `this.end()` has been requested (which may be due to the * child process exiting) */ get ending() { return __classPrivateFieldGet(this, _BatchProcess_endPromise, "f") != null; } /** * @return true if `this.end()` has completed running, which includes child * process cleanup. Note that this may return `true` and the process table may * still include the child pid. Call {@link BatchProcess#running()} for an authoritative * (but expensive!) answer. */ get ended() { var _a; return true === ((_a = __classPrivateFieldGet(this, _BatchProcess_endPromise, "f")) === null || _a === void 0 ? void 0 : _a.settled); } /** * @return true if the child process has exited and is no longer in the * process table. Note that this may be erroneously false if the process table * hasn't been checked. Call {@link BatchProcess#running()} for an authoritative (but * expensive!) answer. */ get exited() { return __classPrivateFieldGet(this, _BatchProcess_exited, "f"); } /** * @return a string describing why this process should be recycled, or null if * the process passes all health checks. Note that this doesn't include if * we're already busy: see {@link BatchProcess.whyNotReady} if you need to * know if a process can handle a new task. */ get whyNotHealthy() { var _a, _b; if (__classPrivateFieldGet(this, _BatchProcess_whyNotHealthy, "f") != null) return __classPrivateFieldGet(this, _BatchProcess_whyNotHealthy, "f"); if (this.ended) { return "ended"; } else if (this.ending) { return "ending"; } else if (__classPrivateFieldGet(this, _BatchProcess_healthCheckFailures, "f") > 0) { return "unhealthy"; } else if (this.proc.stdin == null || this.proc.stdin.destroyed) { return "closed"; } else if (this.opts.maxTasksPerProcess > 0 && this.taskCount >= this.opts.maxTasksPerProcess) { return "worn"; } else if (this.opts.maxIdleMsPerProcess > 0 && this.idleMs > this.opts.maxIdleMsPerProcess) { return "idle"; } else if (this.opts.maxFailedTasksPerProcess > 0 && this.failedTaskCount >= this.opts.maxFailedTasksPerProcess) { return "broken"; } else if (this.opts.maxProcAgeMillis > 0 && this.start + this.opts.maxProcAgeMillis < Date.now()) { return "old"; } else if ((_b = (this.opts.taskTimeoutMillis > 0 && ((_a = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f")) === null || _a === void 0 ? void 0 : _a.runtimeMs))) !== null && _b !== void 0 ? _b : 0 > this.opts.taskTimeoutMillis) { return "timeout"; } else { return null; } } /** * @return true if the process doesn't need to be recycled. */ get healthy() { return this.whyNotHealthy == null; } /** * @return true iff no current task. Does not take into consideration if the * process has ended or should be recycled: see {@link BatchProcess.ready}. */ get idle() { return __classPrivateFieldGet(this, _BatchProcess_currentTask, "f") == null; } /** * @return a string describing why this process cannot currently handle a new * task, or `undefined` if this process is idle and healthy. */ get whyNotReady() { return !this.idle ? "busy" : this.whyNotHealthy; } /** * @return true iff this process is both healthy and idle, and ready for a * new task. */ get ready() { return this.whyNotReady == null; } get idleMs() { return this.idle ? Date.now() - __classPrivateFieldGet(this, _BatchProcess_lastJobFinshedAt, "f") : -1; } /** * @return true if the child process is in the process table */ running() { if (__classPrivateFieldGet(this, _BatchProcess_exited, "f")) return false; const alive = (0, Pids_1.pidExists)(this.pid); if (!alive) { __classPrivateFieldSet(this, _BatchProcess_exited, true, "f"); // once a PID leaves the process table, it's gone for good. this.end(false, "proc.exit"); } return alive; } notRunning() { return !this.running(); } maybeRunHealthcheck() { const hcc = this.opts.healthCheckCommand; // if there's no health check command, no-op. if (hcc == null || (0, String_1.blank)(hcc)) return; // if the prior health check failed, .ready will be false if (!this.ready) return; if (__classPrivateFieldGet(this, _BatchProcess_lastJobFailed, "f") || (this.opts.healthCheckIntervalMillis > 0 && Date.now() - __classPrivateFieldGet(this, _BatchProcess_lastHealthCheck, "f") > this.opts.healthCheckIntervalMillis)) { __classPrivateFieldSet(this, _BatchProcess_lastHealthCheck, Date.now(), "f"); const t = new Task_1.Task(hcc, Parser_1.SimpleParser); t.promise .catch((err) => { var _a; this.opts.observer.emit("healthCheckError", err, this); __classPrivateFieldSet(this, _BatchProcess_healthCheckFailures, (_a = __classPrivateFieldGet(this, _BatchProcess_healthCheckFailures, "f"), _a++, _a), "f"); // BatchCluster will see we're unhealthy and reap us later }) .finally(() => { __classPrivateFieldSet(this, _BatchProcess_lastHealthCheck, Date.now(), "f"); }); __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_execTask).call(this, t); return t; } return; } // This must not be async, or new instances aren't started as busy (until the // startup task is complete) execTask(task) { return this.ready ? __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_execTask).call(this, task) : false; } /** * End this child process. * * @param gracefully Wait for any current task to be resolved or rejected * before shutting down the child process. * @param reason who called end() (used for logging) * @return Promise that will be resolved when the process has completed. * Subsequent calls to end() will ignore the parameters and return the first * endPromise. */ // NOT ASYNC! needs to change state immediately. end(gracefully = true, reason) { var _a, _b; return (__classPrivateFieldSet(this, _BatchProcess_endPromise, (_a = __classPrivateFieldGet(this, _BatchProcess_endPromise, "f")) !== null && _a !== void 0 ? _a : new Deferred_1.Deferred().observe(__classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_end).call(this, gracefully, (__classPrivateFieldSet(this, _BatchProcess_whyNotHealthy, (_b = __classPrivateFieldGet(this, _BatchProcess_whyNotHealthy, "f")) !== null && _b !== void 0 ? _b : reason, "f")))), "f")).promise; } } exports.BatchProcess = BatchProcess; _BatchProcess_lastHealthCheck = new WeakMap(), _BatchProcess_healthCheckFailures = new WeakMap(), _BatchProcess_logger = new WeakMap(), _BatchProcess_lastJobFinshedAt = new WeakMap(), _BatchProcess_lastJobFailed = new WeakMap(), _BatchProcess_starting = new WeakMap(), _BatchProcess_exited = new WeakMap(), _BatchProcess_whyNotHealthy = new WeakMap(), _BatchProcess_taskCount = new WeakMap(), _BatchProcess_currentTask = new WeakMap(), _BatchProcess_currentTaskTimeout = new WeakMap(), _BatchProcess_endPromise = new WeakMap(), _BatchProcess_instances = new WeakSet(), _BatchProcess_execTask = function _BatchProcess_execTask(task) { var _a; var _b; if (this.ending) return false; __classPrivateFieldSet(this, _BatchProcess_taskCount, (_b = __classPrivateFieldGet(this, _BatchProcess_taskCount, "f"), _b++, _b), "f"); __classPrivateFieldSet(this, _BatchProcess_currentTask, task, "f"); const cmd = (0, String_1.ensureSuffix)(task.command, "\n"); const isStartupTask = task.taskId === this.startupTaskId; const taskTimeoutMs = isStartupTask ? this.opts.spawnTimeoutMillis : this.opts.taskTimeoutMillis; if (taskTimeoutMs > 0) { // add the stream flush millis to the taskTimeoutMs, because that time // should not be counted against the task. __classPrivateFieldSet(this, _BatchProcess_currentTaskTimeout, node_timers_1.default.setTimeout(() => __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onTimeout).call(this, task, taskTimeoutMs), taskTimeoutMs + this.opts.streamFlushMillis), "f"); } // CAREFUL! If you add a .catch or .finally, the pipeline can emit unhandled // rejections: void task.promise.then(() => { __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_clearCurrentTask).call(this, task); // this.#logger().trace("task completed", { task }) if (isStartupTask) { // no need to emit taskResolved for startup tasks. __classPrivateFieldSet(this, _BatchProcess_starting, false, "f"); } else { this.opts.observer.emit("taskResolved", task, this); } // Call _after_ we've cleared the current task: this.onIdle(); }, (error) => { __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_clearCurrentTask).call(this, task); // this.#logger().trace("task failed", { task, err: error }) if (isStartupTask) { this.opts.observer.emit("startError", error); this.end(false, "startError"); } else { this.opts.observer.emit("taskError", error, task, this); } // Call _after_ we've cleared the current task: this.onIdle(); }); try { task.onStart(this.opts); const stdin = (_a = this.proc) === null || _a === void 0 ? void 0 : _a.stdin; if (stdin == null || stdin.destroyed) { task.reject(new Error("proc.stdin unexpectedly closed")); return false; } else { stdin.write(cmd, (err) => { if (err != null) { task.reject(err); } }); return true; } } catch (err) { // child process went away. We should too. this.end(false, "stdin.error"); return false; } }, _BatchProcess_end = // NOTE: Must only be invoked by this.end(), and only expected to be invoked // once per instance. async function _BatchProcess_end(gracefully, reason) { var _a, _b, _c, _d; const lastTask = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f"); __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_clearCurrentTask).call(this); // NOTE: We wait on all tasks (even startup tasks) so we can assert that // BatchCluster is idle (and this proc is idle) when the end promise is // resolved. // NOTE: holy crap there are a lot of notes here. // We don't need to wait for the startup task to complete, and we certainly // don't need to fuss about ending when we're just getting started. if (lastTask != null && lastTask.taskId !== this.startupTaskId) { try { // Let's wait for the process to complete and the streams to flush, as // that may actually allow the task to complete successfully. Let's not // wait forever, though. await (0, Timeout_1.thenOrTimeout)(lastTask.promise, gracefully ? 2000 : 250); } catch { // } if (lastTask.pending) { lastTask.reject(new Error(`end() called before task completed (${JSON.stringify({ gracefully, lastTask, })})`)); } } // Ignore EPIPE on .end(): if the process immediately ends after the exit // command, we'll get an EPIPE, so, shush error events *before* we tell the // child process to exit. See https://github.com/nodejs/node/issues/26828 for (const ea of [ this.proc, this.proc.stdin, this.proc.stdout, this.proc.stderr, ]) { ea === null || ea === void 0 ? void 0 : ea.removeAllListeners("error"); } if (true === ((_a = this.proc.stdin) === null || _a === void 0 ? void 0 : _a.writable)) { const exitCmd = this.opts.exitCommand == null ? null : (0, String_1.ensureSuffix)(this.opts.exitCommand, "\n"); try { (_b = this.proc.stdin) === null || _b === void 0 ? void 0 : _b.end(exitCmd); } catch { // don't care } } // None of this *should* be necessary, but we're trying to be as hygienic as // we can to avoid process zombification. (0, Stream_1.destroy)(this.proc.stdin); (0, Stream_1.destroy)(this.proc.stdout); (0, Stream_1.destroy)(this.proc.stderr); if (this.opts.cleanupChildProcs && gracefully && this.opts.endGracefulWaitTimeMillis > 0 && !__classPrivateFieldGet(this, _BatchProcess_exited, "f")) { // Wait for the exit command to take effect: await __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_awaitNotRunning).call(this, this.opts.endGracefulWaitTimeMillis / 2); // If it's still running, send the pid a signal: if (this.running() && this.proc.pid != null) this.proc.kill(); // Wait for the signal handler to work: await __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_awaitNotRunning).call(this, this.opts.endGracefulWaitTimeMillis / 2); } if (this.opts.cleanupChildProcs && this.proc.pid != null && this.running()) { __classPrivateFieldGet(this, _BatchProcess_logger, "f").call(this).warn(this.name + ".end(): force-killing still-running child."); (0, Pids_1.kill)(this.proc.pid, true); } // disconnect may not be a function on proc! (_d = (_c = this.proc).disconnect) === null || _d === void 0 ? void 0 : _d.call(_c); this.opts.observer.emit("childEnd", this, reason); }, _BatchProcess_awaitNotRunning = function _BatchProcess_awaitNotRunning(timeout) { return (0, Async_1.until)(() => this.notRunning(), timeout); }, _BatchProcess_onTimeout = function _BatchProcess_onTimeout(task, timeoutMs) { if (task.pending) { this.opts.observer.emit("taskTimeout", timeoutMs, task, this); __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_onError).call(this, "timeout", new Error("waited " + timeoutMs + "ms"), task); } }, _BatchProcess_onError = function _BatchProcess_onError(reason, error, task) { if (task == null) { task = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f"); } const cleanedError = new Error(reason + ": " + (0, Error_1.cleanError)(error.message)); if (error.stack != null) { // Error stacks, if set, will not be redefined from a rethrow: cleanedError.stack = (0, Error_1.cleanError)(error.stack); } __classPrivateFieldGet(this, _BatchProcess_logger, "f").call(this).warn(this.name + ".onError()", { reason, task: (0, Object_1.map)(task, (t) => t.command), error: cleanedError, }); if (this.ending) { // .#end is already disconnecting the error listeners, but in any event, // we don't really care about errors after we've been told to shut down. return; } // clear the task before ending so the onExit from end() doesn't retry the task: __classPrivateFieldGet(this, _BatchProcess_instances, "m", _BatchProcess_clearCurrentTask).call(this); void this.end(false, reason); if (task != null && this.taskCount === 1) { __classPrivateFieldGet(this, _BatchProcess_logger, "f").call(this).warn(this.name + ".onError(): startup task failed: " + cleanedError); this.opts.observer.emit("startError", cleanedError); } if (task != null) { if (task.pending) { task.reject(cleanedError); } else { this.opts.observer.emit("internalError", new Error(`${this.name}.onError(${cleanedError}) cannot reject already-fulfilled task.`)); } } }, _BatchProcess_onStderr = function _BatchProcess_onStderr(data) { if ((0, String_1.blank)(data)) return; __classPrivateFieldGet(this, _BatchProcess_logger, "f").call(this).warn(this.name + ".onStderr(): " + data); const task = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f"); if (task != null && task.pending) { task.onStderr(data); } else if (!this.ending) { // If we're ending and there isn't a task, don't worry about it. this.opts.observer.emit("noTaskData", null, data, this); void this.end(false, "stderr"); } }, _BatchProcess_onStdout = function _BatchProcess_onStdout(data) { if (data == null) return; const task = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f"); if (task != null && task.pending) { this.opts.observer.emit("taskData", data, task, this); task.onStdout(data); } else if (this.ending) { // don't care if we're already being shut down. } else if (!(0, String_1.blank)(data)) { this.opts.observer.emit("noTaskData", data, null, this); void this.end(false, "stdout.error"); } }, _BatchProcess_clearCurrentTask = function _BatchProcess_clearCurrentTask(task) { var _a; __classPrivateFieldSet(this, _BatchProcess_lastJobFailed, (task === null || task === void 0 ? void 0 : task.state) === "rejected", "f"); if (task != null && task.taskId !== ((_a = __classPrivateFieldGet(this, _BatchProcess_currentTask, "f")) === null || _a === void 0 ? void 0 : _a.taskId)) return; (0, Object_1.map)(__classPrivateFieldGet(this, _BatchProcess_currentTaskTimeout, "f"), (ea) => clearTimeout(ea)); __classPrivateFieldSet(this, _BatchProcess_currentTaskTimeout, undefined, "f"); __classPrivateFieldSet(this, _BatchProcess_currentTask, undefined, "f"); __classPrivateFieldSet(this, _BatchProcess_lastJobFinshedAt, Date.now(), "f"); }; //# sourceMappingURL=BatchProcess.js.map