// FlowAggregator — owns the predicted-volume integrator + net-flow selection // + remaining-time projection for the pumping-station basin. // // Pure domain. Takes a context bag with the live MeasurementContainer, the // basin geometry, and the merged config; mutates measurements in place and // keeps a tiny piece of integrator state internally. const { interpolation } = require('generalFunctions'); const DEFAULT_FLOW_THRESHOLD = 1e-4; const DEFAULT_FLOW_VARIANTS = ['measured', 'predicted']; const DEFAULT_LEVEL_VARIANTS = ['measured', 'predicted']; const DEFAULT_FLOW_POSITIONS = { inflow: ['in', 'upstream'], outflow: ['out', 'downstream'], }; class FlowAggregator { constructor(ctx = {}) { if (!ctx.measurements) throw new Error('FlowAggregator: ctx.measurements is required'); if (!ctx.basin) throw new Error('FlowAggregator: ctx.basin is required'); this.measurements = ctx.measurements; this.basin = ctx.basin; this.config = ctx.config || {}; this.logger = ctx.logger || null; this._interp = ctx.interpolation || new interpolation(); this.flowVariants = ctx.flowVariants || DEFAULT_FLOW_VARIANTS; this.levelVariants = ctx.levelVariants || DEFAULT_LEVEL_VARIANTS; this.flowPositions = ctx.flowPositions || DEFAULT_FLOW_POSITIONS; const cfgThresh = Number(this.config?.general?.flowThreshold); this.flowThreshold = Number.isFinite(ctx.flowThreshold) ? ctx.flowThreshold : (Number.isFinite(cfgThresh) ? cfgThresh : DEFAULT_FLOW_THRESHOLD); this._predictedFlowState = null; this._lastNetFlow = { value: 0, source: null, direction: 'steady' }; this._lastRemaining = { seconds: null, source: null }; } resetState(timestamp = Date.now()) { this._predictedFlowState = { inflow: 0, outflow: 0, lastTimestamp: timestamp }; } update() { const flowUnit = 'm3/s'; const now = Date.now(); const inflow = this.measurements.sum('flow', 'predicted', this.flowPositions.inflow, flowUnit) || 0; const outflow = this.measurements.sum('flow', 'predicted', this.flowPositions.outflow, flowUnit) || 0; if (!this._predictedFlowState) this._predictedFlowState = { inflow, outflow, lastTimestamp: now }; const tPrev = this._predictedFlowState.lastTimestamp ?? now; const dt = Math.max((now - tPrev) / 1000, 0); const dV = dt > 0 ? (inflow - outflow) * dt : 0; const volSeries = this.measurements.type('volume').variant('predicted').position('atequipment'); const currentVol = volSeries.getCurrentValue('m3'); const nextVol = (currentVol ?? this.basin.minVol ?? 0) + dV; const writeTs = tPrev + dt * 1000; volSeries.value(nextVol, writeTs, 'm3').unit('m3'); const surfaceArea = this.basin.surfaceArea; const nextLevel = surfaceArea > 0 ? Math.max(nextVol, 0) / surfaceArea : 0; this.measurements.type('level').variant('predicted').position('atequipment') .value(nextLevel, writeTs, 'm').unit('m'); const percent = this._interp.interpolate_lin_single_point( nextVol, this.basin.minVol, this.basin.maxVolAtOverflow, 0, 100 ); this.measurements.type('volumePercent').variant('predicted').position('atequipment') .value(percent, writeTs, '%'); this._predictedFlowState = { inflow, outflow, lastTimestamp: writeTs }; } selectBestNetFlow() { const type = 'flow'; const unit = this.measurements.getUnit(type) || 'm3/s'; for (const variant of this.flowVariants) { const bucket = this.measurements.measurements?.[type]?.[variant]; if (!bucket || Object.keys(bucket).length === 0) continue; const inflow = this.measurements.sum(type, variant, this.flowPositions.inflow, unit) || 0; const outflow = this.measurements.sum(type, variant, this.flowPositions.outflow, unit) || 0; if (Math.abs(inflow) < this.flowThreshold && Math.abs(outflow) < this.flowThreshold) continue; const net = inflow - outflow; this.measurements.type('netFlowRate').variant(variant).position('atequipment') .value(net, Date.now(), unit); const result = { value: net, source: variant, direction: this.deriveDirection(net) }; this._lastNetFlow = result; return result; } for (const variant of this.levelVariants) { const rate = this._levelRate(variant); if (!Number.isFinite(rate)) continue; const net = rate * this.basin.surfaceArea; const result = { value: net, source: `level:${variant}`, direction: this.deriveDirection(net) }; this._lastNetFlow = result; return result; } if (this.logger) this.logger.warn('No usable measurements to compute net flow; assuming steady.'); const result = { value: 0, source: null, direction: 'steady' }; this._lastNetFlow = result; return result; } computeRemainingTime(netFlow) { if (!netFlow || Math.abs(netFlow.value) < this.flowThreshold) { this._lastRemaining = { seconds: null, source: null }; return this._lastRemaining; } const { overflowLevel, outflowLevel, surfaceArea } = this.basin; if (!Number.isFinite(surfaceArea) || surfaceArea <= 0) { this._lastRemaining = { seconds: null, source: null }; return this._lastRemaining; } for (const variant of this.levelVariants) { const lvl = this.measurements.type('level').variant(variant).position('atequipment').getCurrentValue('m'); if (!Number.isFinite(lvl)) continue; const remainingHeight = netFlow.value > 0 ? Math.max(overflowLevel - lvl, 0) : Math.max(lvl - outflowLevel, 0); const seconds = (remainingHeight * surfaceArea) / Math.abs(netFlow.value); if (!Number.isFinite(seconds)) continue; this._lastRemaining = { seconds, source: `${netFlow.source}/${variant}` }; return this._lastRemaining; } this._lastRemaining = { seconds: null, source: netFlow.source }; return this._lastRemaining; } deriveDirection(netFlow) { if (netFlow > this.flowThreshold) return 'filling'; if (netFlow < -this.flowThreshold) return 'draining'; return 'steady'; } tick() { this.update(); const netFlow = this.selectBestNetFlow(); const remaining = this.computeRemainingTime(netFlow); return { netFlow, remaining }; } snapshot() { return { direction: this._lastNetFlow.direction, netFlow: this._lastNetFlow.value, flowSource: this._lastNetFlow.source, secondsRemaining: this._lastRemaining.seconds, }; } _levelRate(variant) { const m = this.measurements.type('level').variant(variant).position('atequipment').get(); if (!m || !m.values || m.values.length < 2) return null; const current = m.getLaggedSample?.(0); const previous = m.getLaggedSample?.(1); if (!current || !previous || previous.timestamp == null) return null; const dt = (current.timestamp - previous.timestamp) / 1000; if (!Number.isFinite(dt) || dt <= 0) return null; return (current.value - previous.value) / dt; } } module.exports = FlowAggregator;