Files
pumpingStation/src/specificClass.js
znetsixe f5c6282478 refactor(units): use UnitPolicy.convert instead of hardcoded m3/h<->m3/s scalars
Replace the M3H_TO_M3S constant in control/manual.js and the `* 3600`
inline conversion in the status badge with this.unitPolicy.convert
calls. Expose unitPolicy on the frozen control context so manual
strategies pick it up without reaching into host. Matches the
contract direction in .claude/refactor/CONTRACTS.md §6.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 13:43:35 +02:00

333 lines
15 KiB
JavaScript
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
// PumpingStation — S88 Process Cell orchestrator.
//
// Wires the basin / measurement / control / safety modules in configure()
// and runs them in tick(). All real work lives in the modules; this file
// only stitches them together. See wiki/functional-description.md for the
// behaviour spec.
const { BaseDomain, UnitPolicy, statusBadge } = require('generalFunctions');
const BasinGeometry = require('./basin/BasinGeometry');
const { validateThresholdOrdering, computeSafetyPoints } = require('./basin/thresholdValidator');
const FlowAggregator = require('./measurement/flowAggregator');
const MeasurementRouter = require('./measurement/measurementRouter');
const calibration = require('./measurement/calibration');
const control = require('./control');
const SafetyController = require('./safety/safetyController');
class PumpingStation extends BaseDomain {
static name = 'pumpingStation';
// Internal math runs in m3/s for flow and m for level so the volume
// integrator (flow × dt) is unit-consistent. Strict canonicals make
// unit drift in child-fed measurements an explicit error.
// overflowVolume / underflowVolume are listed in output so the
// MeasurementContainer keeps the integrator's m³ unit on those streams
// (FlowAggregator writes spill / underflow per tick).
static unitPolicy = UnitPolicy.declare({
canonical: { flow: 'm3/s', pressure: 'Pa', power: 'W', temperature: 'K' },
output: {
flow: 'm3/s', netFlowRate: 'm3/s', level: 'm', volume: 'm3',
overflowVolume: 'm3', underflowVolume: 'm3',
},
requireUnitForTypes: [],
});
configure() {
this.basin = new BasinGeometry(this.config.basin, this.config.hydraulics);
this.flowVariants = ['measured', 'predicted'];
this.levelVariants = ['measured', 'predicted'];
this.volVariants = ['measured', 'predicted'];
this.flowPositions = { inflow: ['in', 'upstream'], outflow: ['out', 'downstream'] };
this.mode = this.config.control.mode;
this.controlState = { percControl: 0 };
this.state = { direction: 'steady', netFlow: 0, flowSource: null, seconds: null, remainingSource: null };
// Last operator demand from set.demand in manual mode. Stored on the
// host so getOutput()/status reflect it even when no children are
// registered yet (otherwise forwardDemand is invisible on Port 0/1).
// Cleared on mode change away from manual.
this._manualDemand = null;
// Level-armed hysteresis state — ported from basin-docs `_controlLevelBased`.
// Exposed as instance fields because the e2e/basic tests assert on them
// directly. levelBased strategy reads/writes via the same names.
this._shiftArmed = false;
this._shiftHoldValue = null;
this._lastDirection = null;
// stopLevel hysteresis (Schmitt trigger) — ported from basin-docs.
// TRUE while engaged (rising-edge at startLevel until falling-edge at
// stopLevel). Used by levelBased to emit a small keep-alive output in
// the [stopLevel, startLevel] dead band so MGC keeps one pump running.
this._stopHystRunning = false;
// Flow dead-band — values below |flowThreshold| (m3/s) are treated as
// steady. Default ≈ 0.36 m3/h.
const thresholdFromConfig = Number(this.config.general?.flowThreshold);
this.flowThreshold = Number.isFinite(thresholdFromConfig) ? thresholdFromConfig : 1e-4;
// FlowAggregator owns the predicted-volume integrator + net-flow + ETA.
this.flowAggregator = new FlowAggregator({
measurements: this.measurements,
basin: this.basin,
config: this.config,
logger: this.logger,
flowVariants: this.flowVariants,
levelVariants: this.levelVariants,
flowPositions: this.flowPositions,
flowThreshold: this.flowThreshold,
computeSafetyPoints: () => this._computeSafetyPoints(),
});
this.measurementRouter = new MeasurementRouter({
measurements: this.measurements,
basin: this.basin,
logger: this.logger,
});
// Threshold ordering is non-fatal — log + surface for tests/status.
this.thresholdIssues = validateThresholdOrdering(
this.basin, this.config.control?.levelbased, this.config.safety
);
for (const issue of this.thresholdIssues) this.logger.warn(issue.msg);
// Seed predicted volume at the operational floor — without it the
// integrator starts from null and the first tick has no anchor.
this.measurements.type('volume').variant('predicted').position('atequipment')
.value(this.basin.minVol, Date.now(), 'm3').unit('m3');
// Registry-as-truth — `this.machines / machineGroups / stations` are
// read-only getters flattening `this.child[softwareType]` (BaseDomain
// helper). Mutations go through `childRegistrationUtils.registerChild`.
this.declareChildGetter('machines', 'machine');
this.declareChildGetter('machineGroups', 'machinegroup');
this.declareChildGetter('stations', 'pumpingstation');
// SafetyController's captured ctx exposes the same three names as live
// getters (installed in context()), so the registry remains the single
// source of truth long after configure() returns.
this.safety = new SafetyController(this.context());
this.router
.onRegister('measurement', (child) => this._subscribeMeasurement(child))
.onRegister('machine', (child) => {
// Skip individual machines when a machineGroup parent is present —
// the group's flow.predicted already aggregates child machines.
if (Object.keys(this.machineGroups).length === 0) {
this._subscribePredictedFlow(child);
}
})
.onRegister('machinegroup', (child) => this._subscribePredictedFlow(child))
.onRegister('pumpingstation', (child) => this._subscribePredictedFlow(child));
this.logger.debug('PumpingStation initialized');
}
// Frozen view passed to control strategies + safety.
// `host` is a back-reference so strategies that need to mutate
// cross-tick hysteresis state (`_shiftArmed`, `_shiftHoldValue`,
// `_lastDirection`, `_stopHystRunning`) write straight to the live
// instance — Object.freeze on the view itself is fine because these
// flags live on the host, not in the view.
//
// machines / machineGroups / stations are installed as live getters
// that delegate to this.* getters (declareChildGetter). SafetyController
// captures this ctx once at construction; the getters keep it reading
// fresh from the registry after later child registrations.
context() {
const host = this;
const ctx = {
...super.context(),
basin: this.basin,
flowAggregator: this.flowAggregator,
mode: this.mode,
flowVariants: this.flowVariants,
levelVariants: this.levelVariants,
volVariants: this.volVariants,
flowThreshold: this.flowThreshold,
unitPolicy: this.unitPolicy,
host: this,
};
Object.defineProperty(ctx, 'machines', { enumerable: true, get: () => host.machines });
Object.defineProperty(ctx, 'machineGroups', { enumerable: true, get: () => host.machineGroups });
Object.defineProperty(ctx, 'stations', { enumerable: true, get: () => host.stations });
return Object.freeze(ctx);
}
tick() {
const { netFlow, remaining } = this.flowAggregator.tick();
const safe = this.safety.evaluate({ direction: netFlow.direction, secondsRemaining: remaining.seconds });
this.safetyControllerActive = safe.blocked;
if (!safe.blocked) {
Promise.resolve(control.dispatch(this.mode, this.context(), this.controlState, netFlow.direction))
.catch((err) => this.logger.error(`control dispatch failed: ${err.message}`));
}
this.state = {
direction: netFlow.direction,
netFlow: netFlow.value,
flowSource: netFlow.source,
seconds: remaining.seconds,
remainingSource: remaining.source,
};
this.notifyOutputChanged();
}
changeMode(newMode) {
if (this.config.control.allowedModes?.has?.(newMode)) {
this.logger.info(`Control mode changing from ${this.mode} to ${newMode}`);
this.mode = newMode;
if (newMode !== 'manual') this._manualDemand = null;
this.notifyOutputChanged();
} else {
this.logger.warn(`Attempted to change to unsupported control mode: ${newMode}`);
}
}
// Calibration — public methods preserved for tests + commands registry.
calibratePredictedVolume(vol, ts = Date.now()) { calibration.calibratePredictedVolume(this, vol, ts); }
calibratePredictedLevel(lvl, ts = Date.now(), unit = 'm') { calibration.calibratePredictedLevel(this, lvl, ts, unit); }
setManualInflow(value, ts = Date.now(), unit) { calibration.setManualInflow(this, value, ts, unit); }
setManualOutflow(value, ts = Date.now(), unit) { calibration.setManualOutflow(this, value, ts, unit); }
forwardDemandToChildren(demand) {
this._manualDemand = Number.isFinite(demand) ? demand : null;
this.notifyOutputChanged();
return control.manual.forwardDemand(this.context(), demand);
}
// Direct delegations preserved so existing tests can drive the strategy
// without re-mocking the dispatch layer.
async _controlLevelBased(direction) {
return control.strategies.levelbased.run(this.context(), this.controlState, direction);
}
// Public getter so legacy tests + getOutput keep reading the live demand.
get percControl() { return this.controlState.percControl; }
set percControl(v) { this.controlState.percControl = v; }
// ── Predicted-volume integrator — tests drive this directly with a
// controlled Date.now, so expose as an instance method that delegates
// to FlowAggregator.update().
_updatePredictedVolume() {
return this.flowAggregator.update();
}
// ── Mirror FlowAggregator internal integrator state so tests that pin
// _predictedFlowState before driving a tick keep working.
get _predictedFlowState() { return this.flowAggregator._predictedFlowState; }
set _predictedFlowState(v) { this.flowAggregator._predictedFlowState = v; }
_selectBestNetFlow() { return this.flowAggregator.selectBestNetFlow(); }
_computeSafetyPoints() {
return computeSafetyPoints(this.basin, this.config.safety || {});
}
getOutput() {
const out = this.measurements.getFlattenedOutput();
Object.assign(out, this.basin.snapshot());
out.direction = this.state.direction;
out.flowSource = this.state.flowSource;
out.timeleft = this.state.seconds;
out.percControl = this.controlState.percControl;
out.mode = this.mode;
out.manualDemand = this._manualDemand;
// Derived safety thresholds — exposed so editor + dashboards can show
// the dryRunLevel and highVolumeSafetyLevel without recomputing.
const safety = this._computeSafetyPoints();
out.dryRunLevel = safety.dryRunLevel;
out.dryRunSafetyVol = safety.dryRunSafetyVol;
out.highVolumeSafetyLevel = safety.highVolumeSafetyLevel;
out.highVolumeSafetyVol = safety.highVolumeSafetyVol;
// Spill / underflow surface — populated by FlowAggregator when the
// predicted-volume integrator hits the upper or lower physical bound.
out.predictedOverflowVolume = this.measurements
.type('overflowVolume').variant('predicted').position('atequipment').getCurrentValue('m3') ?? 0;
out.predictedOverflowRate = this.measurements
.type('flow').variant('predicted').position('overflow').getCurrentValue('m3/s') ?? 0;
out.predictedUnderflowVolume = this.measurements
.type('underflowVolume').variant('predicted').position('atequipment').getCurrentValue('m3') ?? 0;
return out;
}
getStatusBadge() {
const STYLES = {
filling: { arrow: '⬆️', fill: 'blue' },
draining: { arrow: '⬇️', fill: 'orange' },
steady: { arrow: '⏸️', fill: 'green' },
};
const { arrow = '❔', fill = 'grey' } = STYLES[this.state?.direction] || {};
const pct = this.measurements.type('volumePercent').variant('predicted').position('atequipment').getCurrentValue() ?? 0;
const netFlowM3h = this.unitPolicy.convert(this.state?.netFlow ?? 0, 'm3/s', 'm3/h', 'status badge netFlow');
const mode = this.mode || '?';
const manualPart = this.mode === 'manual' && Number.isFinite(this._manualDemand)
? `Qd=${this._manualDemand.toFixed(0)} m³/h` : null;
return statusBadge.compose(
[mode, `${arrow} ${pct.toFixed(1)}%`, `net: ${netFlowM3h.toFixed(0)} m³/h`, manualPart],
{ fill, shape: 'dot' }
);
}
// ── Direction helper kept for tests pinning the dead-band semantics ──
_deriveDirection(netFlow) { return this.flowAggregator.deriveDirection(netFlow); }
// ── Volume/level conversions kept for tests + back-compat ──────────────
_calcVolumeFromLevel(level) { return this.basin.volumeFromLevel(level); }
_calcLevelFromVolume(volume) { return this.basin.levelFromVolume(volume); }
_subscribeMeasurement(child) {
const position = child.config.functionality.positionVsParent;
const measurementType = child.config.asset.type;
const eventName = `${measurementType}.measured.${position}`;
child.measurements.emitter.on(eventName, (eventData = {}) => {
this.logger.debug(
`Measurement update ${eventName} <- ${eventData.childName || child.config.general.name}: ${eventData.value} ${eventData.unit}`
);
if (measurementType === 'level') {
this.measurementRouter.route(measurementType, eventData.value, position, eventData);
return;
}
this.measurements.type(measurementType).variant('measured').position(position)
.value(eventData.value, eventData.timestamp, eventData.unit);
this.measurementRouter.route(measurementType, eventData.value, position, eventData);
});
}
_subscribePredictedFlow(child) {
// Map the child's position to the orchestrator's posKey + the most
// specific aggregator event. 'downstream' is preferred over 'atequipment'
// because they carry the same total — subscribing to both double-counts.
const POS_MAP = {
downstream: ['out', 'flow.predicted.downstream'],
out: ['out', 'flow.predicted.downstream'],
atequipment:['out', 'flow.predicted.downstream'],
upstream: ['in', 'flow.predicted.upstream'],
in: ['in', 'flow.predicted.upstream'],
};
const position = (child.config.functionality.positionVsParent || '').toLowerCase();
const mapped = POS_MAP[position];
if (!mapped) {
this.logger.warn(`Unsupported predicted flow position "${position}" from ${child.config.general.name}`);
return;
}
const [posKey, eventName] = mapped;
const childId = child.config.general.id ?? child.config.general.name;
child.measurements.emitter.on(eventName, (eventData = {}) => {
const unit = eventData.unit || child.config?.general?.unit;
const ts = eventData.timestamp || Date.now();
this.measurements.type('flow').variant('predicted').position(posKey).child(childId)
.value(eventData.value, ts, unit);
});
}
}
module.exports = PumpingStation;