const assert = require('node:assert/strict'); const { normalizeInput } = require('../../src/normalizer'); const { PointReducer } = require('../../src/reducer'); const { CoreSyncDomain } = require('../../src/coreSyncDomain'); function telemetry(timestamp, value) { return { payload: { measurement: 'P-1', fields: { 'pressure.measured.upstream.PT-1': value, }, tags: { tagcode: 'P-1', }, timestamp, }, }; } test('normalizer maps EVOLV dbase payloads to stable stream identities', () => { const points = normalizeInput(telemetry('2026-05-19T10:15:30.000Z', 12345)); assert.equal(points.length, 1); assert.equal(points[0].thingTag, 'P-1'); assert.equal(points[0].type, 'pressure'); assert.equal(points[0].variant, 'measured'); assert.equal(points[0].position, 'upstream'); assert.equal(points[0].sensorTag, 'PT-1'); assert.equal(points[0].streamKey, 'P-1:pressure:measured:upstream:PT-1'); assert.equal(points[0].unit, 'Pa'); }); test('point reducer keeps first point and previous point on angle change', () => { const reducer = new PointReducer({ angleToleranceDeg: 5, timeScaleMs: 60000, valueScale: 1, maxGapMs: 0, }); const p1 = { time: new Date('2026-05-19T10:00:00.000Z'), value: 0 }; const p2 = { time: new Date('2026-05-19T10:01:00.000Z'), value: 1 }; const p3 = { time: new Date('2026-05-19T10:02:00.000Z'), value: 1 }; assert.deepEqual(reducer.offer(p1).map((x) => x.reason), ['first']); assert.deepEqual(reducer.offer(p2), []); const changed = reducer.offer(p3); assert.equal(changed.length, 1); assert.equal(changed[0].reason, 'angle-change'); assert.equal(changed[0].point, p2); }); test('coresync emits metadata lookup first and drains observations after resolver completes', () => { const hub = new CoreSyncDomain({ frostBaseUrl: 'http://frost.example/FROST-Server', serviceVersion: 'v1.1', }); const first = hub.handleMessage(telemetry('2026-05-19T10:00:00.000Z', 12345)); assert.equal(first[1].length, 1); assert.equal(first[1][0].topic, 'frost.metadata.lookup'); assert.equal(first[1][0]._coreSync.kind, 'thing'); const streamKey = 'P-1:pressure:measured:upstream:PT-1'; const kinds = [ ['thing', 1], ['observedProperty', 2], ['sensor', 3], ['featureOfInterest', 4], ['datastream', 5], ]; let output; for (const [kind, id] of kinds) { output = hub.handleMessage({ topic: 'frost.response', statusCode: 200, payload: { value: [{ '@iot.id': id }] }, _coreSync: { kind, action: 'lookup', streamKey }, }); } assert.equal(output[1].length, 1); assert.equal(output[1][0].topic, 'frost.observation.create'); assert.equal(output[1][0].method, 'POST'); assert.equal(output[1][0].url, 'http://frost.example/FROST-Server/v1.1/Datastreams(5)/Observations'); assert.equal(output[1][0].payload.FeatureOfInterest['@iot.id'], 4); assert.equal(output[1][0].payload.parameters.reductionReason, 'first'); }); test('pending observation queue keeps first and latest unresolved points', () => { const hub = new CoreSyncDomain({}); hub.handleMessage(telemetry('2026-05-19T10:00:00.000Z', 1)); hub.handleMessage(telemetry('2026-05-19T10:01:00.000Z', 2)); hub.handleMessage({ topic: 'coresync.flush' }); hub.handleMessage(telemetry('2026-05-19T10:02:00.000Z', 3)); hub.handleMessage({ topic: 'coresync.flush' }); const state = hub.streams.get('P-1:pressure:measured:upstream:PT-1'); assert.equal(state.pendingObservations.length, 2); assert.equal(state.pendingObservations[0].point.value, 1); assert.equal(state.pendingObservations[1].point.value, 3); });