Compare commits

..

4 Commits

Author SHA1 Message Date
znetsixe
ce4fb4e5d0 feat(commandRegistry): unify command envelope — origin, unit shorthand, always-convert
Shared command-dispatch layer used by every EVOLV node:
- Always-convert: numeric strings ("60") and {value:"60"} now normalise +
  convert like numbers; closes the gap where strings reached handlers raw.
- unit: 'm3/h' shorthand on descriptors; measure is derived from the unit
  (legacy units:{measure,default} still accepted, measure re-derived).
  Unrecognised declared unit throws at construction.
- msg.origin stamped on every dispatch (parent|GUI|fysical, default parent).
- Opt-in gated:true arbitration: accept only if origin in
  source.config.mode.allowedSources[currentMode]; advisory allow-all when a
  node has no mode model. Handles Set- or array-valued allowedSources.

+18 registry tests (45 total, green). All consumer nodes verified green.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-05-29 18:41:00 +02:00
znetsixe
5c091cdce9 feat(config): add planner.emergencyPressurePa for MGC rendezvous emergency bypass
Documented, defaults to null (inert). When set, the MGC pre-empts an in-flight
rendezvous lock and re-plans immediately if the resolved header pressure reaches
this canonical-Pa threshold. Mechanism is wired + tested; never fires until a
real value is configured.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 17:47:57 +02:00
znetsixe
c0be50d02c feat(output): alwaysEmit fields, drop undefined/empty Influx tags, time-based movement re-basing
- OutputUtils: new `alwaysEmit` option exempts named fields from delta
  compression so steady-state values (e.g. ctrl) trace continuously.
- flattenTags now drops null/undefined/empty-string tag values, fixing
  literal `category="undefined"` tags that split every Grafana series in two.
- BaseNodeAdapter wires `static alwaysEmitFields` from the subclass.
- movementManager: track position by elapsed wall-time and capture partial
  progress on abort, so a fast-re-commanding parent can't freeze an actuator
  at its start position.
- Tests for the above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-27 16:09:14 +02:00
znetsixe
bc79de133e fix(influx): accept tagCode camelCase and emit positionVsParent tag
The asset config standardised on tagCode (camelCase) but the InfluxDB
tag emitter still read the lowercase tagcode, so any node saved through
the new editor silently emitted tags.tagcode: undefined. Read both
spellings so old + new configs both produce the tag.

Also surfaces functionality.positionVsParent as a tag so dashboards
can filter by upstream/downstream side.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-23 15:29:39 +02:00
8 changed files with 452 additions and 69 deletions

View File

