Compare commits

..

5 Commits

Author SHA1 Message Date
znetsixe
00e35302b4 fix(examples): complete ui-chart required props in edge.flow.json (flow-lint)
State-code chart was missing interpolation, axis property types, action, colors.
Lint-clean.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 19:18:44 +02:00
znetsixe
0a3a0be15b feat(commands): adopt unified command envelope — msg.origin + unit shorthand
- Provenance resolved via msg.origin (registry-stamped, default parent) with a
  legacy fallback to payload.source; feeds handleInput's mode/source gating.
- set.flow-setpoint: units:{measure,default} -> unit:'m3/h' shorthand.

181/181 tests green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 18:41:14 +02:00
znetsixe
889221fffd fix(rm): force-emit ctrl every tick (static alwaysEmitFields)
Realized control position is constant in steady state, so delta compression
emitted it ~once and the Grafana "% Control" line went invisible. Exempt
`ctrl` from delta compression so the pump's movement always traces.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 16:09:24 +02:00
znetsixe
a8d9895cbf fix(rotatingmachine): seed operating-point flow/power telemetry at boot
The operating-point series (flow.predicted.{downstream,atequipment},
power.predicted.atequipment) were only written by calcFlow/calcPower while
operational, or by _updateState on a state transition. A machine that boots
into idle and never runs therefore emitted these keys NEVER — so InfluxDB
carried only the flow envelope (max/min) and dashboard panels querying the
operating point rendered blank, unable to show even the off/0 state.

Seed them to 0 in _init() alongside max/min, so telemetry always carries the
operating point: 0 while idle, real values once the pump runs. Verified end to
end: keys now present in InfluxDB, the Grafana flow panel resolves, and the
real prediction path produces non-zero values (~98 m3/h, ~13 kW) that flow
through getOutput to Port 1.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 10:07:25 +02:00
znetsixe
455f15dc55 refactor(units): route all conversions through UnitPolicy.convert
Delete the legacy _convertUnitValue helper on the domain and the
duplicate convertUnitValue export on curveNormalizer; both were
identical to UnitPolicy.convert. Callers in flowController, the
curve normalizer, and buildQHCurve now go through this.unitPolicy.
The contract in .claude/refactor/CONTRACTS.md §6 named these as the
target migration; this finishes the rollout for rotatingMachine.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 13:43:26 +02:00
11 changed files with 102 additions and 58 deletions

View File

@@ -322,6 +322,22 @@
"removeOlderUnit": "3600", "removeOlderUnit": "3600",
"x": 1230, "x": 1230,
"y": 300, "y": 300,
"wires": [] "wires": [],
"interpolation": "linear",
"xAxisPropertyType": "timestamp",
"yAxisProperty": "payload",
"yAxisPropertyType": "msg",
"action": "append",
"colors": [
"#0095FF",
"#FF0000",
"#FF7F0E",
"#2CA02C",
"#A347E1",
"#D62728",
"#FF9896",
"#9467BD",
"#C5B0D5"
]
} }
] ]

View File

