A measurement child that already holds a value when the pumpingStation registers it (e.g. a once:true inject that fired during startup before the parent subscribed) was never surfaced — the emitter only delivers future updates. _subscribeMeasurement now seeds from the child's current sample via getLaggedSample(0), so late subscribers pick up present state. This is what makes a measured upstream inflow register as inflow on a clean startup. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
82 lines
3.5 KiB
JavaScript
82 lines
3.5 KiB
JavaScript
// Late-subscriber replay: a measurement child that already holds a value when
|
|
// the pumpingStation registers it (e.g. a once-only inject that fired during
|
|
// startup before the parent subscribed) must still surface on Port 0. The
|
|
// emitter only delivers future updates, so _subscribeMeasurement seeds from the
|
|
// child's current sample.
|
|
|
|
const test = require('node:test');
|
|
const assert = require('node:assert/strict');
|
|
const EventEmitter = require('node:events');
|
|
|
|
const PumpingStation = require('../../src/specificClass');
|
|
const { MeasurementContainer, configManager } = require('generalFunctions');
|
|
|
|
function makePsConfig() {
|
|
const cm = new configManager();
|
|
return cm.buildConfig('pumpingStation', { name: 'PS' }, 'ps-replay', {
|
|
basin: { volume: 50, height: 5, inflowLevel: 3, outflowLevel: 0.2, overflowLevel: 4.5 },
|
|
hydraulics: { minHeightBasedOn: 'outlet' },
|
|
control: {
|
|
mode: 'levelbased',
|
|
allowedModes: new Set(['levelbased']),
|
|
levelbased: { minLevel: 1, startLevel: 2, maxLevel: 4, curveType: 'linear' },
|
|
},
|
|
safety: {},
|
|
});
|
|
}
|
|
|
|
function makeFlowMeasurementChild(id = 'meas-replay') {
|
|
const measurements = new MeasurementContainer({ autoConvert: true, preferredUnits: { flow: 'm3/s' } });
|
|
assert.ok(typeof measurements.emitter?.on === 'function');
|
|
return {
|
|
id,
|
|
source: {
|
|
config: {
|
|
general: { id, name: id },
|
|
functionality: { softwareType: 'measurement', positionVsParent: 'upstream' },
|
|
asset: { type: 'flow' },
|
|
},
|
|
measurements,
|
|
},
|
|
};
|
|
}
|
|
|
|
test('value written BEFORE registration is replayed on subscribe (once-inject timing)', () => {
|
|
const ps = new PumpingStation(makePsConfig());
|
|
const child = makeFlowMeasurementChild();
|
|
|
|
// Child already holds a value — emitted into the void before the parent existed.
|
|
child.source.measurements
|
|
.type('flow').variant('measured').position('upstream')
|
|
.value(50, Date.now(), 'm3/h');
|
|
|
|
// Parent registers AFTER the value is present. Without replay it would only
|
|
// catch future emits and surface nothing.
|
|
ps.childRegistrationUtils.registerChild(child.source, 'upstream');
|
|
|
|
const out = ps.getOutput();
|
|
const upstreamKeys = Object.keys(out).filter((k) => k.startsWith('flow.measured.upstream'));
|
|
assert.ok(upstreamKeys.length > 0, 'parent must surface flow.measured.upstream.* after late subscribe');
|
|
});
|
|
|
|
test('no stored value → nothing replayed, no crash', () => {
|
|
const ps = new PumpingStation(makePsConfig());
|
|
const child = makeFlowMeasurementChild('empty-child');
|
|
// Register with an empty child container; replay must be a safe no-op.
|
|
assert.doesNotThrow(() => ps.childRegistrationUtils.registerChild(child.source, 'upstream'));
|
|
const out = ps.getOutput();
|
|
const upstreamKeys = Object.keys(out).filter((k) => k.startsWith('flow.measured.upstream'));
|
|
assert.equal(upstreamKeys.length, 0, 'no upstream key when child has no value');
|
|
});
|
|
|
|
test('future emits still delivered after subscribe (listener intact)', () => {
|
|
const ps = new PumpingStation(makePsConfig());
|
|
const child = makeFlowMeasurementChild('streaming-child');
|
|
ps.childRegistrationUtils.registerChild(child.source, 'upstream');
|
|
// Emit AFTER registration — the normal streaming-sensor path.
|
|
child.source.measurements.type('flow').variant('measured').position('upstream').value(30, Date.now(), 'm3/h');
|
|
const out = ps.getOutput();
|
|
const upstreamKeys = Object.keys(out).filter((k) => k.startsWith('flow.measured.upstream'));
|
|
assert.ok(upstreamKeys.length > 0, 'normal post-subscribe emit still surfaces');
|
|
});
|