fix: production hardening — safety fixes, prediction accuracy, test coverage

Safety:
- Async input handler: await all handleInput() calls, prevents unhandled rejections
- Fix emergencyStop case mismatch: "emergencyStop" → "emergencystop" matching config
- Implement showCoG() method (was routing to undefined)
- Null guards on 6 methods for missing curve data
- Editor menu polling timeout (5s max)
- Listener cleanup on node close (child measurements + state emitter)
- Tick loop race condition: track startup timeout, clear on close

Prediction accuracy:
- Remove efficiency rounding that destroyed signal in canonical units
- Fix calcEfficiency variant: hydraulic power reads from correct variant
- Guard efficiency calculations against negative/zero values
- Division-by-zero protection in calcRelativeDistanceFromPeak
- Curve data anomaly detection (cross-pressure median-y ratio check)
- calcEfficiencyCurve O(n²) → O(n) with running min
- updateCurve bootstraps predictors when they were null

Tests: 43 new tests (76 total) covering emergency stop, shutdown/maintenance
sequences, efficiency/CoG, movement lifecycle, output format, null guards,
and listener cleanup.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
znetsixe
2026-04-07 13:41:00 +02:00
parent ea33b3bba3
commit 07af7cef40
10 changed files with 786 additions and 32 deletions

View File

