// Late-subscriber replay: a measurement child that already holds a value when // the pumpingStation registers it (e.g. a once-only inject that fired during // startup before the parent subscribed) must still surface on Port 0. The // emitter only delivers future updates, so _subscribeMeasurement seeds from the // child's current sample. const test = require('node:test'); const assert = require('node:assert/strict'); const EventEmitter = require('node:events'); const PumpingStation = require('../../src/specificClass'); const { MeasurementContainer, configManager } = require('generalFunctions'); function makePsConfig() { const cm = new configManager(); return cm.buildConfig('pumpingStation', { name: 'PS' }, 'ps-replay', { basin: { volume: 50, height: 5, inflowLevel: 3, outflowLevel: 0.2, overflowLevel: 4.5 }, hydraulics: { minHeightBasedOn: 'outlet' }, control: { mode: 'levelbased', allowedModes: new Set(['levelbased']), levelbased: { minLevel: 1, startLevel: 2, maxLevel: 4, curveType: 'linear' }, }, safety: {}, }); } function makeFlowMeasurementChild(id = 'meas-replay') { const measurements = new MeasurementContainer({ autoConvert: true, preferredUnits: { flow: 'm3/s' } }); assert.ok(typeof measurements.emitter?.on === 'function'); return { id, source: { config: { general: { id, name: id }, functionality: { softwareType: 'measurement', positionVsParent: 'upstream' }, asset: { type: 'flow' }, }, measurements, }, }; } test('value written BEFORE registration is replayed on subscribe (once-inject timing)', () => { const ps = new PumpingStation(makePsConfig()); const child = makeFlowMeasurementChild(); // Child already holds a value — emitted into the void before the parent existed. child.source.measurements .type('flow').variant('measured').position('upstream') .value(50, Date.now(), 'm3/h'); // Parent registers AFTER the value is present. Without replay it would only // catch future emits and surface nothing. ps.childRegistrationUtils.registerChild(child.source, 'upstream'); const out = ps.getOutput(); const upstreamKeys = Object.keys(out).filter((k) => k.startsWith('flow.measured.upstream')); assert.ok(upstreamKeys.length > 0, 'parent must surface flow.measured.upstream.* after late subscribe'); }); test('no stored value → nothing replayed, no crash', () => { const ps = new PumpingStation(makePsConfig()); const child = makeFlowMeasurementChild('empty-child'); // Register with an empty child container; replay must be a safe no-op. assert.doesNotThrow(() => ps.childRegistrationUtils.registerChild(child.source, 'upstream')); const out = ps.getOutput(); const upstreamKeys = Object.keys(out).filter((k) => k.startsWith('flow.measured.upstream')); assert.equal(upstreamKeys.length, 0, 'no upstream key when child has no value'); }); test('future emits still delivered after subscribe (listener intact)', () => { const ps = new PumpingStation(makePsConfig()); const child = makeFlowMeasurementChild('streaming-child'); ps.childRegistrationUtils.registerChild(child.source, 'upstream'); // Emit AFTER registration — the normal streaming-sensor path. child.source.measurements.type('flow').variant('measured').position('upstream').value(30, Date.now(), 'm3/h'); const out = ps.getOutput(); const upstreamKeys = Object.keys(out).filter((k) => k.startsWith('flow.measured.upstream')); assert.ok(upstreamKeys.length > 0, 'normal post-subscribe emit still surfaces'); });