106 lines
3.6 KiB
JavaScript
106 lines
3.6 KiB
JavaScript
|
|
const assert = require('node:assert/strict');
|
||
|
|
|
||
|
|
const { normalizeInput } = require('../../src/normalizer');
|
||
|
|
const { PointReducer } = require('../../src/reducer');
|
||
|
|
const { CoreSyncDomain } = require('../../src/coreSyncDomain');
|
||
|
|
|
||
|
|
function telemetry(timestamp, value) {
|
||
|
|
return {
|
||
|
|
payload: {
|
||
|
|
measurement: 'P-1',
|
||
|
|
fields: {
|
||
|
|
'pressure.measured.upstream.PT-1': value,
|
||
|
|
},
|
||
|
|
tags: {
|
||
|
|
tagcode: 'P-1',
|
||
|
|
},
|
||
|
|
timestamp,
|
||
|
|
},
|
||
|
|
};
|
||
|
|
}
|
||
|
|
|
||
|
|
test('normalizer maps EVOLV dbase payloads to stable stream identities', () => {
|
||
|
|
const points = normalizeInput(telemetry('2026-05-19T10:15:30.000Z', 12345));
|
||
|
|
|
||
|
|
assert.equal(points.length, 1);
|
||
|
|
assert.equal(points[0].thingTag, 'P-1');
|
||
|
|
assert.equal(points[0].type, 'pressure');
|
||
|
|
assert.equal(points[0].variant, 'measured');
|
||
|
|
assert.equal(points[0].position, 'upstream');
|
||
|
|
assert.equal(points[0].sensorTag, 'PT-1');
|
||
|
|
assert.equal(points[0].streamKey, 'P-1:pressure:measured:upstream:PT-1');
|
||
|
|
assert.equal(points[0].unit, 'Pa');
|
||
|
|
});
|
||
|
|
|
||
|
|
test('point reducer keeps first point and previous point on angle change', () => {
|
||
|
|
const reducer = new PointReducer({
|
||
|
|
angleToleranceDeg: 5,
|
||
|
|
timeScaleMs: 60000,
|
||
|
|
valueScale: 1,
|
||
|
|
maxGapMs: 0,
|
||
|
|
});
|
||
|
|
const p1 = { time: new Date('2026-05-19T10:00:00.000Z'), value: 0 };
|
||
|
|
const p2 = { time: new Date('2026-05-19T10:01:00.000Z'), value: 1 };
|
||
|
|
const p3 = { time: new Date('2026-05-19T10:02:00.000Z'), value: 1 };
|
||
|
|
|
||
|
|
assert.deepEqual(reducer.offer(p1).map((x) => x.reason), ['first']);
|
||
|
|
assert.deepEqual(reducer.offer(p2), []);
|
||
|
|
const changed = reducer.offer(p3);
|
||
|
|
assert.equal(changed.length, 1);
|
||
|
|
assert.equal(changed[0].reason, 'angle-change');
|
||
|
|
assert.equal(changed[0].point, p2);
|
||
|
|
});
|
||
|
|
|
||
|
|
test('coresync emits metadata lookup first and drains observations after resolver completes', () => {
|
||
|
|
const hub = new CoreSyncDomain({
|
||
|
|
frostBaseUrl: 'http://frost.example/FROST-Server',
|
||
|
|
serviceVersion: 'v1.1',
|
||
|
|
});
|
||
|
|
|
||
|
|
const first = hub.handleMessage(telemetry('2026-05-19T10:00:00.000Z', 12345));
|
||
|
|
assert.equal(first[1].length, 1);
|
||
|
|
assert.equal(first[1][0].topic, 'frost.metadata.lookup');
|
||
|
|
assert.equal(first[1][0]._coreSync.kind, 'thing');
|
||
|
|
|
||
|
|
const streamKey = 'P-1:pressure:measured:upstream:PT-1';
|
||
|
|
const kinds = [
|
||
|
|
['thing', 1],
|
||
|
|
['observedProperty', 2],
|
||
|
|
['sensor', 3],
|
||
|
|
['featureOfInterest', 4],
|
||
|
|
['datastream', 5],
|
||
|
|
];
|
||
|
|
|
||
|
|
let output;
|
||
|
|
for (const [kind, id] of kinds) {
|
||
|
|
output = hub.handleMessage({
|
||
|
|
topic: 'frost.response',
|
||
|
|
statusCode: 200,
|
||
|
|
payload: { value: [{ '@iot.id': id }] },
|
||
|
|
_coreSync: { kind, action: 'lookup', streamKey },
|
||
|
|
});
|
||
|
|
}
|
||
|
|
|
||
|
|
assert.equal(output[1].length, 1);
|
||
|
|
assert.equal(output[1][0].topic, 'frost.observation.create');
|
||
|
|
assert.equal(output[1][0].method, 'POST');
|
||
|
|
assert.equal(output[1][0].url, 'http://frost.example/FROST-Server/v1.1/Datastreams(5)/Observations');
|
||
|
|
assert.equal(output[1][0].payload.FeatureOfInterest['@iot.id'], 4);
|
||
|
|
assert.equal(output[1][0].payload.parameters.reductionReason, 'first');
|
||
|
|
});
|
||
|
|
|
||
|
|
test('pending observation queue keeps first and latest unresolved points', () => {
|
||
|
|
const hub = new CoreSyncDomain({});
|
||
|
|
|
||
|
|
hub.handleMessage(telemetry('2026-05-19T10:00:00.000Z', 1));
|
||
|
|
hub.handleMessage(telemetry('2026-05-19T10:01:00.000Z', 2));
|
||
|
|
hub.handleMessage({ topic: 'coresync.flush' });
|
||
|
|
hub.handleMessage(telemetry('2026-05-19T10:02:00.000Z', 3));
|
||
|
|
hub.handleMessage({ topic: 'coresync.flush' });
|
||
|
|
|
||
|
|
const state = hub.streams.get('P-1:pressure:measured:upstream:PT-1');
|
||
|
|
assert.equal(state.pendingObservations.length, 2);
|
||
|
|
assert.equal(state.pendingObservations[0].point.value, 1);
|
||
|
|
assert.equal(state.pendingObservations[1].point.value, 3);
|
||
|
|
});
|