@@ -16,6 +16,16 @@ function _logger(source, ctx) {
return ctx?.logger || source?.logger || null; return ctx?.logger || source?.logger || null;
} }
// Resolve the command origin (control authority: parent | GUI | fysical).
// The shared commandRegistry stamps msg.origin (default 'parent'); legacy flows
// carried the origin as payload.source. Prefer the legacy field when present so
// existing flows keep working, otherwise use the registry-stamped msg.origin.
function _origin(msg) {
const p = msg && msg.payload;
if (p && typeof p === 'object' && typeof p.source === 'string' && p.source) return p.source;
return (typeof msg?.origin === 'string' && msg.origin) ? msg.origin : 'parent';
}
function _send(ctx, ports) { function _send(ctx, ports) {
if (typeof ctx?.send === 'function') ctx.send(ports); if (typeof ctx?.send === 'function') ctx.send(ports);
} }
@@ -28,19 +38,19 @@ exports.setMode = (source, msg) => {
// forwards to these directly so behaviour is identical. // forwards to these directly so behaviour is identical.
exports.startup = async (source, msg) => { exports.startup = async (source, msg) => {
const p = msg.payload || {}; const p = msg.payload || {};
await source.handleInput(p.source ?? 'parent', 'execSequence', 'startup'); await source.handleInput(_origin(msg), 'execSequence', 'startup');
}; };
exports.shutdown = async (source, msg) => { exports.shutdown = async (source, msg) => {
const p = msg.payload || {}; const p = msg.payload || {};
await source.handleInput(p.source ?? 'parent', 'execSequence', 'shutdown'); await source.handleInput(_origin(msg), 'execSequence', 'shutdown');
}; };
exports.estop = async (source, msg) => { exports.estop = async (source, msg) => {
const p = msg.payload || {}; const p = msg.payload || {};
// Legacy emergencystop carried { source, action } — action defaults to // Legacy emergencystop carried { source, action } — action defaults to
// 'emergencystop' when only source is supplied via the canonical topic. // 'emergencystop' when only source is supplied via the canonical topic.
await source.handleInput(p.source ?? 'parent', p.action ?? 'emergencystop'); await source.handleInput(_origin(msg), p.action ?? 'emergencystop');
}; };
// Content-based alias router: legacy `execSequence` carried payload.action in // Content-based alias router: legacy `execSequence` carried payload.action in
@@ -57,13 +67,13 @@ exports.execSequenceAlias = async (source, msg, ctx) => {
exports.setSetpoint = async (source, msg) => { exports.setSetpoint = async (source, msg) => {
const p = msg.payload || {}; const p = msg.payload || {};
const action = p.action ?? 'execMovement'; const action = p.action ?? 'execMovement';
await source.handleInput(p.source ?? 'parent', action, Number(p.setpoint)); await source.handleInput(_origin(msg), action, Number(p.setpoint));
}; };
exports.setFlowSetpoint = async (source, msg) => { exports.setFlowSetpoint = async (source, msg) => {
const p = msg.payload || {}; const p = msg.payload || {};
const action = p.action ?? 'flowMovement'; const action = p.action ?? 'flowMovement';
await source.handleInput(p.source ?? 'parent', action, Number(p.setpoint)); await source.handleInput(_origin(msg), action, Number(p.setpoint));
}; };
exports.simulateMeasurement = (source, msg, ctx) => { exports.simulateMeasurement = (source, msg, ctx) => {

View File

@@ -63,7 +63,7 @@ module.exports = [
topic: 'set.flow-setpoint', topic: 'set.flow-setpoint',
aliases: ['flowMovement'], aliases: ['flowMovement'],
payloadSchema: { type: 'object' }, payloadSchema: { type: 'object' },
units: { measure: 'volumeFlowRate', default: 'm3/h' }, unit: 'm3/h',
description: 'Move the machine to a flow setpoint via flowMovement.', description: 'Move the machine to a flow setpoint via flowMovement.',
handler: handlers.setFlowSetpoint, handler: handlers.setFlowSetpoint,
}, },

View File

@@ -1,39 +1,24 @@
const { convert } = require('generalFunctions');
/**
* Strict numeric unit conversion. Mirrors specificClass._convertUnitValue
* so the curve normalizer is testable without a Machine instance.
*/
function convertUnitValue(value, fromUnit, toUnit, contextLabel = 'unit conversion') {
const numeric = Number(value);
if (!Number.isFinite(numeric)) {
throw new Error(`${contextLabel}: value '${value}' is not finite`);
}
if (!fromUnit || !toUnit || fromUnit === toUnit) return numeric;
return convert(numeric).from(fromUnit).to(toUnit);
}
/** /**
* Convert one curve section (nq or np) from supplied units to canonical * Convert one curve section (nq or np) from supplied units to canonical
* units. Logs a warning when the per-pressure median y jumps by more than * units using the host UnitPolicy. Logs a warning when the per-pressure
* 3x relative to the previous pressure level — that almost always means the * median y jumps by more than 3x relative to the previous pressure level —
* curve file is corrupt (mixed units, swapped rows) and the predict module * that almost always means the curve file is corrupt (mixed units, swapped
* would otherwise silently produce nonsense values. * rows) and the predict module would otherwise silently produce nonsense.
*/ */
function normalizeCurveSection(section, fromYUnit, toYUnit, fromPressureUnit, toPressureUnit, sectionName, logger) { function normalizeCurveSection(section, unitPolicy, fromYUnit, toYUnit, fromPressureUnit, toPressureUnit, sectionName, logger) {
const normalized = {}; const normalized = {};
let prevMedianY = null; let prevMedianY = null;
for (const [pressureKey, pair] of Object.entries(section || {})) { for (const [pressureKey, pair] of Object.entries(section || {})) {
const canonicalPressure = convertUnitValue( const canonicalPressure = unitPolicy.convert(
Number(pressureKey), Number(pressureKey),
fromPressureUnit, fromPressureUnit,
toPressureUnit, toPressureUnit,
`${sectionName} pressure axis` `${sectionName} pressure axis`,
); );
const xArray = Array.isArray(pair?.x) ? pair.x.map(Number) : []; const xArray = Array.isArray(pair?.x) ? pair.x.map(Number) : [];
const yArray = Array.isArray(pair?.y) const yArray = Array.isArray(pair?.y)
? pair.y.map((v) => convertUnitValue(v, fromYUnit, toYUnit, `${sectionName} output`)) ? pair.y.map((v) => unitPolicy.convert(v, fromYUnit, toYUnit, `${sectionName} output`))
: []; : [];
if (!xArray.length || !yArray.length || xArray.length !== yArray.length) { if (!xArray.length || !yArray.length || xArray.length !== yArray.length) {
throw new Error(`Invalid ${sectionName} section at pressure '${pressureKey}'.`); throw new Error(`Invalid ${sectionName} section at pressure '${pressureKey}'.`);
@@ -74,21 +59,23 @@ function normalizeMachineCurve(rawCurve, unitPolicy, logger) {
return { return {
nq: normalizeCurveSection( nq: normalizeCurveSection(
rawCurve.nq, rawCurve.nq,
unitPolicy,
curveUnits.flow, curveUnits.flow,
canonicalFlow, canonicalFlow,
curveUnits.pressure, curveUnits.pressure,
canonicalPressure, canonicalPressure,
'nq', 'nq',
logger logger,
), ),
np: normalizeCurveSection( np: normalizeCurveSection(
rawCurve.np, rawCurve.np,
unitPolicy,
curveUnits.power, curveUnits.power,
canonicalPower, canonicalPower,
curveUnits.pressure, curveUnits.pressure,
canonicalPressure, canonicalPressure,
'np', 'np',
logger logger,
), ),
}; };
} }
@@ -114,4 +101,4 @@ function readCanonical(unitPolicy, type) {
return (unitPolicy.canonical || {})[type] || null; return (unitPolicy.canonical || {})[type] || null;
} }
module.exports = { normalizeMachineCurve, normalizeCurveSection, convertUnitValue }; module.exports = { normalizeMachineCurve, normalizeCurveSection };

View File

@@ -79,6 +79,12 @@ function buildQHCurve(predictors, ctrlPct, options = {}) {
if (!pf.inputCurve || typeof pf.inputCurve !== 'object') { if (!pf.inputCurve || typeof pf.inputCurve !== 'object') {
return { error: NO_CURVE_ERROR, points: [] }; return { error: NO_CURVE_ERROR, points: [] };
} }
const policy = options.unitPolicy || predictors.unitPolicy;
if (!policy) {
return { error: 'No unitPolicy available for Q-axis conversion', points: [] };
}
const flowFrom = policy.canonical?.flow || policy.canonical?.('flow');
const flowTo = policy.output?.flow || policy.output?.('flow');
const x = Number.isFinite(+ctrlPct) ? +ctrlPct : (pf.currentX ?? 0); const x = Number.isFinite(+ctrlPct) ? +ctrlPct : (pf.currentX ?? 0);
const RHO = 999.1; // kg/m³ — water at ~15 °C const RHO = 999.1; // kg/m³ — water at ~15 °C
const G = 9.80665; // m/s² const G = 9.80665; // m/s²
@@ -103,7 +109,8 @@ function buildQHCurve(predictors, ctrlPct, options = {}) {
for (const p of pressures) { for (const p of pressures) {
pf.fDimension = p; pf.fDimension = p;
const QM3s = pf.y(x); const QM3s = pf.y(x);
points.push({ Q: QM3s * 3600, H: p / (RHO * G), dpPa: p }); const Q = policy.convert(QM3s, flowFrom, flowTo, 'buildQHCurve Q-axis');
points.push({ Q, H: p / (RHO * G), dpPa: p });
} }
} finally { } finally {
pf.fDimension = originalF; pf.fDimension = originalF;

View File

@@ -50,7 +50,7 @@ class FlowController {
return await host.executeSequence(parameter); return await host.executeSequence(parameter);
case 'flowmovement': { case 'flowmovement': {
const canonicalFlowSetpoint = host._convertUnitValue( const canonicalFlowSetpoint = host.unitPolicy.convert(
parameter, parameter,
host.unitPolicy.output.flow, host.unitPolicy.output.flow,
host.unitPolicy.canonical.flow, host.unitPolicy.canonical.flow,

View File

@@ -11,6 +11,10 @@ class nodeClass extends BaseNodeAdapter {
static commands = commands; static commands = commands;
static tickInterval = null; static tickInterval = null;
static statusInterval = 1000; static statusInterval = 1000;
// Realized control position holds constant in steady state, so delta
// compression would emit it ~once and the Grafana "% Control" line goes
// invisible. Force it every tick so the pump's movement always traces.
static alwaysEmitFields = ['ctrl'];
buildDomainConfig(uiConfig) { buildDomainConfig(uiConfig) {
_rejectLegacyAssetFields(uiConfig); _rejectLegacyAssetFields(uiConfig);

View File

@@ -229,10 +229,18 @@ class Machine extends BaseDomain {
this.measurements.type('temperature').variant('measured').position('atEquipment').value(15, Date.now(), tu); this.measurements.type('temperature').variant('measured').position('atEquipment').value(15, Date.now(), tu);
this.measurements.type('atmPressure').variant('measured').position('atEquipment').value(101325, Date.now(), 'Pa'); this.measurements.type('atmPressure').variant('measured').position('atEquipment').value(101325, Date.now(), 'Pa');
const fu = this.unitPolicy.canonical.flow; const fu = this.unitPolicy.canonical.flow;
const pu = this.unitPolicy.canonical.power;
const fmin = this.predictFlow ? this.predictFlow.currentFxyYMin : 0; const fmin = this.predictFlow ? this.predictFlow.currentFxyYMin : 0;
const fmax = this.predictFlow ? this.predictFlow.currentFxyYMax : 0; const fmax = this.predictFlow ? this.predictFlow.currentFxyYMax : 0;
this.measurements.type('flow').variant('predicted').position('max').value(fmax, Date.now(), fu); this.measurements.type('flow').variant('predicted').position('max').value(fmax, Date.now(), fu);
this.measurements.type('flow').variant('predicted').position('min').value(fmin, Date.now(), fu); this.measurements.type('flow').variant('predicted').position('min').value(fmin, Date.now(), fu);
// Seed the operating-point series at boot so telemetry always carries them
// (0 while idle, real values once calcFlow/calcPower run when operational).
// Without this an idle-from-boot machine never emits these keys — the
// dashboard can't even show the off/0 state. Mirrors max/min above.
this.measurements.type('flow').variant('predicted').position('downstream').value(0, Date.now(), fu);
this.measurements.type('flow').variant('predicted').position('atEquipment').value(0, Date.now(), fu);
this.measurements.type('power').variant('predicted').position('atEquipment').value(0, Date.now(), pu);
} }
_callMeasurementHandler(measurementType, value, position, context = {}) { _callMeasurementHandler(measurementType, value, position, context = {}) {
@@ -247,12 +255,6 @@ class Machine extends BaseDomain {
if (!this.isUnitValidForType(type, u)) throw new Error(`Unsupported unit '${u}' for ${type} measurement.`); if (!this.isUnitValidForType(type, u)) throw new Error(`Unsupported unit '${u}' for ${type} measurement.`);
return u; return u;
} }
_convertUnitValue(value, from, to, ctx = 'unit conversion') {
const n = Number(value);
if (!Number.isFinite(n)) throw new Error(`${ctx}: value '${value}' is not finite`);
if (!from || !to || from === to) return n;
return convert(n).from(from).to(to);
}
_measurementPositionForMetric(metricId) { return metricId === 'power' ? 'atEquipment' : 'downstream'; } _measurementPositionForMetric(metricId) { return metricId === 'power' ? 'atEquipment' : 'downstream'; }
_resolveProcessRangeForMetric(metricId, predicted, measured) { _resolveProcessRangeForMetric(metricId, predicted, measured) {
let processMin = NaN; let processMax = NaN; let processMin = NaN; let processMax = NaN;

View File

@@ -5,7 +5,6 @@ const { UnitPolicy } = require('generalFunctions');
const { const {
normalizeMachineCurve, normalizeMachineCurve,
normalizeCurveSection, normalizeCurveSection,
convertUnitValue,
} = require('../../src/curves/curveNormalizer'); } = require('../../src/curves/curveNormalizer');
function makePolicy() { function makePolicy() {
@@ -50,39 +49,33 @@ test('normalizeMachineCurve: converts pressure mbar -> Pa and flow m3/h -> m3/s'
}); });
test('normalizeCurveSection: warns on cross-pressure median > 3x jump', () => { test('normalizeCurveSection: warns on cross-pressure median > 3x jump', () => {
const policy = makePolicy();
const logger = captureLogger(); const logger = captureLogger();
const section = { const section = {
1000: { x: [0, 50, 100], y: [0, 5, 10] }, // median 5 1000: { x: [0, 50, 100], y: [0, 5, 10] }, // median 5
1100: { x: [0, 50, 100], y: [0, 50, 100] }, // median 50 (10x jump) 1100: { x: [0, 50, 100], y: [0, 50, 100] }, // median 50 (10x jump)
}; };
normalizeCurveSection(section, 'm3/h', 'm3/h', 'mbar', 'mbar', 'nq', logger); normalizeCurveSection(section, policy, 'm3/h', 'm3/h', 'mbar', 'mbar', 'nq', logger);
const hit = logger.warns.find((w) => /Curve anomaly/.test(w)); const hit = logger.warns.find((w) => /Curve anomaly/.test(w));
assert.ok(hit, `expected a Curve anomaly warning, got: ${JSON.stringify(logger.warns)}`); assert.ok(hit, `expected a Curve anomaly warning, got: ${JSON.stringify(logger.warns)}`);
assert.match(hit, /pressure 1100/); assert.match(hit, /pressure 1100/);
}); });
test('normalizeCurveSection: does not warn on smooth progressions', () => { test('normalizeCurveSection: does not warn on smooth progressions', () => {
const policy = makePolicy();
const logger = captureLogger(); const logger = captureLogger();
const section = { const section = {
1000: { x: [0, 50, 100], y: [0, 5, 10] }, 1000: { x: [0, 50, 100], y: [0, 5, 10] },
1100: { x: [0, 50, 100], y: [0, 6, 11] }, 1100: { x: [0, 50, 100], y: [0, 6, 11] },
}; };
normalizeCurveSection(section, 'm3/h', 'm3/h', 'mbar', 'mbar', 'nq', logger); normalizeCurveSection(section, policy, 'm3/h', 'm3/h', 'mbar', 'mbar', 'nq', logger);
assert.equal(logger.warns.filter((w) => /Curve anomaly/.test(w)).length, 0); assert.equal(logger.warns.filter((w) => /Curve anomaly/.test(w)).length, 0);
}); });
test('normalizeCurveSection: throws when x/y length mismatch', () => { test('normalizeCurveSection: throws when x/y length mismatch', () => {
const policy = makePolicy();
assert.throws( assert.throws(
() => normalizeCurveSection({ 1000: { x: [0, 50], y: [0, 5, 10] } }, 'm3/h', 'm3/s', 'mbar', 'Pa', 'nq', null), () => normalizeCurveSection({ 1000: { x: [0, 50], y: [0, 5, 10] } }, policy, 'm3/h', 'm3/s', 'mbar', 'Pa', 'nq', null),
/Invalid nq section/ /Invalid nq section/
); );
}); });
test('convertUnitValue: identity when units match or missing', () => {
assert.equal(convertUnitValue(42, 'm3/h', 'm3/h'), 42);
assert.equal(convertUnitValue(42, null, null), 42);
});
test('convertUnitValue: throws on non-finite input', () => {
assert.throws(() => convertUnitValue('not-a-number', 'm3/h', 'm3/s', 'test'), /not finite/);
});

View File

@@ -27,6 +27,10 @@ function makeHost({
unitPolicy: { unitPolicy: {
canonical: { flow: 'm3/s' }, canonical: { flow: 'm3/s' },
output: { flow: 'm3/h' }, output: { flow: 'm3/h' },
convert: (val, from, to, label) => {
host.calls.convertUnit.push({ val, from, to, label });
return val * 1000; // pretend m3/h -> m3/s factor
},
}, },
isValidActionForMode: (action) => allowedActions.has(action), isValidActionForMode: (action) => allowedActions.has(action),
isValidSourceForMode: () => allowedSources, isValidSourceForMode: () => allowedSources,
@@ -38,10 +42,6 @@ function makeHost({
return { moved: sp }; return { moved: sp };
}, },
calcCtrl: (canonicalFlow) => { host.calls.calcCtrl.push(canonicalFlow); return canonicalFlow / 2; }, calcCtrl: (canonicalFlow) => { host.calls.calcCtrl.push(canonicalFlow); return canonicalFlow / 2; },
_convertUnitValue: (val, from, to, label) => {
host.calls.convertUnit.push({ val, from, to, label });
return val * 1000; // pretend m3/h -> m3/s factor
},
}; };
return host; return host;
} }

View File

@@ -36,6 +36,31 @@ test('getOutput contains all required fields in idle state', () => {
assert.ok('pressureDriftFlags' in output); assert.ok('pressureDriftFlags' in output);
}); });
test('getOutput seeds operating-point flow/power telemetry at boot (idle = 0, not absent)', () => {
// Regression: an idle-from-boot machine must still emit the operating-point
// series so dashboards can show the off/0 state. These keys are otherwise
// only written once the pump runs (calcFlow/calcPower) or on a state
// transition, leaving them absent in telemetry for a pump that never starts.
const machine = new Machine(makeMachineConfig(), makeStateConfig());
const output = machine.getOutput();
const hasPrefix = (p) => Object.keys(output).some((k) => k.startsWith(p));
const valueFor = (p) => output[Object.keys(output).find((k) => k.startsWith(p))];
for (const prefix of [
'flow.predicted.downstream',
'flow.predicted.atequipment',
'power.predicted.atequipment',
]) {
assert.ok(hasPrefix(prefix), `${prefix}.* must be present at boot (idle)`);
assert.equal(valueFor(prefix), 0, `${prefix}.* should be 0 while idle`);
}
// The envelope keys remain present too.
assert.ok(hasPrefix('flow.predicted.max'));
assert.ok(hasPrefix('flow.predicted.min'));
});
test('getOutput flow drift fields appear after sufficient measured flow samples', async () => { test('getOutput flow drift fields appear after sufficient measured flow samples', async () => {
const machine = new Machine(makeMachineConfig(), makeStateConfig()); const machine = new Machine(makeMachineConfig(), makeStateConfig());