From 21d77a8afa2a0754255c08d478edc3c9bfbb49e0 Mon Sep 17 00:00:00 2001 From: znetsixe Date: Fri, 22 May 2026 20:27:28 +0200 Subject: [PATCH] feat(coresync): FROST/Influx/Grafana demo + sub-tick burst-window reducer fix MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Lands the end-to-end CoreSync demo (frost-influx-grafana.flow.json) along with the supporting normalizer/identity/interpolation/reducer work, plus a targeted fix for the under-compression bug surfaced by the FROST demo. Under-compression fix (PointReducer): - New burstWindowMs option (default 0, opt-in). When two samples arrive within burstWindowMs of each other, the second is treated as the same wall-clock observation: previous is replaced, slope analysis is skipped. - Without this guard, sub-millisecond bursts (rotatingMachine emits twice per pressure-injection cycle, ~1 ms apart) produced near-vertical apparent slopes that tripped angle-change on every tick — driving cog/efficiency/SEC streams to ~0.6% reduction (i.e. no compression). - With burstWindowMs: 10 in the demo flow, the same streams now compress at 78-93% (verified end-to-end in InfluxDB over a 3-min window). - Editor HTML exposes the new "Burst dt" field with explanatory tooltip. Regression test (test/basic/coresync.basic.test.js): - New "burstWindowMs collapses sub-tick sample bursts into a single observation" test reproduces the exact burst pattern from the demo and asserts before/after behaviour. - Existing 14 tests continue to pass. Co-Authored-By: Claude Opus 4.7 (1M context) --- coresync.html | 5 + coresync.js | 1 + examples/README.md | 34 + examples/frost-influx-grafana.flow.json | 870 ++++++++++++++++++++++++ src/coreSyncDomain.js | 9 + src/frostRequests.js | 6 + src/identity.js | 22 +- src/interpolation.js | 151 ++++ src/reducer.js | 45 +- test/basic/coresync.basic.test.js | 218 +++++- 10 files changed, 1355 insertions(+), 6 deletions(-) create mode 100644 examples/README.md create mode 100644 examples/frost-influx-grafana.flow.json create mode 100644 src/interpolation.js diff --git a/coresync.html b/coresync.html index ef2f0a1..f833315 100644 --- a/coresync.html +++ b/coresync.html @@ -15,6 +15,7 @@ maxGapMs: { value: 300000, required: true, validate: RED.validators.number() }, minDeltaTimeMs: { value: 0, required: true, validate: RED.validators.number() }, minDeltaValue: { value: 0, required: true, validate: RED.validators.number() }, + burstWindowMs: { value: 0, required: true, validate: RED.validators.number() }, maxQueuedObservationsPerStream: { value: 2, required: true, validate: RED.validators.number() }, diagnosticsEnabled: { value: false }, }, @@ -83,6 +84,10 @@ +
+ + +
diff --git a/coresync.js b/coresync.js index 0e3285a..3fb53fd 100644 --- a/coresync.js +++ b/coresync.js @@ -19,6 +19,7 @@ module.exports = function(RED) { maxGapMs: Number(config.maxGapMs), minDeltaTimeMs: Number(config.minDeltaTimeMs), minDeltaValue: Number(config.minDeltaValue), + burstWindowMs: Number(config.burstWindowMs) || 0, comparisonMode: config.comparisonMode || 'angle', }, }); diff --git a/examples/README.md b/examples/README.md new file mode 100644 index 0000000..3d1af72 --- /dev/null +++ b/examples/README.md @@ -0,0 +1,34 @@ +# CoreSync Examples + +## FROST + Influx + Grafana + +Import `frost-influx-grafana.flow.json` into Node-RED when the local Docker stack is running. + +The flow demonstrates: + +- a `measurement` node producing a 1 Hz directional flow signal; +- a `rotatingMachine` node producing event-driven pump telemetry; +- raw EVOLV telemetry written to local InfluxDB; +- CoreSync reducing the same telemetry to knots; +- CoreSync FROST HTTP requests sent to `https://sta.wbd-rd.nl/FROST-Server/v1.1`; +- FROST responses looped back into CoreSync as `msg.topic = "frost.response"`; +- CoreSync knots also written to local InfluxDB as `coresync_knots` for Grafana comparison. +- a Node-RED dashboard page with live raw-field, knot, FROST observation, and percent-saved counters. + +Do not store FROST credentials in the flow. Start Node-RED with: + +```sh +FROST_USER=write FROST_PASSWORD='' docker compose up -d --build +``` + +The local InfluxDB and Grafana settings match `docker-compose.yml`: + +- InfluxDB: `http://localhost:8086`, org `evolv`, bucket `telemetry`, token `evolv-dev-token` +- Grafana: `http://localhost:3000`, user `admin`, password `evolv` + +The provisioned Grafana dashboard `CoreSync FROST Demo` compares raw telemetry against the reduced knot stream. The raw samples are intentionally denser than the knot series; the knot series is the data that should be sent to FROST. + +Runtime dashboards: + +- Node-RED live counters: `http://localhost:1880/dashboard/coresync-frost` +- Grafana raw-vs-knot comparison: `http://localhost:3000/d/coresync-frost-demo/coresync-frost-demo` diff --git a/examples/frost-influx-grafana.flow.json b/examples/frost-influx-grafana.flow.json new file mode 100644 index 0000000..c70bb4c --- /dev/null +++ b/examples/frost-influx-grafana.flow.json @@ -0,0 +1,870 @@ +[ + { + "id": "cse_tab", + "type": "tab", + "label": "CoreSync FROST + Influx + Grafana", + "disabled": false, + "info": "Measurement and rotatingMachine telemetry are written as raw samples to local InfluxDB and as CoreSync-reduced knots to FROST and InfluxDB." + }, + { + "id": "cse_note", + "type": "comment", + "z": "cse_tab", + "name": "Set FROST_USER and FROST_PASSWORD in the Node-RED environment before deploying.", + "info": "CoreSync emits FROST-ready HTTP requests. The HTTP request response is fed back with topic=frost.response so metadata lookup/create can continue.", + "x": 450, + "y": 40, + "wires": [] + }, + { + "id": "cse_ui_base", + "type": "ui-base", + "name": "EVOLV CoreSync", + "path": "/dashboard", + "appIcon": "", + "includeClientData": true, + "acceptsClientConfig": [ + "ui-notification", + "ui-control" + ], + "showPathInSidebar": false, + "headerContent": "page", + "navigationStyle": "default", + "titleBarStyle": "default" + }, + { + "id": "cse_ui_theme", + "type": "ui-theme", + "name": "CoreSync Theme", + "colors": { + "surface": "#ffffff", + "primary": "#2364aa", + "bgPage": "#f3f5f7", + "groupBg": "#ffffff", + "groupOutline": "#d5dce3" + }, + "sizes": { + "density": "compact", + "pagePadding": "12px", + "groupGap": "12px", + "groupBorderRadius": "6px", + "widgetGap": "8px" + } + }, + { + "id": "cse_ui_page", + "type": "ui-page", + "name": "CoreSync FROST", + "ui": "cse_ui_base", + "path": "/coresync-frost", + "icon": "timeline", + "layout": "grid", + "theme": "cse_ui_theme", + "breakpoints": [ + { + "name": "Default", + "px": "0", + "cols": "12" + } + ], + "order": 1, + "className": "" + }, + { + "id": "cse_ui_group_kpi", + "type": "ui-group", + "name": "Reduction Counters", + "page": "cse_ui_page", + "width": "12", + "height": "1", + "order": 1, + "showTitle": true, + "className": "" + }, + { + "id": "cse_ui_group_chart", + "type": "ui-group", + "name": "Write Rates", + "page": "cse_ui_page", + "width": "12", + "height": "1", + "order": 2, + "showTitle": true, + "className": "" + }, + { + "id": "cse_measure_flow", + "type": "measurement", + "z": "cse_tab", + "name": "FROST Flow Sensor FT-101", + "scaling": false, + "i_min": 0, + "i_max": 120, + "i_offset": 0, + "o_min": 0, + "o_max": 120, + "simulator": false, + "smooth_method": "none", + "count": 1, + "processOutputFormat": "process", + "dbaseOutputFormat": "frost", + "uuid": "", + "assetTagCode": "FT-101", + "tagCode": "FT-101", + "supplier": "demo", + "category": "sensor", + "assetType": "flow", + "model": "demo-flow-transmitter", + "unit": "m3/h", + "enableLog": false, + "logLevel": "error", + "positionVsParent": "upstream", + "positionIcon": "", + "hasDistance": false, + "distance": 0, + "distanceUnit": "m", + "distanceDescription": "", + "x": 560, + "y": 160, + "wires": [ + [ + "cse_dbg_measure_process" + ], + [ + "cse_fn_measure_to_coresync", + "cse_fn_raw_influx" + ], + [ + "cse_rm_pump" + ] + ] + }, + { + "id": "cse_inj_measure_loop", + "type": "inject", + "z": "cse_tab", + "name": "1 Hz directional flow", + "props": [ + { + "p": "payload" + } + ], + "repeat": "1", + "crontab": "", + "once": true, + "onceDelay": "1", + "topic": "", + "payload": "", + "payloadType": "date", + "x": 160, + "y": 160, + "wires": [ + [ + "cse_fn_measure_triangle" + ] + ] + }, + { + "id": "cse_fn_measure_triangle", + "type": "function", + "z": "cse_tab", + "name": "triangle 0..100..0", + "func": "let i = context.get('i') || 0;\nconst phase = i % 80;\nconst value = phase <= 40 ? phase * 2.5 : (80 - phase) * 2.5;\ncontext.set('i', i + 1);\nmsg.topic = 'measurement';\nmsg.payload = Math.round(value * 100) / 100;\nreturn msg;", + "outputs": 1, + "noerr": 0, + "x": 360, + "y": 160, + "wires": [ + [ + "cse_measure_flow" + ] + ] + }, + { + "id": "cse_fn_measure_to_coresync", + "type": "function", + "z": "cse_tab", + "name": "mAbs -> flow.measured.upstream.FT-101", + "func": "const p = msg.payload;\nif (!p || !p.fields) return null;\nconst value = Number(p.fields.mAbs);\nif (!Number.isFinite(value)) return null;\nmsg.payload = {\n measurement: 'FT-101',\n fields: { 'flow.measured.upstream.FT-101': value },\n tags: { ...(p.tags || {}), tagcode: 'P-101', sensorTag: 'FT-101', unit: 'm3/h' },\n timestamp: p.timestamp || new Date().toISOString(),\n source: { ...(p.source || {}), softwareType: 'measurement', unit: 'm3/h' }\n};\nmsg.topic = msg.payload.measurement;\nreturn msg;", + "outputs": 1, + "noerr": 0, + "x": 850, + "y": 140, + "wires": [ + [ + "cse_coresync" + ] + ] + }, + { + "id": "cse_rm_pump", + "type": "rotatingMachine", + "z": "cse_tab", + "name": "FROST Pump P-101", + "speed": "25", + "startup": "1", + "warmup": "1", + "shutdown": "1", + "cooldown": "1", + "movementMode": "staticspeed", + "machineCurve": "", + "processOutputFormat": "process", + "dbaseOutputFormat": "frost", + "uuid": "", + "assetTagCode": "P-101", + "assetTagNumber": "P-101", + "supplier": "", + "category": "", + "assetType": "", + "model": "hidrostal-H05K-S03R", + "unit": "m3/h", + "enableLog": false, + "logLevel": "error", + "positionVsParent": "atEquipment", + "positionIcon": "", + "hasDistance": false, + "distance": "", + "distanceUnit": "m", + "distanceDescription": "", + "x": 560, + "y": 360, + "wires": [ + [ + "cse_dbg_rm_process" + ], + [ + "cse_coresync", + "cse_fn_raw_influx" + ], + [ + "cse_dbg_rm_parent" + ] + ] + }, + { + "id": "cse_inj_rm_mode", + "type": "inject", + "z": "cse_tab", + "name": "RM mode virtualControl", + "props": [ + { + "p": "topic", + "vt": "str" + }, + { + "p": "payload", + "vt": "str" + } + ], + "repeat": "", + "crontab": "", + "once": true, + "onceDelay": "0.5", + "topic": "setMode", + "payload": "virtualControl", + "payloadType": "str", + "x": 180, + "y": 280, + "wires": [ + [ + "cse_rm_pump" + ] + ] + }, + { + "id": "cse_inj_rm_start", + "type": "inject", + "z": "cse_tab", + "name": "RM startup", + "props": [ + { + "p": "payload", + "vt": "str" + } + ], + "repeat": "", + "crontab": "", + "once": true, + "onceDelay": "1.5", + "topic": "", + "payload": "startup", + "payloadType": "str", + "x": 150, + "y": 320, + "wires": [ + [ + "cse_fn_rm_sequence" + ] + ] + }, + { + "id": "cse_fn_rm_sequence", + "type": "function", + "z": "cse_tab", + "name": "execSequence", + "func": "msg.topic = 'execSequence';\nmsg.payload = { source: 'GUI', action: 'execSequence', parameter: msg.payload };\nreturn msg;", + "outputs": 1, + "noerr": 0, + "x": 340, + "y": 320, + "wires": [ + [ + "cse_rm_pump" + ] + ] + }, + { + "id": "cse_inj_rm_setpoint", + "type": "inject", + "z": "cse_tab", + "name": "RM 5s setpoint wave", + "props": [ + { + "p": "payload" + } + ], + "repeat": "5", + "crontab": "", + "once": true, + "onceDelay": "3", + "topic": "", + "payload": "", + "payloadType": "date", + "x": 170, + "y": 380, + "wires": [ + [ + "cse_fn_rm_setpoint" + ] + ] + }, + { + "id": "cse_fn_rm_setpoint", + "type": "function", + "z": "cse_tab", + "name": "execMovement wave", + "func": "let i = context.get('i') || 0;\nconst values = [25, 55, 85, 60, 35];\nconst setpoint = values[i % values.length];\ncontext.set('i', i + 1);\nmsg.topic = 'execMovement';\nmsg.payload = { source: 'GUI', action: 'execMovement', setpoint };\nreturn msg;", + "outputs": 1, + "noerr": 0, + "x": 360, + "y": 380, + "wires": [ + [ + "cse_rm_pump" + ] + ] + }, + { + "id": "cse_inj_rm_pressure", + "type": "inject", + "z": "cse_tab", + "name": "RM pressure cycle", + "props": [ + { + "p": "payload" + } + ], + "repeat": "2", + "crontab": "", + "once": true, + "onceDelay": "1", + "topic": "", + "payload": "", + "payloadType": "date", + "x": 160, + "y": 440, + "wires": [ + [ + "cse_fn_rm_pressure" + ] + ] + }, + { + "id": "cse_fn_rm_pressure", + "type": "function", + "z": "cse_tab", + "name": "simulate upstream/downstream pressure", + "func": "let i = context.get('i') || 0;\nconst up = 900 + ((i % 6) * 10);\nconst down = 1700 + ((i % 6) * 40);\ncontext.set('i', i + 1);\nreturn [\n { topic: 'simulateMeasurement', payload: { type: 'pressure', position: 'upstream', value: up, unit: 'mbar' } },\n { topic: 'simulateMeasurement', payload: { type: 'pressure', position: 'downstream', value: down, unit: 'mbar' } }\n];", + "outputs": 2, + "noerr": 0, + "x": 390, + "y": 440, + "wires": [ + [ + "cse_rm_pump" + ], + [ + "cse_rm_pump" + ] + ] + }, + { + "id": "cse_coresync", + "type": "coresync", + "z": "cse_tab", + "name": "CoreSync to WBD FROST", + "frostBaseUrl": "https://sta.wbd-rd.nl/FROST-Server/", + "serviceVersion": "v1.1", + "dbaseFormat": "frost", + "assetTagOverride": "", + "sensorTagOverride": "", + "comparisonMode": "angle", + "angleToleranceDeg": 1, + "timeScaleMs": 1000, + "maxGapMs": 30000, + "minDeltaTimeMs": 0, + "minDeltaValue": 0, + "burstWindowMs": 10, + "maxQueuedObservationsPerStream": 2, + "diagnosticsEnabled": false, + "x": 1120, + "y": 260, + "wires": [ + [ + "cse_dbg_coresync_process" + ], + [ + "cse_fn_frost_auth", + "cse_fn_knot_influx" + ], + [ + "cse_dbg_coresync_parent" + ] + ] + }, + { + "id": "cse_inj_flush", + "type": "inject", + "z": "cse_tab", + "name": "CoreSync flush 15s", + "props": [ + { + "p": "topic", + "vt": "str" + } + ], + "repeat": "15", + "crontab": "", + "once": false, + "onceDelay": "", + "topic": "coresync.flush", + "x": 860, + "y": 300, + "wires": [ + [ + "cse_coresync" + ] + ] + }, + { + "id": "cse_fn_frost_auth", + "type": "function", + "z": "cse_tab", + "name": "FROST basic auth from env", + "func": "const user = env.get('FROST_USER');\nconst password = env.get('FROST_PASSWORD');\nif (!user || !password) {\n node.status({ fill: 'red', shape: 'ring', text: 'missing FROST_USER/FROST_PASSWORD' });\n return null;\n}\nmsg.headers = { ...(msg.headers || {}), Authorization: 'Basic ' + Buffer.from(`${user}:${password}`).toString('base64') };\nnode.status({ fill: 'green', shape: 'dot', text: msg.topic });\nreturn msg;", + "outputs": 1, + "noerr": 0, + "x": 1420, + "y": 220, + "wires": [ + [ + "cse_http_frost" + ] + ] + }, + { + "id": "cse_http_frost", + "type": "http request", + "z": "cse_tab", + "name": "Send to FROST", + "method": "use", + "ret": "txt", + "paytoqs": false, + "url": "", + "persist": false, + "authType": "", + "senderr": false, + "x": 1630, + "y": 220, + "wires": [ + [ + "cse_fn_frost_response" + ] + ] + }, + { + "id": "cse_fn_frost_response", + "type": "function", + "z": "cse_tab", + "name": "topic=frost.response", + "func": "if (typeof msg.payload === 'string') {\n try { msg.payload = JSON.parse(msg.payload); } catch (_) { /* keep raw */ }\n}\nconst metric = (msg._coreSync?.kind === 'observation' && msg.statusCode && msg.statusCode < 400)\n ? { topic: 'frostObservation', payload: 1 }\n : null;\nmsg.topic = 'frost.response';\nreturn [msg, metric];", + "outputs": 2, + "noerr": 0, + "x": 1840, + "y": 220, + "wires": [ + [ + "cse_coresync", + "cse_dbg_frost_response" + ], + [ + "cse_fn_reduction_metrics" + ] + ] + }, + { + "id": "cse_fn_raw_influx", + "type": "function", + "z": "cse_tab", + "name": "raw EVOLV -> Influx line protocol", + "func": "const p = msg.payload;\nif (!p || !p.measurement || !p.fields) return null;\nconst esc = (s) => String(s).replace(/,/g, '\\\\,').replace(/ /g, '\\\\ ').replace(/=/g, '\\\\=');\nconst escString = (s) => String(s).replace(/\"/g, '\\\\\"');\nconst tags = Object.entries(p.tags || {})\n .filter(([, v]) => v !== undefined && v !== null && v !== '')\n .map(([k, v]) => `${esc(k)}=${esc(v)}`)\n .join(',');\nconst fields = Object.entries(p.fields)\n .filter(([, v]) => v !== undefined && v !== null)\n .map(([k, v]) => {\n const n = Number(v);\n if (typeof v === 'number' && Number.isFinite(v)) return `${esc(k)}=${v}`;\n if (typeof v === 'boolean') return `${esc(k)}=${v}`;\n if (Number.isFinite(n) && String(v).trim() !== '') return `${esc(k)}=${n}`;\n return `${esc(k)}=\"${escString(v)}\"`;\n });\nif (!fields.length) return null;\nconst t = Date.parse(p.timestamp);\nconst ns = `${Number.isFinite(t) ? t : Date.now()}000000`;\nmsg._coreSyncDemo = { rawFields: fields.length };\nmsg.payload = `${esc(p.measurement)}${tags ? ',' + tags : ''} ${fields.join(',')} ${ns}`;\nmsg.headers = { Authorization: 'Token evolv-dev-token', 'Content-Type': 'text/plain' };\nmsg.url = 'http://influxdb:8086/api/v2/write?org=evolv&bucket=telemetry&precision=ns';\nmsg.method = 'POST';\nreturn msg;", + "outputs": 1, + "noerr": 0, + "x": 880, + "y": 560, + "wires": [ + [ + "cse_http_influx_raw" + ] + ] + }, + { + "id": "cse_http_influx_raw", + "type": "http request", + "z": "cse_tab", + "name": "Write raw InfluxDB", + "method": "use", + "ret": "txt", + "paytoqs": false, + "url": "", + "persist": false, + "authType": "", + "senderr": false, + "x": 1140, + "y": 560, + "wires": [ + [ + "cse_fn_influx_raw_status" + ] + ] + }, + { + "id": "cse_fn_knot_influx", + "type": "function", + "z": "cse_tab", + "name": "CoreSync knot -> Influx line protocol", + "func": "if (msg.topic !== 'frost.observation.create' || !msg.payload) return null;\nconst obs = msg.payload;\nconst params = obs.parameters || {};\nconst streamKey = params.evolvStreamKey || msg._coreSync?.streamKey || 'unknown';\nconst parts = streamKey.split(':');\nconst esc = (s) => String(s).replace(/,/g, '\\\\,').replace(/ /g, '\\\\ ').replace(/=/g, '\\\\=');\nconst tags = {\n streamKey,\n thing: parts[0] || 'unknown',\n type: parts[1] || 'unknown',\n variant: parts[2] || 'unknown',\n position: parts[3] || 'unknown',\n sensorTag: parts[4] || 'unknown',\n reason: params.reductionReason || 'unknown',\n direction: params.direction || 'unknown'\n};\nconst tagText = Object.entries(tags).map(([k, v]) => `${esc(k)}=${esc(v)}`).join(',');\nconst fields = [`result=${Number(obs.result)}`, 'knot=1i'];\nif (Number.isFinite(Number(params.slope))) fields.push(`slope=${Number(params.slope)}`);\nif (Number.isFinite(Number(params.previousValue))) fields.push(`previousValue=${Number(params.previousValue)}`);\nconst t = Date.parse(obs.phenomenonTime);\nconst ns = `${Number.isFinite(t) ? t : Date.now()}000000`;\nmsg._coreSyncDemo = { knotFields: 1 };\nmsg.payload = `coresync_knots,${tagText} ${fields.join(',')} ${ns}`;\nmsg.headers = { Authorization: 'Token evolv-dev-token', 'Content-Type': 'text/plain' };\nmsg.url = 'http://influxdb:8086/api/v2/write?org=evolv&bucket=telemetry&precision=ns';\nmsg.method = 'POST';\nreturn msg;", + "outputs": 1, + "noerr": 0, + "x": 1430, + "y": 300, + "wires": [ + [ + "cse_http_influx_knot" + ] + ] + }, + { + "id": "cse_http_influx_knot", + "type": "http request", + "z": "cse_tab", + "name": "Write knot InfluxDB", + "method": "use", + "ret": "txt", + "paytoqs": false, + "url": "", + "persist": false, + "authType": "", + "senderr": false, + "x": 1680, + "y": 300, + "wires": [ + [ + "cse_fn_influx_knot_status" + ] + ] + }, + { + "id": "cse_fn_influx_raw_status", + "type": "function", + "z": "cse_tab", + "name": "raw write status", + "func": "const count = (context.get('count') || 0) + 1;\nconst errors = context.get('errors') || 0;\nif (msg.statusCode && msg.statusCode >= 400) {\n context.set('errors', errors + 1);\n node.status({ fill: 'red', shape: 'ring', text: `raw ERR ${errors + 1}/${count}` });\n context.set('count', count);\n return null;\n}\nnode.status({ fill: 'green', shape: 'dot', text: `raw ${count} writes` });\ncontext.set('count', count);\nreturn { topic: 'rawFields', payload: msg._coreSyncDemo?.rawFields || 1 };", + "outputs": 1, + "noerr": 0, + "x": 1360, + "y": 560, + "wires": [ + [ + "cse_fn_reduction_metrics" + ] + ] + }, + { + "id": "cse_fn_influx_knot_status", + "type": "function", + "z": "cse_tab", + "name": "knot write status", + "func": "const count = (context.get('count') || 0) + 1;\nconst errors = context.get('errors') || 0;\nif (msg.statusCode && msg.statusCode >= 400) {\n context.set('errors', errors + 1);\n node.status({ fill: 'red', shape: 'ring', text: `knot ERR ${errors + 1}/${count}` });\n context.set('count', count);\n return null;\n}\nnode.status({ fill: 'green', shape: 'dot', text: `knot ${count} writes` });\ncontext.set('count', count);\nreturn { topic: 'knotFields', payload: msg._coreSyncDemo?.knotFields || 1 };", + "outputs": 1, + "noerr": 0, + "x": 1900, + "y": 300, + "wires": [ + [ + "cse_fn_reduction_metrics" + ] + ] + }, + { + "id": "cse_fn_reduction_metrics", + "type": "function", + "z": "cse_tab", + "name": "Live reduction metrics", + "func": "const stats = context.get('stats') || { rawFields: 0, knotFields: 0, frostObservations: 0, started: Date.now() };\nconst n = Number(msg.payload) || 0;\nif (msg.topic === 'rawFields') stats.rawFields += n;\nif (msg.topic === 'knotFields') stats.knotFields += n;\nif (msg.topic === 'frostObservation') stats.frostObservations += n;\ncontext.set('stats', stats);\n\nconst saved = stats.rawFields > 0 ? Math.max(0, 100 * (1 - (stats.knotFields / stats.rawFields))) : 0;\nconst elapsedSec = Math.max(1, (Date.now() - stats.started) / 1000);\nconst rawRate = stats.rawFields / elapsedSec;\nconst knotRate = stats.knotFields / elapsedSec;\n\nnode.status({ fill: 'green', shape: 'dot', text: `${stats.rawFields} raw -> ${stats.knotFields} knots (${saved.toFixed(1)}% saved)` });\n\nreturn [\n { payload: `Raw fields: ${stats.rawFields} | CoreSync knots: ${stats.knotFields} | FROST observations: ${stats.frostObservations} | Saved: ${saved.toFixed(1)}%` },\n { payload: Number(saved.toFixed(1)) },\n { topic: 'raw fields/s', payload: Number(rawRate.toFixed(3)), timestamp: Date.now() },\n { topic: 'knot fields/s', payload: Number(knotRate.toFixed(3)), timestamp: Date.now() },\n { payload: stats.frostObservations }\n];", + "outputs": 5, + "noerr": 0, + "x": 1620, + "y": 500, + "wires": [ + [ + "cse_ui_text_summary" + ], + [ + "cse_ui_gauge_saved" + ], + [ + "cse_ui_chart_raw" + ], + [ + "cse_ui_chart_knot" + ], + [ + "cse_ui_text_frost" + ] + ] + }, + { + "id": "cse_ui_text_summary", + "type": "ui-text", + "z": "cse_tab", + "group": "cse_ui_group_kpi", + "name": "Reduction summary", + "label": "Reduction", + "order": 1, + "width": "8", + "height": "1", + "format": "{{msg.payload}}", + "layout": "row-spread", + "style": false, + "font": "", + "fontSize": "", + "color": "", + "x": 1890, + "y": 460, + "wires": [] + }, + { + "id": "cse_ui_text_frost", + "type": "ui-text", + "z": "cse_tab", + "group": "cse_ui_group_kpi", + "name": "FROST observations", + "label": "FROST observations", + "order": 2, + "width": "4", + "height": "1", + "format": "{{msg.payload}}", + "layout": "row-spread", + "style": false, + "font": "", + "fontSize": "", + "color": "", + "x": 1900, + "y": 620, + "wires": [] + }, + { + "id": "cse_ui_gauge_saved", + "type": "ui-gauge", + "z": "cse_tab", + "group": "cse_ui_group_kpi", + "name": "Saved gauge", + "gtype": "gauge-half", + "gstyle": "Rounded", + "title": "Data Saved", + "units": "%", + "prefix": "", + "suffix": "%", + "min": 0, + "max": 100, + "segments": [ + { + "color": "#d64545", + "from": 0 + }, + { + "color": "#e8a23a", + "from": 40 + }, + { + "color": "#2f9e44", + "from": 70 + } + ], + "width": "4", + "height": "3", + "order": 3, + "x": 1880, + "y": 500, + "wires": [] + }, + { + "id": "cse_ui_chart_raw", + "type": "ui-chart", + "z": "cse_tab", + "group": "cse_ui_group_chart", + "name": "Raw write rate", + "label": "Raw field write rate", + "order": 1, + "width": "6", + "height": "4", + "chartType": "line", + "category": "topic", + "xAxisLabel": "time", + "xAxisType": "time", + "yAxisLabel": "fields/s", + "removeOlder": "15", + "removeOlderUnit": "60", + "x": 1890, + "y": 540, + "wires": [], + "showLegend": false, + "categoryType": "msg", + "xAxisProperty": "", + "xAxisPropertyType": "timestamp", + "yAxisProperty": "payload", + "yAxisPropertyType": "msg", + "action": "append", + "interpolation": "linear" + }, + { + "id": "cse_ui_chart_knot", + "type": "ui-chart", + "z": "cse_tab", + "group": "cse_ui_group_chart", + "name": "Knot write rate", + "label": "CoreSync knot write rate", + "order": 2, + "width": "6", + "height": "4", + "chartType": "line", + "category": "topic", + "xAxisLabel": "time", + "xAxisType": "time", + "yAxisLabel": "knots/s", + "removeOlder": "15", + "removeOlderUnit": "60", + "x": 1890, + "y": 580, + "wires": [], + "showLegend": false, + "categoryType": "msg", + "xAxisProperty": "", + "xAxisPropertyType": "timestamp", + "yAxisProperty": "payload", + "yAxisPropertyType": "msg", + "action": "append", + "interpolation": "linear" + }, + { + "id": "cse_dbg_measure_process", + "type": "debug", + "z": "cse_tab", + "name": "measurement process", + "active": false, + "tosidebar": true, + "console": false, + "tostatus": false, + "complete": "true", + "targetType": "full", + "x": 850, + "y": 100, + "wires": [] + }, + { + "id": "cse_dbg_rm_process", + "type": "debug", + "z": "cse_tab", + "name": "rotatingMachine process", + "active": false, + "tosidebar": true, + "console": false, + "tostatus": false, + "complete": "true", + "targetType": "full", + "x": 840, + "y": 360, + "wires": [] + }, + { + "id": "cse_dbg_rm_parent", + "type": "debug", + "z": "cse_tab", + "name": "rotatingMachine parent", + "active": false, + "tosidebar": true, + "console": false, + "tostatus": false, + "complete": "true", + "targetType": "full", + "x": 840, + "y": 400, + "wires": [] + }, + { + "id": "cse_dbg_coresync_process", + "type": "debug", + "z": "cse_tab", + "name": "CoreSync diagnostics", + "active": true, + "tosidebar": true, + "console": false, + "tostatus": false, + "complete": "true", + "targetType": "full", + "x": 1410, + "y": 180, + "wires": [] + }, + { + "id": "cse_dbg_coresync_parent", + "type": "debug", + "z": "cse_tab", + "name": "CoreSync parent", + "active": false, + "tosidebar": true, + "console": false, + "tostatus": false, + "complete": "true", + "targetType": "full", + "x": 1400, + "y": 340, + "wires": [] + }, + { + "id": "cse_dbg_frost_response", + "type": "debug", + "z": "cse_tab", + "name": "FROST response", + "active": false, + "tosidebar": true, + "console": false, + "tostatus": false, + "complete": "statusCode", + "targetType": "msg", + "x": 2070, + "y": 220, + "wires": [] + } +] diff --git a/src/coreSyncDomain.js b/src/coreSyncDomain.js index e82fb14..4023ddb 100644 --- a/src/coreSyncDomain.js +++ b/src/coreSyncDomain.js @@ -87,6 +87,13 @@ class CoreSyncDomain { return [null, requests.length ? requests : null, null]; } + if (meta.action === 'create' && statusCode >= 200 && statusCode < 300) { + state.inFlightMetadata.delete(meta.kind); + requests.push(lookupRequest(this.config, state.meta, meta.kind)); + state.inFlightMetadata.add(meta.kind); + return [null, requests, null]; + } + if (meta.action === 'lookup' && (statusCode === 200 || statusCode === 0)) { state.inFlightMetadata.delete(meta.kind); requests.push(createRequest(this.config, state.meta, meta.kind, state.ids)); @@ -176,6 +183,8 @@ class CoreSyncDomain { _valueScaleFor(point) { const configured = this.config.reducer.valueScaleByType || DEFAULT_VALUE_SCALE_BY_TYPE; + if (point.type === 'pressure' && String(point.unit || '').toLowerCase() === 'mbar') return 1000; + if (point.type === 'pressure' && String(point.unit || '').toLowerCase() === 'bar') return 10; return configured[point.type] || this.config.reducer.valueScale || 1; } diff --git a/src/frostRequests.js b/src/frostRequests.js index d0c606e..78997f0 100644 --- a/src/frostRequests.js +++ b/src/frostRequests.js @@ -87,6 +87,11 @@ function observationRequest(config, candidate, datastreamId, foiId) { parameters: { reduction: 'knot', reductionReason: candidate.reason, + direction: candidate.direction || 'unknown', + slope: Number.isFinite(candidate.slope) ? candidate.slope : 0, + previousPhenomenonTime: candidate.previousPhenomenonTime, + previousValue: candidate.previousValue, + interpolationHint: 'linear-or-directional-monotone-cubic', evolvFieldKey: meta.fieldKey, evolvStreamKey: meta.streamKey, sourceMeasurement: meta.measurement, @@ -142,6 +147,7 @@ function createPayload(kind, meta, ids) { }), sensor: () => ({ name: meta.sensorTag, + description: `EVOLV sensor ${meta.sensorTag}`, encodingType: 'application/json', metadata: JSON.stringify({ source: 'EVOLV', variant: meta.variant }), properties: { diff --git a/src/identity.js b/src/identity.js index 84f9a07..4448ed9 100644 --- a/src/identity.js +++ b/src/identity.js @@ -13,6 +13,16 @@ const DEFAULT_UNITS = { efficiency: '1', }; +const ROTATING_MACHINE_OUTPUT_UNITS = { + pressure: 'mbar', + flow: 'm3/h', + power: 'kW', + temperature: 'C', + atmpressure: 'Pa', + ctrl: '%', + efficiency: '1', +}; + function firstPresent(...values) { for (const value of values) { if (value !== undefined && value !== null && value !== '') return value; @@ -49,7 +59,7 @@ function resolveIdentity(input, options = {}) { options.sensorTagOverride, parsed.sensorTag || tags.sensorTag || tags.sensor_tagCode || `${variant.toUpperCase()}-${thingTag}`, ); - const unit = sanitizeToken(input.unit, tags.unit || input.source?.unit || DEFAULT_UNITS[type] || '1'); + const unit = sanitizeToken(input.unit, inferredUnitFor({ tags, source: input.source, type }) || DEFAULT_UNITS[type] || '1'); const streamKey = [thingTag, type, variant, position, sensorTag].join(':'); return { @@ -69,8 +79,18 @@ function resolveIdentity(input, options = {}) { }; } +function inferredUnitFor({ tags = {}, source = {}, type }) { + const softwareType = String(tags.softwareType || source.softwareType || '').toLowerCase(); + if (softwareType === 'rotatingmachine' && ROTATING_MACHINE_OUTPUT_UNITS[type]) { + return ROTATING_MACHINE_OUTPUT_UNITS[type]; + } + return tags.unit || source.unit; +} + module.exports = { DEFAULT_UNITS, + ROTATING_MACHINE_OUTPUT_UNITS, + inferredUnitFor, parseFieldKey, resolveIdentity, }; diff --git a/src/interpolation.js b/src/interpolation.js new file mode 100644 index 0000000..964cd7e --- /dev/null +++ b/src/interpolation.js @@ -0,0 +1,151 @@ +'use strict'; + +function toMs(value) { + if (value instanceof Date) return value.getTime(); + if (typeof value === 'number') return value; + return new Date(value).getTime(); +} + +function normalizeKnots(knots) { + return (Array.isArray(knots) ? knots : []) + .map((knot) => ({ + ...knot, + timeMs: toMs(knot.time ?? knot.phenomenonTime ?? knot.timestamp), + value: Number(knot.value ?? knot.result), + direction: knot.direction || knot.parameters?.direction || 'unknown', + })) + .filter((knot) => Number.isFinite(knot.timeMs) && Number.isFinite(knot.value)) + .sort((a, b) => a.timeMs - b.timeMs); +} + +function directionSign(direction) { + if (direction === 'rising') return 1; + if (direction === 'falling') return -1; + if (direction === 'flat') return 0; + return null; +} + +function secants(knots) { + const d = []; + for (let i = 0; i < knots.length - 1; i += 1) { + const dt = knots[i + 1].timeMs - knots[i].timeMs; + d.push(dt > 0 ? (knots[i + 1].value - knots[i].value) / dt : 0); + } + return d; +} + +function directionalTangents(knots) { + if (knots.length < 2) return knots.map(() => 0); + const d = secants(knots); + const m = new Array(knots.length).fill(0); + m[0] = d[0]; + m[m.length - 1] = d[d.length - 1]; + + for (let i = 1; i < knots.length - 1; i += 1) { + if (d[i - 1] === 0 || d[i] === 0 || Math.sign(d[i - 1]) !== Math.sign(d[i])) { + m[i] = 0; + } else { + m[i] = (d[i - 1] + d[i]) / 2; + } + } + + for (let i = 0; i < knots.length; i += 1) { + const sign = directionSign(knots[i].direction); + if (sign === 0) { + m[i] = 0; + } else if (sign !== null && Math.sign(m[i]) !== 0 && Math.sign(m[i]) !== sign) { + m[i] = 0; + } + } + + for (let i = 0; i < d.length; i += 1) { + if (d[i] === 0) { + m[i] = 0; + m[i + 1] = 0; + continue; + } + const a = m[i] / d[i]; + const b = m[i + 1] / d[i]; + const sum = a * a + b * b; + if (sum > 9) { + const tau = 3 / Math.sqrt(sum); + m[i] = tau * a * d[i]; + m[i + 1] = tau * b * d[i]; + } + } + + return m; +} + +function segmentIndex(knots, timeMs) { + if (timeMs <= knots[0].timeMs) return 0; + for (let i = 0; i < knots.length - 1; i += 1) { + if (timeMs >= knots[i].timeMs && timeMs <= knots[i + 1].timeMs) return i; + } + return knots.length - 2; +} + +function interpolateAt(knotsInput, time, options = {}) { + const knots = normalizeKnots(knotsInput); + if (knots.length === 0) return null; + if (knots.length === 1) return knots[0].value; + + const timeMs = toMs(time); + if (!Number.isFinite(timeMs)) return null; + const i = segmentIndex(knots, timeMs); + const a = knots[i]; + const b = knots[i + 1]; + const h = b.timeMs - a.timeMs; + if (h <= 0) return a.value; + + const t = Math.min(1, Math.max(0, (timeMs - a.timeMs) / h)); + if ((options.method || 'linear') === 'linear') { + return a.value + ((b.value - a.value) * t); + } + + const tangents = directionalTangents(knots); + const t2 = t * t; + const t3 = t2 * t; + const h00 = (2 * t3) - (3 * t2) + 1; + const h10 = t3 - (2 * t2) + t; + const h01 = (-2 * t3) + (3 * t2); + const h11 = t3 - t2; + return (h00 * a.value) + (h10 * h * tangents[i]) + (h01 * b.value) + (h11 * h * tangents[i + 1]); +} + +function reconstructSeries(knotsInput, options = {}) { + const knots = normalizeKnots(knotsInput); + if (knots.length === 0) return []; + if (knots.length === 1) return [{ time: new Date(knots[0].timeMs).toISOString(), value: knots[0].value, reconstructed: false }]; + + const intervalMs = Math.max(Number(options.intervalMs) || 1000, 1); + const startMs = Number.isFinite(toMs(options.start)) ? toMs(options.start) : knots[0].timeMs; + const endMs = Number.isFinite(toMs(options.end)) ? toMs(options.end) : knots[knots.length - 1].timeMs; + const values = []; + const knotTimes = new Set(knots.map((knot) => knot.timeMs)); + + for (let t = startMs; t <= endMs; t += intervalMs) { + values.push({ + time: new Date(t).toISOString(), + value: interpolateAt(knots, t, options), + reconstructed: !knotTimes.has(t), + }); + } + + if (values[values.length - 1]?.time !== new Date(endMs).toISOString()) { + values.push({ + time: new Date(endMs).toISOString(), + value: interpolateAt(knots, endMs, options), + reconstructed: !knotTimes.has(endMs), + }); + } + + return values; +} + +module.exports = { + directionalTangents, + interpolateAt, + normalizeKnots, + reconstructSeries, +}; diff --git a/src/reducer.js b/src/reducer.js index 890043c..778e1c0 100644 --- a/src/reducer.js +++ b/src/reducer.js @@ -7,6 +7,7 @@ const DEFAULTS = { maxGapMs: 300000, minDeltaTimeMs: 0, minDeltaValue: 0, + burstWindowMs: 0, comparisonMode: 'angle', relativeSlopeTolerance: 0.1, }; @@ -33,6 +34,25 @@ function relativeSlope(from, to, options) { return ((Number(to.value) - Number(from.value)) / Math.max(Math.abs(Number(options.valueScale) || DEFAULTS.valueScale), Number.EPSILON)) / dt; } +function directionFor(from, to) { + if (!from || !to) return 'unknown'; + const delta = Number(to.value) - Number(from.value); + if (delta > 0) return 'rising'; + if (delta < 0) return 'falling'; + return 'flat'; +} + +function candidate(point, reason, from, to, options = {}) { + return { + point, + reason, + direction: directionFor(from, to || point), + slope: from && to ? relativeSlope(from, to, options) : 0, + previousPhenomenonTime: from?.phenomenonTime, + previousValue: from?.value, + }; +} + class PointReducer { constructor(options = {}) { this.options = { ...DEFAULTS, ...options }; @@ -47,27 +67,42 @@ class PointReducer { if (!this.anchor) { this.anchor = point; this.previous = point; - output.push({ point, reason: 'first' }); + output.push(candidate(point, 'first', null, point, this.options)); return output; } const minDeltaTimeMs = Number(this.options.minDeltaTimeMs) || 0; const minDeltaValue = Number(this.options.minDeltaValue) || 0; + const burstWindowMs = Number(this.options.burstWindowMs) || 0; const dtFromPrevious = toMs(point) - toMs(this.previous); const dvFromPrevious = Math.abs(Number(point.value) - Number(this.previous.value)); if (dtFromPrevious < minDeltaTimeMs && dvFromPrevious < minDeltaValue) { this.previous = point; return output; } + // Sub-tick burst: when two samples arrive within burstWindowMs, treat them + // as the same wall-clock observation. Replace previous with the latest value + // but skip slope analysis (dt this small produces unreliable angle estimates + // that trip angle-change on every tick). + if (burstWindowMs > 0 && dtFromPrevious >= 0 && dtFromPrevious < burstWindowMs) { + this.previous = point; + return output; + } const maxGapMs = Number(this.options.maxGapMs) || 0; if (maxGapMs > 0 && this.previous !== this.anchor && toMs(point) - toMs(this.anchor) >= maxGapMs) { - output.push({ point: this.previous, reason: 'max-gap' }); + output.push(candidate(this.previous, 'max-gap', this.anchor, this.previous, this.options)); this.anchor = this.previous; } if (this.previous !== this.anchor && this._changedDirection(point)) { - output.push({ point: this.previous, reason: this.options.comparisonMode === 'relative-slope' ? 'slope-change' : 'angle-change' }); + output.push(candidate( + this.previous, + this.options.comparisonMode === 'relative-slope' ? 'slope-change' : 'angle-change', + this.anchor, + this.previous, + this.options, + )); this.anchor = this.previous; } @@ -78,8 +113,9 @@ class PointReducer { flush(reason = 'flush') { if (this.previous && this.previous !== this.anchor) { const point = this.previous; + const kept = candidate(point, reason, this.anchor, point, this.options); this.anchor = point; - return [{ point, reason }]; + return [kept]; } return []; } @@ -103,4 +139,5 @@ module.exports = { PointReducer, angleDeg, angleDiff, + directionFor, }; diff --git a/test/basic/coresync.basic.test.js b/test/basic/coresync.basic.test.js index 317fd50..daf231e 100644 --- a/test/basic/coresync.basic.test.js +++ b/test/basic/coresync.basic.test.js @@ -1,8 +1,12 @@ const assert = require('node:assert/strict'); +const fs = require('node:fs'); +const path = require('node:path'); -const { normalizeInput } = require('../../src/normalizer'); +const { normalizeInput, normalizeOne } = require('../../src/normalizer'); const { PointReducer } = require('../../src/reducer'); const { CoreSyncDomain } = require('../../src/coreSyncDomain'); +const { interpolateAt, reconstructSeries } = require('../../src/interpolation'); +const { createRequest } = require('../../src/frostRequests'); function telemetry(timestamp, value) { return { @@ -32,6 +36,44 @@ test('normalizer maps EVOLV dbase payloads to stable stream identities', () => { assert.equal(points[0].unit, 'Pa'); }); +test('normalizer infers rotatingMachine field units from field type', () => { + const points = normalizeOne({ + measurement: 'rotatingmachine_cse_rm_pump', + fields: { + 'pressure.measured.downstream.dashboard-sim-downstream': 1700, + }, + tags: { + tagcode: 'P-101', + softwareType: 'rotatingmachine', + unit: 'm3/h', + }, + source: { + softwareType: 'rotatingmachine', + unit: 'm3/h', + }, + timestamp: '2026-05-19T10:15:30.000Z', + }); + + assert.equal(points[0].unit, 'mbar'); +}); + +test('normalizer keeps measurement unit for measurement-node fields', () => { + const points = normalizeOne({ + measurement: 'FT-101', + fields: { + 'flow.measured.upstream.FT-101': 42, + }, + tags: { + tagcode: 'P-101', + softwareType: 'measurement', + unit: 'm3/h', + }, + timestamp: '2026-05-19T10:15:30.000Z', + }); + + assert.equal(points[0].unit, 'm3/h'); +}); + test('point reducer keeps first point and previous point on angle change', () => { const reducer = new PointReducer({ angleToleranceDeg: 5, @@ -87,6 +129,7 @@ test('coresync emits metadata lookup first and drains observations after resolve assert.equal(output[1][0].url, 'http://frost.example/FROST-Server/v1.1/Datastreams(5)/Observations'); assert.equal(output[1][0].payload.FeatureOfInterest['@iot.id'], 4); assert.equal(output[1][0].payload.parameters.reductionReason, 'first'); + assert.equal(output[1][0].payload.parameters.interpolationHint, 'linear-or-directional-monotone-cubic'); }); test('pending observation queue keeps first and latest unresolved points', () => { @@ -103,3 +146,176 @@ test('pending observation queue keeps first and latest unresolved points', () => assert.equal(state.pendingObservations[0].point.value, 1); assert.equal(state.pendingObservations[1].point.value, 3); }); + +test('successful bodyless metadata create is followed by lookup', () => { + const hub = new CoreSyncDomain({ + frostBaseUrl: 'http://frost.example/FROST-Server', + serviceVersion: 'v1.1', + }); + + hub.handleMessage(telemetry('2026-05-19T10:00:00.000Z', 12345)); + const output = hub.handleMessage({ + topic: 'frost.response', + statusCode: 201, + payload: null, + _coreSync: { + kind: 'thing', + action: 'create', + streamKey: 'P-1:pressure:measured:upstream:PT-1', + }, + }); + + assert.equal(output[1].length, 1); + assert.equal(output[1][0].topic, 'frost.metadata.lookup'); + assert.equal(output[1][0]._coreSync.kind, 'thing'); +}); + +test('sensor create payload includes FROST-required description', () => { + const point = normalizeInput(telemetry('2026-05-19T10:15:30.000Z', 12345))[0]; + const request = createRequest({ frostBaseUrl: 'http://frost.example/FROST-Server', serviceVersion: 'v1.1' }, point, 'sensor', {}); + + assert.equal(request.payload.name, 'PT-1'); + assert.equal(request.payload.description, 'EVOLV sensor PT-1'); +}); + +test('pressure reducer scale follows pressure unit', () => { + const paHub = new CoreSyncDomain({}); + const mbarHub = new CoreSyncDomain({}); + const barHub = new CoreSyncDomain({}); + + assert.equal(paHub._valueScaleFor({ type: 'pressure', unit: 'Pa' }), 100000); + assert.equal(mbarHub._valueScaleFor({ type: 'pressure', unit: 'mbar' }), 1000); + assert.equal(barHub._valueScaleFor({ type: 'pressure', unit: 'bar' }), 10); +}); + +test('reducer keeps only line-drawing knots for a dense directional series', () => { + const reducer = new PointReducer({ + angleToleranceDeg: 2, + timeScaleMs: 1000, + valueScale: 1, + maxGapMs: 0, + }); + + const kept = []; + const start = Date.parse('2026-05-19T10:00:00.000Z'); + for (let i = 0; i <= 60; i += 1) { + const value = i <= 30 ? i : 60 - i; + kept.push(...reducer.offer({ + time: new Date(start + (i * 1000)), + phenomenonTime: new Date(start + (i * 1000)).toISOString(), + value, + })); + } + kept.push(...reducer.flush('flush')); + + assert.equal(kept.length, 3); + assert.deepEqual(kept.map((x) => x.reason), ['first', 'angle-change', 'flush']); + assert.deepEqual(kept.map((x) => x.direction), ['unknown', 'rising', 'falling']); + assert.equal(kept[1].point.value, 30); +}); + +test('burstWindowMs collapses sub-tick sample bursts into a single observation', () => { + // Repro for the CoreSync FROST demo bug: rotatingMachine emits two telemetry + // samples ~1ms apart per pressure-injection cycle. Without a burst guard, + // dt=1ms with even tiny dy produces near-vertical slopes that trip + // angle-change on every tick → near-zero compression. With burstWindowMs > 0 + // the burst is collapsed and only real direction changes emit knots. + const baseOptions = { angleToleranceDeg: 1, timeScaleMs: 1000, valueScale: 1, maxGapMs: 0 }; + const start = Date.parse('2026-05-22T10:00:00.000Z'); + + // Three ticks of: (steady value, then 1ms later a tiny noisy nudge), 2s apart + // — exactly the rotatingMachine telemetry pattern from the FROST demo. + const pattern = [ + { dt: 0, v: 0.200 }, + { dt: 1, v: 0.203 }, + { dt: 2000, v: 0.197 }, + { dt: 2001, v: 0.200 }, + { dt: 4000, v: 0.203 }, + { dt: 4001, v: 0.197 }, + ]; + + const noGuard = new PointReducer(baseOptions); + const noGuardKnots = pattern.flatMap((p) => noGuard.offer({ + time: new Date(start + p.dt), + phenomenonTime: new Date(start + p.dt).toISOString(), + value: p.v, + })); + + // Without the guard, the spurious sub-ms slopes cause the reducer to emit a + // knot on nearly every burst sample → demonstrably broken compression. + assert.ok(noGuardKnots.length >= 4, `expected ≥4 spurious knots without guard, got ${noGuardKnots.length}`); + + const guarded = new PointReducer({ ...baseOptions, burstWindowMs: 10 }); + const guardedKnots = pattern.flatMap((p) => guarded.offer({ + time: new Date(start + p.dt), + phenomenonTime: new Date(start + p.dt).toISOString(), + value: p.v, + })); + + // With burstWindowMs=10, sub-ms bursts are collapsed: only the initial 'first' + // knot and at most one real direction-change knot survive. + assert.ok(guardedKnots.length <= 2, `expected ≤2 knots with guard, got ${guardedKnots.length}`); + assert.equal(guardedKnots[0].reason, 'first'); +}); + +test('linear reconstruction restores omitted points exactly for piecewise-linear knots', () => { + const knots = [ + { phenomenonTime: '2026-05-19T10:00:00.000Z', result: 0, direction: 'unknown' }, + { phenomenonTime: '2026-05-19T10:00:30.000Z', result: 30, direction: 'rising' }, + { phenomenonTime: '2026-05-19T10:01:00.000Z', result: 0, direction: 'falling' }, + ]; + + assert.equal(interpolateAt(knots, '2026-05-19T10:00:15.000Z', { method: 'linear' }), 15); + assert.equal(interpolateAt(knots, '2026-05-19T10:00:45.000Z', { method: 'linear' }), 15); + + const series = reconstructSeries(knots, { intervalMs: 15000, method: 'linear' }); + assert.deepEqual(series.map((point) => point.value), [0, 15, 30, 15, 0]); + assert.equal(series[1].reconstructed, true); + assert.equal(series[2].reconstructed, false); +}); + +test('directional cubic reconstruction honors knot directions without overshoot', () => { + const knots = [ + { phenomenonTime: '2026-05-19T10:00:00.000Z', result: 0, direction: 'rising' }, + { phenomenonTime: '2026-05-19T10:00:30.000Z', result: 30, direction: 'flat' }, + { phenomenonTime: '2026-05-19T10:01:00.000Z', result: 0, direction: 'falling' }, + ]; + + const series = reconstructSeries(knots, { intervalMs: 5000, method: 'directional-cubic' }); + for (const point of series) { + assert.equal(point.value >= -1e-9, true); + assert.equal(point.value <= 30 + 1e-9, true); + } + assert.equal(series[0].value, 0); + assert.equal(series[6].value, 30); + assert.equal(series[12].value, 0); +}); + +test('FROST/Influx/Grafana example flow contains the end-to-end CoreSync path', () => { + const flowPath = path.resolve(__dirname, '../../examples/frost-influx-grafana.flow.json'); + const flow = JSON.parse(fs.readFileSync(flowPath, 'utf8')); + const byId = new Map(flow.map((node) => [node.id, node])); + + assert.equal(flow.some((node) => node.type === 'measurement'), true); + assert.equal(flow.some((node) => node.type === 'rotatingMachine'), true); + assert.equal(flow.some((node) => node.type === 'coresync'), true); + + const core = byId.get('cse_coresync'); + assert.equal(core.frostBaseUrl, 'https://sta.wbd-rd.nl/FROST-Server/'); + assert.equal(core.wires[1].includes('cse_fn_frost_auth'), true); + assert.equal(core.wires[1].includes('cse_fn_knot_influx'), true); + + const response = byId.get('cse_fn_frost_response'); + assert.equal(response.wires[0].includes('cse_coresync'), true); + assert.equal(flow.some((node) => node.id === 'cse_fn_raw_influx'), true); + assert.equal(flow.some((node) => node.id === 'cse_http_influx_knot'), true); +}); + +test('CoreSync Grafana dashboard provisioning JSON is valid', () => { + const dashboardPath = path.resolve(__dirname, '../../../../docker/grafana/provisioning/dashboards/coresync-frost-demo.json'); + const dashboard = JSON.parse(fs.readFileSync(dashboardPath, 'utf8')); + + assert.equal(dashboard.uid, 'coresync-frost-demo'); + assert.equal(dashboard.panels.length >= 3, true); + assert.equal(JSON.stringify(dashboard).includes('coresync_knots'), true); +});