'use strict'; // Tick-driven executor for the schedule produced by movementScheduler.plan. // // - Holds the current schedule + a cursor that advances one per tick(). // - Fires any unfired command whose fireAtTickN <= cursor. // - replan(newSchedule) replaces the schedule and resets the cursor — // already-fired commands stay fired (the pump's FSM is downstream and // handles their consequences; the executor never tries to "undo" a // fired startup, which keeps warmup/cooldown safety intact). // - fireCommand is injected for unit-testability — production wires it to // `machine.handleInput(...)`. class MovementExecutor { constructor({ fireCommand, logger } = {}) { if (typeof fireCommand !== 'function') { throw new TypeError('MovementExecutor: fireCommand callback is required'); } this._fireCommand = fireCommand; this._logger = logger || null; this._schedule = null; this._cursor = 0; this._firedIdx = new Set(); // Wall-clock anchor for the active schedule. Each tick recomputes // a "virtual cursor" from elapsed time so the schedule survives a // blocking first tick (e.g. an awaited startup sequence that takes // multiple seconds to settle). this._dispatchT0 = null; } // Replace the active schedule. Cursor starts at 0 (new dispatch is // anchored to "now"). The previous schedule's unfired commands are // dropped; already-fired commands are not retracted. replan(schedule) { this._schedule = schedule || { commands: [] }; this._cursor = 0; this._firedIdx = new Set(); this._dispatchT0 = Date.now(); if (this._logger?.debug) { const cmds = this._schedule.commands || []; this._logger.debug(`MovementExecutor.replan: ${cmds.length} commands, tStar=${this._schedule.tStarS ?? '?'}s`); } } // Advance one tick. Returns a Promise resolving to the list of // commands fired this tick once their async work settles. Awaiting // the FIRST tick from within a dispatch is what gives the new move // priority over an in-flight shutdown sequence — fire-and-forget // gives the shutdown's for-loop a window to progress through state // transitions before the new move's residue handler claims the FSM. async tick() { // Virtual cursor = max(advanced cursor, elapsed wall-clock ticks). // If a previous tick blocked on a long await, elapsed time has // already passed and we should fire every command whose // fireAtTickN now lies in the past — not wait another N timer // cycles to catch up. tickS is stamped on the schedule by the // planner (defaults to 1 s). const tickS = Number.isFinite(this._schedule?.tickS) && this._schedule.tickS > 0 ? this._schedule.tickS : 1; const elapsedS = this._dispatchT0 != null ? (Date.now() - this._dispatchT0) / 1000 : 0; const wallTick = Math.floor(elapsedS / tickS); const virtCursor = Math.max(this._cursor, wallTick); const fired = []; const cmds = this._schedule?.commands || []; for (let i = 0; i < cmds.length; i++) { if (this._firedIdx.has(i)) continue; const c = cmds[i]; if (c.fireAtTickN <= virtCursor) { this._firedIdx.add(i); try { // Fire-and-forget. The synchronous prologue of // handleInput claims the latest-wins gate before // returning its promise — that's enough for race // favouring. AWAITing the returned promise here // would block the executor for the entire ladder + // ramp duration of a flowmovement-after-startup // (because the pump's delayedMove only resolves // when the ramp completes), preventing the // wall-clock timer from starting and dragging every // delayed command in the schedule forward by that // amount. const r = this._fireCommand(c); if (r && typeof r.then === 'function') { r.catch((e) => { if (this._logger?.error) { this._logger.error(`MovementExecutor: fireCommand rejected for ${c.machineId}/${c.action}: ${e?.message || e}`); } }); } fired.push(c); } catch (e) { if (this._logger?.error) { this._logger.error(`MovementExecutor: fireCommand failed for ${c.machineId}/${c.action}: ${e?.message || e}`); } } } } this._cursor = virtCursor + 1; return fired; } // Telemetry — number of commands not yet fired. pending() { const cmds = this._schedule?.commands || []; return cmds.length - this._firedIdx.size; } // Telemetry — current tick cursor. cursor() { return this._cursor; } // Telemetry — the live schedule (read-only view). schedule() { return this._schedule; } } module.exports = MovementExecutor;