// PumpingStation — S88 Process Cell orchestrator. // // Wires the basin / measurement / control / safety modules in configure() // and runs them in tick(). All real work lives in the modules; this file // only stitches them together. See wiki/functional-description.md for the // behaviour spec. const { BaseDomain, UnitPolicy, statusBadge } = require('generalFunctions'); const BasinGeometry = require('./basin/BasinGeometry'); const { validateThresholdOrdering, computeSafetyPoints } = require('./basin/thresholdValidator'); const FlowAggregator = require('./measurement/flowAggregator'); const MeasurementRouter = require('./measurement/measurementRouter'); const calibration = require('./measurement/calibration'); const control = require('./control'); const SafetyController = require('./safety/safetyController'); class PumpingStation extends BaseDomain { static name = 'pumpingStation'; // Internal math runs in m3/s for flow and m for level so the volume // integrator (flow × dt) is unit-consistent — canonical stays m3/s, the // platform-wide convention every cross-node consumer (MGC demand math, // physics-sanity) assumes. Strict canonicals make unit drift in child-fed // measurements an explicit error. // Output flow / netFlowRate are emitted in m3/h so telemetry/dashboard // series land on the same axis as the rest of the pump group (verified // slice #47); the m3/s→m3/h presentation conversion happens at the output // boundary only — it never touches the canonical integrator basis. // overflowVolume / underflowVolume are listed in output so the // MeasurementContainer keeps the integrator's m³ unit on those streams // (FlowAggregator writes spill / underflow per tick). static unitPolicy = UnitPolicy.declare({ canonical: { flow: 'm3/s', pressure: 'Pa', power: 'W', temperature: 'K' }, output: { flow: 'm3/h', netFlowRate: 'm3/h', level: 'm', volume: 'm3', overflowVolume: 'm3', underflowVolume: 'm3', }, requireUnitForTypes: [], }); configure() { this.basin = new BasinGeometry(this.config.basin, this.config.hydraulics); this.flowVariants = ['measured', 'predicted']; this.levelVariants = ['measured', 'predicted']; this.volVariants = ['measured', 'predicted']; this.flowPositions = { inflow: ['in', 'upstream'], outflow: ['out', 'downstream'] }; this.mode = this.config.control.mode; this.controlState = { percControl: 0 }; this.state = { direction: 'steady', netFlow: 0, flowSource: null, seconds: null, remainingSource: null }; // Last operator demand from set.demand in manual mode. Stored on the // host so getOutput()/status reflect it even when no children are // registered yet (otherwise forwardDemand is invisible on Port 0/1). // Cleared on mode change away from manual. this._manualDemand = null; // Level-armed hysteresis state — ported from basin-docs `_controlLevelBased`. // Exposed as instance fields because the e2e/basic tests assert on them // directly. levelBased strategy reads/writes via the same names. this._shiftArmed = false; this._shiftHoldValue = null; this._lastDirection = null; // stopLevel hysteresis (Schmitt trigger) — ported from basin-docs. // TRUE while engaged (rising-edge at startLevel until falling-edge at // stopLevel). Used by levelBased to emit a small keep-alive output in // the [stopLevel, startLevel] dead band so MGC keeps one pump running. this._stopHystRunning = false; // Flow dead-band — values below |flowThreshold| (m3/s) are treated as // steady. Default ≈ 0.36 m3/h. const thresholdFromConfig = Number(this.config.general?.flowThreshold); this.flowThreshold = Number.isFinite(thresholdFromConfig) ? thresholdFromConfig : 1e-4; // FlowAggregator owns the predicted-volume integrator + net-flow + ETA. this.flowAggregator = new FlowAggregator({ measurements: this.measurements, basin: this.basin, config: this.config, logger: this.logger, flowVariants: this.flowVariants, levelVariants: this.levelVariants, flowPositions: this.flowPositions, flowThreshold: this.flowThreshold, computeSafetyPoints: () => this._computeSafetyPoints(), }); this.measurementRouter = new MeasurementRouter({ measurements: this.measurements, basin: this.basin, logger: this.logger, }); // Threshold ordering is non-fatal — log + surface for tests/status. this.thresholdIssues = validateThresholdOrdering( this.basin, this.config.control?.levelbased, this.config.safety ); for (const issue of this.thresholdIssues) this.logger.warn(issue.msg); // Seed predicted volume at the operational floor — without it the // integrator starts from null and the first tick has no anchor. this.measurements.type('volume').variant('predicted').position('atequipment') .value(this.basin.minVol, Date.now(), 'm3').unit('m3'); // Registry-as-truth — `this.machines / machineGroups / stations` are // read-only getters flattening `this.child[softwareType]` (BaseDomain // helper). Mutations go through `childRegistrationUtils.registerChild`. this.declareChildGetter('machines', 'machine'); this.declareChildGetter('machineGroups', 'machinegroup'); this.declareChildGetter('stations', 'pumpingstation'); // SafetyController's captured ctx exposes the same three names as live // getters (installed in context()), so the registry remains the single // source of truth long after configure() returns. this.safety = new SafetyController(this.context()); this.router .onRegister('measurement', (child) => this._subscribeMeasurement(child)) .onRegister('machine', (child) => { // Skip individual machines when a machineGroup parent is present — // the group's flow.predicted already aggregates child machines. if (Object.keys(this.machineGroups).length === 0) { this._subscribePredictedFlow(child); } }) .onRegister('machinegroup', (child) => this._subscribePredictedFlow(child)) .onRegister('pumpingstation', (child) => this._subscribePredictedFlow(child)); this.logger.debug('PumpingStation initialized'); } // Frozen view passed to control strategies + safety. // `host` is a back-reference so strategies that need to mutate // cross-tick hysteresis state (`_shiftArmed`, `_shiftHoldValue`, // `_lastDirection`, `_stopHystRunning`) write straight to the live // instance — Object.freeze on the view itself is fine because these // flags live on the host, not in the view. // // machines / machineGroups / stations are installed as live getters // that delegate to this.* getters (declareChildGetter). SafetyController // captures this ctx once at construction; the getters keep it reading // fresh from the registry after later child registrations. context() { const host = this; const ctx = { ...super.context(), basin: this.basin, flowAggregator: this.flowAggregator, mode: this.mode, flowVariants: this.flowVariants, levelVariants: this.levelVariants, volVariants: this.volVariants, flowThreshold: this.flowThreshold, unitPolicy: this.unitPolicy, host: this, }; Object.defineProperty(ctx, 'machines', { enumerable: true, get: () => host.machines }); Object.defineProperty(ctx, 'machineGroups', { enumerable: true, get: () => host.machineGroups }); Object.defineProperty(ctx, 'stations', { enumerable: true, get: () => host.stations }); return Object.freeze(ctx); } tick() { const { netFlow, remaining } = this.flowAggregator.tick(); const safe = this.safety.evaluate({ direction: netFlow.direction, secondsRemaining: remaining.seconds }); this.safetyControllerActive = safe.blocked; if (!safe.blocked) { Promise.resolve(control.dispatch(this.mode, this.context(), this.controlState, netFlow.direction)) .catch((err) => this.logger.error(`control dispatch failed: ${err.message}`)); } this.state = { direction: netFlow.direction, netFlow: netFlow.value, flowSource: netFlow.source, seconds: remaining.seconds, remainingSource: remaining.source, }; this.notifyOutputChanged(); } changeMode(newMode) { if (this.config.control.allowedModes?.has?.(newMode)) { this.logger.info(`Control mode changing from ${this.mode} to ${newMode}`); this.mode = newMode; if (newMode !== 'manual') this._manualDemand = null; this.notifyOutputChanged(); } else { this.logger.warn(`Attempted to change to unsupported control mode: ${newMode}`); } } // Calibration — public methods preserved for tests + commands registry. calibratePredictedVolume(vol, ts = Date.now()) { calibration.calibratePredictedVolume(this, vol, ts); } calibratePredictedLevel(lvl, ts = Date.now(), unit = 'm') { calibration.calibratePredictedLevel(this, lvl, ts, unit); } setManualInflow(value, ts = Date.now(), unit) { calibration.setManualInflow(this, value, ts, unit); } setManualOutflow(value, ts = Date.now(), unit) { calibration.setManualOutflow(this, value, ts, unit); } forwardDemandToChildren(demand) { this._manualDemand = Number.isFinite(demand) ? demand : null; this.notifyOutputChanged(); return control.manual.forwardDemand(this.context(), demand); } // Direct delegations preserved so existing tests can drive the strategy // without re-mocking the dispatch layer. async _controlLevelBased(direction) { return control.strategies.levelbased.run(this.context(), this.controlState, direction); } // Public getter so legacy tests + getOutput keep reading the live demand. get percControl() { return this.controlState.percControl; } set percControl(v) { this.controlState.percControl = v; } // ── Predicted-volume integrator — tests drive this directly with a // controlled Date.now, so expose as an instance method that delegates // to FlowAggregator.update(). _updatePredictedVolume() { return this.flowAggregator.update(); } // ── Mirror FlowAggregator internal integrator state so tests that pin // _predictedFlowState before driving a tick keep working. get _predictedFlowState() { return this.flowAggregator._predictedFlowState; } set _predictedFlowState(v) { this.flowAggregator._predictedFlowState = v; } _selectBestNetFlow() { return this.flowAggregator.selectBestNetFlow(); } _computeSafetyPoints() { return computeSafetyPoints(this.basin, this.config.safety || {}); } getOutput() { const out = this.measurements.getFlattenedOutput(); Object.assign(out, this.basin.snapshot()); out.direction = this.state.direction; out.flowSource = this.state.flowSource; out.timeleft = this.state.seconds; out.percControl = this.controlState.percControl; out.mode = this.mode; out.manualDemand = this._manualDemand; // Derived safety thresholds — exposed so editor + dashboards can show // the dryRunLevel and highVolumeSafetyLevel without recomputing. const safety = this._computeSafetyPoints(); out.dryRunLevel = safety.dryRunLevel; out.dryRunSafetyVol = safety.dryRunSafetyVol; out.highVolumeSafetyLevel = safety.highVolumeSafetyLevel; out.highVolumeSafetyVol = safety.highVolumeSafetyVol; // Spill / underflow surface — populated by FlowAggregator when the // predicted-volume integrator hits the upper or lower physical bound. out.predictedOverflowVolume = this.measurements .type('overflowVolume').variant('predicted').position('atequipment').getCurrentValue('m3') ?? 0; out.predictedOverflowRate = this.measurements .type('flow').variant('predicted').position('overflow').getCurrentValue('m3/s') ?? 0; out.predictedUnderflowVolume = this.measurements .type('underflowVolume').variant('predicted').position('atequipment').getCurrentValue('m3') ?? 0; return out; } getStatusBadge() { const STYLES = { filling: { arrow: '⬆️', fill: 'blue' }, draining: { arrow: '⬇️', fill: 'orange' }, steady: { arrow: '⏸️', fill: 'green' }, }; const { arrow = '❔', fill = 'grey' } = STYLES[this.state?.direction] || {}; const pct = this.measurements.type('volumePercent').variant('predicted').position('atequipment').getCurrentValue() ?? 0; const netFlowM3h = this.unitPolicy.convert(this.state?.netFlow ?? 0, 'm3/s', 'm3/h', 'status badge netFlow'); const mode = this.mode || '?'; const manualPart = this.mode === 'manual' && Number.isFinite(this._manualDemand) ? `Qd=${this._manualDemand.toFixed(0)} m³/h` : null; return statusBadge.compose( [mode, `${arrow} ${pct.toFixed(1)}%`, `net: ${netFlowM3h.toFixed(0)} m³/h`, manualPart], { fill, shape: 'dot' } ); } // ── Direction helper kept for tests pinning the dead-band semantics ── _deriveDirection(netFlow) { return this.flowAggregator.deriveDirection(netFlow); } // ── Volume/level conversions kept for tests + back-compat ────────────── _calcVolumeFromLevel(level) { return this.basin.volumeFromLevel(level); } _calcLevelFromVolume(volume) { return this.basin.levelFromVolume(volume); } _subscribeMeasurement(child) { const position = child.config.functionality.positionVsParent; const measurementType = child.config.asset.type; const eventName = `${measurementType}.measured.${position}`; const handle = (eventData = {}) => { this.logger.debug( `Measurement update ${eventName} <- ${eventData.childName || child.config.general.name}: ${eventData.value} ${eventData.unit}` ); if (measurementType === 'level') { this.measurementRouter.route(measurementType, eventData.value, position, eventData); return; } this.measurements.type(measurementType).variant('measured').position(position) .value(eventData.value, eventData.timestamp, eventData.unit); this.measurementRouter.route(measurementType, eventData.value, position, eventData); }; child.measurements.emitter.on(eventName, handle); // Seed from the child's current value. The emitter only delivers FUTURE // updates, so a parent that registers after the child already emitted // (e.g. a once-only inject that fired during startup before this // subscription existed) would otherwise never see that value. Replaying // the last sample makes a late subscriber pick up the present state. const series = child.measurements .type(measurementType).variant('measured').position(position).get?.(); const sample = series?.getLaggedSample?.(0); if (sample && sample.value != null) { handle({ ...sample, childName: child.config.general.name }); } } _subscribePredictedFlow(child) { // Map the child's position to the orchestrator's posKey + the most // specific aggregator event. 'downstream' is preferred over 'atequipment' // because they carry the same total — subscribing to both double-counts. const POS_MAP = { downstream: ['out', 'flow.predicted.downstream'], out: ['out', 'flow.predicted.downstream'], atequipment:['out', 'flow.predicted.downstream'], upstream: ['in', 'flow.predicted.upstream'], in: ['in', 'flow.predicted.upstream'], }; const position = (child.config.functionality.positionVsParent || '').toLowerCase(); const mapped = POS_MAP[position]; if (!mapped) { this.logger.warn(`Unsupported predicted flow position "${position}" from ${child.config.general.name}`); return; } const [posKey, eventName] = mapped; const childId = child.config.general.id ?? child.config.general.name; child.measurements.emitter.on(eventName, (eventData = {}) => { const unit = eventData.unit || child.config?.general?.unit; const ts = eventData.timestamp || Date.now(); this.measurements.type('flow').variant('predicted').position(posKey).child(childId) .value(eventData.value, ts, unit); }); } } module.exports = PumpingStation;