Compare commits

..

1 Commits

Author SHA1 Message Date
znetsixe
21d77a8afa feat(coresync): FROST/Influx/Grafana demo + sub-tick burst-window reducer fix
Lands the end-to-end CoreSync demo (frost-influx-grafana.flow.json) along
with the supporting normalizer/identity/interpolation/reducer work, plus
a targeted fix for the under-compression bug surfaced by the FROST demo.

Under-compression fix (PointReducer):
- New burstWindowMs option (default 0, opt-in). When two samples arrive
  within burstWindowMs of each other, the second is treated as the same
  wall-clock observation: previous is replaced, slope analysis is skipped.
- Without this guard, sub-millisecond bursts (rotatingMachine emits twice
  per pressure-injection cycle, ~1 ms apart) produced near-vertical
  apparent slopes that tripped angle-change on every tick — driving
  cog/efficiency/SEC streams to ~0.6% reduction (i.e. no compression).
- With burstWindowMs: 10 in the demo flow, the same streams now compress
  at 78-93% (verified end-to-end in InfluxDB over a 3-min window).
- Editor HTML exposes the new "Burst dt" field with explanatory tooltip.

Regression test (test/basic/coresync.basic.test.js):
- New "burstWindowMs collapses sub-tick sample bursts into a single
  observation" test reproduces the exact burst pattern from the demo
  and asserts before/after behaviour.
- Existing 14 tests continue to pass.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-22 20:27:28 +02:00
10 changed files with 1355 additions and 6 deletions

View File

@@ -15,6 +15,7 @@
maxGapMs: { value: 300000, required: true, validate: RED.validators.number() }, maxGapMs: { value: 300000, required: true, validate: RED.validators.number() },
minDeltaTimeMs: { value: 0, required: true, validate: RED.validators.number() }, minDeltaTimeMs: { value: 0, required: true, validate: RED.validators.number() },
minDeltaValue: { value: 0, required: true, validate: RED.validators.number() }, minDeltaValue: { value: 0, required: true, validate: RED.validators.number() },
burstWindowMs: { value: 0, required: true, validate: RED.validators.number() },
maxQueuedObservationsPerStream: { value: 2, required: true, validate: RED.validators.number() }, maxQueuedObservationsPerStream: { value: 2, required: true, validate: RED.validators.number() },
diagnosticsEnabled: { value: false }, diagnosticsEnabled: { value: false },
}, },
@@ -83,6 +84,10 @@
<label for="node-input-minDeltaValue"><i class="fa fa-filter"></i> Min dv</label> <label for="node-input-minDeltaValue"><i class="fa fa-filter"></i> Min dv</label>
<input type="number" id="node-input-minDeltaValue" style="width:70%;"> <input type="number" id="node-input-minDeltaValue" style="width:70%;">
</div> </div>
<div class="form-row">
<label for="node-input-burstWindowMs" title="Collapse sub-tick sample bursts: when two samples arrive within this many milliseconds, treat them as the same observation. 0 disables. Set to ~10 ms when an upstream node emits multiple telemetry rows per logical tick (e.g. rotatingMachine reacting to two simultaneous simulateMeasurement msgs)."><i class="fa fa-compress"></i> Burst dt</label>
<input type="number" id="node-input-burstWindowMs" style="width:70%;">
</div>
<div class="form-row"> <div class="form-row">
<label for="node-input-maxQueuedObservationsPerStream"><i class="fa fa-list"></i> Queue</label> <label for="node-input-maxQueuedObservationsPerStream"><i class="fa fa-list"></i> Queue</label>
<input type="number" id="node-input-maxQueuedObservationsPerStream" style="width:70%;"> <input type="number" id="node-input-maxQueuedObservationsPerStream" style="width:70%;">

View File

@@ -19,6 +19,7 @@ module.exports = function(RED) {
maxGapMs: Number(config.maxGapMs), maxGapMs: Number(config.maxGapMs),
minDeltaTimeMs: Number(config.minDeltaTimeMs), minDeltaTimeMs: Number(config.minDeltaTimeMs),
minDeltaValue: Number(config.minDeltaValue), minDeltaValue: Number(config.minDeltaValue),
burstWindowMs: Number(config.burstWindowMs) || 0,
comparisonMode: config.comparisonMode || 'angle', comparisonMode: config.comparisonMode || 'angle',
}, },
}); });

34
examples/README.md Normal file
View File

@@ -0,0 +1,34 @@
# CoreSync Examples
## FROST + Influx + Grafana
Import `frost-influx-grafana.flow.json` into Node-RED when the local Docker stack is running.
The flow demonstrates:
- a `measurement` node producing a 1 Hz directional flow signal;
- a `rotatingMachine` node producing event-driven pump telemetry;
- raw EVOLV telemetry written to local InfluxDB;
- CoreSync reducing the same telemetry to knots;
- CoreSync FROST HTTP requests sent to `https://sta.wbd-rd.nl/FROST-Server/v1.1`;
- FROST responses looped back into CoreSync as `msg.topic = "frost.response"`;
- CoreSync knots also written to local InfluxDB as `coresync_knots` for Grafana comparison.
- a Node-RED dashboard page with live raw-field, knot, FROST observation, and percent-saved counters.
Do not store FROST credentials in the flow. Start Node-RED with:
```sh
FROST_USER=write FROST_PASSWORD='<password>' docker compose up -d --build
```
The local InfluxDB and Grafana settings match `docker-compose.yml`:
- InfluxDB: `http://localhost:8086`, org `evolv`, bucket `telemetry`, token `evolv-dev-token`
- Grafana: `http://localhost:3000`, user `admin`, password `evolv`
The provisioned Grafana dashboard `CoreSync FROST Demo` compares raw telemetry against the reduced knot stream. The raw samples are intentionally denser than the knot series; the knot series is the data that should be sent to FROST.
Runtime dashboards:
- Node-RED live counters: `http://localhost:1880/dashboard/coresync-frost`
- Grafana raw-vs-knot comparison: `http://localhost:3000/d/coresync-frost-demo/coresync-frost-demo`

View File