@@ -254,10 +254,11 @@ class nodeClass {
* Start the periodic tick loop.
*/
_startTickLoop() {
setTimeout(() => {
this._startupTimeout = setTimeout(() => {
this._startupTimeout = null;
this._tickInterval = setInterval(() => this._tick(), 1000);
// Update node status on nodered screen every second ( this is not the best way to do this, but it works for now)
// Update node status on nodered screen every second
this._statusInterval = setInterval(() => {
const status = this._updateNodeStatus();
this.node.status(status);
@@ -284,15 +285,13 @@ class nodeClass {
* Attach the node's input handler, routing control messages to the class.
*/
_attachInputHandler() {
this.node.on('input', (msg, send, done) => {
/* Update to complete event based node by putting the tick function after an input event */
this.node.on('input', async (msg, send, done) => {
const m = this.source;
const nodeSend = typeof send === 'function' ? send : (outMsg) => this.node.send(outMsg);
try {
switch(msg.topic) {
case 'registerChild': {
// Register this node as a child of the parent node
const childId = msg.payload;
const childObj = this.RED.nodes.getNode(childId);
if (!childObj || !childObj.source) {
@@ -307,22 +306,22 @@ class nodeClass {
break;
case 'execSequence': {
const { source, action, parameter } = msg.payload;
m.handleInput(source, action, parameter);
await m.handleInput(source, action, parameter);
break;
}
case 'execMovement': {
const { source: mvSource, action: mvAction, setpoint } = msg.payload;
m.handleInput(mvSource, mvAction, Number(setpoint));
await m.handleInput(mvSource, mvAction, Number(setpoint));
break;
}
case 'flowMovement': {
const { source: fmSource, action: fmAction, setpoint: fmSetpoint } = msg.payload;
m.handleInput(fmSource, fmAction, Number(fmSetpoint));
await m.handleInput(fmSource, fmAction, Number(fmSetpoint));
break;
}
case 'emergencystop': {
const { source: esSource, action: esAction } = msg.payload;
m.handleInput(esSource, esAction);
await m.handleInput(esSource, esAction);
break;
}
case 'simulateMeasurement':
@@ -403,8 +402,28 @@ class nodeClass {
*/
_attachCloseHandler() {
this.node.on('close', (done) => {
clearTimeout(this._startupTimeout);
clearInterval(this._tickInterval);
clearInterval(this._statusInterval);
// Clean up child measurement listeners
const m = this.source;
if (m?.childMeasurementListeners) {
for (const [, entry] of m.childMeasurementListeners) {
if (typeof entry.emitter?.off === 'function') {
entry.emitter.off(entry.eventName, entry.handler);
} else if (typeof entry.emitter?.removeListener === 'function') {
entry.emitter.removeListener(entry.eventName, entry.handler);
}
}
m.childMeasurementListeners.clear();
}
// Clean up state emitter listeners
if (m?.state?.emitter) {
m.state.emitter.removeAllListeners();
}
if (typeof done === 'function') done();
});
}

View File

@@ -438,7 +438,10 @@ _callMeasurementHandler(measurementType, value, position, context) {
_normalizeCurveSection(section, fromYUnit, toYUnit, fromPressureUnit, toPressureUnit, sectionName) {
const normalized = {};
for (const [pressureKey, pair] of Object.entries(section || {})) {
const pressureEntries = Object.entries(section || {});
let prevMedianY = null;
for (const [pressureKey, pair] of pressureEntries) {
const canonicalPressure = this._convertUnitValue(
Number(pressureKey),
fromPressureUnit,
@@ -450,6 +453,21 @@ _callMeasurementHandler(measurementType, value, position, context) {
if (!xArray.length || !yArray.length || xArray.length !== yArray.length) {
throw new Error(`Invalid ${sectionName} section at pressure '${pressureKey}'.`);
}
// Cross-pressure anomaly detection: flag sudden jumps in median y between adjacent pressure levels
const sortedY = [...yArray].sort((a, b) => a - b);
const medianY = sortedY[Math.floor(sortedY.length / 2)];
if (prevMedianY != null && prevMedianY > 0) {
const ratio = medianY / prevMedianY;
if (ratio > 3 || ratio < 0.33) {
this.logger.warn(
`Curve anomaly in ${sectionName} at pressure ${pressureKey}: median y=${medianY.toFixed(2)} ` +
`deviates ${(ratio).toFixed(1)}x from adjacent level (${prevMedianY.toFixed(2)}). Check curve data.`
);
}
}
prevMedianY = medianY;
normalized[String(canonicalPressure)] = {
x: xArray,
y: yArray,
@@ -772,7 +790,7 @@ _callMeasurementHandler(measurementType, value, position, context) {
case "emergencystop":
this.logger.warn(`Emergency stop activated by '${source}'.`);
return await this.executeSequence("emergencyStop");
return await this.executeSequence("emergencystop");
case "statuscheck":
this.logger.info(`Status Check: Mode = '${this.currentMode}', Source = '${source}'.`);
@@ -972,7 +990,7 @@ _callMeasurementHandler(measurementType, value, position, context) {
// returns the best available pressure measurement to use in the prediction calculation
// this will be either the differential pressure, downstream or upstream pressure
getMeasuredPressure() {
if(this.hasCurve === false){
if(!this.hasCurve || !this.predictFlow || !this.predictPower || !this.predictCtrl){
this.logger.error(`No valid curve available to calculate prediction using last known pressure`);
return 0;
}
@@ -1321,13 +1339,33 @@ _callMeasurementHandler(measurementType, value, position, context) {
calcRelativeDistanceFromPeak(currentEfficiency,maxEfficiency,minEfficiency){
let distance = 1;
if(currentEfficiency != null){
if(currentEfficiency != null && maxEfficiency !== minEfficiency){
distance = this.interpolation.interpolate_lin_single_point(currentEfficiency,maxEfficiency, minEfficiency, 0, 1);
}
return distance;
}
showCoG() {
if (!this.hasCurve) {
return { error: 'No curve data available', cog: 0, NCog: 0, cogIndex: 0 };
}
const { cog, cogIndex, NCog, minEfficiency } = this.calcCog();
return {
cog,
cogIndex,
NCog,
NCogPercent: Math.round(NCog * 100 * 100) / 100,
minEfficiency,
currentEfficiencyCurve: this.currentEfficiencyCurve,
absDistFromPeak: this.absDistFromPeak,
relDistFromPeak: this.relDistFromPeak,
};
}
showWorkingCurves() {
if (!this.hasCurve) {
return { error: 'No curve data available' };
}
// Show the current curves for debugging
const { powerCurve, flowCurve } = this.getCurrentCurves();
return {
@@ -1345,6 +1383,9 @@ _callMeasurementHandler(measurementType, value, position, context) {
// Calculate the center of gravity for current pressure
calcCog() {
if (!this.hasCurve || !this.predictFlow || !this.predictPower) {
return { cog: 0, cogIndex: 0, NCog: 0, minEfficiency: 0 };
}
//fetch current curve data for power and flow
const { powerCurve, flowCurve } = this.getCurrentCurves();
@@ -1370,24 +1411,32 @@ _callMeasurementHandler(measurementType, value, position, context) {
const efficiencyCurve = [];
let peak = 0;
let peakIndex = 0;
let minEfficiency = 0;
let minEfficiency = Infinity;
// Calculate efficiency curve based on power and flow curves
if (!powerCurve?.y?.length || !flowCurve?.y?.length) {
return { efficiencyCurve: [], peak: 0, peakIndex: 0, minEfficiency: 0 };
}
// Specific flow ratio (Q/P): for variable-speed centrifugal pumps this is
// monotonically decreasing (P scales ~Q³ by affinity laws), so the peak is
// always at minimum flow and NCog = 0. The MGC BEP-Gravitation algorithm
// compensates via slope-based redistribution which IS sensitive to curve shape.
powerCurve.y.forEach((power, index) => {
// Get flow for the current power
const flow = flowCurve.y[index];
const eff = (power > 0 && flow >= 0) ? flow / power : 0;
efficiencyCurve.push(eff);
// higher efficiency is better
efficiencyCurve.push( Math.round( ( flow / power ) * 100 ) / 100);
// Keep track of peak efficiency
peak = Math.max(peak, efficiencyCurve[index]);
peakIndex = peak == efficiencyCurve[index] ? index : peakIndex;
minEfficiency = Math.min(...efficiencyCurve);
if (eff > peak) {
peak = eff;
peakIndex = index;
}
if (eff < minEfficiency) {
minEfficiency = eff;
}
});
if (!Number.isFinite(minEfficiency)) minEfficiency = 0;
return { efficiencyCurve, peak, peakIndex, minEfficiency };
}
@@ -1424,11 +1473,11 @@ _callMeasurementHandler(measurementType, value, position, context) {
this.logger.debug(`temp: ${temp} atmPressure : ${atmPressure} rho : ${rho} pressureDiff: ${pressureDiff?.value || 0}`);
const flowM3s = this.measurements.type('flow').variant('predicted').position('atEquipment').getCurrentValue('m3/s');
const powerWatt = this.measurements.type('power').variant('predicted').position('atEquipment').getCurrentValue('W');
const flowM3s = this.measurements.type('flow').variant(variant).position('atEquipment').getCurrentValue('m3/s');
const powerWatt = this.measurements.type('power').variant(variant).position('atEquipment').getCurrentValue('W');
this.logger.debug(`Flow : ${flowM3s} power: ${powerWatt}`);
if (power != 0 && flow != 0) {
if (power > 0 && flow > 0) {
const specificFlow = flow / power;
const specificEnergyConsumption = power / flow;
@@ -1470,18 +1519,31 @@ _callMeasurementHandler(measurementType, value, position, context) {
this.config = this.configUtils.updateConfig(this.config, newConfig);
//After we passed validation load the curves into their predictors
this.predictFlow.updateCurve(this.config.asset.machineCurve.nq);
this.predictPower.updateCurve(this.config.asset.machineCurve.np);
this.predictCtrl.updateCurve(this.reverseCurve(this.config.asset.machineCurve.nq));
if (!this.predictFlow || !this.predictPower || !this.predictCtrl) {
this.predictFlow = new predict({ curve: this.config.asset.machineCurve.nq });
this.predictPower = new predict({ curve: this.config.asset.machineCurve.np });
this.predictCtrl = new predict({ curve: this.reverseCurve(this.config.asset.machineCurve.nq) });
this.hasCurve = true;
} else {
this.predictFlow.updateCurve(this.config.asset.machineCurve.nq);
this.predictPower.updateCurve(this.config.asset.machineCurve.np);
this.predictCtrl.updateCurve(this.reverseCurve(this.config.asset.machineCurve.nq));
}
}
getCompleteCurve() {
if (!this.hasCurve || !this.predictPower || !this.predictFlow) {
return { powerCurve: null, flowCurve: null };
}
const powerCurve = this.predictPower.inputCurveData;
const flowCurve = this.predictFlow.inputCurveData;
return { powerCurve, flowCurve };
}
getCurrentCurves() {
if (!this.hasCurve || !this.predictPower || !this.predictFlow) {
return { powerCurve: { x: [], y: [] }, flowCurve: { x: [], y: [] } };
}
const powerCurve = this.predictPower.currentFxyCurve[this.predictPower.currentF];
const flowCurve = this.predictFlow.currentFxyCurve[this.predictFlow.currentF];