const MeasurementBuilder = require('./MeasurementBuilder'); const EventEmitter = require('events'); const convertModule = require('../convert/index'); const { POSITIONS } = require('../constants/positions'); /* ============================================================================ * MeasurementContainer — measurement storage with chainable type/variant/ * position/child addressing. * * INTERNAL STORAGE SHAPE * measurements[type][variant][position][childId] = Measurement instance * * The childId layer is ALWAYS present, even when the caller doesn't specify * one. _getOrCreateMeasurement defaults childId to 'default' when no * .child(...) is in the chain. So writing * * mc.type('level').variant('measured').position('atequipment') * .value(2.5, ts, 'm'); * * stores the value at measurements.level.measured.atequipment.default. * * READING — the chainable getters resolve the default child transparently, * so consumers usually don't see it: * * mc.type('level').variant('measured').position('atequipment') * .getCurrentValue('m'); // returns 2.5 * * FLATTENED OUTPUT — getFlattenedOutput() emits ONE key per child, including * the implicit 'default' bucket: * * { * 'level.measured.atequipment.default': 2.5, // implicit child * 'flow.predicted.in.manual-qin': 0.05, // explicit .child('manual-qin') * 'flow.predicted.in.from-pump-A': 0.03, * … * } * * ⚠ DASHBOARDS / DOWNSTREAM PARSERS MUST INCLUDE THE CHILD KEY * The flat key format is `${type}.${variant}.${position}.${childId}`. * When you have not used .child(), the childId is the literal string * 'default'. Use 'level.measured.atequipment.default', NOT * 'level.measured.atequipment'. This trips up new consumers — see the * pumpingStation basic-dashboard parser for an example that gets it right. * * AGGREGATION — sum() folds all children of a position into one number: * * mc.sum('flow', 'predicted', ['in'], 'm3/s'); * // = manual-qin + from-pump-A + … + (default if any) * ============================================================================ */ class MeasurementContainer { constructor(options = {},logger) { this.logger = logger || null; this.emitter = new EventEmitter(); this.measurements = {}; this.windowSize = options.windowSize || 10; // Default window size // For chaining context this._currentChildId = null; this._currentType = null; this._currentVariant = null; this._currentPosition = null; this._currentDistance = null; this._unit = null; // Default units for each measurement type (ingress/preferred) this.defaultUnits = { pressure: 'mbar', flow: 'm3/h', power: 'kW', temperature: 'C', volume: 'm3', length: 'm', ...options.defaultUnits // Allow override }; // Canonical storage unit map (single conversion anchor per measurement type) this.canonicalUnits = { pressure: 'Pa', atmPressure: 'Pa', flow: 'm3/s', power: 'W', hydraulicPower: 'W', temperature: 'K', volume: 'm3', length: 'm', mass: 'kg', energy: 'J', ...options.canonicalUnits, }; // Auto-conversion settings this.autoConvert = options.autoConvert !== false; // Default to true this.preferredUnits = options.preferredUnits || {}; // Per-measurement overrides this.storeCanonical = options.storeCanonical === true; this.strictUnitValidation = options.strictUnitValidation === true; this.throwOnInvalidUnit = options.throwOnInvalidUnit === true; this.requireUnitForTypes = new Set( (options.requireUnitForTypes || []).map((t) => String(t).trim().toLowerCase()) ); // Map EVOLV measurement types to convert-module measure families this.measureMap = { pressure: 'pressure', atmpressure: 'pressure', flow: 'volumeFlowRate', power: 'power', hydraulicpower: 'power', reactivepower: 'reactivePower', apparentpower: 'apparentPower', temperature: 'temperature', volume: 'volume', length: 'length', mass: 'mass', energy: 'energy', reactiveenergy: 'reactiveEnergy', }; // For chaining context this._currentType = null; this._currentVariant = null; this._currentPosition = null; this._unit = null; // NEW: Enhanced child identification this.childId = null; this.childName = null; this.parentRef = null; } // NEW: Methods to set child context setChildId(childId) { this.childId = childId; return this; } child(childId) { this._currentChildId = childId || 'default'; return this; } setChildName(childName) { this.childName = childName; return this; } setParentRef(parent) { this.parentRef = parent; return this; } // New method to set preferred units setPreferredUnit(measurementType, unit) { this.preferredUnits[measurementType] = unit; return this; } setCanonicalUnit(measurementType, unit) { this.canonicalUnits[measurementType] = unit; return this; } // Get the target unit for a measurement type _getTargetUnit(measurementType) { return this.preferredUnits[measurementType] || this.defaultUnits[measurementType] || null; } _getCanonicalUnit(measurementType) { return this.canonicalUnits[measurementType] || null; } _normalizeType(measurementType) { return String(measurementType || '').trim().toLowerCase(); } _describeUnit(unit) { if (typeof unit !== 'string' || unit.trim() === '') return null; try { return convertModule().describe(unit.trim()); } catch (error) { return null; } } isUnitCompatible(measurementType, unit) { // Unknown type (not in measureMap): accept any unit. This lets user- // defined measurement types (e.g. 'humidity', 'co2', arbitrary IoT // channels in digital mode) pass through without being rejected just // because their unit string ('%', 'ppm', …) is not a known physical // unit to the convert module. Known types are still validated strictly. const normalizedType = this._normalizeType(measurementType); const expectedMeasure = this.measureMap[normalizedType]; if (!expectedMeasure) return true; const desc = this._describeUnit(unit); if (!desc) return false; return desc.measure === expectedMeasure; } _handleUnitViolation(message) { if (this.throwOnInvalidUnit) { throw new Error(message); } if (this.logger) { this.logger.warn(message); } } _resolveUnitPolicy(measurementType, sourceUnit = null) { const normalizedType = this._normalizeType(measurementType); const rawSourceUnit = typeof sourceUnit === 'string' && sourceUnit.trim() ? sourceUnit.trim() : null; const fallbackIngressUnit = this._getTargetUnit(measurementType); const canonicalUnit = this._getCanonicalUnit(measurementType); const resolvedSourceUnit = rawSourceUnit || fallbackIngressUnit || canonicalUnit || null; if (this.requireUnitForTypes.has(normalizedType) && !rawSourceUnit) { this._handleUnitViolation(`Missing source unit for required measurement type '${measurementType}'.`); return { valid: false }; } if (resolvedSourceUnit && !this.isUnitCompatible(measurementType, resolvedSourceUnit)) { this._handleUnitViolation(`Incompatible or unknown source unit '${resolvedSourceUnit}' for measurement type '${measurementType}'.`); return { valid: false }; } const resolvedStorageUnit = this.storeCanonical ? (canonicalUnit || fallbackIngressUnit || resolvedSourceUnit) : (fallbackIngressUnit || canonicalUnit || resolvedSourceUnit); if (resolvedStorageUnit && !this.isUnitCompatible(measurementType, resolvedStorageUnit)) { this._handleUnitViolation(`Incompatible storage unit '${resolvedStorageUnit}' for measurement type '${measurementType}'.`); return { valid: false }; } return { valid: true, sourceUnit: resolvedSourceUnit, storageUnit: resolvedStorageUnit || null, strictValidation: this.strictUnitValidation, }; } getUnit(type) { if (!type) return null; if (this.preferredUnits && this.preferredUnits[type]) return this.preferredUnits[type]; if (this.defaultUnits && this.defaultUnits[type]) return this.defaultUnits[type]; return null; } // Chainable methods type(typeName) { this._currentType = typeName; this._currentVariant = null; this._currentPosition = null; this._currentChildId = null; return this; } variant(variantName) { if (!this._currentType) { if (this.logger) { this.logger.warn('variant() ignored: type must be specified before variant'); } return this; } this._currentVariant = variantName; this._currentPosition = null; this._currentChildId = null; return this; } position(positionValue) { if (!this._currentVariant) { if (this.logger) { this.logger.warn('position() ignored: variant must be specified before position'); } return this; } this._currentPosition = positionValue.toString().toLowerCase(); return this; } distance(distance) { // If distance is not provided, derive from positionVsParent if(distance === null) { distance = this._convertPositionStr2Num(this._currentPosition); } this._currentDistance = distance; return this; } // ENHANCED: Update your existing value method value(val, timestamp = Date.now(), sourceUnit = null) { if (!this._ensureChainIsValid()) return this; const unitPolicy = this._resolveUnitPolicy(this._currentType, sourceUnit); if (!unitPolicy.valid) return this; const measurement = this._getOrCreateMeasurement(); const targetUnit = unitPolicy.storageUnit; let convertedValue = val; let finalUnit = targetUnit || unitPolicy.sourceUnit; // Auto-convert if enabled and units are specified if (this.autoConvert && unitPolicy.sourceUnit && targetUnit && unitPolicy.sourceUnit !== targetUnit) { try { convertedValue = convertModule(val).from(unitPolicy.sourceUnit).to(targetUnit); finalUnit = targetUnit; if (this.logger) { this.logger.debug(`Auto-converted ${val} ${unitPolicy.sourceUnit} to ${convertedValue} ${targetUnit}`); } } catch (error) { const message = `Auto-conversion failed from ${unitPolicy.sourceUnit} to ${targetUnit}: ${error.message}`; if (this.strictUnitValidation) { this._handleUnitViolation(message); return this; } if (this.logger) this.logger.warn(message); convertedValue = val; finalUnit = unitPolicy.sourceUnit; } } measurement.setValue(convertedValue, timestamp); if (finalUnit) { measurement.setUnit(finalUnit); } // ENHANCED: Emit event with rich context const eventData = { value: convertedValue, originalValue: val, unit: finalUnit, sourceUnit: unitPolicy.sourceUnit, timestamp, position: this._currentPosition, distance: this._currentDistance, variant: this._currentVariant, type: this._currentType, // NEW: Enhanced context childId: this.childId, childName: this.childName, parentRef: this.parentRef, }; // Emit the exact event your parent expects this.emitter.emit(`${this._currentType}.${this._currentVariant}.${this._currentPosition}`, eventData); //console.log(`Emitted event: ${this._currentType}.${this._currentVariant}.${this._currentPosition}`, eventData); return this; } /** * Check whether a measurement series exists. * * You can rely on the current chain (type/variant/position already set via * type().variant().position()), or pass them explicitly via the options. * * @param {object} options * @param {string} [options.type] Override the current type * @param {string} [options.variant] Override the current variant * @param {string} [options.position] Override the current position * @param {boolean} [options.requireValues=false] * When true, the series must contain at least one stored value. * * @returns {boolean} */ exists({ type, variant, position, requireValues = false } = {}) { const typeKey = type ?? this._currentType; if (!typeKey) return false; const variantKey = variant ?? this._currentVariant; if (!variantKey) return false; const positionKey = position ?? this._currentPosition; const typeBucket = this.measurements[typeKey]; if (!typeBucket) return false; const variantBucket = typeBucket[variantKey]; if (!variantBucket) return false; if (!positionKey) { // No specific position requested – just check the variant bucket. return requireValues ? Object.values(variantBucket).some(m => m?.values?.length > 0) : Object.keys(variantBucket).length > 0; } const measurement = variantBucket[positionKey]; if (!measurement) return false; return requireValues ? measurement.values?.length > 0 : true; } unit(unitName) { if (!this._ensureChainIsValid()) return this; const measurement = this._getOrCreateMeasurement(); measurement.setUnit(unitName); this._unit = unitName; return this; } // Terminal operations - get data out get() { if (!this._ensureChainIsValid()) return null; const variantBucket = this.measurements[this._currentType]?.[this._currentVariant]; if (!variantBucket) return null; const posBucket = variantBucket[this._currentPosition]; if (!posBucket) return null; // Legacy single measurement if (posBucket?.getCurrentValue) return posBucket; // Child-aware: pick requested child, otherwise fall back to default, otherwise first available if (posBucket && typeof posBucket === 'object') { const requestedKey = this._currentChildId || this.childId; const keys = Object.keys(posBucket); if (!keys.length) return null; const measurement = (requestedKey && posBucket[requestedKey]) || posBucket.default || posBucket[keys[0]]; return measurement || null; } return null; } getCurrentValue(requestedUnit = null) { const measurement = this.get(); if (!measurement) return null; const value = measurement.getCurrentValue(); if (value === null) return null; if (!requestedUnit || !measurement.unit || requestedUnit === measurement.unit) { return value; } try { return convertModule(value).from(measurement.unit).to(requestedUnit); } catch (error) { if (this.logger) this.logger.error(`Unit conversion failed: ${error.message}`); return value; } } getAverage(requestedUnit = null) { const measurement = this.get(); if (!measurement) return null; const avgValue = measurement.getAverage(); if (avgValue === null) return null; if (!requestedUnit || !measurement.unit || requestedUnit === measurement.unit) { return avgValue; } try { return convertModule(avgValue).from(measurement.unit).to(requestedUnit); } catch (error) { if (this.logger) { this.logger.error(`Unit conversion failed: ${error.message}`); } return avgValue; } } getMin() { const measurement = this.get(); return measurement ? measurement.getMin() : null; } getMax() { const measurement = this.get(); return measurement ? measurement.getMax() : null; } getAllValues() { const measurement = this.get(); return measurement ? measurement.getAllValues() : null; } getLaggedValue(lag = 1,requestedUnit = null ){ const measurement = this.get(); if (!measurement) return null; let sample = measurement.getLaggedSample(lag); if (sample === null) return null; const value = sample.value; // Return as-is if no unit conversion requested if (!requestedUnit) { return value; } // Convert if needed if (measurement.unit && requestedUnit !== measurement.unit) { try { const convertedValue = convertModule(sample.value).from(measurement.unit).to(requestedUnit); //replace old value in sample and return obj sample.value = convertedValue ; sample.unit = requestedUnit; return sample; } catch (error) { if (this.logger) { this.logger.error(`Unit conversion failed: ${error.message}`); } return sample; // Return original value if conversion fails } } return value; } getLaggedSample(lag = 1,requestedUnit = null ){ const measurement = this.get(); if (!measurement) return null; let sample = measurement.getLaggedSample(lag); if (sample === null) return null; // Return as-is if no unit conversion requested if (!requestedUnit) { return sample; } // Convert if needed if (measurement.unit && requestedUnit !== measurement.unit) { try { const convertedValue = convertModule(sample.value).from(measurement.unit).to(requestedUnit); //replace old value in sample and return obj sample.value = convertedValue ; sample.unit = requestedUnit; return sample; } catch (error) { if (this.logger) { this.logger.error(`Unit conversion failed: ${error.message}`); } return sample; // Return original value if conversion fails } } return sample; } sum(type, variant, positions = [], targetUnit = null) { const bucket = this.measurements?.[type]?.[variant]; if (!bucket) return 0; return positions .map((pos) => { const posBucket = bucket[pos]; if (!posBucket) return 0; return Object.values(posBucket) .map((m) => { if (!m?.getCurrentValue) return 0; const val = m.getCurrentValue(); if (val == null) return 0; const fromUnit = m.unit || targetUnit; if (!targetUnit || !fromUnit || fromUnit === targetUnit) return val; try { return convertModule(val).from(fromUnit).to(targetUnit); } catch { return val; } }) .reduce((acc, v) => acc + (Number.isFinite(v) ? v : 0), 0); }) .reduce((acc, v) => acc + v, 0); } /** * Flatten the entire container to a key→value map, suitable for * dashboards / InfluxDB / debug dumps. * * KEY FORMAT — child-bucketed series (the common case): * `${type}.${variant}.${position}.${childId}` * * Even measurements written without an explicit `.child(...)` end up * here under `childId === 'default'` (see _getOrCreateMeasurement). * Examples: * level.measured.atequipment.default // implicit child * flow.predicted.in.manual-qin // explicit child * flow.predicted.in.from-pump-A // explicit child * * Consumers (Node-RED dashboards, parsers) MUST include the trailing * `.default` when reading default-bucket measurements. Stripping it * silently misses the value. This is the #1 footgun for new code that * uses MeasurementContainer. * * The "Legacy single series" branch below catches a pre-v2 storage * shape where a position held a Measurement directly (no child layer); * new code never produces that shape but old serialized state may. */ getFlattenedOutput(options = {}) { const requestedUnits = options.requestedUnits || (options.usePreferredUnits ? this.preferredUnits : null); const out = {}; Object.entries(this.measurements).forEach(([type, variants]) => { Object.entries(variants).forEach(([variant, positions]) => { Object.entries(positions).forEach(([position, entry]) => { // Legacy single series (no childId layer) if (entry?.getCurrentValue) { out[`${type}.${variant}.${position}`] = this._resolveOutputValue(type, entry, requestedUnits); return; } // Child-bucketed series — ALWAYS the case for new writes, // including the implicit 'default' bucket when no .child() is // used. The flat key carries the childId. if (entry && typeof entry === 'object') { Object.entries(entry).forEach(([childId, m]) => { if (m?.getCurrentValue) { out[`${type}.${variant}.${position}.${childId}`] = this._resolveOutputValue(type, m, requestedUnits); } }); } }); }); }); return out; } // Difference calculations between positions difference({ from = POSITIONS.DOWNSTREAM, to = POSITIONS.UPSTREAM, unit: requestedUnit } = {}) { if (!this._currentType || !this._currentVariant) { if (this.logger) { this.logger.warn('difference() ignored: type and variant must be specified'); } return null; } const get = pos => { const bucket = this.measurements?.[this._currentType]?.[this._currentVariant]?.[pos]; if (!bucket) return null; // child-aware bucket: pick current childId/default or first available if (bucket && typeof bucket === 'object' && !bucket.getCurrentValue) { const childKey = this._currentChildId || this.childId || Object.keys(bucket)[0]; return bucket?.[childKey] || null; } // legacy single measurement return bucket; }; const a = get(from); const b = get(to); if (!a || !b || !a.values || !b.values || a.values.length === 0 || b.values.length === 0) { return null; } const targetUnit = requestedUnit || a.unit || b.unit; const aVal = this._convertValueToUnit(a.getCurrentValue(), a.unit, targetUnit); const bVal = this._convertValueToUnit(b.getCurrentValue(), b.unit, targetUnit); const aAvg = this._convertValueToUnit(a.getAverage(), a.unit, targetUnit); const bAvg = this._convertValueToUnit(b.getAverage(), b.unit, targetUnit); return { value: aVal - bVal, avgDiff: aAvg - bAvg, unit: targetUnit, from, to }; } // Helper methods _ensureChainIsValid() { if (!this._currentType || !this._currentVariant || !this._currentPosition) { if (this.logger) { this.logger.error('Incomplete measurement chain, required: type, variant, and position'); } return false; } return true; } _getOrCreateMeasurement() { // Initialize nested structure if needed if (!this.measurements[this._currentType]) { this.measurements[this._currentType] = {}; } if (!this.measurements[this._currentType][this._currentVariant]) { this.measurements[this._currentType][this._currentVariant] = {}; } const positionKey = this._currentPosition; const childKey = this._currentChildId || this.childId || 'default'; if (!this.measurements[this._currentType][this._currentVariant][positionKey]) { this.measurements[this._currentType][this._currentVariant][positionKey] = {}; } const bucket = this.measurements[this._currentType][this._currentVariant][positionKey]; if (!bucket[childKey]) { bucket[childKey] = new MeasurementBuilder() .setType(this._currentType) .setVariant(this._currentVariant) .setPosition(positionKey) .setWindowSize(this.windowSize) .setDistance(this._currentDistance) .build(); } return bucket[childKey]; } // Additional utility methods getTypes() { return Object.keys(this.measurements); } getVariants() { if (!this._currentType) { if (this.logger) { this.logger.warn('getVariants() ignored: type must be specified first'); } return []; } return this.measurements[this._currentType] ? Object.keys(this.measurements[this._currentType]) : []; } _resolveOutputValue(type, measurement, requestedUnits = null) { const value = measurement.getCurrentValue(); if (!requestedUnits || value === null || typeof value === 'undefined') { return value; } const targetUnit = requestedUnits[type]; if (!targetUnit) { return value; } return this._convertValueToUnit(value, measurement.unit, targetUnit); } getPositions() { if (!this._currentType || !this._currentVariant) { if (this.logger) { this.logger.warn('getPositions() ignored: type and variant must be specified first'); } return []; } if (!this.measurements[this._currentType] || !this.measurements[this._currentType][this._currentVariant]) { return []; } return Object.keys(this.measurements[this._currentType][this._currentVariant]); } clear() { this.measurements = {}; this._currentType = null; this._currentVariant = null; this._currentPosition = null; this._currentDistance = null; this._unit = null; } // Helper method for value conversion _convertValueToUnit(value, fromUnit, toUnit) { if ((value === null || typeof value === 'undefined') || !fromUnit || !toUnit || fromUnit === toUnit) { return value; } try { return convertModule(value).from(fromUnit).to(toUnit); } catch (error) { if (this.logger) { this.logger.warn(`Conversion failed from ${fromUnit} to ${toUnit}: ${error.message}`); } return value; } } // Get available units for a measurement type getAvailableUnits(measurementType = null) { const type = measurementType || this._currentType; if (!type) return []; const convertMeasure = this.measureMap[this._normalizeType(type)]; if (!convertMeasure) return []; try { return convertModule().possibilities(convertMeasure); } catch (error) { return []; } } // Get best unit for current value getBestUnit(excludeUnits = []) { const measurement = this.get(); if (!measurement || !measurement.unit) return null; const currentValue = measurement.getCurrentValue(); if (currentValue === null) return null; try { const best = convertModule(currentValue) .from(measurement.unit) .toBest({ exclude: excludeUnits }); return best; } catch (error) { if (this.logger) { this.logger.error(`getBestUnit failed: ${error.message}`); } return null; } } _convertPositionStr2Num(positionString) { switch(positionString) { case POSITIONS.AT_EQUIPMENT: return 0; case POSITIONS.UPSTREAM: return Number.POSITIVE_INFINITY; case POSITIONS.DOWNSTREAM: return Number.NEGATIVE_INFINITY; default: if (this.logger) { this.logger.error(`Invalid positionVsParent provided: ${positionString}`); } return; } } _convertPositionNum2Str(positionValue) { if (positionValue === 0) { return POSITIONS.AT_EQUIPMENT; } if (positionValue < 0) { return POSITIONS.UPSTREAM; } if (positionValue > 0) { return POSITIONS.DOWNSTREAM; } if (this.logger) { this.logger.warn(`Invalid position provided: ${positionValue}`); } return null; } } module.exports = MeasurementContainer;