@@ -149,6 +149,13 @@
"type": "boolean",
"description": "If true, every dispatch is routed through the rendezvous planner regardless of control strategy: per-pump moves are delayed so all pumps reach their setpoint at the same wall-clock instant t* = max(eta_i). If false, all flowmovement commands fire immediately and each pump ramps at its own speed (legacy behaviour)."
}
},
"emergencyPressurePa": {
"default": null,
"rules": {
"type": "number",
"description": "Safety threshold (canonical Pa) for the rendezvous emergency bypass. While a rendezvous is in flight new setpoints are locked out and queued sequentially; if the resolved header pressure reaches this value the lock is pre-empted and the group re-plans immediately. Null/unset (the default) leaves the bypass mechanism wired but INERT — it never fires until a real threshold is configured."
}
}
},
"mode": {

View File

@@ -2,8 +2,16 @@ const { getFormatter } = require('./formatters');
//this class will handle the output events for the node red node
class OutputUtils {
constructor() {
// `options.alwaysEmit` is an optional list of field keys that bypass delta
// compression: they are re-emitted on every tick even when unchanged. Use it
// sparingly for slowly-varying values that must still trace as a continuous
// line downstream (e.g. a pump's realized control position `ctrl`, which sits
// constant in steady state and otherwise produces ~1 point per long stretch —
// invisible in a Grafana timeseries with createEmpty:false). Defaults to none,
// so existing nodes keep pure delta-compression behaviour.
constructor(options = {}) {
this.output = {};
this.alwaysEmit = new Set(options.alwaysEmit || []);
}
checkForChanges(output, format) {
@@ -13,7 +21,9 @@ class OutputUtils {
this.output[format] = this.output[format] || {};
const changedFields = {};
for (const key in output) {
if (Object.prototype.hasOwnProperty.call(output, key) && output[key] !== this.output[format][key]) {
if (!Object.prototype.hasOwnProperty.call(output, key)) continue;
const forced = this.alwaysEmit.has(key) && output[key] !== undefined;
if (forced || output[key] !== this.output[format][key]) {
let value = output[key];
// For fields: if the value is an object (and not a Date), stringify it.
if (value !== null && typeof value === 'object' && !(value instanceof Date)) {
@@ -79,7 +89,13 @@ class OutputUtils {
for (const key in obj) {
if (Object.prototype.hasOwnProperty.call(obj, key)) {
const value = obj[key];
if (value !== null && typeof value === 'object' && !(value instanceof Date)) {
// Skip tags that carry no information. When a config field is unset,
// extractRelevantConfig hands us `undefined`; stringifying that wrote
// literal `category="undefined"` / `geoLocation="undefined"` tags that
// clutter every Grafana legend and needlessly inflate tag cardinality.
// Drop null / undefined / empty-string before they reach InfluxDB.
if (value === null || value === undefined || value === '') continue;
if (typeof value === 'object' && !(value instanceof Date)) {
// Recursively flatten the nested object.
const flatChild = this.flattenTags(value);
for (const childKey in flatChild) {
@@ -104,9 +120,10 @@ class OutputUtils {
// functionality properties
softwareType: config.functionality?.softwareType,
role: config.functionality?.role,
positionVsParent: config.functionality?.positionVsParent,
// asset properties (exclude machineCurve)
uuid: config.asset?.uuid,
tagcode: config.asset?.tagcode,
tagcode: config.asset?.tagCode || config.asset?.tagcode,
geoLocation: config.asset?.geoLocation,
category: config.asset?.category,
type: config.asset?.type,

View File

@@ -82,7 +82,9 @@ class BaseNodeAdapter {
// pumpingStation/measurement nodeClass _attachInputHandler patterns.
this.node.source = this.source;
this._output = new OutputUtils();
// `static alwaysEmitFields = ['ctrl', …]` on a subclass exempts those
// fields from delta compression so they trace continuously downstream.
this._output = new OutputUtils({ alwaysEmit: ctor.alwaysEmitFields });
const userHasUnitsQuery = ctor.commands.some(
(c) => c && (c.topic === 'query.units' || (Array.isArray(c.aliases) && c.aliases.includes('query.units'))));
const mergedCommands = userHasUnitsQuery

View File

@@ -26,14 +26,60 @@ function _describeUnit(unit) {
try { return convert().describe(unit); } catch (_) { return null; }
}
// A numeric scalar is a finite number, or a non-empty string that parses to a
// finite number ("60", "1.5"). Node-RED inject/`change` nodes and upstream MQTT
// payloads routinely arrive as strings; treating them as non-numeric here is the
// gap that let values reach a handler unconverted.
function _asNumber(x) {
if (typeof x === 'number') return Number.isFinite(x) ? x : null;
if (typeof x === 'string' && x.trim() !== '') {
const n = Number(x);
return Number.isFinite(n) ? n : null;
}
return null;
}
function _extractValueAndUnit(msg) {
if (!msg || typeof msg !== 'object') return null;
const p = msg.payload;
if (typeof p === 'number') return { value: p, unit: msg.unit };
if (p && typeof p === 'object' && typeof p.value === 'number') {
return { value: p.value, unit: p.unit ?? msg.unit };
if (p && typeof p === 'object') {
const value = _asNumber(p.value);
if (value === null) return null;
return { value, unit: p.unit ?? msg.unit };
}
return null;
const value = _asNumber(p);
if (value === null) return null;
return { value, unit: msg.unit };
}
// Derive the dimensional measure (e.g. 'volumeFlowRate') from a unit string.
// Returns null when convert doesn't recognise the unit.
function _measureOf(unit) {
const desc = _describeUnit(unit);
return desc ? desc.measure : null;
}
// Command origin = which control authority issued this message (the rotatingMachine
// `allowedSources` vocabulary: 'parent' = automation/parent controller, 'GUI' =
// SCADA/HMI operator, 'fysical' = physical buttons). Default 'parent'. Named
// `origin` on the message because `source` is already the domain instance handed
// to handlers.
const DEFAULT_ORIGIN = 'parent';
function _resolveOrigin(msg, descriptor) {
const o = msg && typeof msg.origin === 'string' && msg.origin.trim() !== ''
? msg.origin.trim()
: (descriptor.defaultOrigin || DEFAULT_ORIGIN);
return o;
}
// allowedSources values may be a Set (post config processing, as rotatingMachine
// stores them) or a plain array (raw config / other nodes). Accept both.
function _setHas(coll, value) {
if (!coll) return false;
if (typeof coll.has === 'function') return coll.has(value);
if (Array.isArray(coll)) return coll.includes(value);
return false;
}
class CommandRegistry {
@@ -76,6 +122,8 @@ class CommandRegistry {
payloadSchema: cmd.payloadSchema || null,
description: typeof cmd.description === 'string' ? cmd.description : null,
units,
gated: cmd.gated === true,
defaultOrigin: typeof cmd.defaultOrigin === 'string' ? cmd.defaultOrigin : null,
handler: cmd.handler,
};
this._byKey.set(cmd.topic, descriptor);
@@ -87,12 +135,25 @@ class CommandRegistry {
}
_validateUnits(cmd) {
if (cmd.units === undefined || cmd.units === null) return null;
const { measure, default: def } = cmd.units;
if (typeof measure !== 'string' || measure.length === 0 ||
typeof def !== 'string' || def.length === 0) {
// Two ways to declare the unit, normalised to the same internal shape:
// unit: 'm3/h' (preferred — measure derived)
// units: { default: 'm3/h' } (measure derived)
// units: { measure, default: 'm3/h' } (legacy — measure ignored, derived)
// The measure is always derived from the unit so it can never drift from it.
let def;
if (typeof cmd.unit === 'string') def = cmd.unit;
else if (cmd.units === undefined || cmd.units === null) return null;
else if (typeof cmd.units === 'string') def = cmd.units;
else def = cmd.units.default;
if (typeof def !== 'string' || def.length === 0) {
throw new TypeError(
`command '${cmd.topic}' units requires { measure: string, default: string }`);
`command '${cmd.topic}' requires a unit string (unit: 'm3/h' or units: { default: 'm3/h' })`);
}
const measure = _measureOf(def);
if (!measure) {
throw new TypeError(
`command '${cmd.topic}' declares unit '${def}' which convert does not recognise`);
}
return { measure, default: def };
}
@@ -137,11 +198,31 @@ class CommandRegistry {
return;
}
if (topic !== descriptor.topic) this._noteAlias(topic, descriptor.topic, log);
// Always stamp the command origin so handlers + gating can rely on it.
msg.origin = _resolveOrigin(msg, descriptor);
if (!this._originAllowed(descriptor, source, msg.origin, log)) return;
if (descriptor.units) this._normaliseUnits(descriptor, msg, log);
if (!this._validatePayload(descriptor, msg, log)) return;
return descriptor.handler(source, msg, ctx);
}
// Mode-gated control-authority arbitration. Opt-in per command via
// `gated: true`. The asset's mode (e.g. rotatingMachine's auto /
// virtualControl / fysicalControl) decides which origins it accepts via
// `source.config.mode.allowedSources[mode]`. Release = changing the mode.
// Nodes without a mode model are advisory (allow-all) so this is inert
// until a node opts in — never a silent behaviour change.
_originAllowed(descriptor, source, origin, log) {
if (!descriptor.gated) return true;
const allowedSources = source && source.config && source.config.mode
? source.config.mode.allowedSources : null;
const mode = source ? source.currentMode : undefined;
if (!allowedSources || !mode) return true; // no mode model → advisory
if (_setHas(allowedSources[mode], origin)) return true;
log.warn?.(`${descriptor.topic}: origin '${origin}' not allowed in mode '${mode}'`);
return false;
}
_noteAlias(alias, canonical, log) {
const prev = this._deprecationCounts.get(alias) || 0;
this._deprecationCounts.set(alias, prev + 1);

View File

@@ -79,65 +79,70 @@ class movementManager {
// Clamp the final target into [minPosition, maxPosition]
targetPosition = this.constrain(targetPosition);
// Compute direction and remaining distance
const direction = targetPosition > this.currentPosition ? 1 : -1;
const distance = Math.abs(targetPosition - this.currentPosition);
// Snapshot the starting point. Position is derived from ELAPSED WALL-TIME
// (not accumulated per-tick steps) so an interruption that lands between
// ticks — or before the very first tick — still leaves currentPosition at
// the real distance travelled. A fast re-commanding parent (e.g. MGC
// updating demand every tick) then re-bases from the true position instead
// of freezing at the start. See _settleAt / the abort handler below.
const startPosition = this.currentPosition;
const direction = targetPosition > startPosition ? 1 : -1;
const distance = Math.abs(targetPosition - startPosition);
const velocity = this.getVelocity(); // units per second
if (velocity <= 0) {
return reject(new Error("Movement aborted: zero speed"));
}
// Duration and bookkeeping
const duration = distance / velocity; // seconds to go the remaining distance
const duration = distance / velocity; // seconds to go the full distance
this.timeleft = duration;
this.logger.debug(
`Linear move: dir=${direction}, dist=${distance}, vel=${velocity.toFixed(2)} u/s, dur=${duration.toFixed(2)}s`
);
// Compute how much to move each tick
const intervalMs = this.interval;
const intervalSec = intervalMs / 1000;
const stepSize = direction * velocity * intervalSec;
const startTime = Date.now();
// Position reached after `elapsedSec` of travel, clamped to the target.
const posAt = (elapsedSec) =>
this.constrain(startPosition + direction * Math.min(distance, velocity * elapsedSec));
// Re-base currentPosition (and timeleft) onto the real elapsed progress.
const settle = () => {
const elapsed = (Date.now() - startTime) / 1000;
this.currentPosition = posAt(elapsed);
this.timeleft = Math.max(0, duration - elapsed);
this.emitPos(this.currentPosition);
return elapsed;
};
// Kick off the loop
const intervalId = setInterval(() => {
// 7a) Abort check
if (signal?.aborted) {
clearInterval(intervalId);
settle();
return reject(new Error("Movement aborted"));
}
// Advance position and clamp
this.currentPosition += stepSize;
this.currentPosition = this.constrain(this.currentPosition);
this.emitPos(this.currentPosition);
// Update timeleft
const elapsed = (Date.now() - startTime) / 1000;
this.timeleft = Math.max(0, duration - elapsed);
const elapsed = settle();
this.logger.debug(
`pos=${this.currentPosition.toFixed(2)}, timeleft=${this.timeleft.toFixed(2)}`
);
// Completed the move?
if (
(direction > 0 && this.currentPosition >= targetPosition) ||
(direction < 0 && this.currentPosition <= targetPosition)
) {
// Completed the move? (time-based so it can't overshoot/undershoot)
if (elapsed >= duration) {
clearInterval(intervalId);
this.currentPosition = targetPosition;
this.timeleft = 0;
this.emitPos(this.currentPosition);
return resolve("Reached target move.");
}
}, intervalMs);
// 8) Also catch aborts that happen before the first tick
// Catch aborts that happen between ticks (incl. before the first tick):
// capture the partial progress so the move re-bases instead of freezing.
signal?.addEventListener("abort", () => {
clearInterval(intervalId);
settle();
reject(new Error("Movement aborted"));
});
});
@@ -213,8 +218,8 @@ class movementManager {
return reject(new Error("Movement aborted"));
}
const totalDistance = Math.abs(targetPosition - this.currentPosition);
const startPosition = this.currentPosition;
const totalDistance = Math.abs(targetPosition - this.currentPosition);
const velocity = this.getVelocity();
if (velocity <= 0) {
return reject(new Error("Movement aborted: zero speed"));
@@ -223,45 +228,53 @@ class movementManager {
const easeFunction = (t) =>
t < 0.5 ? 4 * t * t * t : 1 - Math.pow(-2 * t + 2, 3) / 2;
let elapsedTime = 0;
const duration = totalDistance / velocity;
this.timeleft = duration;
const interval = this.interval;
const startTime = Date.now();
// Position from ELAPSED WALL-TIME (eased), so an interruption between
// ticks re-bases from the real position rather than freezing at the
// start — same rationale as moveLinear.
const posAt = (elapsedSec) => {
const progress = duration > 0 ? Math.min(elapsedSec / duration, 1) : 1;
return startPosition + (targetPosition - startPosition) * easeFunction(progress);
};
const settle = () => {
const elapsed = (Date.now() - startTime) / 1000;
this.currentPosition = posAt(elapsed);
this.timeleft = Math.max(0, duration - elapsed);
this.emitPos(this.currentPosition);
return elapsed;
};
// 2) Start the moving loop
const intervalId = setInterval(() => {
// 3) Check for abort on each tick
if (signal?.aborted) {
clearInterval(intervalId);
settle();
return reject(new Error("Movement aborted"));
}
elapsedTime += interval / 1000;
const progress = Math.min(elapsedTime / duration, 1);
this.timeleft = duration - elapsedTime;
const easedProgress = easeFunction(progress);
const newPosition =
startPosition + (targetPosition - startPosition) * easedProgress;
this.emitPos(newPosition);
const elapsed = settle();
this.logger.debug(
`Using ${this.movementMode} => Progress=${progress.toFixed(
2
)}, Eased=${easedProgress.toFixed(2)}`
`Using ${this.movementMode} => elapsed=${elapsed.toFixed(2)}s, pos=${this.currentPosition.toFixed(2)}`
);
if (progress >= 1) {
if (elapsed >= duration) {
clearInterval(intervalId);
this.currentPosition = targetPosition;
this.timeleft = 0;
this.emitPos(this.currentPosition);
resolve(`Reached target move.`);
} else {
this.currentPosition = newPosition;
}
}, interval);
// 4) Also listen once for abort before first tick
// 4) Capture partial progress on aborts between/before ticks.
signal?.addEventListener("abort", () => {
clearInterval(intervalId);
settle();
reject(new Error("Movement aborted"));
});
});

View File

@@ -394,43 +394,163 @@ test('units: object payload {value} without unit falls back to default unit sile
assert.equal(logger._calls.warn.length, 0);
});
test('units: non-numeric payload (no normalisation applied) passes through to handler', async () => {
test('units: non-NUMERIC string payload (not normalisable) passes through to handler', async () => {
const logger = makeLogger();
const seen = [];
const reg = createRegistry([{
topic: 'set.demand',
units: { measure: 'volumeFlowRate', default: 'm3/h' },
unit: 'm3/h',
handler: (_s, msg) => { seen.push(msg.payload); },
}], { logger });
// string payload — not normalisable. Should not crash; handler still fires.
// non-numeric string — not normalisable. Should not crash; handler still fires.
await reg.dispatch({ topic: 'set.demand', payload: 'magic' }, {}, {});
assert.equal(seen.length, 1);
assert.equal(seen[0], 'magic');
});
test('units: missing default field throws at construction', () => {
test('units: NUMERIC string payload is always converted (closes the string gap)', async () => {
const logger = makeLogger();
const seen = [];
const reg = createRegistry([{
topic: 'set.demand',
unit: 'm3/h',
handler: (_s, msg) => { seen.push({ payload: msg.payload, unit: msg.unit }); },
}], { logger });
// "1" m3/s must convert to 3600 m3/h — same as the numeric-payload case.
await reg.dispatch({ topic: 'set.demand', payload: '1', unit: 'm3/s' }, {}, {});
assert.equal(seen.length, 1);
assert.ok(Math.abs(seen[0].payload - 3600) < 1e-6, `expected 3600, got ${seen[0].payload}`);
assert.equal(seen[0].unit, 'm3/h');
assert.equal(logger._calls.warn.length, 0);
});
test('units: { value: numeric-string } object payload is converted too', async () => {
const seen = [];
const reg = createRegistry([{
topic: 'set.pressure',
unit: 'Pa',
handler: (_s, msg) => { seen.push({ payload: msg.payload, unit: msg.unit }); },
}]);
await reg.dispatch({ topic: 'set.pressure', payload: { value: '5', unit: 'mbar' } }, {}, {});
assert.ok(Math.abs(seen[0].payload - 500) < 1e-6, `expected 500, got ${seen[0].payload}`);
assert.equal(seen[0].unit, 'Pa');
});
test('unit: shorthand declares the unit and DERIVES the measure', async () => {
const seen = [];
const reg = createRegistry([{
topic: 'set.demand',
unit: 'm3/h',
handler: (_s, msg) => { seen.push({ payload: msg.payload, unit: msg.unit }); },
}]);
await reg.dispatch({ topic: 'set.demand', payload: 1, unit: 'm3/s' }, {}, {});
assert.ok(Math.abs(seen[0].payload - 3600) < 1e-6);
assert.equal(seen[0].unit, 'm3/h');
});
test('units: { default } without measure derives the measure (measure no longer required)', () => {
const reg = createRegistry([{
topic: 'set.demand',
units: { default: 'm3/h' },
handler: () => {},
}]);
assert.deepEqual(reg.list()[0].units, { measure: 'volumeFlowRate', default: 'm3/h' });
});
test('units: legacy { measure, default } still works; measure is re-derived from the unit', () => {
const reg = createRegistry([{
topic: 'set.demand',
units: { measure: 'totallyWrong', default: 'm3/h' },
handler: () => {},
}]);
// declared measure ignored — derived from the unit so it can never drift.
assert.deepEqual(reg.list()[0].units, { measure: 'volumeFlowRate', default: 'm3/h' });
});
test('units: missing unit/default throws at construction', () => {
assert.throws(() => createRegistry([{
topic: 'set.demand',
units: { measure: 'volumeFlowRate' },
handler: () => {},
}]), /units requires/);
}]), /requires a unit string/);
});
test('units: missing measure field throws at construction', () => {
test('units: unrecognised declared unit throws at construction (descriptor bug caught early)', () => {
assert.throws(() => createRegistry([{
topic: 'set.demand',
units: { default: 'm3/h' },
unit: 'flarbargs',
handler: () => {},
}]), /units requires/);
}]), /convert does not recognise/);
});
test('units: descriptor.units surfaces in list() output', () => {
const reg = createRegistry([
{ topic: 'set.demand', units: { measure: 'volumeFlowRate', default: 'm3/h' }, handler: () => {} },
{ topic: 'set.demand', unit: 'm3/h', handler: () => {} },
{ topic: 'set.mode', handler: () => {} },
]);
const list = reg.list();
assert.deepEqual(list[0].units, { measure: 'volumeFlowRate', default: 'm3/h' });
assert.equal(list[1].units, null);
});
// ---------------------------------------------------------------------------
// origin — command provenance + mode-gated control-authority arbitration
// ---------------------------------------------------------------------------
test('origin: defaults to parent and is stamped on msg before the handler runs', async () => {
const seen = [];
const reg = createRegistry([{ topic: 'set.mode', handler: (_s, msg) => { seen.push(msg.origin); } }]);
await reg.dispatch({ topic: 'set.mode', payload: 'auto' }, {}, {});
assert.equal(seen[0], 'parent');
});
test('origin: explicit msg.origin is preserved and trimmed', async () => {
const seen = [];
const reg = createRegistry([{ topic: 'set.mode', handler: (_s, msg) => { seen.push(msg.origin); } }]);
await reg.dispatch({ topic: 'set.mode', payload: 'auto', origin: ' GUI ' }, {}, {});
assert.equal(seen[0], 'GUI');
});
test('origin: defaultOrigin descriptor overrides the global parent default', async () => {
const seen = [];
const reg = createRegistry([{ topic: 'set.x', defaultOrigin: 'fysical', handler: (_s, msg) => { seen.push(msg.origin); } }]);
await reg.dispatch({ topic: 'set.x' }, {}, {});
assert.equal(seen[0], 'fysical');
});
test('origin gating: non-gated command is never blocked, even with a mode model', async () => {
let invoked = false;
const reg = createRegistry([{ topic: 'set.x', handler: () => { invoked = true; } }]);
const source = { currentMode: 'fysicalControl', config: { mode: { allowedSources: { fysicalControl: ['fysical'] } } } };
await reg.dispatch({ topic: 'set.x', origin: 'GUI' }, source, {});
assert.equal(invoked, true);
});
test('origin gating: gated command rejects an origin disallowed by the current mode', async () => {
const logger = makeLogger();
let invoked = false;
const reg = createRegistry([{ topic: 'set.x', gated: true, handler: () => { invoked = true; } }], { logger });
const source = { currentMode: 'fysicalControl', config: { mode: { allowedSources: { fysicalControl: ['fysical'] } } } };
await reg.dispatch({ topic: 'set.x', origin: 'GUI' }, source, {});
assert.equal(invoked, false);
assert.ok(logger._calls.warn.some((m) => m.includes("origin 'GUI' not allowed in mode 'fysicalControl'")));
});
test('origin gating: gated command accepts an allowed origin (Set or array allowedSources)', async () => {
let count = 0;
const reg = createRegistry([{ topic: 'set.x', gated: true, handler: () => { count += 1; } }]);
const arraySrc = { currentMode: 'auto', config: { mode: { allowedSources: { auto: ['parent', 'GUI', 'fysical'] } } } };
const setSrc = { currentMode: 'auto', config: { mode: { allowedSources: { auto: new Set(['parent', 'GUI', 'fysical']) } } } };
await reg.dispatch({ topic: 'set.x', origin: 'GUI' }, arraySrc, {});
await reg.dispatch({ topic: 'set.x', origin: 'GUI' }, setSrc, {});
assert.equal(count, 2);
});
test('origin gating: gated command on a node WITHOUT a mode model is advisory (allow-all)', async () => {
let invoked = false;
const reg = createRegistry([{ topic: 'set.x', gated: true, handler: () => { invoked = true; } }]);
await reg.dispatch({ topic: 'set.x', origin: 'GUI' }, { id: 'no-mode' }, {});
assert.equal(invoked, true);
});

View File

@@ -0,0 +1,78 @@
const test = require('node:test');
const assert = require('node:assert/strict');
const EventEmitter = require('events');
const MovementManager = require('../src/state/movementManager');
const noopLogger = { debug() {}, info() {}, warn() {}, error() {} };
const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
function makeManager({ mode = 'staticspeed', speed = 50, interval = 1000, initial = 0 } = {}) {
// speed%/s on a 0..100 range → velocity = speed %/s. interval defaults to the
// production 1000ms so the abort-before-first-tick race is reproduced exactly.
return new MovementManager(
{
position: { min: 0, max: 100, initial },
movement: { mode, speed, maxSpeed: 1000, interval },
},
noopLogger,
new EventEmitter(),
);
}
// Regression: before the time-based fix, currentPosition only advanced inside
// setInterval(…, interval). An abort landing before the first tick (the MGC's
// ~1s re-command cadence vs the 1000ms tick) left the pump frozen at the start.
for (const mode of ['staticspeed', 'dynspeed']) {
test(`${mode}: abort before the first tick still advances position (no freeze)`, async () => {
const mgr = makeManager({ mode, speed: 50, interval: 1000 });
const ac = new AbortController();
const moving = mgr.moveTo(80, ac.signal); // ~1.6s of travel; first tick at 1000ms
await sleep(200); // interrupt well before the first tick
ac.abort();
await moving;
const pos = mgr.getCurrentPosition();
// The fix: any non-zero progress means the abort re-based instead of
// freezing at the start. (dynspeed eases in, so its early travel is small
// but must still be > 0; staticspeed travels ~velocity·elapsed.)
assert.ok(pos > 0, `expected partial progress, got frozen at ${pos}`);
assert.ok(pos < 80, `should not have reached target, got ${pos}`);
});
test(`${mode}: a fresh setpoint re-bases from the interrupted position`, async () => {
const mgr = makeManager({ mode, speed: 50, interval: 1000 });
const ac1 = new AbortController();
const m1 = mgr.moveTo(80, ac1.signal);
await sleep(200);
ac1.abort();
await m1;
const afterFirst = mgr.getCurrentPosition();
// New command toward 0 must start from afterFirst, not from 80 or a reset.
const ac2 = new AbortController();
const m2 = mgr.moveTo(0, ac2.signal);
await sleep(100);
ac2.abort();
await m2;
const afterSecond = mgr.getCurrentPosition();
assert.ok(afterSecond < afterFirst, `expected re-base downward from ${afterFirst}, got ${afterSecond}`);
assert.ok(afterSecond >= 0, `position must stay in range, got ${afterSecond}`);
});
}
test('staticspeed: an uninterrupted move reaches the exact target', async () => {
const mgr = makeManager({ mode: 'staticspeed', speed: 500, interval: 10 }); // fast
await mgr.moveTo(40, new AbortController().signal);
assert.equal(mgr.getCurrentPosition(), 40);
});
test('position is clamped to [min,max] on a re-based abort', async () => {
const mgr = makeManager({ mode: 'staticspeed', speed: 5000, interval: 1000, initial: 0 });
const ac = new AbortController();
const moving = mgr.moveTo(100, ac.signal);
await sleep(150);
ac.abort();
await moving;
const pos = mgr.getCurrentPosition();
assert.ok(pos >= 0 && pos <= 100, `clamped, got ${pos}`);
});

View File

@@ -8,7 +8,7 @@ const config = {
general: { id: 'abc', unit: 'mbar' },
asset: {
uuid: 'u1',
tagcode: 't1',
tagCode: 't1',
geoLocation: { lat: 51.6, lon: 4.7 },
category: 'measurement',
type: 'pressure',
@@ -30,6 +30,35 @@ test('process format emits message with changed fields only', () => {
assert.deepEqual(third.payload, { b: 3, c: JSON.stringify({ x: 1 }) });
});
test('alwaysEmit fields bypass delta compression (re-emitted while unchanged)', () => {
const out = new OutputUtils({ alwaysEmit: ['ctrl'] });
const first = out.formatMsg({ ctrl: 40, flow: 12 }, config, 'influxdb');
assert.deepEqual(first.payload.fields, { ctrl: 40, flow: 12 });
// flow unchanged → dropped; ctrl unchanged but forced → still emitted.
const second = out.formatMsg({ ctrl: 40, flow: 12 }, config, 'influxdb');
assert.deepEqual(second.payload.fields, { ctrl: 40 });
// ctrl changed → emitted with its new value.
const third = out.formatMsg({ ctrl: 41, flow: 12 }, config, 'influxdb');
assert.deepEqual(third.payload.fields, { ctrl: 41 });
});
test('alwaysEmit is per-format and does not force a missing/undefined field', () => {
const out = new OutputUtils({ alwaysEmit: ['ctrl'] });
// ctrl absent from the output → nothing to force; with no other change the
// message is suppressed as usual.
out.formatMsg({ flow: 5 }, config, 'influxdb');
assert.equal(out.formatMsg({ flow: 5 }, config, 'influxdb'), null);
});
test('default OutputUtils keeps pure delta compression (no alwaysEmit)', () => {
const out = new OutputUtils();
out.formatMsg({ ctrl: 40 }, config, 'influxdb');
assert.equal(out.formatMsg({ ctrl: 40 }, config, 'influxdb'), null);
});
test('influx format flattens tags and stringifies tag values', () => {
const out = new OutputUtils();
const msg = out.formatMsg({ value: 10 }, config, 'influxdb');
@@ -38,5 +67,41 @@ test('influx format flattens tags and stringifies tag values', () => {
assert.equal(msg.payload.measurement, 'measurement_abc');
assert.equal(msg.payload.tags.geoLocation_lat, '51.6');
assert.equal(msg.payload.tags.geoLocation_lon, '4.7');
assert.equal(msg.payload.tags.tagcode, 't1');
assert.ok(msg.payload.timestamp instanceof Date);
});
test('influx format omits tags whose config value is unset', () => {
const out = new OutputUtils();
// No asset block at all: uuid/tagcode/geoLocation/category/type/model are
// all undefined and must NOT appear as `="undefined"` tags.
const sparse = {
functionality: { softwareType: 'measurement' },
general: { id: 'abc' },
};
const msg = out.formatMsg({ value: 10 }, sparse, 'influxdb');
for (const t of ['geoLocation', 'category', 'type', 'model', 'uuid', 'tagcode', 'unit', 'role']) {
assert.ok(!(t in msg.payload.tags), `tag "${t}" should be omitted when unset, got "${msg.payload.tags[t]}"`);
}
// Tags that DO have values still come through.
assert.equal(msg.payload.tags.id, 'abc');
assert.equal(msg.payload.tags.softwareType, 'measurement');
// Nothing should stringify to the literal "undefined".
for (const v of Object.values(msg.payload.tags)) {
assert.notEqual(v, 'undefined');
}
});
test('influx format drops empty-string tag values too', () => {
const out = new OutputUtils();
const cfg = {
functionality: { softwareType: 'pump', role: '' },
general: { id: 'p1' },
asset: { category: '', model: 'M9' },
};
const msg = out.formatMsg({ value: 1 }, cfg, 'influxdb');
assert.ok(!('role' in msg.payload.tags));
assert.ok(!('category' in msg.payload.tags));
assert.equal(msg.payload.tags.model, 'M9');
});