Compare commits
4 Commits
ef07f2a5b2
...
e041877ae4
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e041877ae4 | ||
|
|
8216480950 | ||
|
|
dfaa0c3ae8 | ||
|
|
6e727d929b |
@@ -253,6 +253,26 @@ async function run(ctx, controlState, direction) {
|
|||||||
`Level-based: level=${level} dir=${direction} armed=${shiftArmed} hold=${shiftHold} pct=${percControl}`
|
`Level-based: level=${level} dir=${direction} armed=${shiftArmed} hold=${shiftHold} pct=${percControl}`
|
||||||
);
|
);
|
||||||
|
|
||||||
|
// We are past every off-gate, so the station is engaged and the computed
|
||||||
|
// demand is meant to drive pumps. If no machine group is registered the
|
||||||
|
// demand has nowhere to go and the pumps stay silent — the signature of a
|
||||||
|
// dropped Port 2 parent↔group registration (e.g. after a partial redeploy
|
||||||
|
// that recreated this node). Warn once until a group reappears so the
|
||||||
|
// failure isn't invisible.
|
||||||
|
const groupCount = machineGroups ? Object.keys(machineGroups).length : 0;
|
||||||
|
if (groupCount === 0) {
|
||||||
|
if (host && !host._warnedNoMachineGroup) {
|
||||||
|
logger?.warn?.(
|
||||||
|
`Level-based control engaged (demand ${percControl.toFixed(1)} %) but no machine group is registered — `
|
||||||
|
+ `pumps cannot be driven. The parent↔group registration was likely lost on a partial redeploy; `
|
||||||
|
+ `redeploy/restart fully to re-run the Port 2 registration handshake.`
|
||||||
|
);
|
||||||
|
host._warnedNoMachineGroup = true;
|
||||||
|
}
|
||||||
|
} else if (host) {
|
||||||
|
host._warnedNoMachineGroup = false;
|
||||||
|
}
|
||||||
|
|
||||||
await _applyMachineGroupLevelControl(machineGroups, percControl, logger);
|
await _applyMachineGroupLevelControl(machineGroups, percControl, logger);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -28,6 +28,18 @@ async function forwardDemand(ctx, demand) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Neither a group nor a direct machine is registered, so the operator's
|
||||||
|
// demand silently goes nowhere. Surface it — the usual cause is a dropped
|
||||||
|
// Port 2 parent↔child registration after a partial redeploy.
|
||||||
|
const noGroups = !machineGroups || Object.keys(machineGroups).length === 0;
|
||||||
|
const noMachines = !machines || Object.keys(machines).length === 0;
|
||||||
|
if (noGroups && noMachines) {
|
||||||
|
logger?.warn?.(
|
||||||
|
`Manual demand ${demand} not forwarded — no machine group or machine is registered to this pumping station. `
|
||||||
|
+ `Check the parent↔child Port 2 registration (redeploy/restart fully to restore it).`
|
||||||
|
);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports = {
|
module.exports = {
|
||||||
|
|||||||
@@ -18,15 +18,21 @@ class PumpingStation extends BaseDomain {
|
|||||||
static name = 'pumpingStation';
|
static name = 'pumpingStation';
|
||||||
|
|
||||||
// Internal math runs in m3/s for flow and m for level so the volume
|
// Internal math runs in m3/s for flow and m for level so the volume
|
||||||
// integrator (flow × dt) is unit-consistent. Strict canonicals make
|
// integrator (flow × dt) is unit-consistent — canonical stays m3/s, the
|
||||||
// unit drift in child-fed measurements an explicit error.
|
// platform-wide convention every cross-node consumer (MGC demand math,
|
||||||
|
// physics-sanity) assumes. Strict canonicals make unit drift in child-fed
|
||||||
|
// measurements an explicit error.
|
||||||
|
// Output flow / netFlowRate are emitted in m3/h so telemetry/dashboard
|
||||||
|
// series land on the same axis as the rest of the pump group (verified
|
||||||
|
// slice #47); the m3/s→m3/h presentation conversion happens at the output
|
||||||
|
// boundary only — it never touches the canonical integrator basis.
|
||||||
// overflowVolume / underflowVolume are listed in output so the
|
// overflowVolume / underflowVolume are listed in output so the
|
||||||
// MeasurementContainer keeps the integrator's m³ unit on those streams
|
// MeasurementContainer keeps the integrator's m³ unit on those streams
|
||||||
// (FlowAggregator writes spill / underflow per tick).
|
// (FlowAggregator writes spill / underflow per tick).
|
||||||
static unitPolicy = UnitPolicy.declare({
|
static unitPolicy = UnitPolicy.declare({
|
||||||
canonical: { flow: 'm3/s', pressure: 'Pa', power: 'W', temperature: 'K' },
|
canonical: { flow: 'm3/s', pressure: 'Pa', power: 'W', temperature: 'K' },
|
||||||
output: {
|
output: {
|
||||||
flow: 'm3/s', netFlowRate: 'm3/s', level: 'm', volume: 'm3',
|
flow: 'm3/h', netFlowRate: 'm3/h', level: 'm', volume: 'm3',
|
||||||
overflowVolume: 'm3', underflowVolume: 'm3',
|
overflowVolume: 'm3', underflowVolume: 'm3',
|
||||||
},
|
},
|
||||||
requireUnitForTypes: [],
|
requireUnitForTypes: [],
|
||||||
@@ -286,7 +292,7 @@ class PumpingStation extends BaseDomain {
|
|||||||
const measurementType = child.config.asset.type;
|
const measurementType = child.config.asset.type;
|
||||||
const eventName = `${measurementType}.measured.${position}`;
|
const eventName = `${measurementType}.measured.${position}`;
|
||||||
|
|
||||||
child.measurements.emitter.on(eventName, (eventData = {}) => {
|
const handle = (eventData = {}) => {
|
||||||
this.logger.debug(
|
this.logger.debug(
|
||||||
`Measurement update ${eventName} <- ${eventData.childName || child.config.general.name}: ${eventData.value} ${eventData.unit}`
|
`Measurement update ${eventName} <- ${eventData.childName || child.config.general.name}: ${eventData.value} ${eventData.unit}`
|
||||||
);
|
);
|
||||||
@@ -297,7 +303,21 @@ class PumpingStation extends BaseDomain {
|
|||||||
this.measurements.type(measurementType).variant('measured').position(position)
|
this.measurements.type(measurementType).variant('measured').position(position)
|
||||||
.value(eventData.value, eventData.timestamp, eventData.unit);
|
.value(eventData.value, eventData.timestamp, eventData.unit);
|
||||||
this.measurementRouter.route(measurementType, eventData.value, position, eventData);
|
this.measurementRouter.route(measurementType, eventData.value, position, eventData);
|
||||||
});
|
};
|
||||||
|
|
||||||
|
child.measurements.emitter.on(eventName, handle);
|
||||||
|
|
||||||
|
// Seed from the child's current value. The emitter only delivers FUTURE
|
||||||
|
// updates, so a parent that registers after the child already emitted
|
||||||
|
// (e.g. a once-only inject that fired during startup before this
|
||||||
|
// subscription existed) would otherwise never see that value. Replaying
|
||||||
|
// the last sample makes a late subscriber pick up the present state.
|
||||||
|
const series = child.measurements
|
||||||
|
.type(measurementType).variant('measured').position(position).get?.();
|
||||||
|
const sample = series?.getLaggedSample?.(0);
|
||||||
|
if (sample && sample.value != null) {
|
||||||
|
handle({ ...sample, childName: child.config.general.name });
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_subscribePredictedFlow(child) {
|
_subscribePredictedFlow(child) {
|
||||||
|
|||||||
@@ -182,3 +182,51 @@ test('no valid level → warns and returns without mutating percControl or calli
|
|||||||
assert.equal(g._calls.handleInput.length, 0);
|
assert.equal(g._calls.handleInput.length, 0);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|
||||||
|
// Regression: a station engaged above startLevel but with no machine group
|
||||||
|
// registered (e.g. the Port 2 parent↔group registration was dropped by a
|
||||||
|
// partial redeploy) computes a real demand that goes nowhere. The strategy
|
||||||
|
// must surface this once, not fail silently. See the 2026-05-27 "PS not
|
||||||
|
// reacting to level" trace.
|
||||||
|
test('engaged with NO machine group registered → warns once (throttled via host)', async () => {
|
||||||
|
const ctx = makeCtx(3, { levelbased: { holdLevel: 2 } }); // level 3 > startLevel 2 → engaged
|
||||||
|
ctx.machineGroups = {}; // registration lost
|
||||||
|
ctx.host = {};
|
||||||
|
const warns = [];
|
||||||
|
ctx.logger.warn = (m) => warns.push(m);
|
||||||
|
|
||||||
|
const state = { percControl: 0 };
|
||||||
|
await levelBased.run(ctx, state);
|
||||||
|
|
||||||
|
assert.ok(state.percControl > 0, 'demand is computed even though there is no group');
|
||||||
|
assert.equal(warns.length, 1, 'warns exactly once');
|
||||||
|
assert.match(warns[0], /no machine group is registered/i);
|
||||||
|
assert.equal(ctx.host._warnedNoMachineGroup, true);
|
||||||
|
|
||||||
|
// Subsequent ticks while still group-less stay quiet (no log spam).
|
||||||
|
await levelBased.run(ctx, state);
|
||||||
|
assert.equal(warns.length, 1, 'throttled: no repeat warning on the next tick');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('warning re-arms after a group reappears then disappears again', async () => {
|
||||||
|
const ctx = makeCtx(3, { levelbased: { holdLevel: 2 } });
|
||||||
|
ctx.host = {};
|
||||||
|
const warns = [];
|
||||||
|
ctx.logger.warn = (m) => warns.push(m);
|
||||||
|
const state = { percControl: 0 };
|
||||||
|
|
||||||
|
ctx.machineGroups = {};
|
||||||
|
await levelBased.run(ctx, state);
|
||||||
|
assert.equal(warns.length, 1);
|
||||||
|
|
||||||
|
// Group registers again → flag clears, no new warning.
|
||||||
|
ctx.machineGroups = { a: makeGroup('A') };
|
||||||
|
await levelBased.run(ctx, state);
|
||||||
|
assert.equal(warns.length, 1);
|
||||||
|
assert.equal(ctx.host._warnedNoMachineGroup, false);
|
||||||
|
|
||||||
|
// Group lost again → warns once more.
|
||||||
|
ctx.machineGroups = {};
|
||||||
|
await levelBased.run(ctx, state);
|
||||||
|
assert.equal(warns.length, 2, 're-armed after recovery');
|
||||||
|
});
|
||||||
|
|||||||
81
test/basic/replay-on-subscribe.basic.test.js
Normal file
81
test/basic/replay-on-subscribe.basic.test.js
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
// Late-subscriber replay: a measurement child that already holds a value when
|
||||||
|
// the pumpingStation registers it (e.g. a once-only inject that fired during
|
||||||
|
// startup before the parent subscribed) must still surface on Port 0. The
|
||||||
|
// emitter only delivers future updates, so _subscribeMeasurement seeds from the
|
||||||
|
// child's current sample.
|
||||||
|
|
||||||
|
const test = require('node:test');
|
||||||
|
const assert = require('node:assert/strict');
|
||||||
|
const EventEmitter = require('node:events');
|
||||||
|
|
||||||
|
const PumpingStation = require('../../src/specificClass');
|
||||||
|
const { MeasurementContainer, configManager } = require('generalFunctions');
|
||||||
|
|
||||||
|
function makePsConfig() {
|
||||||
|
const cm = new configManager();
|
||||||
|
return cm.buildConfig('pumpingStation', { name: 'PS' }, 'ps-replay', {
|
||||||
|
basin: { volume: 50, height: 5, inflowLevel: 3, outflowLevel: 0.2, overflowLevel: 4.5 },
|
||||||
|
hydraulics: { minHeightBasedOn: 'outlet' },
|
||||||
|
control: {
|
||||||
|
mode: 'levelbased',
|
||||||
|
allowedModes: new Set(['levelbased']),
|
||||||
|
levelbased: { minLevel: 1, startLevel: 2, maxLevel: 4, curveType: 'linear' },
|
||||||
|
},
|
||||||
|
safety: {},
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
function makeFlowMeasurementChild(id = 'meas-replay') {
|
||||||
|
const measurements = new MeasurementContainer({ autoConvert: true, preferredUnits: { flow: 'm3/s' } });
|
||||||
|
assert.ok(typeof measurements.emitter?.on === 'function');
|
||||||
|
return {
|
||||||
|
id,
|
||||||
|
source: {
|
||||||
|
config: {
|
||||||
|
general: { id, name: id },
|
||||||
|
functionality: { softwareType: 'measurement', positionVsParent: 'upstream' },
|
||||||
|
asset: { type: 'flow' },
|
||||||
|
},
|
||||||
|
measurements,
|
||||||
|
},
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
test('value written BEFORE registration is replayed on subscribe (once-inject timing)', () => {
|
||||||
|
const ps = new PumpingStation(makePsConfig());
|
||||||
|
const child = makeFlowMeasurementChild();
|
||||||
|
|
||||||
|
// Child already holds a value — emitted into the void before the parent existed.
|
||||||
|
child.source.measurements
|
||||||
|
.type('flow').variant('measured').position('upstream')
|
||||||
|
.value(50, Date.now(), 'm3/h');
|
||||||
|
|
||||||
|
// Parent registers AFTER the value is present. Without replay it would only
|
||||||
|
// catch future emits and surface nothing.
|
||||||
|
ps.childRegistrationUtils.registerChild(child.source, 'upstream');
|
||||||
|
|
||||||
|
const out = ps.getOutput();
|
||||||
|
const upstreamKeys = Object.keys(out).filter((k) => k.startsWith('flow.measured.upstream'));
|
||||||
|
assert.ok(upstreamKeys.length > 0, 'parent must surface flow.measured.upstream.* after late subscribe');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('no stored value → nothing replayed, no crash', () => {
|
||||||
|
const ps = new PumpingStation(makePsConfig());
|
||||||
|
const child = makeFlowMeasurementChild('empty-child');
|
||||||
|
// Register with an empty child container; replay must be a safe no-op.
|
||||||
|
assert.doesNotThrow(() => ps.childRegistrationUtils.registerChild(child.source, 'upstream'));
|
||||||
|
const out = ps.getOutput();
|
||||||
|
const upstreamKeys = Object.keys(out).filter((k) => k.startsWith('flow.measured.upstream'));
|
||||||
|
assert.equal(upstreamKeys.length, 0, 'no upstream key when child has no value');
|
||||||
|
});
|
||||||
|
|
||||||
|
test('future emits still delivered after subscribe (listener intact)', () => {
|
||||||
|
const ps = new PumpingStation(makePsConfig());
|
||||||
|
const child = makeFlowMeasurementChild('streaming-child');
|
||||||
|
ps.childRegistrationUtils.registerChild(child.source, 'upstream');
|
||||||
|
// Emit AFTER registration — the normal streaming-sensor path.
|
||||||
|
child.source.measurements.type('flow').variant('measured').position('upstream').value(30, Date.now(), 'm3/h');
|
||||||
|
const out = ps.getOutput();
|
||||||
|
const upstreamKeys = Object.keys(out).filter((k) => k.startsWith('flow.measured.upstream'));
|
||||||
|
assert.ok(upstreamKeys.length > 0, 'normal post-subscribe emit still surfaces');
|
||||||
|
});
|
||||||
Reference in New Issue
Block a user