Files
coresync/test/basic/coresync.basic.test.js
znetsixe 21d77a8afa feat(coresync): FROST/Influx/Grafana demo + sub-tick burst-window reducer fix
Lands the end-to-end CoreSync demo (frost-influx-grafana.flow.json) along
with the supporting normalizer/identity/interpolation/reducer work, plus
a targeted fix for the under-compression bug surfaced by the FROST demo.

Under-compression fix (PointReducer):
- New burstWindowMs option (default 0, opt-in). When two samples arrive
  within burstWindowMs of each other, the second is treated as the same
  wall-clock observation: previous is replaced, slope analysis is skipped.
- Without this guard, sub-millisecond bursts (rotatingMachine emits twice
  per pressure-injection cycle, ~1 ms apart) produced near-vertical
  apparent slopes that tripped angle-change on every tick — driving
  cog/efficiency/SEC streams to ~0.6% reduction (i.e. no compression).
- With burstWindowMs: 10 in the demo flow, the same streams now compress
  at 78-93% (verified end-to-end in InfluxDB over a 3-min window).
- Editor HTML exposes the new "Burst dt" field with explanatory tooltip.

Regression test (test/basic/coresync.basic.test.js):
- New "burstWindowMs collapses sub-tick sample bursts into a single
  observation" test reproduces the exact burst pattern from the demo
  and asserts before/after behaviour.
- Existing 14 tests continue to pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 20:27:28 +02:00

322 lines
12 KiB
JavaScript