@@ -0,0 +1,870 @@
[
{
"id": "cse_tab",
"type": "tab",
"label": "CoreSync FROST + Influx + Grafana",
"disabled": false,
"info": "Measurement and rotatingMachine telemetry are written as raw samples to local InfluxDB and as CoreSync-reduced knots to FROST and InfluxDB."
},
{
"id": "cse_note",
"type": "comment",
"z": "cse_tab",
"name": "Set FROST_USER and FROST_PASSWORD in the Node-RED environment before deploying.",
"info": "CoreSync emits FROST-ready HTTP requests. The HTTP request response is fed back with topic=frost.response so metadata lookup/create can continue.",
"x": 450,
"y": 40,
"wires": []
},
{
"id": "cse_ui_base",
"type": "ui-base",
"name": "EVOLV CoreSync",
"path": "/dashboard",
"appIcon": "",
"includeClientData": true,
"acceptsClientConfig": [
"ui-notification",
"ui-control"
],
"showPathInSidebar": false,
"headerContent": "page",
"navigationStyle": "default",
"titleBarStyle": "default"
},
{
"id": "cse_ui_theme",
"type": "ui-theme",
"name": "CoreSync Theme",
"colors": {
"surface": "#ffffff",
"primary": "#2364aa",
"bgPage": "#f3f5f7",
"groupBg": "#ffffff",
"groupOutline": "#d5dce3"
},
"sizes": {
"density": "compact",
"pagePadding": "12px",
"groupGap": "12px",
"groupBorderRadius": "6px",
"widgetGap": "8px"
}
},
{
"id": "cse_ui_page",
"type": "ui-page",
"name": "CoreSync FROST",
"ui": "cse_ui_base",
"path": "/coresync-frost",
"icon": "timeline",
"layout": "grid",
"theme": "cse_ui_theme",
"breakpoints": [
{
"name": "Default",
"px": "0",
"cols": "12"
}
],
"order": 1,
"className": ""
},
{
"id": "cse_ui_group_kpi",
"type": "ui-group",
"name": "Reduction Counters",
"page": "cse_ui_page",
"width": "12",
"height": "1",
"order": 1,
"showTitle": true,
"className": ""
},
{
"id": "cse_ui_group_chart",
"type": "ui-group",
"name": "Write Rates",
"page": "cse_ui_page",
"width": "12",
"height": "1",
"order": 2,
"showTitle": true,
"className": ""
},
{
"id": "cse_measure_flow",
"type": "measurement",
"z": "cse_tab",
"name": "FROST Flow Sensor FT-101",
"scaling": false,
"i_min": 0,
"i_max": 120,
"i_offset": 0,
"o_min": 0,
"o_max": 120,
"simulator": false,
"smooth_method": "none",
"count": 1,
"processOutputFormat": "process",
"dbaseOutputFormat": "frost",
"uuid": "",
"assetTagCode": "FT-101",
"tagCode": "FT-101",
"supplier": "demo",
"category": "sensor",
"assetType": "flow",
"model": "demo-flow-transmitter",
"unit": "m3/h",
"enableLog": false,
"logLevel": "error",
"positionVsParent": "upstream",
"positionIcon": "",
"hasDistance": false,
"distance": 0,
"distanceUnit": "m",
"distanceDescription": "",
"x": 560,
"y": 160,
"wires": [
[
"cse_dbg_measure_process"
],
[
"cse_fn_measure_to_coresync",
"cse_fn_raw_influx"
],
[
"cse_rm_pump"
]
]
},
{
"id": "cse_inj_measure_loop",
"type": "inject",
"z": "cse_tab",
"name": "1 Hz directional flow",
"props": [
{
"p": "payload"
}
],
"repeat": "1",
"crontab": "",
"once": true,
"onceDelay": "1",
"topic": "",
"payload": "",
"payloadType": "date",
"x": 160,
"y": 160,
"wires": [
[
"cse_fn_measure_triangle"
]
]
},
{
"id": "cse_fn_measure_triangle",
"type": "function",
"z": "cse_tab",
"name": "triangle 0..100..0",
"func": "let i = context.get('i') || 0;\nconst phase = i % 80;\nconst value = phase <= 40 ? phase * 2.5 : (80 - phase) * 2.5;\ncontext.set('i', i + 1);\nmsg.topic = 'measurement';\nmsg.payload = Math.round(value * 100) / 100;\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 360,
"y": 160,
"wires": [
[
"cse_measure_flow"
]
]
},
{
"id": "cse_fn_measure_to_coresync",
"type": "function",
"z": "cse_tab",
"name": "mAbs -> flow.measured.upstream.FT-101",
"func": "const p = msg.payload;\nif (!p || !p.fields) return null;\nconst value = Number(p.fields.mAbs);\nif (!Number.isFinite(value)) return null;\nmsg.payload = {\n measurement: 'FT-101',\n fields: { 'flow.measured.upstream.FT-101': value },\n tags: { ...(p.tags || {}), tagcode: 'P-101', sensorTag: 'FT-101', unit: 'm3/h' },\n timestamp: p.timestamp || new Date().toISOString(),\n source: { ...(p.source || {}), softwareType: 'measurement', unit: 'm3/h' }\n};\nmsg.topic = msg.payload.measurement;\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 850,
"y": 140,
"wires": [
[
"cse_coresync"
]
]
},
{
"id": "cse_rm_pump",
"type": "rotatingMachine",
"z": "cse_tab",
"name": "FROST Pump P-101",
"speed": "25",
"startup": "1",
"warmup": "1",
"shutdown": "1",
"cooldown": "1",
"movementMode": "staticspeed",
"machineCurve": "",
"processOutputFormat": "process",
"dbaseOutputFormat": "frost",
"uuid": "",
"assetTagCode": "P-101",
"assetTagNumber": "P-101",
"supplier": "",
"category": "",
"assetType": "",
"model": "hidrostal-H05K-S03R",
"unit": "m3/h",
"enableLog": false,
"logLevel": "error",
"positionVsParent": "atEquipment",
"positionIcon": "",
"hasDistance": false,
"distance": "",
"distanceUnit": "m",
"distanceDescription": "",
"x": 560,
"y": 360,
"wires": [
[
"cse_dbg_rm_process"
],
[
"cse_coresync",
"cse_fn_raw_influx"
],
[
"cse_dbg_rm_parent"
]
]
},
{
"id": "cse_inj_rm_mode",
"type": "inject",
"z": "cse_tab",
"name": "RM mode virtualControl",
"props": [
{
"p": "topic",
"vt": "str"
},
{
"p": "payload",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": true,
"onceDelay": "0.5",
"topic": "setMode",
"payload": "virtualControl",
"payloadType": "str",
"x": 180,
"y": 280,
"wires": [
[
"cse_rm_pump"
]
]
},
{
"id": "cse_inj_rm_start",
"type": "inject",
"z": "cse_tab",
"name": "RM startup",
"props": [
{
"p": "payload",
"vt": "str"
}
],
"repeat": "",
"crontab": "",
"once": true,
"onceDelay": "1.5",
"topic": "",
"payload": "startup",
"payloadType": "str",
"x": 150,
"y": 320,
"wires": [
[
"cse_fn_rm_sequence"
]
]
},
{
"id": "cse_fn_rm_sequence",
"type": "function",
"z": "cse_tab",
"name": "execSequence",
"func": "msg.topic = 'execSequence';\nmsg.payload = { source: 'GUI', action: 'execSequence', parameter: msg.payload };\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 340,
"y": 320,
"wires": [
[
"cse_rm_pump"
]
]
},
{
"id": "cse_inj_rm_setpoint",
"type": "inject",
"z": "cse_tab",
"name": "RM 5s setpoint wave",
"props": [
{
"p": "payload"
}
],
"repeat": "5",
"crontab": "",
"once": true,
"onceDelay": "3",
"topic": "",
"payload": "",
"payloadType": "date",
"x": 170,
"y": 380,
"wires": [
[
"cse_fn_rm_setpoint"
]
]
},
{
"id": "cse_fn_rm_setpoint",
"type": "function",
"z": "cse_tab",
"name": "execMovement wave",
"func": "let i = context.get('i') || 0;\nconst values = [25, 55, 85, 60, 35];\nconst setpoint = values[i % values.length];\ncontext.set('i', i + 1);\nmsg.topic = 'execMovement';\nmsg.payload = { source: 'GUI', action: 'execMovement', setpoint };\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 360,
"y": 380,
"wires": [
[
"cse_rm_pump"
]
]
},
{
"id": "cse_inj_rm_pressure",
"type": "inject",
"z": "cse_tab",
"name": "RM pressure cycle",
"props": [
{
"p": "payload"
}
],
"repeat": "2",
"crontab": "",
"once": true,
"onceDelay": "1",
"topic": "",
"payload": "",
"payloadType": "date",
"x": 160,
"y": 440,
"wires": [
[
"cse_fn_rm_pressure"
]
]
},
{
"id": "cse_fn_rm_pressure",
"type": "function",
"z": "cse_tab",
"name": "simulate upstream/downstream pressure",
"func": "let i = context.get('i') || 0;\nconst up = 900 + ((i % 6) * 10);\nconst down = 1700 + ((i % 6) * 40);\ncontext.set('i', i + 1);\nreturn [\n { topic: 'simulateMeasurement', payload: { type: 'pressure', position: 'upstream', value: up, unit: 'mbar' } },\n { topic: 'simulateMeasurement', payload: { type: 'pressure', position: 'downstream', value: down, unit: 'mbar' } }\n];",
"outputs": 2,
"noerr": 0,
"x": 390,
"y": 440,
"wires": [
[
"cse_rm_pump"
],
[
"cse_rm_pump"
]
]
},
{
"id": "cse_coresync",
"type": "coresync",
"z": "cse_tab",
"name": "CoreSync to WBD FROST",
"frostBaseUrl": "https://sta.wbd-rd.nl/FROST-Server/",
"serviceVersion": "v1.1",
"dbaseFormat": "frost",
"assetTagOverride": "",
"sensorTagOverride": "",
"comparisonMode": "angle",
"angleToleranceDeg": 1,
"timeScaleMs": 1000,
"maxGapMs": 30000,
"minDeltaTimeMs": 0,
"minDeltaValue": 0,
"burstWindowMs": 10,
"maxQueuedObservationsPerStream": 2,
"diagnosticsEnabled": false,
"x": 1120,
"y": 260,
"wires": [
[
"cse_dbg_coresync_process"
],
[
"cse_fn_frost_auth",
"cse_fn_knot_influx"
],
[
"cse_dbg_coresync_parent"
]
]
},
{
"id": "cse_inj_flush",
"type": "inject",
"z": "cse_tab",
"name": "CoreSync flush 15s",
"props": [
{
"p": "topic",
"vt": "str"
}
],
"repeat": "15",
"crontab": "",
"once": false,
"onceDelay": "",
"topic": "coresync.flush",
"x": 860,
"y": 300,
"wires": [
[
"cse_coresync"
]
]
},
{
"id": "cse_fn_frost_auth",
"type": "function",
"z": "cse_tab",
"name": "FROST basic auth from env",
"func": "const user = env.get('FROST_USER');\nconst password = env.get('FROST_PASSWORD');\nif (!user || !password) {\n node.status({ fill: 'red', shape: 'ring', text: 'missing FROST_USER/FROST_PASSWORD' });\n return null;\n}\nmsg.headers = { ...(msg.headers || {}), Authorization: 'Basic ' + Buffer.from(`${user}:${password}`).toString('base64') };\nnode.status({ fill: 'green', shape: 'dot', text: msg.topic });\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 1420,
"y": 220,
"wires": [
[
"cse_http_frost"
]
]
},
{
"id": "cse_http_frost",
"type": "http request",
"z": "cse_tab",
"name": "Send to FROST",
"method": "use",
"ret": "txt",
"paytoqs": false,
"url": "",
"persist": false,
"authType": "",
"senderr": false,
"x": 1630,
"y": 220,
"wires": [
[
"cse_fn_frost_response"
]
]
},
{
"id": "cse_fn_frost_response",
"type": "function",
"z": "cse_tab",
"name": "topic=frost.response",
"func": "if (typeof msg.payload === 'string') {\n try { msg.payload = JSON.parse(msg.payload); } catch (_) { /* keep raw */ }\n}\nconst metric = (msg._coreSync?.kind === 'observation' && msg.statusCode && msg.statusCode < 400)\n ? { topic: 'frostObservation', payload: 1 }\n : null;\nmsg.topic = 'frost.response';\nreturn [msg, metric];",
"outputs": 2,
"noerr": 0,
"x": 1840,
"y": 220,
"wires": [
[
"cse_coresync",
"cse_dbg_frost_response"
],
[
"cse_fn_reduction_metrics"
]
]
},
{
"id": "cse_fn_raw_influx",
"type": "function",
"z": "cse_tab",
"name": "raw EVOLV -> Influx line protocol",
"func": "const p = msg.payload;\nif (!p || !p.measurement || !p.fields) return null;\nconst esc = (s) => String(s).replace(/,/g, '\\\\,').replace(/ /g, '\\\\ ').replace(/=/g, '\\\\=');\nconst escString = (s) => String(s).replace(/\"/g, '\\\\\"');\nconst tags = Object.entries(p.tags || {})\n .filter(([, v]) => v !== undefined && v !== null && v !== '')\n .map(([k, v]) => `${esc(k)}=${esc(v)}`)\n .join(',');\nconst fields = Object.entries(p.fields)\n .filter(([, v]) => v !== undefined && v !== null)\n .map(([k, v]) => {\n const n = Number(v);\n if (typeof v === 'number' && Number.isFinite(v)) return `${esc(k)}=${v}`;\n if (typeof v === 'boolean') return `${esc(k)}=${v}`;\n if (Number.isFinite(n) && String(v).trim() !== '') return `${esc(k)}=${n}`;\n return `${esc(k)}=\"${escString(v)}\"`;\n });\nif (!fields.length) return null;\nconst t = Date.parse(p.timestamp);\nconst ns = `${Number.isFinite(t) ? t : Date.now()}000000`;\nmsg._coreSyncDemo = { rawFields: fields.length };\nmsg.payload = `${esc(p.measurement)}${tags ? ',' + tags : ''} ${fields.join(',')} ${ns}`;\nmsg.headers = { Authorization: 'Token evolv-dev-token', 'Content-Type': 'text/plain' };\nmsg.url = 'http://influxdb:8086/api/v2/write?org=evolv&bucket=telemetry&precision=ns';\nmsg.method = 'POST';\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 880,
"y": 560,
"wires": [
[
"cse_http_influx_raw"
]
]
},
{
"id": "cse_http_influx_raw",
"type": "http request",
"z": "cse_tab",
"name": "Write raw InfluxDB",
"method": "use",
"ret": "txt",
"paytoqs": false,
"url": "",
"persist": false,
"authType": "",
"senderr": false,
"x": 1140,
"y": 560,
"wires": [
[
"cse_fn_influx_raw_status"
]
]
},
{
"id": "cse_fn_knot_influx",
"type": "function",
"z": "cse_tab",
"name": "CoreSync knot -> Influx line protocol",
"func": "if (msg.topic !== 'frost.observation.create' || !msg.payload) return null;\nconst obs = msg.payload;\nconst params = obs.parameters || {};\nconst streamKey = params.evolvStreamKey || msg._coreSync?.streamKey || 'unknown';\nconst parts = streamKey.split(':');\nconst esc = (s) => String(s).replace(/,/g, '\\\\,').replace(/ /g, '\\\\ ').replace(/=/g, '\\\\=');\nconst tags = {\n streamKey,\n thing: parts[0] || 'unknown',\n type: parts[1] || 'unknown',\n variant: parts[2] || 'unknown',\n position: parts[3] || 'unknown',\n sensorTag: parts[4] || 'unknown',\n reason: params.reductionReason || 'unknown',\n direction: params.direction || 'unknown'\n};\nconst tagText = Object.entries(tags).map(([k, v]) => `${esc(k)}=${esc(v)}`).join(',');\nconst fields = [`result=${Number(obs.result)}`, 'knot=1i'];\nif (Number.isFinite(Number(params.slope))) fields.push(`slope=${Number(params.slope)}`);\nif (Number.isFinite(Number(params.previousValue))) fields.push(`previousValue=${Number(params.previousValue)}`);\nconst t = Date.parse(obs.phenomenonTime);\nconst ns = `${Number.isFinite(t) ? t : Date.now()}000000`;\nmsg._coreSyncDemo = { knotFields: 1 };\nmsg.payload = `coresync_knots,${tagText} ${fields.join(',')} ${ns}`;\nmsg.headers = { Authorization: 'Token evolv-dev-token', 'Content-Type': 'text/plain' };\nmsg.url = 'http://influxdb:8086/api/v2/write?org=evolv&bucket=telemetry&precision=ns';\nmsg.method = 'POST';\nreturn msg;",
"outputs": 1,
"noerr": 0,
"x": 1430,
"y": 300,
"wires": [
[
"cse_http_influx_knot"
]
]
},
{
"id": "cse_http_influx_knot",
"type": "http request",
"z": "cse_tab",
"name": "Write knot InfluxDB",
"method": "use",
"ret": "txt",
"paytoqs": false,
"url": "",
"persist": false,
"authType": "",
"senderr": false,
"x": 1680,
"y": 300,
"wires": [
[
"cse_fn_influx_knot_status"
]
]
},
{
"id": "cse_fn_influx_raw_status",
"type": "function",
"z": "cse_tab",
"name": "raw write status",
"func": "const count = (context.get('count') || 0) + 1;\nconst errors = context.get('errors') || 0;\nif (msg.statusCode && msg.statusCode >= 400) {\n context.set('errors', errors + 1);\n node.status({ fill: 'red', shape: 'ring', text: `raw ERR ${errors + 1}/${count}` });\n context.set('count', count);\n return null;\n}\nnode.status({ fill: 'green', shape: 'dot', text: `raw ${count} writes` });\ncontext.set('count', count);\nreturn { topic: 'rawFields', payload: msg._coreSyncDemo?.rawFields || 1 };",
"outputs": 1,
"noerr": 0,
"x": 1360,
"y": 560,
"wires": [
[
"cse_fn_reduction_metrics"
]
]
},
{
"id": "cse_fn_influx_knot_status",
"type": "function",
"z": "cse_tab",
"name": "knot write status",
"func": "const count = (context.get('count') || 0) + 1;\nconst errors = context.get('errors') || 0;\nif (msg.statusCode && msg.statusCode >= 400) {\n context.set('errors', errors + 1);\n node.status({ fill: 'red', shape: 'ring', text: `knot ERR ${errors + 1}/${count}` });\n context.set('count', count);\n return null;\n}\nnode.status({ fill: 'green', shape: 'dot', text: `knot ${count} writes` });\ncontext.set('count', count);\nreturn { topic: 'knotFields', payload: msg._coreSyncDemo?.knotFields || 1 };",
"outputs": 1,
"noerr": 0,
"x": 1900,
"y": 300,
"wires": [
[
"cse_fn_reduction_metrics"
]
]
},
{
"id": "cse_fn_reduction_metrics",
"type": "function",
"z": "cse_tab",
"name": "Live reduction metrics",
"func": "const stats = context.get('stats') || { rawFields: 0, knotFields: 0, frostObservations: 0, started: Date.now() };\nconst n = Number(msg.payload) || 0;\nif (msg.topic === 'rawFields') stats.rawFields += n;\nif (msg.topic === 'knotFields') stats.knotFields += n;\nif (msg.topic === 'frostObservation') stats.frostObservations += n;\ncontext.set('stats', stats);\n\nconst saved = stats.rawFields > 0 ? Math.max(0, 100 * (1 - (stats.knotFields / stats.rawFields))) : 0;\nconst elapsedSec = Math.max(1, (Date.now() - stats.started) / 1000);\nconst rawRate = stats.rawFields / elapsedSec;\nconst knotRate = stats.knotFields / elapsedSec;\n\nnode.status({ fill: 'green', shape: 'dot', text: `${stats.rawFields} raw -> ${stats.knotFields} knots (${saved.toFixed(1)}% saved)` });\n\nreturn [\n { payload: `Raw fields: ${stats.rawFields} | CoreSync knots: ${stats.knotFields} | FROST observations: ${stats.frostObservations} | Saved: ${saved.toFixed(1)}%` },\n { payload: Number(saved.toFixed(1)) },\n { topic: 'raw fields/s', payload: Number(rawRate.toFixed(3)), timestamp: Date.now() },\n { topic: 'knot fields/s', payload: Number(knotRate.toFixed(3)), timestamp: Date.now() },\n { payload: stats.frostObservations }\n];",
"outputs": 5,
"noerr": 0,
"x": 1620,
"y": 500,
"wires": [
[
"cse_ui_text_summary"
],
[
"cse_ui_gauge_saved"
],
[
"cse_ui_chart_raw"
],
[
"cse_ui_chart_knot"
],
[
"cse_ui_text_frost"
]
]
},
{
"id": "cse_ui_text_summary",
"type": "ui-text",
"z": "cse_tab",
"group": "cse_ui_group_kpi",
"name": "Reduction summary",
"label": "Reduction",
"order": 1,
"width": "8",
"height": "1",
"format": "{{msg.payload}}",
"layout": "row-spread",
"style": false,
"font": "",
"fontSize": "",
"color": "",
"x": 1890,
"y": 460,
"wires": []
},
{
"id": "cse_ui_text_frost",
"type": "ui-text",
"z": "cse_tab",
"group": "cse_ui_group_kpi",
"name": "FROST observations",
"label": "FROST observations",
"order": 2,
"width": "4",
"height": "1",
"format": "{{msg.payload}}",
"layout": "row-spread",
"style": false,
"font": "",
"fontSize": "",
"color": "",
"x": 1900,
"y": 620,
"wires": []
},
{
"id": "cse_ui_gauge_saved",
"type": "ui-gauge",
"z": "cse_tab",
"group": "cse_ui_group_kpi",
"name": "Saved gauge",
"gtype": "gauge-half",
"gstyle": "Rounded",
"title": "Data Saved",
"units": "%",
"prefix": "",
"suffix": "%",
"min": 0,
"max": 100,
"segments": [
{
"color": "#d64545",
"from": 0
},
{
"color": "#e8a23a",
"from": 40
},
{
"color": "#2f9e44",
"from": 70
}
],
"width": "4",
"height": "3",
"order": 3,
"x": 1880,
"y": 500,
"wires": []
},
{
"id": "cse_ui_chart_raw",
"type": "ui-chart",
"z": "cse_tab",
"group": "cse_ui_group_chart",
"name": "Raw write rate",
"label": "Raw field write rate",
"order": 1,
"width": "6",
"height": "4",
"chartType": "line",
"category": "topic",
"xAxisLabel": "time",
"xAxisType": "time",
"yAxisLabel": "fields/s",
"removeOlder": "15",
"removeOlderUnit": "60",
"x": 1890,
"y": 540,
"wires": [],
"showLegend": false,
"categoryType": "msg",
"xAxisProperty": "",
"xAxisPropertyType": "timestamp",
"yAxisProperty": "payload",
"yAxisPropertyType": "msg",
"action": "append",
"interpolation": "linear"
},
{
"id": "cse_ui_chart_knot",
"type": "ui-chart",
"z": "cse_tab",
"group": "cse_ui_group_chart",
"name": "Knot write rate",
"label": "CoreSync knot write rate",
"order": 2,
"width": "6",
"height": "4",
"chartType": "line",
"category": "topic",
"xAxisLabel": "time",
"xAxisType": "time",
"yAxisLabel": "knots/s",
"removeOlder": "15",
"removeOlderUnit": "60",
"x": 1890,
"y": 580,
"wires": [],
"showLegend": false,
"categoryType": "msg",
"xAxisProperty": "",
"xAxisPropertyType": "timestamp",
"yAxisProperty": "payload",
"yAxisPropertyType": "msg",
"action": "append",
"interpolation": "linear"
},
{
"id": "cse_dbg_measure_process",
"type": "debug",
"z": "cse_tab",
"name": "measurement process",
"active": false,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 850,
"y": 100,
"wires": []
},
{
"id": "cse_dbg_rm_process",
"type": "debug",
"z": "cse_tab",
"name": "rotatingMachine process",
"active": false,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 840,
"y": 360,
"wires": []
},
{
"id": "cse_dbg_rm_parent",
"type": "debug",
"z": "cse_tab",
"name": "rotatingMachine parent",
"active": false,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 840,
"y": 400,
"wires": []
},
{
"id": "cse_dbg_coresync_process",
"type": "debug",
"z": "cse_tab",
"name": "CoreSync diagnostics",
"active": true,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 1410,
"y": 180,
"wires": []
},
{
"id": "cse_dbg_coresync_parent",
"type": "debug",
"z": "cse_tab",
"name": "CoreSync parent",
"active": false,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "true",
"targetType": "full",
"x": 1400,
"y": 340,
"wires": []
},
{
"id": "cse_dbg_frost_response",
"type": "debug",
"z": "cse_tab",
"name": "FROST response",
"active": false,
"tosidebar": true,
"console": false,
"tostatus": false,
"complete": "statusCode",
"targetType": "msg",
"x": 2070,
"y": 220,
"wires": []
}
]

View File

@@ -87,6 +87,13 @@ class CoreSyncDomain {
return [null, requests.length ? requests : null, null]; return [null, requests.length ? requests : null, null];
} }
if (meta.action === 'create' && statusCode >= 200 && statusCode < 300) {
state.inFlightMetadata.delete(meta.kind);
requests.push(lookupRequest(this.config, state.meta, meta.kind));
state.inFlightMetadata.add(meta.kind);
return [null, requests, null];
}
if (meta.action === 'lookup' && (statusCode === 200 || statusCode === 0)) { if (meta.action === 'lookup' && (statusCode === 200 || statusCode === 0)) {
state.inFlightMetadata.delete(meta.kind); state.inFlightMetadata.delete(meta.kind);
requests.push(createRequest(this.config, state.meta, meta.kind, state.ids)); requests.push(createRequest(this.config, state.meta, meta.kind, state.ids));
@@ -176,6 +183,8 @@ class CoreSyncDomain {
_valueScaleFor(point) { _valueScaleFor(point) {
const configured = this.config.reducer.valueScaleByType || DEFAULT_VALUE_SCALE_BY_TYPE; const configured = this.config.reducer.valueScaleByType || DEFAULT_VALUE_SCALE_BY_TYPE;
if (point.type === 'pressure' && String(point.unit || '').toLowerCase() === 'mbar') return 1000;
if (point.type === 'pressure' && String(point.unit || '').toLowerCase() === 'bar') return 10;
return configured[point.type] || this.config.reducer.valueScale || 1; return configured[point.type] || this.config.reducer.valueScale || 1;
} }

View File

@@ -87,6 +87,11 @@ function observationRequest(config, candidate, datastreamId, foiId) {
parameters: { parameters: {
reduction: 'knot', reduction: 'knot',
reductionReason: candidate.reason, reductionReason: candidate.reason,
direction: candidate.direction || 'unknown',
slope: Number.isFinite(candidate.slope) ? candidate.slope : 0,
previousPhenomenonTime: candidate.previousPhenomenonTime,
previousValue: candidate.previousValue,
interpolationHint: 'linear-or-directional-monotone-cubic',
evolvFieldKey: meta.fieldKey, evolvFieldKey: meta.fieldKey,
evolvStreamKey: meta.streamKey, evolvStreamKey: meta.streamKey,
sourceMeasurement: meta.measurement, sourceMeasurement: meta.measurement,
@@ -142,6 +147,7 @@ function createPayload(kind, meta, ids) {
}), }),
sensor: () => ({ sensor: () => ({
name: meta.sensorTag, name: meta.sensorTag,
description: `EVOLV sensor ${meta.sensorTag}`,
encodingType: 'application/json', encodingType: 'application/json',
metadata: JSON.stringify({ source: 'EVOLV', variant: meta.variant }), metadata: JSON.stringify({ source: 'EVOLV', variant: meta.variant }),
properties: { properties: {

View File

@@ -13,6 +13,16 @@ const DEFAULT_UNITS = {
efficiency: '1', efficiency: '1',
}; };
const ROTATING_MACHINE_OUTPUT_UNITS = {
pressure: 'mbar',
flow: 'm3/h',
power: 'kW',
temperature: 'C',
atmpressure: 'Pa',
ctrl: '%',
efficiency: '1',
};
function firstPresent(...values) { function firstPresent(...values) {
for (const value of values) { for (const value of values) {
if (value !== undefined && value !== null && value !== '') return value; if (value !== undefined && value !== null && value !== '') return value;
@@ -49,7 +59,7 @@ function resolveIdentity(input, options = {}) {
options.sensorTagOverride, options.sensorTagOverride,
parsed.sensorTag || tags.sensorTag || tags.sensor_tagCode || `${variant.toUpperCase()}-${thingTag}`, parsed.sensorTag || tags.sensorTag || tags.sensor_tagCode || `${variant.toUpperCase()}-${thingTag}`,
); );
const unit = sanitizeToken(input.unit, tags.unit || input.source?.unit || DEFAULT_UNITS[type] || '1'); const unit = sanitizeToken(input.unit, inferredUnitFor({ tags, source: input.source, type }) || DEFAULT_UNITS[type] || '1');
const streamKey = [thingTag, type, variant, position, sensorTag].join(':'); const streamKey = [thingTag, type, variant, position, sensorTag].join(':');
return { return {
@@ -69,8 +79,18 @@ function resolveIdentity(input, options = {}) {
}; };
} }
function inferredUnitFor({ tags = {}, source = {}, type }) {
const softwareType = String(tags.softwareType || source.softwareType || '').toLowerCase();
if (softwareType === 'rotatingmachine' && ROTATING_MACHINE_OUTPUT_UNITS[type]) {
return ROTATING_MACHINE_OUTPUT_UNITS[type];
}
return tags.unit || source.unit;
}
module.exports = { module.exports = {
DEFAULT_UNITS, DEFAULT_UNITS,
ROTATING_MACHINE_OUTPUT_UNITS,
inferredUnitFor,
parseFieldKey, parseFieldKey,
resolveIdentity, resolveIdentity,
}; };

151
src/interpolation.js Normal file
View File

@@ -0,0 +1,151 @@
'use strict';
function toMs(value) {
if (value instanceof Date) return value.getTime();
if (typeof value === 'number') return value;
return new Date(value).getTime();
}
function normalizeKnots(knots) {
return (Array.isArray(knots) ? knots : [])
.map((knot) => ({
...knot,
timeMs: toMs(knot.time ?? knot.phenomenonTime ?? knot.timestamp),
value: Number(knot.value ?? knot.result),
direction: knot.direction || knot.parameters?.direction || 'unknown',
}))
.filter((knot) => Number.isFinite(knot.timeMs) && Number.isFinite(knot.value))
.sort((a, b) => a.timeMs - b.timeMs);
}
function directionSign(direction) {
if (direction === 'rising') return 1;
if (direction === 'falling') return -1;
if (direction === 'flat') return 0;
return null;
}
function secants(knots) {
const d = [];
for (let i = 0; i < knots.length - 1; i += 1) {
const dt = knots[i + 1].timeMs - knots[i].timeMs;
d.push(dt > 0 ? (knots[i + 1].value - knots[i].value) / dt : 0);
}
return d;
}
function directionalTangents(knots) {
if (knots.length < 2) return knots.map(() => 0);
const d = secants(knots);
const m = new Array(knots.length).fill(0);
m[0] = d[0];
m[m.length - 1] = d[d.length - 1];
for (let i = 1; i < knots.length - 1; i += 1) {
if (d[i - 1] === 0 || d[i] === 0 || Math.sign(d[i - 1]) !== Math.sign(d[i])) {
m[i] = 0;
} else {
m[i] = (d[i - 1] + d[i]) / 2;
}
}
for (let i = 0; i < knots.length; i += 1) {
const sign = directionSign(knots[i].direction);
if (sign === 0) {
m[i] = 0;
} else if (sign !== null && Math.sign(m[i]) !== 0 && Math.sign(m[i]) !== sign) {
m[i] = 0;
}
}
for (let i = 0; i < d.length; i += 1) {
if (d[i] === 0) {
m[i] = 0;
m[i + 1] = 0;
continue;
}
const a = m[i] / d[i];
const b = m[i + 1] / d[i];
const sum = a * a + b * b;
if (sum > 9) {
const tau = 3 / Math.sqrt(sum);
m[i] = tau * a * d[i];
m[i + 1] = tau * b * d[i];
}
}
return m;
}
function segmentIndex(knots, timeMs) {
if (timeMs <= knots[0].timeMs) return 0;
for (let i = 0; i < knots.length - 1; i += 1) {
if (timeMs >= knots[i].timeMs && timeMs <= knots[i + 1].timeMs) return i;
}
return knots.length - 2;
}
function interpolateAt(knotsInput, time, options = {}) {
const knots = normalizeKnots(knotsInput);
if (knots.length === 0) return null;
if (knots.length === 1) return knots[0].value;
const timeMs = toMs(time);
if (!Number.isFinite(timeMs)) return null;
const i = segmentIndex(knots, timeMs);
const a = knots[i];
const b = knots[i + 1];
const h = b.timeMs - a.timeMs;
if (h <= 0) return a.value;
const t = Math.min(1, Math.max(0, (timeMs - a.timeMs) / h));
if ((options.method || 'linear') === 'linear') {
return a.value + ((b.value - a.value) * t);
}
const tangents = directionalTangents(knots);
const t2 = t * t;
const t3 = t2 * t;
const h00 = (2 * t3) - (3 * t2) + 1;
const h10 = t3 - (2 * t2) + t;
const h01 = (-2 * t3) + (3 * t2);
const h11 = t3 - t2;
return (h00 * a.value) + (h10 * h * tangents[i]) + (h01 * b.value) + (h11 * h * tangents[i + 1]);
}
function reconstructSeries(knotsInput, options = {}) {
const knots = normalizeKnots(knotsInput);
if (knots.length === 0) return [];
if (knots.length === 1) return [{ time: new Date(knots[0].timeMs).toISOString(), value: knots[0].value, reconstructed: false }];
const intervalMs = Math.max(Number(options.intervalMs) || 1000, 1);
const startMs = Number.isFinite(toMs(options.start)) ? toMs(options.start) : knots[0].timeMs;
const endMs = Number.isFinite(toMs(options.end)) ? toMs(options.end) : knots[knots.length - 1].timeMs;
const values = [];
const knotTimes = new Set(knots.map((knot) => knot.timeMs));
for (let t = startMs; t <= endMs; t += intervalMs) {
values.push({
time: new Date(t).toISOString(),
value: interpolateAt(knots, t, options),
reconstructed: !knotTimes.has(t),
});
}
if (values[values.length - 1]?.time !== new Date(endMs).toISOString()) {
values.push({
time: new Date(endMs).toISOString(),
value: interpolateAt(knots, endMs, options),
reconstructed: !knotTimes.has(endMs),
});
}
return values;
}
module.exports = {
directionalTangents,
interpolateAt,
normalizeKnots,
reconstructSeries,
};

View File

@@ -7,6 +7,7 @@ const DEFAULTS = {
maxGapMs: 300000, maxGapMs: 300000,
minDeltaTimeMs: 0, minDeltaTimeMs: 0,
minDeltaValue: 0, minDeltaValue: 0,
burstWindowMs: 0,
comparisonMode: 'angle', comparisonMode: 'angle',
relativeSlopeTolerance: 0.1, relativeSlopeTolerance: 0.1,
}; };
@@ -33,6 +34,25 @@ function relativeSlope(from, to, options) {
return ((Number(to.value) - Number(from.value)) / Math.max(Math.abs(Number(options.valueScale) || DEFAULTS.valueScale), Number.EPSILON)) / dt; return ((Number(to.value) - Number(from.value)) / Math.max(Math.abs(Number(options.valueScale) || DEFAULTS.valueScale), Number.EPSILON)) / dt;
} }
function directionFor(from, to) {
if (!from || !to) return 'unknown';
const delta = Number(to.value) - Number(from.value);
if (delta > 0) return 'rising';
if (delta < 0) return 'falling';
return 'flat';
}
function candidate(point, reason, from, to, options = {}) {
return {
point,
reason,
direction: directionFor(from, to || point),
slope: from && to ? relativeSlope(from, to, options) : 0,
previousPhenomenonTime: from?.phenomenonTime,
previousValue: from?.value,
};
}
class PointReducer { class PointReducer {
constructor(options = {}) { constructor(options = {}) {
this.options = { ...DEFAULTS, ...options }; this.options = { ...DEFAULTS, ...options };
@@ -47,27 +67,42 @@ class PointReducer {
if (!this.anchor) { if (!this.anchor) {
this.anchor = point; this.anchor = point;
this.previous = point; this.previous = point;
output.push({ point, reason: 'first' }); output.push(candidate(point, 'first', null, point, this.options));
return output; return output;
} }
const minDeltaTimeMs = Number(this.options.minDeltaTimeMs) || 0; const minDeltaTimeMs = Number(this.options.minDeltaTimeMs) || 0;
const minDeltaValue = Number(this.options.minDeltaValue) || 0; const minDeltaValue = Number(this.options.minDeltaValue) || 0;
const burstWindowMs = Number(this.options.burstWindowMs) || 0;
const dtFromPrevious = toMs(point) - toMs(this.previous); const dtFromPrevious = toMs(point) - toMs(this.previous);
const dvFromPrevious = Math.abs(Number(point.value) - Number(this.previous.value)); const dvFromPrevious = Math.abs(Number(point.value) - Number(this.previous.value));
if (dtFromPrevious < minDeltaTimeMs && dvFromPrevious < minDeltaValue) { if (dtFromPrevious < minDeltaTimeMs && dvFromPrevious < minDeltaValue) {
this.previous = point; this.previous = point;
return output; return output;
} }
// Sub-tick burst: when two samples arrive within burstWindowMs, treat them
// as the same wall-clock observation. Replace previous with the latest value
// but skip slope analysis (dt this small produces unreliable angle estimates
// that trip angle-change on every tick).
if (burstWindowMs > 0 && dtFromPrevious >= 0 && dtFromPrevious < burstWindowMs) {
this.previous = point;
return output;
}
const maxGapMs = Number(this.options.maxGapMs) || 0; const maxGapMs = Number(this.options.maxGapMs) || 0;
if (maxGapMs > 0 && this.previous !== this.anchor && toMs(point) - toMs(this.anchor) >= maxGapMs) { if (maxGapMs > 0 && this.previous !== this.anchor && toMs(point) - toMs(this.anchor) >= maxGapMs) {
output.push({ point: this.previous, reason: 'max-gap' }); output.push(candidate(this.previous, 'max-gap', this.anchor, this.previous, this.options));
this.anchor = this.previous; this.anchor = this.previous;
} }
if (this.previous !== this.anchor && this._changedDirection(point)) { if (this.previous !== this.anchor && this._changedDirection(point)) {
output.push({ point: this.previous, reason: this.options.comparisonMode === 'relative-slope' ? 'slope-change' : 'angle-change' }); output.push(candidate(
this.previous,
this.options.comparisonMode === 'relative-slope' ? 'slope-change' : 'angle-change',
this.anchor,
this.previous,
this.options,
));
this.anchor = this.previous; this.anchor = this.previous;
} }
@@ -78,8 +113,9 @@ class PointReducer {
flush(reason = 'flush') { flush(reason = 'flush') {
if (this.previous && this.previous !== this.anchor) { if (this.previous && this.previous !== this.anchor) {
const point = this.previous; const point = this.previous;
const kept = candidate(point, reason, this.anchor, point, this.options);
this.anchor = point; this.anchor = point;
return [{ point, reason }]; return [kept];
} }
return []; return [];
} }
@@ -103,4 +139,5 @@ module.exports = {
PointReducer, PointReducer,
angleDeg, angleDeg,
angleDiff, angleDiff,
directionFor,
}; };

View File

@@ -1,8 +1,12 @@
const assert = require('node:assert/strict'); const assert = require('node:assert/strict');
const fs = require('node:fs');
const path = require('node:path');
const { normalizeInput } = require('../../src/normalizer'); const { normalizeInput, normalizeOne } = require('../../src/normalizer');
const { PointReducer } = require('../../src/reducer'); const { PointReducer } = require('../../src/reducer');
const { CoreSyncDomain } = require('../../src/coreSyncDomain'); const { CoreSyncDomain } = require('../../src/coreSyncDomain');
const { interpolateAt, reconstructSeries } = require('../../src/interpolation');
const { createRequest } = require('../../src/frostRequests');
function telemetry(timestamp, value) { function telemetry(timestamp, value) {
return { return {
@@ -32,6 +36,44 @@ test('normalizer maps EVOLV dbase payloads to stable stream identities', () => {
assert.equal(points[0].unit, 'Pa'); assert.equal(points[0].unit, 'Pa');
}); });
test('normalizer infers rotatingMachine field units from field type', () => {
const points = normalizeOne({
measurement: 'rotatingmachine_cse_rm_pump',
fields: {
'pressure.measured.downstream.dashboard-sim-downstream': 1700,
},
tags: {
tagcode: 'P-101',
softwareType: 'rotatingmachine',
unit: 'm3/h',
},
source: {
softwareType: 'rotatingmachine',
unit: 'm3/h',
},
timestamp: '2026-05-19T10:15:30.000Z',
});
assert.equal(points[0].unit, 'mbar');
});
test('normalizer keeps measurement unit for measurement-node fields', () => {
const points = normalizeOne({
measurement: 'FT-101',
fields: {
'flow.measured.upstream.FT-101': 42,
},
tags: {
tagcode: 'P-101',
softwareType: 'measurement',
unit: 'm3/h',
},
timestamp: '2026-05-19T10:15:30.000Z',
});
assert.equal(points[0].unit, 'm3/h');
});
test('point reducer keeps first point and previous point on angle change', () => { test('point reducer keeps first point and previous point on angle change', () => {
const reducer = new PointReducer({ const reducer = new PointReducer({
angleToleranceDeg: 5, angleToleranceDeg: 5,
@@ -87,6 +129,7 @@ test('coresync emits metadata lookup first and drains observations after resolve
assert.equal(output[1][0].url, 'http://frost.example/FROST-Server/v1.1/Datastreams(5)/Observations'); assert.equal(output[1][0].url, 'http://frost.example/FROST-Server/v1.1/Datastreams(5)/Observations');
assert.equal(output[1][0].payload.FeatureOfInterest['@iot.id'], 4); assert.equal(output[1][0].payload.FeatureOfInterest['@iot.id'], 4);
assert.equal(output[1][0].payload.parameters.reductionReason, 'first'); assert.equal(output[1][0].payload.parameters.reductionReason, 'first');
assert.equal(output[1][0].payload.parameters.interpolationHint, 'linear-or-directional-monotone-cubic');
}); });
test('pending observation queue keeps first and latest unresolved points', () => { test('pending observation queue keeps first and latest unresolved points', () => {
@@ -103,3 +146,176 @@ test('pending observation queue keeps first and latest unresolved points', () =>
assert.equal(state.pendingObservations[0].point.value, 1); assert.equal(state.pendingObservations[0].point.value, 1);
assert.equal(state.pendingObservations[1].point.value, 3); assert.equal(state.pendingObservations[1].point.value, 3);
}); });
test('successful bodyless metadata create is followed by lookup', () => {
const hub = new CoreSyncDomain({
frostBaseUrl: 'http://frost.example/FROST-Server',
serviceVersion: 'v1.1',
});
hub.handleMessage(telemetry('2026-05-19T10:00:00.000Z', 12345));
const output = hub.handleMessage({
topic: 'frost.response',
statusCode: 201,
payload: null,
_coreSync: {
kind: 'thing',
action: 'create',
streamKey: 'P-1:pressure:measured:upstream:PT-1',
},
});
assert.equal(output[1].length, 1);
assert.equal(output[1][0].topic, 'frost.metadata.lookup');
assert.equal(output[1][0]._coreSync.kind, 'thing');
});
test('sensor create payload includes FROST-required description', () => {
const point = normalizeInput(telemetry('2026-05-19T10:15:30.000Z', 12345))[0];
const request = createRequest({ frostBaseUrl: 'http://frost.example/FROST-Server', serviceVersion: 'v1.1' }, point, 'sensor', {});
assert.equal(request.payload.name, 'PT-1');
assert.equal(request.payload.description, 'EVOLV sensor PT-1');
});
test('pressure reducer scale follows pressure unit', () => {
const paHub = new CoreSyncDomain({});
const mbarHub = new CoreSyncDomain({});
const barHub = new CoreSyncDomain({});
assert.equal(paHub._valueScaleFor({ type: 'pressure', unit: 'Pa' }), 100000);
assert.equal(mbarHub._valueScaleFor({ type: 'pressure', unit: 'mbar' }), 1000);
assert.equal(barHub._valueScaleFor({ type: 'pressure', unit: 'bar' }), 10);
});
test('reducer keeps only line-drawing knots for a dense directional series', () => {
const reducer = new PointReducer({
angleToleranceDeg: 2,
timeScaleMs: 1000,
valueScale: 1,
maxGapMs: 0,
});
const kept = [];
const start = Date.parse('2026-05-19T10:00:00.000Z');
for (let i = 0; i <= 60; i += 1) {
const value = i <= 30 ? i : 60 - i;
kept.push(...reducer.offer({
time: new Date(start + (i * 1000)),
phenomenonTime: new Date(start + (i * 1000)).toISOString(),
value,
}));
}
kept.push(...reducer.flush('flush'));
assert.equal(kept.length, 3);
assert.deepEqual(kept.map((x) => x.reason), ['first', 'angle-change', 'flush']);
assert.deepEqual(kept.map((x) => x.direction), ['unknown', 'rising', 'falling']);
assert.equal(kept[1].point.value, 30);
});
test('burstWindowMs collapses sub-tick sample bursts into a single observation', () => {
// Repro for the CoreSync FROST demo bug: rotatingMachine emits two telemetry
// samples ~1ms apart per pressure-injection cycle. Without a burst guard,
// dt=1ms with even tiny dy produces near-vertical slopes that trip
// angle-change on every tick → near-zero compression. With burstWindowMs > 0
// the burst is collapsed and only real direction changes emit knots.
const baseOptions = { angleToleranceDeg: 1, timeScaleMs: 1000, valueScale: 1, maxGapMs: 0 };
const start = Date.parse('2026-05-22T10:00:00.000Z');
// Three ticks of: (steady value, then 1ms later a tiny noisy nudge), 2s apart
// — exactly the rotatingMachine telemetry pattern from the FROST demo.
const pattern = [
{ dt: 0, v: 0.200 },
{ dt: 1, v: 0.203 },
{ dt: 2000, v: 0.197 },
{ dt: 2001, v: 0.200 },
{ dt: 4000, v: 0.203 },
{ dt: 4001, v: 0.197 },
];
const noGuard = new PointReducer(baseOptions);
const noGuardKnots = pattern.flatMap((p) => noGuard.offer({
time: new Date(start + p.dt),
phenomenonTime: new Date(start + p.dt).toISOString(),
value: p.v,
}));
// Without the guard, the spurious sub-ms slopes cause the reducer to emit a
// knot on nearly every burst sample → demonstrably broken compression.
assert.ok(noGuardKnots.length >= 4, `expected ≥4 spurious knots without guard, got ${noGuardKnots.length}`);
const guarded = new PointReducer({ ...baseOptions, burstWindowMs: 10 });
const guardedKnots = pattern.flatMap((p) => guarded.offer({
time: new Date(start + p.dt),
phenomenonTime: new Date(start + p.dt).toISOString(),
value: p.v,
}));
// With burstWindowMs=10, sub-ms bursts are collapsed: only the initial 'first'
// knot and at most one real direction-change knot survive.
assert.ok(guardedKnots.length <= 2, `expected ≤2 knots with guard, got ${guardedKnots.length}`);
assert.equal(guardedKnots[0].reason, 'first');
});
test('linear reconstruction restores omitted points exactly for piecewise-linear knots', () => {
const knots = [
{ phenomenonTime: '2026-05-19T10:00:00.000Z', result: 0, direction: 'unknown' },
{ phenomenonTime: '2026-05-19T10:00:30.000Z', result: 30, direction: 'rising' },
{ phenomenonTime: '2026-05-19T10:01:00.000Z', result: 0, direction: 'falling' },
];
assert.equal(interpolateAt(knots, '2026-05-19T10:00:15.000Z', { method: 'linear' }), 15);
assert.equal(interpolateAt(knots, '2026-05-19T10:00:45.000Z', { method: 'linear' }), 15);
const series = reconstructSeries(knots, { intervalMs: 15000, method: 'linear' });
assert.deepEqual(series.map((point) => point.value), [0, 15, 30, 15, 0]);
assert.equal(series[1].reconstructed, true);
assert.equal(series[2].reconstructed, false);
});
test('directional cubic reconstruction honors knot directions without overshoot', () => {
const knots = [
{ phenomenonTime: '2026-05-19T10:00:00.000Z', result: 0, direction: 'rising' },
{ phenomenonTime: '2026-05-19T10:00:30.000Z', result: 30, direction: 'flat' },
{ phenomenonTime: '2026-05-19T10:01:00.000Z', result: 0, direction: 'falling' },
];
const series = reconstructSeries(knots, { intervalMs: 5000, method: 'directional-cubic' });
for (const point of series) {
assert.equal(point.value >= -1e-9, true);
assert.equal(point.value <= 30 + 1e-9, true);
}
assert.equal(series[0].value, 0);
assert.equal(series[6].value, 30);
assert.equal(series[12].value, 0);
});
test('FROST/Influx/Grafana example flow contains the end-to-end CoreSync path', () => {
const flowPath = path.resolve(__dirname, '../../examples/frost-influx-grafana.flow.json');
const flow = JSON.parse(fs.readFileSync(flowPath, 'utf8'));
const byId = new Map(flow.map((node) => [node.id, node]));
assert.equal(flow.some((node) => node.type === 'measurement'), true);
assert.equal(flow.some((node) => node.type === 'rotatingMachine'), true);
assert.equal(flow.some((node) => node.type === 'coresync'), true);
const core = byId.get('cse_coresync');
assert.equal(core.frostBaseUrl, 'https://sta.wbd-rd.nl/FROST-Server/');
assert.equal(core.wires[1].includes('cse_fn_frost_auth'), true);
assert.equal(core.wires[1].includes('cse_fn_knot_influx'), true);
const response = byId.get('cse_fn_frost_response');
assert.equal(response.wires[0].includes('cse_coresync'), true);
assert.equal(flow.some((node) => node.id === 'cse_fn_raw_influx'), true);
assert.equal(flow.some((node) => node.id === 'cse_http_influx_knot'), true);
});
test('CoreSync Grafana dashboard provisioning JSON is valid', () => {
const dashboardPath = path.resolve(__dirname, '../../../../docker/grafana/provisioning/dashboards/coresync-frost-demo.json');
const dashboard = JSON.parse(fs.readFileSync(dashboardPath, 'utf8'));
assert.equal(dashboard.uid, 'coresync-frost-demo');
assert.equal(dashboard.panels.length >= 3, true);
assert.equal(JSON.stringify(dashboard).includes('coresync_knots'), true);
});