feat(output): alwaysEmit fields, drop undefined/empty Influx tags, time-based movement re-basing

- OutputUtils: new `alwaysEmit` option exempts named fields from delta
  compression so steady-state values (e.g. ctrl) trace continuously.
- flattenTags now drops null/undefined/empty-string tag values, fixing
  literal `category="undefined"` tags that split every Grafana series in two.
- BaseNodeAdapter wires `static alwaysEmitFields` from the subclass.
- movementManager: track position by elapsed wall-time and capture partial
  progress on abort, so a fast-re-commanding parent can't freeze an actuator
  at its start position.
- Tests for the above.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
znetsixe
2026-05-27 16:09:14 +02:00
parent bc79de133e
commit c0be50d02c
5 changed files with 221 additions and 48 deletions

View File

@@ -0,0 +1,78 @@
const test = require('node:test');
const assert = require('node:assert/strict');
const EventEmitter = require('events');
const MovementManager = require('../src/state/movementManager');
const noopLogger = { debug() {}, info() {}, warn() {}, error() {} };
const sleep = (ms) => new Promise((r) => setTimeout(r, ms));
function makeManager({ mode = 'staticspeed', speed = 50, interval = 1000, initial = 0 } = {}) {
// speed%/s on a 0..100 range → velocity = speed %/s. interval defaults to the
// production 1000ms so the abort-before-first-tick race is reproduced exactly.
return new MovementManager(
{
position: { min: 0, max: 100, initial },
movement: { mode, speed, maxSpeed: 1000, interval },
},
noopLogger,
new EventEmitter(),
);
}
// Regression: before the time-based fix, currentPosition only advanced inside
// setInterval(…, interval). An abort landing before the first tick (the MGC's
// ~1s re-command cadence vs the 1000ms tick) left the pump frozen at the start.
for (const mode of ['staticspeed', 'dynspeed']) {
test(`${mode}: abort before the first tick still advances position (no freeze)`, async () => {
const mgr = makeManager({ mode, speed: 50, interval: 1000 });
const ac = new AbortController();
const moving = mgr.moveTo(80, ac.signal); // ~1.6s of travel; first tick at 1000ms
await sleep(200); // interrupt well before the first tick
ac.abort();
await moving;
const pos = mgr.getCurrentPosition();
// The fix: any non-zero progress means the abort re-based instead of
// freezing at the start. (dynspeed eases in, so its early travel is small
// but must still be > 0; staticspeed travels ~velocity·elapsed.)
assert.ok(pos > 0, `expected partial progress, got frozen at ${pos}`);
assert.ok(pos < 80, `should not have reached target, got ${pos}`);
});
test(`${mode}: a fresh setpoint re-bases from the interrupted position`, async () => {
const mgr = makeManager({ mode, speed: 50, interval: 1000 });
const ac1 = new AbortController();
const m1 = mgr.moveTo(80, ac1.signal);
await sleep(200);
ac1.abort();
await m1;
const afterFirst = mgr.getCurrentPosition();
// New command toward 0 must start from afterFirst, not from 80 or a reset.
const ac2 = new AbortController();
const m2 = mgr.moveTo(0, ac2.signal);
await sleep(100);
ac2.abort();
await m2;
const afterSecond = mgr.getCurrentPosition();
assert.ok(afterSecond < afterFirst, `expected re-base downward from ${afterFirst}, got ${afterSecond}`);
assert.ok(afterSecond >= 0, `position must stay in range, got ${afterSecond}`);
});
}
test('staticspeed: an uninterrupted move reaches the exact target', async () => {
const mgr = makeManager({ mode: 'staticspeed', speed: 500, interval: 10 }); // fast
await mgr.moveTo(40, new AbortController().signal);
assert.equal(mgr.getCurrentPosition(), 40);
});
test('position is clamped to [min,max] on a re-based abort', async () => {
const mgr = makeManager({ mode: 'staticspeed', speed: 5000, interval: 1000, initial: 0 });
const ac = new AbortController();
const moving = mgr.moveTo(100, ac.signal);
await sleep(150);
ac.abort();
await moving;
const pos = mgr.getCurrentPosition();
assert.ok(pos >= 0 && pos <= 100, `clamped, got ${pos}`);
});