const assert = require('node:assert/strict');
const fs = require('node:fs');
const path = require('node:path');
const { normalizeInput, normalizeOne } = require('../../src/normalizer');
const { PointReducer } = require('../../src/reducer');
const { CoreSyncDomain } = require('../../src/coreSyncDomain');
const { interpolateAt, reconstructSeries } = require('../../src/interpolation');
const { createRequest } = require('../../src/frostRequests');
function telemetry(timestamp, value) {
return {
payload: {
measurement: 'P-1',
fields: {
'pressure.measured.upstream.PT-1': value,
},
tags: {
tagcode: 'P-1',
},
timestamp,
},
};
}
test('normalizer maps EVOLV dbase payloads to stable stream identities', () => {
const points = normalizeInput(telemetry('2026-05-19T10:15:30.000Z', 12345));
assert.equal(points.length, 1);
assert.equal(points[0].thingTag, 'P-1');
assert.equal(points[0].type, 'pressure');
assert.equal(points[0].variant, 'measured');
assert.equal(points[0].position, 'upstream');
assert.equal(points[0].sensorTag, 'PT-1');
assert.equal(points[0].streamKey, 'P-1:pressure:measured:upstream:PT-1');
assert.equal(points[0].unit, 'Pa');
});
test('normalizer infers rotatingMachine field units from field type', () => {
const points = normalizeOne({
measurement: 'rotatingmachine_cse_rm_pump',
fields: {
'pressure.measured.downstream.dashboard-sim-downstream': 1700,
},
tags: {
tagcode: 'P-101',
softwareType: 'rotatingmachine',
unit: 'm3/h',
},
source: {
softwareType: 'rotatingmachine',
unit: 'm3/h',
},
timestamp: '2026-05-19T10:15:30.000Z',
});
assert.equal(points[0].unit, 'mbar');
});
test('normalizer keeps measurement unit for measurement-node fields', () => {
const points = normalizeOne({
measurement: 'FT-101',
fields: {
'flow.measured.upstream.FT-101': 42,
},
tags: {
tagcode: 'P-101',
softwareType: 'measurement',
unit: 'm3/h',
},
timestamp: '2026-05-19T10:15:30.000Z',
});
assert.equal(points[0].unit, 'm3/h');
});
test('point reducer keeps first point and previous point on angle change', () => {
const reducer = new PointReducer({
angleToleranceDeg: 5,
timeScaleMs: 60000,
valueScale: 1,
maxGapMs: 0,
});
const p1 = { time: new Date('2026-05-19T10:00:00.000Z'), value: 0 };
const p2 = { time: new Date('2026-05-19T10:01:00.000Z'), value: 1 };
const p3 = { time: new Date('2026-05-19T10:02:00.000Z'), value: 1 };
assert.deepEqual(reducer.offer(p1).map((x) => x.reason), ['first']);
assert.deepEqual(reducer.offer(p2), []);
const changed = reducer.offer(p3);
assert.equal(changed.length, 1);
assert.equal(changed[0].reason, 'angle-change');
assert.equal(changed[0].point, p2);
});
test('coresync emits metadata lookup first and drains observations after resolver completes', () => {
const hub = new CoreSyncDomain({
frostBaseUrl: 'http://frost.example/FROST-Server',
serviceVersion: 'v1.1',
});
const first = hub.handleMessage(telemetry('2026-05-19T10:00:00.000Z', 12345));
assert.equal(first[1].length, 1);
assert.equal(first[1][0].topic, 'frost.metadata.lookup');
assert.equal(first[1][0]._coreSync.kind, 'thing');
const streamKey = 'P-1:pressure:measured:upstream:PT-1';
const kinds = [
['thing', 1],
['observedProperty', 2],
['sensor', 3],
['featureOfInterest', 4],
['datastream', 5],
];
let output;
for (const [kind, id] of kinds) {
output = hub.handleMessage({
topic: 'frost.response',
statusCode: 200,
payload: { value: [{ '@iot.id': id }] },
_coreSync: { kind, action: 'lookup', streamKey },
});
}
assert.equal(output[1].length, 1);
assert.equal(output[1][0].topic, 'frost.observation.create');
assert.equal(output[1][0].method, 'POST');
assert.equal(output[1][0].url, 'http://frost.example/FROST-Server/v1.1/Datastreams(5)/Observations');
assert.equal(output[1][0].payload.FeatureOfInterest['@iot.id'], 4);
assert.equal(output[1][0].payload.parameters.reductionReason, 'first');
assert.equal(output[1][0].payload.parameters.interpolationHint, 'linear-or-directional-monotone-cubic');
});
test('pending observation queue keeps first and latest unresolved points', () => {
const hub = new CoreSyncDomain({});
hub.handleMessage(telemetry('2026-05-19T10:00:00.000Z', 1));
hub.handleMessage(telemetry('2026-05-19T10:01:00.000Z', 2));
hub.handleMessage({ topic: 'coresync.flush' });
hub.handleMessage(telemetry('2026-05-19T10:02:00.000Z', 3));
hub.handleMessage({ topic: 'coresync.flush' });
const state = hub.streams.get('P-1:pressure:measured:upstream:PT-1');
assert.equal(state.pendingObservations.length, 2);
assert.equal(state.pendingObservations[0].point.value, 1);
assert.equal(state.pendingObservations[1].point.value, 3);
});
test('successful bodyless metadata create is followed by lookup', () => {
const hub = new CoreSyncDomain({
frostBaseUrl: 'http://frost.example/FROST-Server',
serviceVersion: 'v1.1',
});
hub.handleMessage(telemetry('2026-05-19T10:00:00.000Z', 12345));
const output = hub.handleMessage({
topic: 'frost.response',
statusCode: 201,
payload: null,
_coreSync: {
kind: 'thing',
action: 'create',
streamKey: 'P-1:pressure:measured:upstream:PT-1',
},
});
assert.equal(output[1].length, 1);
assert.equal(output[1][0].topic, 'frost.metadata.lookup');
assert.equal(output[1][0]._coreSync.kind, 'thing');
});
test('sensor create payload includes FROST-required description', () => {
const point = normalizeInput(telemetry('2026-05-19T10:15:30.000Z', 12345))[0];
const request = createRequest({ frostBaseUrl: 'http://frost.example/FROST-Server', serviceVersion: 'v1.1' }, point, 'sensor', {});
assert.equal(request.payload.name, 'PT-1');
assert.equal(request.payload.description, 'EVOLV sensor PT-1');
});
test('pressure reducer scale follows pressure unit', () => {
const paHub = new CoreSyncDomain({});
const mbarHub = new CoreSyncDomain({});
const barHub = new CoreSyncDomain({});
assert.equal(paHub._valueScaleFor({ type: 'pressure', unit: 'Pa' }), 100000);
assert.equal(mbarHub._valueScaleFor({ type: 'pressure', unit: 'mbar' }), 1000);
assert.equal(barHub._valueScaleFor({ type: 'pressure', unit: 'bar' }), 10);
});
test('reducer keeps only line-drawing knots for a dense directional series', () => {
const reducer = new PointReducer({
angleToleranceDeg: 2,
timeScaleMs: 1000,
valueScale: 1,
maxGapMs: 0,
});
const kept = [];
const start = Date.parse('2026-05-19T10:00:00.000Z');
for (let i = 0; i <= 60; i += 1) {
const value = i <= 30 ? i : 60 - i;
kept.push(...reducer.offer({
time: new Date(start + (i * 1000)),
phenomenonTime: new Date(start + (i * 1000)).toISOString(),
value,
}));
}
kept.push(...reducer.flush('flush'));
assert.equal(kept.length, 3);
assert.deepEqual(kept.map((x) => x.reason), ['first', 'angle-change', 'flush']);
assert.deepEqual(kept.map((x) => x.direction), ['unknown', 'rising', 'falling']);
assert.equal(kept[1].point.value, 30);
});
test('burstWindowMs collapses sub-tick sample bursts into a single observation', () => {
// Repro for the CoreSync FROST demo bug: rotatingMachine emits two telemetry
// samples ~1ms apart per pressure-injection cycle. Without a burst guard,
// dt=1ms with even tiny dy produces near-vertical slopes that trip
// angle-change on every tick → near-zero compression. With burstWindowMs > 0
// the burst is collapsed and only real direction changes emit knots.
const baseOptions = { angleToleranceDeg: 1, timeScaleMs: 1000, valueScale: 1, maxGapMs: 0 };
const start = Date.parse('2026-05-22T10:00:00.000Z');
// Three ticks of: (steady value, then 1ms later a tiny noisy nudge), 2s apart
// — exactly the rotatingMachine telemetry pattern from the FROST demo.
const pattern = [
{ dt: 0, v: 0.200 },
{ dt: 1, v: 0.203 },
{ dt: 2000, v: 0.197 },
{ dt: 2001, v: 0.200 },
{ dt: 4000, v: 0.203 },
{ dt: 4001, v: 0.197 },
];
const noGuard = new PointReducer(baseOptions);
const noGuardKnots = pattern.flatMap((p) => noGuard.offer({
time: new Date(start + p.dt),
phenomenonTime: new Date(start + p.dt).toISOString(),
value: p.v,
}));
// Without the guard, the spurious sub-ms slopes cause the reducer to emit a
// knot on nearly every burst sample → demonstrably broken compression.
assert.ok(noGuardKnots.length >= 4, `expected ≥4 spurious knots without guard, got ${noGuardKnots.length}`);
const guarded = new PointReducer({ ...baseOptions, burstWindowMs: 10 });
const guardedKnots = pattern.flatMap((p) => guarded.offer({
time: new Date(start + p.dt),
phenomenonTime: new Date(start + p.dt).toISOString(),
value: p.v,
}));
// With burstWindowMs=10, sub-ms bursts are collapsed: only the initial 'first'
// knot and at most one real direction-change knot survive.
assert.ok(guardedKnots.length <= 2, `expected ≤2 knots with guard, got ${guardedKnots.length}`);
assert.equal(guardedKnots[0].reason, 'first');
});
test('linear reconstruction restores omitted points exactly for piecewise-linear knots', () => {
const knots = [
{ phenomenonTime: '2026-05-19T10:00:00.000Z', result: 0, direction: 'unknown' },
{ phenomenonTime: '2026-05-19T10:00:30.000Z', result: 30, direction: 'rising' },
{ phenomenonTime: '2026-05-19T10:01:00.000Z', result: 0, direction: 'falling' },
];
assert.equal(interpolateAt(knots, '2026-05-19T10:00:15.000Z', { method: 'linear' }), 15);
assert.equal(interpolateAt(knots, '2026-05-19T10:00:45.000Z', { method: 'linear' }), 15);
const series = reconstructSeries(knots, { intervalMs: 15000, method: 'linear' });
assert.deepEqual(series.map((point) => point.value), [0, 15, 30, 15, 0]);
assert.equal(series[1].reconstructed, true);
assert.equal(series[2].reconstructed, false);
});
test('directional cubic reconstruction honors knot directions without overshoot', () => {
const knots = [
{ phenomenonTime: '2026-05-19T10:00:00.000Z', result: 0, direction: 'rising' },
{ phenomenonTime: '2026-05-19T10:00:30.000Z', result: 30, direction: 'flat' },
{ phenomenonTime: '2026-05-19T10:01:00.000Z', result: 0, direction: 'falling' },
];
const series = reconstructSeries(knots, { intervalMs: 5000, method: 'directional-cubic' });
for (const point of series) {
assert.equal(point.value >= -1e-9, true);
assert.equal(point.value <= 30 + 1e-9, true);
}
assert.equal(series[0].value, 0);
assert.equal(series[6].value, 30);
assert.equal(series[12].value, 0);
});
test('FROST/Influx/Grafana example flow contains the end-to-end CoreSync path', () => {
const flowPath = path.resolve(__dirname, '../../examples/frost-influx-grafana.flow.json');
const flow = JSON.parse(fs.readFileSync(flowPath, 'utf8'));
const byId = new Map(flow.map((node) => [node.id, node]));
assert.equal(flow.some((node) => node.type === 'measurement'), true);
assert.equal(flow.some((node) => node.type === 'rotatingMachine'), true);
assert.equal(flow.some((node) => node.type === 'coresync'), true);
const core = byId.get('cse_coresync');
assert.equal(core.frostBaseUrl, 'https://sta.wbd-rd.nl/FROST-Server/');
assert.equal(core.wires[1].includes('cse_fn_frost_auth'), true);
assert.equal(core.wires[1].includes('cse_fn_knot_influx'), true);
const response = byId.get('cse_fn_frost_response');
assert.equal(response.wires[0].includes('cse_coresync'), true);
assert.equal(flow.some((node) => node.id === 'cse_fn_raw_influx'), true);
assert.equal(flow.some((node) => node.id === 'cse_http_influx_knot'), true);
});
test('CoreSync Grafana dashboard provisioning JSON is valid', () => {
const dashboardPath = path.resolve(__dirname, '../../../../docker/grafana/provisioning/dashboards/coresync-frost-demo.json');
const dashboard = JSON.parse(fs.readFileSync(dashboardPath, 'utf8'));
assert.equal(dashboard.uid, 'coresync-frost-demo');
assert.equal(dashboard.panels.length >= 3, true);
assert.equal(JSON.stringify(dashboard).includes('coresync_knots'), true);
});