View File

@@ -30,6 +30,35 @@ test('process format emits message with changed fields only', () => {
assert.deepEqual(third.payload, { b: 3, c: JSON.stringify({ x: 1 }) });
});
test('alwaysEmit fields bypass delta compression (re-emitted while unchanged)', () => {
const out = new OutputUtils({ alwaysEmit: ['ctrl'] });
const first = out.formatMsg({ ctrl: 40, flow: 12 }, config, 'influxdb');
assert.deepEqual(first.payload.fields, { ctrl: 40, flow: 12 });
// flow unchanged → dropped; ctrl unchanged but forced → still emitted.
const second = out.formatMsg({ ctrl: 40, flow: 12 }, config, 'influxdb');
assert.deepEqual(second.payload.fields, { ctrl: 40 });
// ctrl changed → emitted with its new value.
const third = out.formatMsg({ ctrl: 41, flow: 12 }, config, 'influxdb');
assert.deepEqual(third.payload.fields, { ctrl: 41 });
});
test('alwaysEmit is per-format and does not force a missing/undefined field', () => {
const out = new OutputUtils({ alwaysEmit: ['ctrl'] });
// ctrl absent from the output → nothing to force; with no other change the
// message is suppressed as usual.
out.formatMsg({ flow: 5 }, config, 'influxdb');
assert.equal(out.formatMsg({ flow: 5 }, config, 'influxdb'), null);
});
test('default OutputUtils keeps pure delta compression (no alwaysEmit)', () => {
const out = new OutputUtils();
out.formatMsg({ ctrl: 40 }, config, 'influxdb');
assert.equal(out.formatMsg({ ctrl: 40 }, config, 'influxdb'), null);
});
test('influx format flattens tags and stringifies tag values', () => {
const out = new OutputUtils();
const msg = out.formatMsg({ value: 10 }, config, 'influxdb');
@@ -41,3 +70,38 @@ test('influx format flattens tags and stringifies tag values', () => {
assert.equal(msg.payload.tags.tagcode, 't1');
assert.ok(msg.payload.timestamp instanceof Date);
});
test('influx format omits tags whose config value is unset', () => {
const out = new OutputUtils();
// No asset block at all: uuid/tagcode/geoLocation/category/type/model are
// all undefined and must NOT appear as `="undefined"` tags.
const sparse = {
functionality: { softwareType: 'measurement' },
general: { id: 'abc' },
};
const msg = out.formatMsg({ value: 10 }, sparse, 'influxdb');
for (const t of ['geoLocation', 'category', 'type', 'model', 'uuid', 'tagcode', 'unit', 'role']) {
assert.ok(!(t in msg.payload.tags), `tag "${t}" should be omitted when unset, got "${msg.payload.tags[t]}"`);
}
// Tags that DO have values still come through.
assert.equal(msg.payload.tags.id, 'abc');
assert.equal(msg.payload.tags.softwareType, 'measurement');
// Nothing should stringify to the literal "undefined".
for (const v of Object.values(msg.payload.tags)) {
assert.notEqual(v, 'undefined');
}
});
test('influx format drops empty-string tag values too', () => {
const out = new OutputUtils();
const cfg = {
functionality: { softwareType: 'pump', role: '' },
general: { id: 'p1' },
asset: { category: '', model: 'M9' },
};
const msg = out.formatMsg({ value: 1 }, cfg, 'influxdb');
assert.ok(!('role' in msg.payload.tags));
assert.ok(!('category' in msg.payload.tags));
assert.equal(msg.payload.tags.model, 'M9');
});