Files
EVOLV/examples/pumpingstation-complete-example/build_flow.py
Rene De Ren aec90cc8e7
Some checks failed
CI / lint-and-test (push) Has been cancelled
fix: stopLevel hysteresis works — bump rotatingMachine + MGC
Pump-shutdown deadlock fix split across two submodules:

- rotatingMachine@8f9150e: shutdown sequence clears state.delayedMove
  so the abort-and-return-to-operational path doesn't auto-pickup the
  queued setpoint and re-engage the pump.
- machineGroupControl@ea2857f: turnOffAllMachines clears MGC's
  _delayedCall and serializes per-pump shutdown so PS's 2 s tick loop
  can't interrupt an in-flight shutdown.

Live verification on pumpingstation-complete-example demo: basin now
shuts pumps off at stopLevel cleanly, reverses to fill, completes the
hysteresis cycle.

Also disable the trends page in the demo flow (build_flow.py + regen
flow.json). FlowFuse ui-chart's per-series server-side history buffer
(7 charts × ~20 series × 3600-point retention) was saturating the
Node-RED event loop at 129% CPU, making the dashboard freeze on every
click. Trends remain available — just disabled by default; flip the
ui_page_trends "d" key to false to re-enable.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-09 18:18:11 +02:00

1911 lines
79 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Generate the multi-tab Node-RED flow for the
'pumpingstation-complete-example' end-to-end demo.
Stack
-----
- 1 pumpingStation (basin model, levelbased control)
- 1 machineGroupControl (orchestrates the 3 pumps)
- 3 rotatingMachine pumps
- 12 measurement nodes (4 per pump: upstream P, downstream P, flow, power)
- All EVOLV node port-1 telemetry routed to InfluxDB via http request
- FlowFuse dashboard (realtime + 1h trends)
- Grafana dashboard (realtime gauges + historic graphs)
Tabs
----
Tab 1 Process Plant EVOLV nodes only — pumps, MGC, PS, measurements,
per-node output formatters and per-pump physics
feeders that drive the measurement nodes from live
plant state.
Tab 2 Dashboard UI only ui-* widgets. No business logic.
Tab 3 Demo Drivers inflow generator (Constant / Sine / Diurnal / Storm
scenarios chosen by buttons; baseline set by slider).
Tab 4 Setup & Init one-shot deploy-time injects (MGC scaling/mode,
pumps mode = auto).
Tab 5 Telemetry collects port-1 InfluxDB payloads from every EVOLV
node, converts to line protocol, POSTs to InfluxDB.
Cross-tab wiring is via NAMED link-out / link-in pairs only.
To regenerate:
python3 build_flow.py > flow.json
"""
import json
import sys
# ---------------------------------------------------------------------------
# Tab IDs
# ---------------------------------------------------------------------------
TAB_PROCESS = "tab_process"
TAB_UI = "tab_ui"
TAB_DRIVERS = "tab_drivers"
TAB_SETUP = "tab_setup"
TAB_TLM = "tab_telemetry"
# ---------------------------------------------------------------------------
# Spacing constants
# ---------------------------------------------------------------------------
LANE_X = [120, 380, 640, 900, 1160, 1420]
ROW = 80
SECTION_GAP = 220
POSITION_ICON = {
"upstream": "",
"downstream": "",
"atEquipment": "",
}
# ---------------------------------------------------------------------------
# Cross-tab link channels — the wiring contract
# ---------------------------------------------------------------------------
CH_INFLOW_BASELINE = "cmd:inflow-baseline" # m³/h baseline (slider)
CH_INFLOW_SCENARIO = "cmd:inflow-scenario" # 'constant' | 'sine' | 'diurnal' | 'storm'
CH_QIN = "cmd:q_in" # m³/s, generator → PS
CH_QD = "cmd:Qd" # m³/h, slider → PS (manual mode only)
CH_PS_MODE = "cmd:ps-mode" # 'levelbased' | 'manual'
CH_STATION_START = "cmd:station-startup"
CH_STATION_STOP = "cmd:station-shutdown"
CH_STATION_ESTOP = "cmd:station-estop"
CH_PUMP_SETPOINT = {"pump_a": "cmd:setpoint-A",
"pump_b": "cmd:setpoint-B",
"pump_c": "cmd:setpoint-C"}
CH_PUMP_SEQUENCE = {"pump_a": "cmd:pump-A-seq",
"pump_b": "cmd:pump-B-seq",
"pump_c": "cmd:pump-C-seq"}
CH_PUMP_EVT = {"pump_a": "evt:pump-A",
"pump_b": "evt:pump-B",
"pump_c": "evt:pump-C"}
CH_MGC_EVT = "evt:mgc"
CH_PS_EVT = "evt:ps"
CH_INFLOW_EVT = "evt:inflow"
CH_TLM = "evt:tlm"
PUMPS = ["pump_a", "pump_b", "pump_c"]
PUMP_LABELS = {"pump_a": "Pump A", "pump_b": "Pump B", "pump_c": "Pump C"}
MGC_ID = "mgc_pumps"
PS_ID = "ps_basin"
# Basin geometry — single source of truth.
# Realistic wet-well wastewater pumping station — pumps are oversized
# ~5× nominal inflow for storm tolerance. Sized so:
# - nominal inflow ~25 m³/h refills the dead-band [stopLvl, startLvl]
# (~6.25 m³) in ~15 min while pumps are off
# - one pump at minimum stable flow (~99 m³/h) drains the same band in
# ~5 min once engaged via the stopLevel Schmitt trigger
# - storm inflow ~250 m³/h pushes percControl up the ramp until all 3
# pumps are engaged at high flow (combined max ≈ 681 m³/h)
# surfaceArea = 50 / 4 = 12.5 m²; band volume = 12.5 × 0.5 = 6.25 m³
BASIN_VOLUME = 50.0
BASIN_HEIGHT = 4.0
OUTFLOW_LEVEL = 0.3
OVERFLOW_LEVEL = 3.8
# ---------------------------------------------------------------------------
# Generic node-builder helpers
# ---------------------------------------------------------------------------
def comment(node_id, tab, x, y, name, info=""):
return {"id": node_id, "type": "comment", "z": tab, "name": name,
"info": info, "x": x, "y": y, "wires": []}
def inject(node_id, tab, x, y, name, topic, payload, payload_type="str",
once=False, repeat="", once_delay="0.5", wires=None):
return {
"id": node_id, "type": "inject", "z": tab, "name": name,
"props": [
{"p": "topic", "vt": "str"},
{"p": "payload", "v": str(payload), "vt": payload_type},
],
"topic": topic, "payload": str(payload), "payloadType": payload_type,
"repeat": repeat, "crontab": "",
"once": once, "onceDelay": once_delay,
"x": x, "y": y, "wires": [wires or []],
}
def function_node(node_id, tab, x, y, name, code, outputs=1, wires=None):
return {
"id": node_id, "type": "function", "z": tab, "name": name,
"func": code, "outputs": outputs,
"noerr": 0, "initialize": "", "finalize": "", "libs": [],
"x": x, "y": y,
"wires": wires if wires is not None else [[] for _ in range(outputs)],
}
def link_out(node_id, tab, x, y, channel_name, target_in_ids):
return {
"id": node_id, "type": "link out", "z": tab, "name": channel_name,
"mode": "link", "links": list(target_in_ids),
"x": x, "y": y, "wires": [],
}
def link_in(node_id, tab, x, y, channel_name, source_out_ids, downstream):
return {
"id": node_id, "type": "link in", "z": tab, "name": channel_name,
"links": list(source_out_ids),
"x": x, "y": y, "wires": [downstream or []],
}
def debug_node(node_id, tab, x, y, name, target="payload",
target_type="msg", active=False):
return {
"id": node_id, "type": "debug", "z": tab, "name": name,
"active": active, "tosidebar": True, "console": False, "tostatus": False,
"complete": target, "targetType": target_type,
"x": x, "y": y, "wires": [],
}
# ---------------------------------------------------------------------------
# Dashboard scaffolding
# ---------------------------------------------------------------------------
def dashboard_scaffold():
base = {
"id": "ui_base", "type": "ui-base", "name": "EVOLV Pumping",
"path": "/dashboard", "appIcon": "",
"includeClientData": True,
"acceptsClientConfig": ["ui-notification", "ui-control"],
"showPathInSidebar": True, "headerContent": "page",
"navigationStyle": "default", "titleBarStyle": "default",
}
theme = {
"id": "ui_theme", "type": "ui-theme", "name": "EVOLV Theme",
"colors": {
"surface": "#ffffff", "primary": "#0c99d9",
"bgPage": "#f4f6fa", "groupBg": "#ffffff",
"groupOutline": "#cccccc",
},
"sizes": {
"density": "default", "pagePadding": "12px",
"groupGap": "12px", "groupBorderRadius": "6px",
"widgetGap": "8px",
},
}
page_realtime = {
"id": "ui_page_realtime", "type": "ui-page",
"name": "Realtime", "ui": "ui_base",
"path": "/realtime", "icon": "speed",
"layout": "grid", "theme": "ui_theme",
"breakpoints": [{"name": "Default", "px": "0", "cols": "12"}],
"order": 1, "className": "",
}
page_trends = {
"id": "ui_page_trends", "type": "ui-page",
"name": "Trends — 1 hour", "ui": "ui_base",
"path": "/trends", "icon": "show_chart",
"layout": "grid", "theme": "ui_theme",
"breakpoints": [{"name": "Default", "px": "0", "cols": "12"}],
"order": 2, "className": "",
"d": True,
}
return [base, theme, page_realtime, page_trends]
def ui_group(group_id, name, page_id, width=6, order=1):
return {
"id": group_id, "type": "ui-group", "name": name, "page": page_id,
"width": str(width), "height": "1", "order": order,
"showTitle": True, "className": "", "groupType": "default",
"disabled": False, "visible": True,
}
def ui_text(node_id, tab, x, y, group, name, label, fmt, layout="row-spread"):
return {
"id": node_id, "type": "ui-text", "z": tab, "group": group,
"order": 1, "width": "0", "height": "0", "name": name, "label": label,
"format": fmt, "layout": layout, "style": False, "font": "",
"fontSize": 14, "color": "#000000",
"x": x, "y": y, "wires": [],
}
def ui_button(node_id, tab, x, y, group, name, label, payload, payload_type,
topic, color="#0c99d9", icon="play_arrow", wires=None):
return {
"id": node_id, "type": "ui-button", "z": tab, "group": group,
"name": name, "label": label, "order": 1, "width": "0", "height": "0",
"tooltip": "", "color": "#ffffff", "bgcolor": color,
"className": "", "icon": icon, "iconPosition": "left",
"payload": payload, "payloadType": payload_type,
"topic": topic, "topicType": "str", "buttonType": "default",
"x": x, "y": y, "wires": [wires or []],
}
def ui_slider(node_id, tab, x, y, group, name, label, mn, mx, step=1.0,
topic="", wires=None):
return {
"id": node_id, "type": "ui-slider", "z": tab, "group": group,
"name": name, "label": label, "tooltip": "", "order": 1,
"width": "0", "height": "0", "passthru": True, "outs": "end",
"topic": topic, "topicType": "str",
"min": str(mn), "max": str(mx), "step": str(step),
"showLabel": True, "showValue": True, "labelPosition": "top",
"valuePosition": "left", "thumbLabel": False,
"iconStart": "", "iconEnd": "",
"x": x, "y": y, "wires": [wires or []],
}
def ui_switch(node_id, tab, x, y, group, name, label, on_value, off_value,
topic, wires=None):
return {
"id": node_id, "type": "ui-switch", "z": tab, "group": group,
"name": name, "label": label, "tooltip": "", "order": 1,
"width": "0", "height": "0", "passthru": True, "decouple": "false",
"topic": topic, "topicType": "str",
"style": "", "className": "", "evaluate": "true",
"onvalue": on_value, "onvalueType": "str",
"onicon": "auto_mode", "oncolor": "#0c99d9",
"offvalue": off_value, "offvalueType": "str",
"officon": "back_hand", "offcolor": "#888888",
"x": x, "y": y, "wires": [wires or []],
}
def ui_chart(node_id, tab, x, y, group, name, label,
width=12, height=6,
remove_older="60", remove_older_unit="60",
remove_older_points="1800",
y_axis_label="", ymin=None, ymax=None, order=1,
interpolation="linear"):
"""FlowFuse ui-chart — full required field set per node-red-flow-layout.md."""
return {
"id": node_id, "type": "ui-chart", "z": tab, "group": group,
"name": name, "label": label, "order": order,
"chartType": "line",
"interpolation": interpolation,
"category": "topic", "categoryType": "msg",
"xAxisLabel": "", "xAxisType": "time",
"xAxisProperty": "", "xAxisPropertyType": "timestamp",
"xAxisFormat": "", "xAxisFormatType": "auto",
"xmin": "", "xmax": "",
"yAxisLabel": y_axis_label,
"yAxisProperty": "payload", "yAxisPropertyType": "msg",
"ymin": "" if ymin is None else str(ymin),
"ymax": "" if ymax is None else str(ymax),
"removeOlder": str(remove_older),
"removeOlderUnit": str(remove_older_unit),
"removeOlderPoints": str(remove_older_points),
"action": "append",
"stackSeries": False,
"pointShape": "circle", "pointRadius": 4,
"showLegend": True,
"bins": 10,
"colors": [
"#0095FF", "#FF0000", "#FF7F0E", "#2CA02C",
"#A347E1", "#D62728", "#FF9896", "#9467BD", "#C5B0D5",
],
"textColor": ["#666666"], "textColorDefault": True,
"gridColor": ["#e5e5e5"], "gridColorDefault": True,
"width": int(width), "height": int(height), "className": "",
"x": x, "y": y, "wires": [[]],
}
def ui_gauge(node_id, tab, x, y, group, name, title, units, mn, mx,
segments, gtype="gauge-34", suffix="", icon="",
width=3, height=3, order=1):
return {
"id": node_id, "type": "ui-gauge", "z": tab, "group": group,
"name": name, "gtype": gtype, "gstyle": "Rounded",
"title": title, "units": units, "prefix": "", "suffix": suffix,
"min": mn, "max": mx, "segments": segments,
"width": width, "height": height, "order": order,
"icon": icon, "sizeGauge": 20, "sizeGap": 2, "sizeSegments": 10,
"x": x, "y": y, "wires": [],
}
# ---------------------------------------------------------------------------
# Tab 1 — PROCESS PLANT
# ---------------------------------------------------------------------------
def build_process_tab():
nodes = []
nodes.append({
"id": TAB_PROCESS, "type": "tab",
"label": "🏭 Process Plant",
"disabled": False,
"info": (
"EVOLV plant model: 3 rotatingMachines (each with 4 measurement "
"nodes — upstream P, downstream P, flow, power), MGC, PS.\n\n"
"Per pump there is a 'physics' function node that consumes the "
"pump's own port-0 stream PLUS PS port-0 (basin level) and "
"drives all 4 measurement nodes with physically-coupled values "
"(upstream P from basin head; downstream P from pump state + "
"flow; flow/power mirror predicted with Gaussian noise). This "
"lives on this tab so the plant model is self-contained.\n\n"
"All cross-tab wires use named link-in / link-out channels."
),
})
nodes.append(comment("c_process_title", TAB_PROCESS, LANE_X[2], 20,
"🏭 PROCESS PLANT — EVOLV nodes + per-pump physics feeders",
""))
# ---------------- Per-pump rows ----------------
for i, pump in enumerate(PUMPS):
label = PUMP_LABELS[pump]
y_section = 80 + i * (SECTION_GAP + 60)
nodes.append(comment(f"c_{pump}", TAB_PROCESS, LANE_X[2], y_section,
f"── {label} ── (pump + 4 sensors + physics feeder)",
"Up/Dn pressure + flow + power sensors register as children of "
"the pump. The physics_<pump> function takes the pump's own "
"port-0 stream and PS port-0 (basin level) and drives all 4 "
"sensors with physically-coupled values."
))
# ---- 4 measurement nodes (driven via msg.topic='measurement') ----
SENSORS = [
("u", "Up", "upstream", "mbar",
"pressure", "vega", "vega-pressure-10"),
("d", "Dn", "downstream", "mbar",
"pressure", "vega", "vega-pressure-10"),
("f", "Flow", "downstream", "m3/h",
"flow", "endress", "endress-promag-50"),
("p", "Pwr", "atEquipment","kW",
"power", "siemens", "siemens-sentron-pac4200"),
]
for j, (suffix, lbl, pos, unit, asset_type, supplier, model) in enumerate(SENSORS):
mid = f"meas_{pump}_{suffix}"
mid_label = f"{label.split()[1]}-{lbl}"
if asset_type == "pressure":
o_min, o_max = 0, 4000
elif asset_type == "flow":
o_min, o_max = 0, 250
else: # power
o_min, o_max = 0, 30
nodes.append({
"id": mid, "type": "measurement", "z": TAB_PROCESS,
"name": mid_label,
"mode": "analog", "channels": "[]",
"scaling": False,
"i_min": 0, "i_max": 1, "i_offset": 0,
"o_min": o_min, "o_max": o_max,
"simulator": False,
"smooth_method": "mean", "count": "3",
"processOutputFormat": "process",
"dbaseOutputFormat": "influxdb",
"uuid": f"sensor-{pump}-{suffix}",
"supplier": supplier, "category": "sensor",
"assetType": asset_type, "model": model,
"unit": unit,
"assetTagNumber": f"{label.split()[1]}-{suffix.upper()}",
"enableLog": False, "logLevel": "warn",
"tickIntervalMs": 2000,
"positionVsParent": pos,
"positionIcon": POSITION_ICON.get(pos, ""),
"hasDistance": False, "distance": 0, "distanceUnit": "m",
"distanceDescription": "",
"x": LANE_X[1], "y": y_section + 40 + j * 35,
# Port 0 unused, port 1 → telemetry, port 2 → pump (registerChild)
"wires": [[], [f"lout_tlm_{mid}"], [pump]],
})
nodes.append(link_out(
f"lout_tlm_{mid}", TAB_PROCESS, LANE_X[1] + 200,
y_section + 40 + j * 35,
CH_TLM, target_in_ids=["lin_tlm"],
))
# ---- The pump itself ----
nodes.append({
"id": pump, "type": "rotatingMachine", "z": TAB_PROCESS,
"name": label,
# speed (movement units/s). The state machine doesn't auto-
# return to 'operational' after a routine abort (avoids a
# bounce loop), so any setpoint that arrives while still
# accelerating gets deferred via delayedMove. With MGC
# retargeting every PS tick (2 s) and a 0..100 position
# range, speed must be high enough that the movement
# finishes inside one tick — otherwise the FSM gets parked
# in 'accelerating' and the badge stops advancing. 200 u/s
# gives a worst-case 0..100 traversal of 0.5 s, well inside
# the 2 s window.
"speed": "200",
"startup": "2", "warmup": "1", "shutdown": "2", "cooldown": "1",
"movementMode": "staticspeed",
"machineCurve": "",
"uuid": f"pump-{pump}",
"supplier": "hidrostal", "category": "pump",
"assetType": "pump-centrifugal",
"model": "hidrostal-H05K-S03R",
"unit": "m3/h",
"curvePressureUnit": "mbar", "curveFlowUnit": "m3/h",
"curvePowerUnit": "kW", "curveControlUnit": "%",
"enableLog": False, "logLevel": "warn",
"tickIntervalMs": 2000,
"positionVsParent": "atEquipment",
"positionIcon": POSITION_ICON["atEquipment"],
"hasDistance": False, "distance": 0, "distanceUnit": "m",
"distanceDescription": "",
"x": LANE_X[3], "y": y_section + 90,
"wires": [
[f"format_{pump}", f"physics_{pump}"],
[f"lout_tlm_{pump}"],
[MGC_ID],
],
})
nodes.append(link_out(
f"lout_tlm_{pump}", TAB_PROCESS, LANE_X[3], y_section + 130,
CH_TLM, target_in_ids=["lin_tlm"],
))
# ---- Per-pump output formatter (for dashboard) ----
nodes.append(function_node(
f"format_{pump}", TAB_PROCESS, LANE_X[4], y_section + 90,
f"format {label} port 0",
"const p = msg.payload || {};\n"
"const c = context.get('c') || {};\n"
"Object.assign(c, p);\n"
"context.set('c', c);\n"
"// Throttle dashboard fan-out to ≤ 2 Hz. The pump emits on\n"
"// every state change (multiple per sec while cycling); the\n"
"// dashboard doesn't need that resolution and the websocket\n"
"// fan-out chokes the browser.\n"
"const now = Date.now();\n"
"const last = context.get('_lastEmit') || 0;\n"
"if (now - last < 1000) return null;\n"
"context.set('_lastEmit', now);\n"
"function find(prefix) {\n"
" for (const k in c) { if (k.indexOf(prefix) === 0) return c[k]; }\n"
" return null;\n"
"}\n"
"const flow = find('flow.predicted.downstream.');\n"
"const power = find('power.predicted.atequipment.');\n"
"const ctrl = find('ctrl.predicted.atequipment.');\n"
"const pUp = find('pressure.measured.upstream.');\n"
"const pDn = find('pressure.measured.downstream.');\n"
"msg.payload = {\n"
" state: c.state || 'idle',\n"
" mode: c.mode || 'auto',\n"
" ctrl: ctrl != null ? Number(ctrl ).toFixed(1) + '%' : 'n/a',\n"
" flow: flow != null ? Number(flow ).toFixed(1) + ' m³/h' : 'n/a',\n"
" power: power != null ? Number(power).toFixed(2) + ' kW' : 'n/a',\n"
" pUp: pUp != null ? Number(pUp ).toFixed(0) + ' mbar' : 'n/a',\n"
" pDn: pDn != null ? Number(pDn ).toFixed(0) + ' mbar' : 'n/a',\n"
" ctrlNum: ctrl != null ? Number(ctrl ) : null,\n"
" flowNum: flow != null ? Number(flow ) : null,\n"
" powerNum: power != null ? Number(power) : null,\n"
" pUpNum: pUp != null ? Number(pUp ) : null,\n"
" pDnNum: pDn != null ? Number(pDn ) : null,\n"
" // Pump is moving water any time it's between startup and shutdown, not\n"
" // just during steady operational. accelerate/decelerate/warmup count.\n"
" isRunning: ['operational','starting','warmingup','accelerating','decelerating','stopping'].includes(c.state),\n"
"};\n"
"return msg;",
outputs=1, wires=[[f"lout_evt_{pump}"]],
))
nodes.append(link_out(
f"lout_evt_{pump}", TAB_PROCESS, LANE_X[5], y_section + 90,
CH_PUMP_EVT[pump],
target_in_ids=[f"lin_evt_{pump}_dash"],
))
# ---- Physics feeder ----
nodes.append(function_node(
f"physics_{pump}", TAB_PROCESS, LANE_X[4], y_section + 160,
f"physics {label} → 4 sensors",
_physics_code(pump.split("_", 1)[1]),
outputs=4,
wires=[
[f"meas_{pump}_u"],
[f"meas_{pump}_d"],
[f"meas_{pump}_f"],
[f"meas_{pump}_p"],
],
))
# ---- Setpoint slider link-in ----
nodes.append(link_in(
f"lin_setpoint_{pump}", TAB_PROCESS, LANE_X[0], y_section + 60,
CH_PUMP_SETPOINT[pump],
source_out_ids=[f"lout_setpoint_{pump}_dash"],
downstream=[f"build_setpoint_{pump}"],
))
nodes.append(function_node(
f"build_setpoint_{pump}", TAB_PROCESS,
LANE_X[1] + 220, y_section + 60,
f"build setpoint cmd ({label})",
"msg.topic = 'execMovement';\n"
"msg.payload = { source: 'GUI', action: 'execMovement', "
"setpoint: Number(msg.payload) };\n"
"return msg;",
outputs=1, wires=[[pump]],
))
# ---- Per-pump start/stop link-in ----
nodes.append(link_in(
f"lin_seq_{pump}", TAB_PROCESS, LANE_X[0], y_section + 110,
CH_PUMP_SEQUENCE[pump],
source_out_ids=[f"lout_seq_{pump}_dash"],
downstream=[pump],
))
# ---------------- MGC ----------------
y_mgc = 80 + len(PUMPS) * (SECTION_GAP + 60)
nodes.append(comment("c_mgc", TAB_PROCESS, LANE_X[2], y_mgc,
"── MGC ── (orchestrates the 3 pumps via optimalcontrol)",
""))
nodes.append({
"id": MGC_ID, "type": "machineGroupControl", "z": TAB_PROCESS,
"name": "MGC — Pump Group",
"uuid": "mgc-pump-group",
"category": "controller",
"assetType": "machinegroupcontrol",
"model": "default", "unit": "m3/h", "supplier": "evolv",
"enableLog": True, "logLevel": "debug",
"tickIntervalMs": 2000,
"positionVsParent": "atEquipment",
"positionIcon": POSITION_ICON["atEquipment"],
"hasDistance": False, "distance": 0, "distanceUnit": "m",
"distanceDescription": "",
"processOutputFormat": "process", "dbaseOutputFormat": "influxdb",
"x": LANE_X[3], "y": y_mgc + 80,
"wires": [
["format_mgc"],
["lout_tlm_mgc"],
[PS_ID],
],
})
nodes.append(link_out(
"lout_tlm_mgc", TAB_PROCESS, LANE_X[3], y_mgc + 120,
CH_TLM, target_in_ids=["lin_tlm"],
))
nodes.append(function_node(
"format_mgc", TAB_PROCESS, LANE_X[4], y_mgc + 80,
"format MGC port 0",
"const p = msg.payload || {};\n"
"const c = context.get('c') || {};\n"
"Object.assign(c, p);\n"
"context.set('c', c);\n"
"// Throttle: MGC fires on every distribution change.\n"
"const now = Date.now();\n"
"const last = context.get('_lastEmit') || 0;\n"
"if (now - last < 1000) return null;\n"
"context.set('_lastEmit', now);\n"
"function find(prefix) {\n"
" for (const k in c) { if (k.indexOf(prefix) === 0) return c[k]; }\n"
" return null;\n"
"}\n"
"const totalFlow = find('flow.predicted.atequipment.') ?? "
"find('downstream_predicted_flow');\n"
"const totalPower = find('power.predicted.atequipment.') ?? "
"find('atEquipment_predicted_power');\n"
"const eff = find('efficiency.predicted.atequipment.');\n"
"msg.payload = {\n"
" totalFlow: totalFlow != null ? Number(totalFlow ).toFixed(1) + ' m³/h' : 'n/a',\n"
" totalPower: totalPower != null ? Number(totalPower).toFixed(2) + ' kW' : 'n/a',\n"
" efficiency: eff != null ? Number(eff).toFixed(3) : 'n/a',\n"
" totalFlowNum: totalFlow != null ? Number(totalFlow ) : null,\n"
" totalPowerNum: totalPower != null ? Number(totalPower) : null,\n"
" efficiencyNum: eff != null ? Number(eff) : null,\n"
"};\n"
"return msg;",
outputs=1, wires=[["lout_evt_mgc"]],
))
nodes.append(link_out(
"lout_evt_mgc", TAB_PROCESS, LANE_X[5], y_mgc + 80,
CH_MGC_EVT, target_in_ids=["lin_evt_mgc_dash"],
))
# ---------------- PS ----------------
y_ps = y_mgc + SECTION_GAP + 60
nodes.append(comment("c_ps", TAB_PROCESS, LANE_X[2], y_ps,
"── Pumping Station ── (basin model, levelbased control)", ""))
nodes.append(link_in(
"lin_qin_at_ps", TAB_PROCESS, LANE_X[0], y_ps + 40,
CH_QIN, source_out_ids=["lout_qin_drivers"],
downstream=[PS_ID],
))
nodes.append(link_in(
"lin_qd_at_ps", TAB_PROCESS, LANE_X[0], y_ps + 80,
CH_QD, source_out_ids=["lout_qd_dash"],
downstream=["qd_to_ps_wrap"],
))
nodes.append(function_node(
"qd_to_ps_wrap", TAB_PROCESS, LANE_X[1], y_ps + 80,
"wrap slider → PS Qd",
"msg.topic = 'Qd';\n"
"return msg;",
outputs=1, wires=[[PS_ID]],
))
nodes.append(link_in(
"lin_ps_mode_at_ps", TAB_PROCESS, LANE_X[0], y_ps + 120,
CH_PS_MODE, source_out_ids=["lout_ps_mode_dash"],
downstream=[PS_ID],
))
nodes.append({
"id": PS_ID, "type": "pumpingStation", "z": TAB_PROCESS,
"name": "Pumping Station",
"uuid": "ps-basin-1",
"category": "station", "assetType": "pumpingstation",
"model": "default", "unit": "m3/s", "supplier": "evolv",
"enableLog": False, "logLevel": "warn",
"tickIntervalMs": 2000,
"positionVsParent": "atEquipment",
"positionIcon": POSITION_ICON["atEquipment"],
"hasDistance": False, "distance": 0, "distanceUnit": "m",
"distanceDescription": "",
"processOutputFormat": "process", "dbaseOutputFormat": "influxdb",
"controlMode": "levelbased",
"basinVolume": BASIN_VOLUME,
"basinHeight": BASIN_HEIGHT,
# inflowLevel = top of inlet pipe (geometry) AND foot of the
# demand ramp (control). Setting it equal to maxLevel collapses
# the ramp to a step function — the runtime cycles 0/100 % every
# tick AND the editor's level-mode preview hides the diagonal
# line (mode-preview.js refuses to draw a degenerate ramp).
"inflowLevel": 2.5,
"outflowLevel": OUTFLOW_LEVEL,
"overflowLevel": OVERFLOW_LEVEL,
"inletPipeDiameter": 0.3,
"outletPipeDiameter": 0.3,
"minLevel": 0.5,
# startLevel — ramp foot AND rising-edge engage point. Demand
# scales 0..100 % over [startLevel, maxLevel].
"startLevel": 2.5,
# stopLevel — falling-edge disengage point. While engaged AND
# level < startLevel (basin draining through the dead band), PS
# emits the keep-alive percControl below so MGC keeps a single
# pump running until level reaches stopLevel.
"stopLevel": 2.0,
# deadZoneKeepAlivePercent — % sent to MGC while engaged in the
# dead band [stopLevel, startLevel). Mapped by MGC's normalized
# scaling to flow.min — i.e., a single pump at minimum stable
# speed. 1 % is small enough to round to flow.min.
"deadZoneKeepAlivePercent": 1,
"maxLevel": 3.5,
"refHeight": "NAP",
"minHeightBasedOn": "outlet",
"basinBottomRef": 0,
"staticHead": 12,
"maxDischargeHead": 24,
"pipelineLength": 80,
"defaultFluid": "wastewater",
"temperatureReferenceDegC": 15,
"maxInflowRate": 200,
"enableDryRunProtection": True,
"enableOverfillProtection": True,
"dryRunThresholdPercent": 5,
"overfillThresholdPercent": 95,
"timeleftToFullOrEmptyThresholdSeconds": 0,
"x": LANE_X[3], "y": y_ps + 80,
"wires": [
["format_ps", "ps_to_physics"],
["lout_tlm_ps"],
],
})
nodes.append(link_out(
"lout_tlm_ps", TAB_PROCESS, LANE_X[3], y_ps + 120,
CH_TLM, target_in_ids=["lin_tlm"],
))
nodes.append(function_node(
"ps_to_physics", TAB_PROCESS, LANE_X[4], y_ps + 130,
"ps → fan basin level to 3 physics feeders",
"const out = { from: 'ps', payload: msg.payload };\n"
"return [out, out, out];",
outputs=3,
wires=[["physics_pump_a"], ["physics_pump_b"], ["physics_pump_c"]],
))
nodes.append(function_node(
"format_ps", TAB_PROCESS, LANE_X[4], y_ps + 80,
"format PS port 0",
"const p = msg.payload || {};\n"
"const c = context.get('c') || {};\n"
"Object.assign(c, p);\n"
"context.set('c', c);\n"
"// Throttle: PS emits frequently in levelbased mode.\n"
"const now = Date.now();\n"
"const last = context.get('_lastEmit') || 0;\n"
"if (now - last < 1000) return null;\n"
"context.set('_lastEmit', now);\n"
"function find(prefix) {\n"
" for (const k in c) { if (k.indexOf(prefix) === 0) return c[k]; }\n"
" return null;\n"
"}\n"
f"const MAX_VOL = {BASIN_VOLUME};\n"
"const lvl = find('level.predicted.');\n"
"const vol = find('volume.predicted.');\n"
"const qIn = find('flow.predicted.in.');\n"
"const qOut = find('flow.predicted.out.');\n"
"const netFlowRate = find('netFlowRate.predicted.');\n"
"const fillPct = vol != null\n"
" ? Math.min(100, Math.max(0, Math.round(Number(vol) / MAX_VOL * 100)))\n"
" : null;\n"
"const netM3h = netFlowRate != null ? Number(netFlowRate) * 3600 : null;\n"
"const seconds = (c.timeleft != null && Number.isFinite(Number(c.timeleft)))\n"
" ? Number(c.timeleft) : null;\n"
"const timeStr = seconds != null\n"
" ? (seconds > 60 ? Math.round(seconds/60) + ' min'\n"
" : Math.round(seconds) + ' s')\n"
" : 'n/a';\n"
"msg.payload = {\n"
" direction: c.direction || 'steady',\n"
" level: lvl != null ? Number(lvl).toFixed(2) + ' m' : 'n/a',\n"
" volume: vol != null ? Number(vol).toFixed(1) + '' : 'n/a',\n"
" fillPct: fillPct != null ? fillPct + '%' : 'n/a',\n"
" netFlow: netM3h != null ? netM3h.toFixed(0) + ' m³/h' : 'n/a',\n"
" timeLeft: timeStr,\n"
" qIn: qIn != null ? (Number(qIn ) * 3600).toFixed(0) + ' m³/h' : 'n/a',\n"
" qOut: qOut != null ? (Number(qOut) * 3600).toFixed(0) + ' m³/h' : 'n/a',\n"
" levelNum: lvl != null ? Number(lvl) : null,\n"
" volumeNum: vol != null ? Number(vol) : null,\n"
" fillPctNum: fillPct,\n"
" netFlowNum: netM3h,\n"
" percControl: c.percControl != null ? Number(c.percControl) : null,\n"
" qInNum: qIn != null ? Number(qIn ) * 3600 : null,\n"
" qOutNum: qOut != null ? Number(qOut) * 3600 : null,\n"
" safetyState: c.safetyState || 'normal',\n"
"};\n"
"return msg;",
outputs=1, wires=[["lout_evt_ps"]],
))
nodes.append(link_out(
"lout_evt_ps", TAB_PROCESS, LANE_X[5], y_ps + 80,
CH_PS_EVT, target_in_ids=["lin_evt_ps_dash"],
))
# ---------------- Mode broadcast ----------------
y_mode = y_ps + SECTION_GAP
nodes.append(comment("c_mode_bcast", TAB_PROCESS, LANE_X[2], y_mode,
"── Mode broadcast ──", ""))
nodes.append(link_in(
"lin_mode", TAB_PROCESS, LANE_X[0], y_mode + 60,
"cmd:mode",
source_out_ids=["lout_mode_setup"],
downstream=["fanout_mode"],
))
nodes.append(function_node(
"fanout_mode", TAB_PROCESS, LANE_X[1] + 220, y_mode + 60,
"fan setMode → 3 pumps",
"msg.topic = 'setMode';\n"
"return [msg, msg, msg];",
outputs=3, wires=[["pump_a"], ["pump_b"], ["pump_c"]],
))
# ---------------- Station-wide commands ----------------
y_station = y_mode + 200
nodes.append(comment("c_station_cmds", TAB_PROCESS, LANE_X[2], y_station,
"── Station-wide commands ──", ""))
for k, (chan, link_id, fn_name, label_suffix) in enumerate([
(CH_STATION_START, "lin_station_start",
"fan_station_start", "startup"),
(CH_STATION_STOP, "lin_station_stop",
"fan_station_stop", "shutdown"),
(CH_STATION_ESTOP, "lin_station_estop",
"fan_station_estop", "emergency stop"),
]):
y = y_station + 60 + k * 60
slug = chan.replace(":", "_").replace("-", "_")
nodes.append(link_in(
link_id, TAB_PROCESS, LANE_X[0], y, chan,
source_out_ids=[f"lout_{slug}_dash"],
downstream=[fn_name],
))
nodes.append(function_node(
fn_name, TAB_PROCESS, LANE_X[1] + 220, y,
f"fan {label_suffix} → 3 pumps",
"return [msg, msg, msg];",
outputs=3, wires=[["pump_a"], ["pump_b"], ["pump_c"]],
))
# ---------------- Setup feeder link-in ----------------
y_setup_in = y_station + 280
nodes.append(comment("c_setup_at_mgc", TAB_PROCESS, LANE_X[2], y_setup_in,
"── Setup feeders ──", ""))
nodes.append(link_in(
"lin_setup_at_mgc", TAB_PROCESS, LANE_X[0], y_setup_in + 60,
"setup:to-mgc",
source_out_ids=["lout_setup_to_mgc"],
downstream=[MGC_ID],
))
nodes.append(link_in(
"lin_setup_calibrate_ps", TAB_PROCESS, LANE_X[0], y_setup_in + 120,
"setup:calibrate-ps",
source_out_ids=["lout_setup_calibrate"],
downstream=[PS_ID],
))
return nodes
def _physics_code(pump_letter):
"""JS source for the per-pump physics feeder.
Real parallel-pump installations share suction and discharge headers,
so every pump sees the SAME differential pressure. We therefore
publish each pump's predicted flow into Node-RED `flow` context, sum
across all pumps to get the manifold flow, and derive ONE header
pressure used as p_downstream for ALL pumps. Per-pump diagnostics
still get individually-noisy upstream values (suction header) since
sensor noise is local even on a shared header.
"""
return (
"const c = context.get('c') || {};\n"
"function find(o, prefix) {\n"
" for (const k in o) { if (k.indexOf(prefix) === 0) return o[k]; }\n"
" return null;\n"
"}\n"
"function gauss(sigma) {\n"
" let s = 0;\n"
" for (let i = 0; i < 12; i++) s += Math.random();\n"
" return (s - 6) * sigma;\n"
"}\n"
"\n"
"if (msg.from === 'ps') {\n"
" const psSnap = c.ps || {};\n"
" Object.assign(psSnap, msg.payload || {});\n"
" c.ps = psSnap;\n"
" const lvl = find(psSnap, 'level.predicted.atequipment.')\n"
" ?? find(psSnap, 'level.measured.atequipment.');\n"
" if (lvl != null) c.basinLevel = Number(lvl);\n"
" context.set('c', c);\n"
" return null;\n"
"}\n"
"\n"
"const pumpSnap = c.pump || {};\n"
"Object.assign(pumpSnap, msg.payload || {});\n"
"c.pump = pumpSnap;\n"
"context.set('c', c);\n"
"// Throttle: 1 Hz sensor updates are plenty for the demo; the\n"
"// pump emits on every state change (5+/sec while cycling).\n"
"const _now = Date.now();\n"
"const _last = context.get('_lastEmit') || 0;\n"
"if (_now - _last < 1000) return null;\n"
"context.set('_lastEmit', _now);\n"
"\n"
"const state = pumpSnap.state || 'idle';\n"
"// 'isRunning' = the rotor is spinning (any non-idle, non-cooled state).\n"
"// MGC retargets flow on every tick, so the pump spends most of its\n"
"// time in 'accelerating' or 'decelerating', not 'operational'. Those\n"
"// transient states are still moving water — flow/power sensors must\n"
"// publish non-zero values during them or the measurement nodes go\n"
"// quiet (formatMsg skips emits on no-diff).\n"
"const isRunning = ['operational','starting','warmingup','accelerating','decelerating','stopping'].includes(state);\n"
"// 'pumpFlow' (not 'flow') — `flow` is the Node-RED flow-context object.\n"
"const pumpFlow = Number(find(pumpSnap, 'flow.predicted.downstream.'));\n"
"const pumpPower = Number(find(pumpSnap, 'power.predicted.atequipment.'));\n"
"const basinLevel = c.basinLevel != null ? Number(c.basinLevel) : 0;\n"
"\n"
"// Publish this pump's contribution to the flow-context shared\n"
"// header so the other physics feeders can compute total flow.\n"
f"flow.set('pump_flow_{pump_letter}', isRunning && Number.isFinite(pumpFlow) ? pumpFlow : 0);\n"
f"flow.set('pump_flow_{pump_letter}_state', state);\n"
"const flowA = Number(flow.get('pump_flow_a') || 0);\n"
"const flowB = Number(flow.get('pump_flow_b') || 0);\n"
"const flowC = Number(flow.get('pump_flow_c') || 0);\n"
"const totalFlow = flowA + flowB + flowC;\n"
"\n"
# Hydrostatic head → mbar.
# Pa = rho * g * h = 9810 * h (rho=1000, g=9.81)
# mbar = Pa / 100 = 98.1 * h
f"const HEAD_M = Math.max(0, basinLevel - {OUTFLOW_LEVEL});\n"
"// Suction (basin) header pressure — same physical value for all\n"
"// pumps; per-pump sensor noise added independently.\n"
"const p_upstream_clean = 98.1 * HEAD_M;\n"
"let p_upstream = Math.max(0, p_upstream_clean + gauss(2.5));\n"
"\n"
"// Discharge (header) pressure — driven by TOTAL flow leaving the\n"
"// manifold, NOT this pump's individual flow. Static head 12 m\n"
"// + quadratic system curve scaled so totalFlow=300 m³/h gives\n"
"// ~full dynamic head.\n"
"const STATIC_MBAR = 12 * 98.1;\n"
"const DYN_MBAR_MAX = 12 * 98.1;\n"
"const TOTAL_FLOW_MAX = 300;\n"
"const ratio = Math.min(1, totalFlow / TOTAL_FLOW_MAX);\n"
"const p_downstream_header = STATIC_MBAR + ratio * ratio * DYN_MBAR_MAX;\n"
"// Publish the clean header value to flow context so the MGC's\n"
"// header-pressure measurement child can read it.\n"
"flow.set('header_p_downstream', p_downstream_header);\n"
"flow.set('header_p_upstream', p_upstream_clean);\n"
"// Per-pump downstream sensor: header value with local sensor noise.\n"
"let p_downstream = Math.max(0, p_downstream_header + gauss(8));\n"
"\n"
"const flowMeas = (isRunning && Number.isFinite(pumpFlow))\n"
" ? Math.max(0, pumpFlow + gauss(Math.max(0.5, pumpFlow * 0.01)))\n"
" : 0;\n"
"\n"
"const powerMeas = (isRunning && Number.isFinite(pumpPower))\n"
" ? Math.max(0, pumpPower + gauss(Math.max(0.05, pumpPower * 0.005)))\n"
" : 0;\n"
"\n"
"return [\n"
" { topic: 'measurement', payload: p_upstream },\n"
" { topic: 'measurement', payload: p_downstream },\n"
" { topic: 'measurement', payload: flowMeas },\n"
" { topic: 'measurement', payload: powerMeas },\n"
"];\n"
)
# ---------------------------------------------------------------------------
# Tab 2 — DASHBOARD UI
# ---------------------------------------------------------------------------
def build_ui_tab():
nodes = []
nodes.append({
"id": TAB_UI, "type": "tab",
"label": "📊 Dashboard UI",
"disabled": False,
"info": (
"All FlowFuse ui-* widgets. Two pages:\n"
" /dashboard/realtime — gauges + per-pump status (no time history)\n"
" /dashboard/trends — line charts, 1 hour rolling window\n\n"
"All inputs leave via link-out; all process state arrives via link-in."
),
})
nodes += dashboard_scaffold()
PG_RT = "ui_page_realtime"
PG_TRENDS = "ui_page_trends"
g_inflow = "ui_grp_inflow"
g_station = "ui_grp_station"
g_basin = "ui_grp_basin"
g_mgc = "ui_grp_mgc"
g_pump_a = "ui_grp_pump_a"
g_pump_b = "ui_grp_pump_b"
g_pump_c = "ui_grp_pump_c"
g_tr_basin = "ui_grp_tr_basin"
g_tr_demand = "ui_grp_tr_demand"
g_tr_dq = "ui_grp_tr_dq"
g_tr_states = "ui_grp_tr_states"
g_tr_flow = "ui_grp_tr_flow"
g_tr_power = "ui_grp_tr_power"
g_tr_press = "ui_grp_tr_press"
nodes += [
ui_group(g_inflow, "1. Inflow (operator input)", PG_RT, width=12, order=1),
ui_group(g_station, "2. Station Mode + Commands", PG_RT, width=12, order=2),
ui_group(g_basin, "3. Basin Realtime", PG_RT, width=6, order=3),
ui_group(g_mgc, "4. Pump Group (MGC)", PG_RT, width=6, order=4),
ui_group(g_pump_a, "5a. Pump A", PG_RT, width=4, order=5),
ui_group(g_pump_b, "5b. Pump B", PG_RT, width=4, order=6),
ui_group(g_pump_c, "5c. Pump C", PG_RT, width=4, order=7),
ui_group(g_tr_basin, "Basin level + fill (1h)", PG_TRENDS, width=12, order=1),
ui_group(g_tr_demand, "Process demand — PS percControl (1h)",
PG_TRENDS, width=12, order=2),
ui_group(g_tr_dq, "ΔQ = inflow outflow (m³/h, +fill / drain)",
PG_TRENDS, width=12, order=3),
ui_group(g_tr_states, "Pump state timeline (gantt)",
PG_TRENDS, width=12, order=4),
ui_group(g_tr_flow, "Inflow / Outflow / Per-pump flow (1h)",
PG_TRENDS, width=12, order=5),
ui_group(g_tr_power, "Per-pump power (1h)", PG_TRENDS, width=12, order=6),
ui_group(g_tr_press, "Per-pump pressures (1h)", PG_TRENDS, width=12, order=7),
]
nodes.append(comment("c_ui_title", TAB_UI, LANE_X[2], 20,
"📊 DASHBOARD UI — only ui-* widgets here", ""))
# ---------- INFLOW SECTION ----------
y = 80
nodes.append(comment("c_ui_inflow", TAB_UI, LANE_X[2], y,
"── Operator inflow input ──", ""))
nodes.append(ui_slider(
"ui_inflow_slider", TAB_UI, LANE_X[0], y + 40, g_inflow,
"Inflow baseline",
"Inflow baseline (m³/h) — scenarios modulate around this value",
0, 250, 5.0, "inflowBaseline",
wires=["lout_inflow_baseline"],
))
nodes.append(link_out(
"lout_inflow_baseline", TAB_UI, LANE_X[1], y + 40,
CH_INFLOW_BASELINE, target_in_ids=["lin_inflow_baseline"],
))
SCENARIOS = [
("constant", "Constant", "#0c99d9", "horizontal_rule"),
("sine", "Sine wave","#16a34a", "show_chart"),
("diurnal", "Diurnal", "#f59e0b", "schedule"),
("storm", "Storm", "#dc2626", "thunderstorm"),
]
for k, (key, label, color, icon) in enumerate(SCENARIOS):
ybtn = y + 100 + k * 50
btn_id = f"btn_scn_{key}"
wrap_id = f"wrap_scn_{key}"
nodes.append(ui_button(
btn_id, TAB_UI, LANE_X[0], ybtn, g_inflow,
f"Scenario {label}", label, key, "str",
topic="scenario", color=color, icon=icon,
wires=[wrap_id],
))
nodes.append(function_node(
wrap_id, TAB_UI, LANE_X[1] + 100, ybtn,
f"build scenario {key}",
f"msg.payload = '{key}';\n"
"return msg;",
outputs=1, wires=[["lout_inflow_scenario"]],
))
nodes.append(link_out(
"lout_inflow_scenario", TAB_UI, LANE_X[2], y + 100,
CH_INFLOW_SCENARIO, target_in_ids=["lin_inflow_scenario"],
))
nodes.append(link_in(
"lin_evt_inflow", TAB_UI, LANE_X[3], y + 40,
CH_INFLOW_EVT, source_out_ids=["lout_evt_inflow"],
downstream=["dispatch_inflow"],
))
nodes.append(function_node(
"dispatch_inflow", TAB_UI, LANE_X[4], y + 40,
"dispatch inflow",
"const p = msg.payload || {};\n"
"const ts = Date.now();\n"
"return [\n"
" { payload: (p.scenario || 'constant').toUpperCase() },\n"
" { payload: p.q_h != null ? Number(p.q_h).toFixed(1) + ' m³/h' : 'n/a' },\n"
" p.q_h != null ? { topic: 'Inflow', payload: Number(p.q_h), timestamp: ts } : null,\n"
"];",
outputs=3,
wires=[["ui_inflow_scn_text"], ["ui_inflow_value_text"], ["chart_trend_flow"]],
))
nodes.append(ui_text(
"ui_inflow_scn_text", TAB_UI, LANE_X[5], y + 40, g_inflow,
"Active scenario", "Active scenario", "{{msg.payload}}",
))
nodes.append(ui_text(
"ui_inflow_value_text", TAB_UI, LANE_X[5], y + 80, g_inflow,
"Live inflow", "Live inflow", "{{msg.payload}}",
))
# ---------- MODE + STATION COMMANDS ----------
y = 380
nodes.append(comment("c_ui_station", TAB_UI, LANE_X[2], y,
"── Mode + Station-wide buttons ──", ""))
nodes.append(ui_switch(
"ui_mode_toggle", TAB_UI, LANE_X[0], y + 40, g_station,
"Station mode",
"Station mode (Auto = level-based · Manual = slider Qd)",
on_value="levelbased", off_value="manual", topic="changemode",
wires=["lout_ps_mode_dash"],
))
nodes.append(link_out(
"lout_ps_mode_dash", TAB_UI, LANE_X[1], y + 40,
CH_PS_MODE, target_in_ids=["lin_ps_mode_at_ps"],
))
nodes.append(ui_slider(
"ui_qd_slider", TAB_UI, LANE_X[0], y + 90, g_station,
"Manual Qd",
"Manual Qd (m³/h, manual mode only)", 0, 600, 5.0,
"manualDemand", wires=["lout_qd_dash"],
))
nodes.append(link_out(
"lout_qd_dash", TAB_UI, LANE_X[1], y + 90,
CH_QD, target_in_ids=["lin_qd_at_ps"],
))
for k, (text, color, icon, lout_id, channel,
wrap_code) in enumerate([
("Start all pumps", "#16a34a", "play_arrow",
"lout_cmd_station_startup_dash", CH_STATION_START,
"msg.topic = 'execSequence';\n"
"msg.payload = { source:'GUI', action:'execSequence', "
"parameter:'startup' };\n"
"return msg;"),
("Stop all pumps", "#ea580c", "stop",
"lout_cmd_station_shutdown_dash", CH_STATION_STOP,
"msg.topic = 'execSequence';\n"
"msg.payload = { source:'GUI', action:'execSequence', "
"parameter:'shutdown' };\n"
"return msg;"),
("EMERGENCY STOP", "#dc2626", "stop_circle",
"lout_cmd_station_estop_dash", CH_STATION_ESTOP,
"msg.topic = 'emergencystop';\n"
"msg.payload = { source:'GUI', action:'emergencystop' };\n"
"return msg;"),
]):
yk = y + 150 + k * 50
btn_id = f"btn_station_{k}"
wrap_id = f"wrap_station_{k}"
nodes.append(ui_button(
btn_id, TAB_UI, LANE_X[0], yk, g_station,
text, text, "fired", "str",
topic=f"station_{k}", color=color, icon=icon,
wires=[wrap_id],
))
nodes.append(function_node(
wrap_id, TAB_UI, LANE_X[1] + 100, yk,
f"build cmd ({text})", wrap_code,
outputs=1, wires=[[lout_id]],
))
nodes.append(link_out(
lout_id, TAB_UI, LANE_X[2], yk,
channel,
target_in_ids=[{
CH_STATION_START: "lin_station_start",
CH_STATION_STOP: "lin_station_stop",
CH_STATION_ESTOP: "lin_station_estop",
}[channel]],
))
# ---------- BASIN REALTIME ----------
y = 700
nodes.append(comment("c_ui_basin", TAB_UI, LANE_X[2], y,
"── Basin realtime (gauges + text) ──", ""))
nodes.append(link_in(
"lin_evt_ps_dash", TAB_UI, LANE_X[0], y + 40,
CH_PS_EVT, source_out_ids=["lout_evt_ps"],
downstream=["dispatch_ps"],
))
nodes.append(function_node(
"dispatch_ps", TAB_UI, LANE_X[1], y + 40,
"dispatch PS",
"const p = msg.payload || {};\n"
"const ts = Date.now();\n"
"// ΔQ = inflow outflow in m³/h (positive = filling).\n"
"const dQ = (p.qInNum != null && p.qOutNum != null)\n"
" ? p.qInNum - p.qOutNum : null;\n"
"// Demand text formatting.\n"
"const demandStr = p.percControl != null\n"
" ? Number(p.percControl).toFixed(0) + '%' : 'n/a';\n"
"return [\n"
" { payload: String(p.direction || 'steady') },\n"
" { payload: String(p.level || 'n/a') },\n"
" { payload: String(p.volume || 'n/a') },\n"
" { payload: String(p.fillPct || 'n/a') },\n"
" { payload: String(p.netFlow || 'n/a') },\n"
" { payload: String(p.timeLeft || 'n/a') },\n"
" { payload: String(p.qIn || 'n/a') },\n"
" { payload: String(p.qOut || 'n/a') },\n"
" { payload: String(p.safetyState || 'normal') },\n"
" { payload: demandStr },\n"
" p.levelNum != null ? { payload: p.levelNum } : null,\n"
" p.fillPctNum != null ? { payload: p.fillPctNum } : null,\n"
" p.percControl != null ? { payload: p.percControl } : null,\n"
" p.levelNum != null ? { topic: 'Basin level', payload: p.levelNum, timestamp: ts } : null,\n"
" p.fillPctNum != null ? { topic: 'Fill %', payload: p.fillPctNum, timestamp: ts } : null,\n"
" p.qOutNum != null ? { topic: 'Outflow', payload: p.qOutNum, timestamp: ts } : null,\n"
" p.percControl != null ? { topic: 'PS demand', payload: p.percControl, timestamp: ts } : null,\n"
" dQ != null ? { topic: 'ΔQ', payload: dQ, timestamp: ts } : null,\n"
"];",
outputs=18,
wires=[
["ui_ps_direction"],
["ui_ps_level"],
["ui_ps_volume"],
["ui_ps_fill"],
["ui_ps_netflow"],
["ui_ps_timeleft"],
["ui_ps_qin"],
["ui_ps_qout"],
["ui_ps_safety"],
["ui_ps_demand"],
["gauge_basin_level"],
["gauge_basin_fill"],
["gauge_ps_demand"],
["chart_trend_basin"],
["chart_trend_basin"],
["chart_trend_flow"],
["chart_trend_demand"],
["chart_trend_dq"],
],
))
nodes.append(ui_text("ui_ps_direction", TAB_UI, LANE_X[2], y + 40, g_basin,
"Direction", "Direction", "{{msg.payload}}"))
nodes.append(ui_text("ui_ps_level", TAB_UI, LANE_X[2], y + 70, g_basin,
"Basin level", "Basin level","{{msg.payload}}"))
nodes.append(ui_text("ui_ps_volume", TAB_UI, LANE_X[2], y + 100, g_basin,
"Basin volume","Basin volume","{{msg.payload}}"))
nodes.append(ui_text("ui_ps_fill", TAB_UI, LANE_X[2], y + 130, g_basin,
"Fill %", "Fill %", "{{msg.payload}}"))
nodes.append(ui_text("ui_ps_netflow", TAB_UI, LANE_X[2], y + 160, g_basin,
"Net flow", "Net flow", "{{msg.payload}}"))
nodes.append(ui_text("ui_ps_timeleft", TAB_UI, LANE_X[2], y + 190, g_basin,
"Time left", "Time to full/empty",
"{{msg.payload}}"))
nodes.append(ui_text("ui_ps_qin", TAB_UI, LANE_X[2], y + 220, g_basin,
"Inflow", "Inflow", "{{msg.payload}}"))
nodes.append(ui_text("ui_ps_qout", TAB_UI, LANE_X[2], y + 250, g_basin,
"Outflow", "Outflow", "{{msg.payload}}"))
nodes.append(ui_text("ui_ps_safety", TAB_UI, LANE_X[2], y + 280, g_basin,
"Safety", "Safety state","{{msg.payload}}"))
nodes.append(ui_text("ui_ps_demand", TAB_UI, LANE_X[2], y + 310, g_basin,
"PS demand", "Process demand","{{msg.payload}}"))
LEVEL_SEGMENTS = [
{"color": "#f44336", "from": 0},
{"color": "#ff9800", "from": 1.0},
{"color": "#2196f3", "from": 2.0},
{"color": "#ff9800", "from": 3.5},
{"color": "#f44336", "from": 3.8},
]
FILL_SEGMENTS = [
{"color": "#f44336", "from": 0},
{"color": "#ff9800", "from": 10},
{"color": "#4caf50", "from": 30},
{"color": "#ff9800", "from": 80},
{"color": "#f44336", "from": 95},
]
nodes.append(ui_gauge(
"gauge_basin_level", TAB_UI, LANE_X[3], y + 40, g_basin,
"Basin level gauge", "Level", "m", 0, BASIN_HEIGHT,
LEVEL_SEGMENTS, gtype="gauge-tank", suffix=" m",
width=3, height=4, order=10,
))
nodes.append(ui_gauge(
"gauge_basin_fill", TAB_UI, LANE_X[3], y + 100, g_basin,
"Basin fill gauge", "Fill", "%", 0, 100,
FILL_SEGMENTS, gtype="gauge-34", suffix="%",
icon="water_drop", width=3, height=4, order=11,
))
# PS process demand gauge — shows the % command PS sends to MGC.
DEMAND_SEGMENTS = [
{"color": "#cccccc", "from": 0},
{"color": "#0c99d9", "from": 5},
{"color": "#16a34a", "from": 30},
{"color": "#f59e0b", "from": 70},
{"color": "#dc2626", "from": 95},
]
nodes.append(ui_gauge(
"gauge_ps_demand", TAB_UI, LANE_X[3], y + 160, g_basin,
"PS demand gauge", "PS demand", "%", 0, 100,
DEMAND_SEGMENTS, gtype="gauge-34", suffix="%",
icon="speed", width=3, height=4, order=12,
))
# ---------- MGC REALTIME ----------
y = 1080
nodes.append(comment("c_ui_mgc", TAB_UI, LANE_X[2], y,
"── MGC realtime ──", ""))
nodes.append(link_in(
"lin_evt_mgc_dash", TAB_UI, LANE_X[0], y + 40,
CH_MGC_EVT, source_out_ids=["lout_evt_mgc"],
downstream=["dispatch_mgc"],
))
nodes.append(function_node(
"dispatch_mgc", TAB_UI, LANE_X[1], y + 40,
"dispatch MGC",
"const p = msg.payload || {};\n"
"return [\n"
" { payload: String(p.totalFlow || 'n/a') },\n"
" { payload: String(p.totalPower || 'n/a') },\n"
" { payload: String(p.efficiency || 'n/a') },\n"
" p.totalFlowNum != null ? { payload: p.totalFlowNum } : null,\n"
" p.totalPowerNum != null ? { payload: p.totalPowerNum } : null,\n"
"];",
outputs=5,
wires=[
["ui_mgc_total_flow"],
["ui_mgc_total_power"],
["ui_mgc_eff"],
["gauge_mgc_flow"],
["gauge_mgc_power"],
],
))
nodes.append(ui_text("ui_mgc_total_flow", TAB_UI, LANE_X[2], y + 40, g_mgc,
"MGC total flow", "Total flow", "{{msg.payload}}"))
nodes.append(ui_text("ui_mgc_total_power", TAB_UI, LANE_X[2], y + 70, g_mgc,
"MGC total power", "Total power", "{{msg.payload}}"))
nodes.append(ui_text("ui_mgc_eff", TAB_UI, LANE_X[2], y + 100, g_mgc,
"MGC efficiency", "Group efficiency", "{{msg.payload}}"))
nodes.append(ui_gauge(
"gauge_mgc_flow", TAB_UI, LANE_X[3], y + 40, g_mgc,
"MGC total flow gauge", "Total flow", "m³/h", 0, 600,
[
{"color": "#cccccc", "from": 0},
{"color": "#0c99d9", "from": 50},
{"color": "#16a34a", "from": 200},
{"color": "#f59e0b", "from": 500},
],
gtype="gauge-34", suffix=" m³/h",
width=3, height=4, order=10,
))
nodes.append(ui_gauge(
"gauge_mgc_power", TAB_UI, LANE_X[3], y + 100, g_mgc,
"MGC total power gauge", "Total power", "kW", 0, 30,
[
{"color": "#cccccc", "from": 0},
{"color": "#0c99d9", "from": 1},
{"color": "#16a34a", "from": 5},
{"color": "#f59e0b", "from": 20},
],
gtype="gauge-34", suffix=" kW",
width=3, height=4, order=11,
))
# ---------- PER-PUMP REALTIME PANELS ----------
y_pumps_start = 1340
PUMP_FIELDS = [
("State", "state", "{{msg.payload}}"),
("Mode", "mode", "{{msg.payload}}"),
("Controller %", "ctrl", "{{msg.payload}}"),
("Flow", "flow", "{{msg.payload}}"),
("Power", "power", "{{msg.payload}}"),
("p Upstream", "pUp", "{{msg.payload}}"),
("p Downstream", "pDn", "{{msg.payload}}"),
]
for i, pump in enumerate(PUMPS):
label = PUMP_LABELS[pump]
g = {"pump_a": g_pump_a, "pump_b": g_pump_b, "pump_c": g_pump_c}[pump]
y_p = y_pumps_start + i * 480
state_offset = i * 3 # A=0, B=3, C=6
nodes.append(comment(f"c_ui_{pump}", TAB_UI, LANE_X[2], y_p,
f"── {label} ──", ""))
nodes.append(link_in(
f"lin_evt_{pump}_dash", TAB_UI, LANE_X[0], y_p + 40,
CH_PUMP_EVT[pump],
source_out_ids=[f"lout_evt_{pump}"],
downstream=[f"dispatch_{pump}"],
))
dispatch_code = (
"const p = msg.payload || {};\n"
"const ts = Date.now();\n"
f"const OFF = {state_offset};\n"
"function stateNum(s) {\n"
" switch (s) {\n"
" case 'operational': return OFF + 2;\n"
" case 'starting':\n"
" case 'warmingup': return OFF + 1;\n"
" case 'stopping': return OFF + 1.5;\n"
" case 'coolingdown': return OFF + 0.5;\n"
" default: return OFF;\n"
" }\n"
"}\n"
"const sNum = p.state ? stateNum(p.state) : null;\n"
"return [\n"
" {payload: String(p.state || 'idle')},\n"
" {payload: String(p.mode || 'auto')},\n"
" {payload: String(p.ctrl || 'n/a')},\n"
" {payload: String(p.flow || 'n/a')},\n"
" {payload: String(p.power || 'n/a')},\n"
" {payload: String(p.pUp || 'n/a')},\n"
" {payload: String(p.pDn || 'n/a')},\n"
" p.flowNum != null ? {topic: '" + label + "', payload: p.flowNum, timestamp: ts} : null,\n"
" p.powerNum != null ? {topic: '" + label + "', payload: p.powerNum, timestamp: ts} : null,\n"
" p.pUpNum != null ? {topic: '" + label + " up', payload: p.pUpNum, timestamp: ts} : null,\n"
" p.pDnNum != null ? {topic: '" + label + " dn', payload: p.pDnNum, timestamp: ts} : null,\n"
" sNum != null ? {topic: '" + label + " state', payload: sNum, timestamp: ts} : null,\n"
"];"
)
nodes.append(function_node(
f"dispatch_{pump}", TAB_UI, LANE_X[1], y_p + 40,
f"dispatch {label}", dispatch_code,
outputs=12,
wires=[
[f"ui_{pump}_{f}"] for _, f, _ in PUMP_FIELDS
] + [
["chart_trend_flow"],
["chart_trend_power"],
["chart_trend_pressure"],
["chart_trend_pressure"],
["chart_trend_states"],
],
))
for k, (label_txt, field, fmt) in enumerate(PUMP_FIELDS):
nodes.append(ui_text(
f"ui_{pump}_{field}", TAB_UI, LANE_X[2], y_p + 40 + k * 30, g,
f"{label} {label_txt}", label_txt, fmt,
))
nodes.append(ui_slider(
f"ui_{pump}_setpoint", TAB_UI, LANE_X[0], y_p + 280, g,
f"{label} setpoint", "Setpoint % (manual mode)",
0, 100, 5.0, f"setpoint_{pump}",
wires=[f"lout_setpoint_{pump}_dash"],
))
nodes.append(link_out(
f"lout_setpoint_{pump}_dash", TAB_UI, LANE_X[1], y_p + 280,
CH_PUMP_SETPOINT[pump],
target_in_ids=[f"lin_setpoint_{pump}"],
))
nodes.append(ui_button(
f"btn_{pump}_start", TAB_UI, LANE_X[0], y_p + 330, g,
f"{label} startup", "Startup", "fired", "str",
topic=f"start_{pump}", color="#16a34a", icon="play_arrow",
wires=[f"wrap_{pump}_start"],
))
nodes.append(function_node(
f"wrap_{pump}_start", TAB_UI, LANE_X[1] + 100, y_p + 330,
f"build start ({label})",
"msg.topic = 'execSequence';\n"
"msg.payload = { source:'GUI', action:'execSequence', parameter:'startup' };\n"
"return msg;",
outputs=1, wires=[[f"lout_seq_{pump}_dash"]],
))
nodes.append(ui_button(
f"btn_{pump}_stop", TAB_UI, LANE_X[0], y_p + 380, g,
f"{label} shutdown", "Shutdown", "fired", "str",
topic=f"stop_{pump}", color="#ea580c", icon="stop",
wires=[f"wrap_{pump}_stop"],
))
nodes.append(function_node(
f"wrap_{pump}_stop", TAB_UI, LANE_X[1] + 100, y_p + 380,
f"build stop ({label})",
"msg.topic = 'execSequence';\n"
"msg.payload = { source:'GUI', action:'execSequence', parameter:'shutdown' };\n"
"return msg;",
outputs=1, wires=[[f"lout_seq_{pump}_dash"]],
))
nodes.append(link_out(
f"lout_seq_{pump}_dash", TAB_UI, LANE_X[2], y_p + 355,
CH_PUMP_SEQUENCE[pump],
target_in_ids=[f"lin_seq_{pump}"],
))
# ---------- TREND CHARTS ----------
y_trends = y_pumps_start + len(PUMPS) * 480 + 60
nodes.append(comment("c_ui_trends", TAB_UI, LANE_X[2], y_trends,
"── Trend charts (1h rolling) ──", ""))
nodes.append(ui_chart(
"chart_trend_basin", TAB_UI, LANE_X[3], y_trends + 40,
g_tr_basin,
"Basin level + fill %", "Basin level + fill",
width=12, height=8, y_axis_label="m / %",
remove_older="60", remove_older_unit="60",
remove_older_points="3600",
order=1,
))
nodes.append(ui_chart(
"chart_trend_demand", TAB_UI, LANE_X[3], y_trends + 80,
g_tr_demand,
"PS process demand %", "PS demand",
width=12, height=6, y_axis_label="%",
remove_older="60", remove_older_unit="60",
remove_older_points="3600",
ymin=0, ymax=110, order=1,
))
nodes.append(ui_chart(
"chart_trend_dq", TAB_UI, LANE_X[3], y_trends + 100,
g_tr_dq,
"ΔQ — inflow outflow", "ΔQ",
width=12, height=6, y_axis_label="m³/h",
remove_older="60", remove_older_unit="60",
remove_older_points="3600",
order=1,
))
# State timeline: each pump has a Y-axis "track" (A=0..2, B=3..5, C=6..8)
# with discrete values: 0/3/6 idle, 0.5/3.5/6.5 coolingdown,
# 1/4/7 starting/warmingup, 1.5/4.5/7.5 stopping, 2/5/8 operational.
# Step interpolation so transitions are sharp.
nodes.append(ui_chart(
"chart_trend_states", TAB_UI, LANE_X[3], y_trends + 120,
g_tr_states,
"Pump state timeline", "Pump states (A=0-2, B=3-5, C=6-8)",
width=12, height=6, y_axis_label="A B C tracks",
remove_older="60", remove_older_unit="60",
remove_older_points="3600",
ymin=-0.5, ymax=8.5, order=1,
interpolation="step",
))
nodes.append(ui_chart(
"chart_trend_flow", TAB_UI, LANE_X[3], y_trends + 120,
g_tr_flow,
"Inflow / Outflow / Per-pump flow", "Flows",
width=12, height=8, y_axis_label="m³/h",
remove_older="60", remove_older_unit="60",
remove_older_points="3600",
order=1,
))
nodes.append(ui_chart(
"chart_trend_power", TAB_UI, LANE_X[3], y_trends + 200,
g_tr_power,
"Per-pump power", "Power",
width=12, height=8, y_axis_label="kW",
remove_older="60", remove_older_unit="60",
remove_older_points="3600",
order=1,
))
nodes.append(ui_chart(
"chart_trend_pressure", TAB_UI, LANE_X[3], y_trends + 280,
g_tr_press,
"Per-pump up/dn pressure", "Pressure",
width=12, height=8, y_axis_label="mbar",
remove_older="60", remove_older_unit="60",
remove_older_points="3600",
order=1,
))
return nodes
# ---------------------------------------------------------------------------
# Tab 3 — DEMO DRIVERS (inflow generator)
# ---------------------------------------------------------------------------
def build_drivers_tab():
nodes = []
nodes.append({
"id": TAB_DRIVERS, "type": "tab",
"label": "🎛️ Demo Drivers",
"disabled": False,
"info": (
"Inflow generator. The operator picks a SCENARIO (Constant / Sine /"
" Diurnal / Storm) on the dashboard and sets a BASELINE m³/h value."
" Every second this generator emits q_in to the PS based on the "
"active scenario + baseline.\n\n"
"Outflow is implicit: the pumps drain the basin via MGC."
),
})
nodes.append(comment("c_drv_title", TAB_DRIVERS, LANE_X[2], 20,
"🎛️ DEMO DRIVERS — operator-driven inflow generator", ""))
nodes.append(link_in(
"lin_inflow_scenario", TAB_DRIVERS, LANE_X[0], 100,
CH_INFLOW_SCENARIO,
source_out_ids=["lout_inflow_scenario", "lout_setup_inflow_scn"],
downstream=["inflow_state"],
))
nodes.append(link_in(
"lin_inflow_baseline", TAB_DRIVERS, LANE_X[0], 140,
CH_INFLOW_BASELINE,
source_out_ids=["lout_inflow_baseline", "lout_setup_inflow_baseline"],
downstream=["inflow_state"],
))
nodes.append(inject(
"inflow_tick", TAB_DRIVERS, LANE_X[0], 200,
"tick (1 Hz)", topic="tick", payload="", payload_type="date",
repeat="1", wires=["inflow_state"],
))
nodes.append(function_node(
"inflow_state", TAB_DRIVERS, LANE_X[2], 160,
"inflow scenario engine",
"let scenario = context.get('scenario') || 'constant';\n"
"let baseline = context.get('baseline');\n"
"if (baseline == null) baseline = 60;\n"
"\n"
"if (msg.topic === 'inflowBaseline') {\n"
" const v = Number(msg.payload);\n"
" if (Number.isFinite(v) && v >= 0) {\n"
" baseline = v;\n"
" context.set('baseline', baseline);\n"
" }\n"
" return null;\n"
"}\n"
"if (msg.topic === 'scenario') {\n"
" const s = String(msg.payload || '').toLowerCase();\n"
" if (['constant','sine','diurnal','storm'].includes(s)) {\n"
" scenario = s;\n"
" context.set('scenario', scenario);\n"
" }\n"
" return null;\n"
"}\n"
"const t = Date.now() / 1000;\n"
"let q_h;\n"
"switch (scenario) {\n"
" case 'sine': {\n"
" q_h = baseline * (1 + 0.5 * Math.sin(2 * Math.PI * t / 240));\n"
" break;\n"
" }\n"
" case 'diurnal': {\n"
" q_h = baseline * (1 + 0.6 * Math.sin(2 * Math.PI * t / 480 - Math.PI/2));\n"
" break;\n"
" }\n"
" case 'storm': {\n"
" const phase = (t % 240) / 240;\n"
" let factor;\n"
" if (phase < 0.15) factor = 1 + (4 / 0.15) * phase;\n"
" else factor = Math.max(1, 5 - (4 / 0.85) * (phase - 0.15));\n"
" q_h = baseline * factor;\n"
" break;\n"
" }\n"
" case 'constant':\n"
" default:\n"
" q_h = baseline;\n"
"}\n"
"q_h = Math.max(0, q_h);\n"
"const q_s = q_h / 3600;\n"
"return [\n"
" { topic: 'q_in', payload: q_s, unit: 'm3/s', timestamp: Date.now() },\n"
" { payload: { scenario, baseline, q_h, q_s, ts: Date.now() } },\n"
"];",
outputs=2,
wires=[["lout_qin_drivers"], ["lout_evt_inflow"]],
))
nodes.append(link_out(
"lout_qin_drivers", TAB_DRIVERS, LANE_X[3], 140,
CH_QIN, target_in_ids=["lin_qin_at_ps"],
))
nodes.append(link_out(
"lout_evt_inflow", TAB_DRIVERS, LANE_X[3], 180,
CH_INFLOW_EVT, target_in_ids=["lin_evt_inflow"],
))
return nodes
# ---------------------------------------------------------------------------
# Tab 4 — SETUP & INIT
# ---------------------------------------------------------------------------
def build_setup_tab():
nodes = []
nodes.append({
"id": TAB_SETUP, "type": "tab",
"label": "⚙️ Setup & Init",
"disabled": False,
"info": (
"One-shot deploy-time injects:\n"
" • MGC scaling = normalized + mode = optimalcontrol\n"
" • all pumps mode = auto\n"
" • initial inflow baseline + scenario\n\n"
"Disable this tab in production."
),
})
nodes.append(comment("c_setup_title", TAB_SETUP, LANE_X[2], 20,
"⚙️ SETUP & INIT — one-shot deploy-time injects", ""))
nodes.append(inject(
"setup_mgc_scaling", TAB_SETUP, LANE_X[0], 100,
"MGC scaling = normalized",
topic="setScaling", payload="normalized", payload_type="str",
once=True, once_delay="1.5",
wires=["lout_setup_to_mgc"],
))
nodes.append(inject(
"setup_mgc_mode", TAB_SETUP, LANE_X[0], 160,
"MGC mode = optimalcontrol",
topic="setMode", payload="optimalcontrol", payload_type="str",
once=True, once_delay="1.7",
wires=["lout_setup_to_mgc"],
))
nodes.append(link_out(
"lout_setup_to_mgc", TAB_SETUP, LANE_X[1], 130,
"setup:to-mgc", target_in_ids=["lin_setup_at_mgc"],
))
nodes.append(inject(
"setup_pumps_mode", TAB_SETUP, LANE_X[0], 240,
"pumps mode = auto",
topic="setMode", payload="auto", payload_type="str",
once=True, once_delay="2.0",
wires=["lout_mode_setup"],
))
nodes.append(link_out(
"lout_mode_setup", TAB_SETUP, LANE_X[1], 240,
"cmd:mode", target_in_ids=["lin_mode"],
))
nodes.append(inject(
"setup_inflow_baseline", TAB_SETUP, LANE_X[0], 320,
"inflow baseline = 25 m³/h (nominal)",
topic="inflowBaseline", payload="25", payload_type="num",
once=True, once_delay="2.5",
wires=["lout_setup_inflow_baseline"],
))
nodes.append(link_out(
"lout_setup_inflow_baseline", TAB_SETUP, LANE_X[1], 320,
CH_INFLOW_BASELINE, target_in_ids=["lin_inflow_baseline"],
))
nodes.append(inject(
"setup_inflow_scenario", TAB_SETUP, LANE_X[0], 380,
"inflow scenario = sine",
topic="scenario", payload="sine", payload_type="str",
once=True, once_delay="2.7",
wires=["lout_setup_inflow_scn"],
))
nodes.append(link_out(
"lout_setup_inflow_scn", TAB_SETUP, LANE_X[1], 380,
CH_INFLOW_SCENARIO, target_in_ids=["lin_inflow_scenario"],
))
# Manual calibrate basin button — does NOT auto-fire on deploy.
# Auto-firing on every flow reload would clobber the basin level
# mid-cycle and reset the simulation, so we expose this as an inject
# the user clicks when they actually want to reset (e.g. starting a
# fresh demo run). To use: open the editor's Setup tab and click the
# button on this inject node.
nodes.append(inject(
"setup_calibrate_level", TAB_SETUP, LANE_X[0], 460,
"[manual] calibrate basin = 1.0 m (click to reset)",
topic="calibratePredictedLevel", payload="1.0", payload_type="num",
once=False, # <- never fire on deploy
wires=["lout_setup_calibrate"],
))
nodes.append(link_out(
"lout_setup_calibrate", TAB_SETUP, LANE_X[1], 460,
"setup:calibrate-ps", target_in_ids=["lin_setup_calibrate_ps"],
))
return nodes
# ---------------------------------------------------------------------------
# Tab 5 — TELEMETRY (port 1 → InfluxDB line protocol → http POST)
# ---------------------------------------------------------------------------
def build_telemetry_tab():
nodes = []
nodes.append({
"id": TAB_TLM, "type": "tab",
"label": "📈 Telemetry",
"disabled": False,
"info": (
"InfluxDB writer: every EVOLV node's port-1 telemetry is fanned in "
"via the evt:tlm link channel, converted to line protocol, and "
"POSTed to InfluxDB v2 (org=evolv, bucket=telemetry).\n\n"
"Pattern adapted from docker/demo-flow.json."
),
})
nodes.append(comment("c_tlm_title", TAB_TLM, LANE_X[2], 20,
"📈 TELEMETRY — InfluxDB writer", ""))
nodes.append(link_in(
"lin_tlm", TAB_TLM, LANE_X[0], 100,
CH_TLM,
source_out_ids=_all_tlm_lout_ids(),
downstream=["fn_tlm_to_lp"],
))
# ── Pipeline ──
# link in → fn_tlm_to_lp (one line / msg)
# → join (string mode, joiner=\n, count=100 OR timeout 1s)
# → fn_tlm_post (set headers/url/method)
# → http request → fn_count
nodes.append(function_node(
"fn_tlm_to_lp", TAB_TLM, LANE_X[2], 100,
"→ InfluxDB line protocol",
"const p = msg.payload;\n"
"if (!p || !p.measurement || !p.fields) return null;\n"
"const esc = (s) => String(s)\n"
" .replace(/,/g, '\\\\,').replace(/ /g, '\\\\ ').replace(/=/g, '\\\\=');\n"
"const tags = Object.entries(p.tags || {})\n"
" .filter(([k, v]) => v !== undefined && v !== null && v !== '')\n"
" .map(([k, v]) => `${esc(k)}=${esc(v)}`).join(',');\n"
"const fieldPairs = Object.entries(p.fields)\n"
" .filter(([k, v]) => v !== undefined && v !== null)\n"
" .map(([k, v]) => {\n"
" if (typeof v === 'number' && Number.isFinite(v)) return `${esc(k)}=${v}`;\n"
" if (typeof v === 'boolean') return `${esc(k)}=${v}`;\n"
" return `${esc(k)}=\"${String(v).replace(/\"/g, '\\\\\"')}\"`;\n"
" });\n"
"if (fieldPairs.length === 0) return null;\n"
"const ts = Date.now() * 1000000;\n"
"msg.payload = `${esc(p.measurement)}${tags ? ',' + tags : ''} `\n"
" + `${fieldPairs.join(',')} ${ts}`;\n"
"// Hint the join node to fire on size or timeout.\n"
"msg.topic = 'tlm';\n"
"return msg;",
outputs=1, wires=[["join_tlm"]],
))
# Idiomatic Node-RED batching: join collects messages into a single
# newline-joined string, flushed every `count` messages OR `timeout`
# seconds, whichever fires first.
nodes.append({
"id": "join_tlm", "type": "join", "z": TAB_TLM,
"name": "batch (200 lines / 2 s)",
"mode": "custom",
"build": "string",
"property": "payload", "propertyType": "msg",
"key": "topic",
"joiner": "\\n", "joinerType": "str",
"accumulate": False,
"timeout": "2",
"count": "200",
"reduceRight": False,
"reduceExp": "", "reduceInit": "",
"reduceInitType": "", "reduceFixup": "",
"x": LANE_X[3], "y": 100,
"wires": [["fn_tlm_post"]],
})
nodes.append(function_node(
"fn_tlm_post", TAB_TLM, LANE_X[3] + 200, 100,
"wrap as InfluxDB POST",
"// Count lines for status reporting.\n"
"const body = String(msg.payload || '');\n"
"const lineCount = body ? body.split('\\n').length : 0;\n"
"if (lineCount === 0) return null;\n"
"msg.lineCount = lineCount;\n"
"msg.headers = {\n"
" 'Authorization': 'Token evolv-dev-token',\n"
" 'Content-Type': 'text/plain'\n"
"};\n"
"msg.url = 'http://influxdb:8086/api/v2/write?org=evolv&bucket=telemetry&precision=ns';\n"
"msg.method = 'POST';\n"
"return msg;",
outputs=1, wires=[["http_tlm"]],
))
nodes.append({
"id": "http_tlm", "type": "http request", "z": TAB_TLM,
"name": "Write InfluxDB",
"method": "use", "ret": "txt", "paytoqs": "ignore",
"url": "", "tls": "", "persist": False, "proxy": "",
"authType": "", "senderr": False,
"x": LANE_X[4] + 80, "y": 100,
"wires": [["fn_tlm_count"]],
})
nodes.append(function_node(
"fn_tlm_count", TAB_TLM, LANE_X[5], 100,
"Count writes",
"const lines = Number(msg.lineCount) || 0;\n"
"const writes = (global.get('influx_writes') || 0) + 1;\n"
"const totalLines = (global.get('influx_lines') || 0) + lines;\n"
"global.set('influx_writes', writes);\n"
"global.set('influx_lines', totalLines);\n"
"const errors = global.get('influx_errors') || 0;\n"
"if (msg.statusCode && msg.statusCode >= 400) {\n"
" global.set('influx_errors', errors + 1);\n"
" node.status({fill:'red', shape:'ring',\n"
" text:`ERR ${errors+1}: ${msg.statusCode}`});\n"
"} else {\n"
" node.status({fill:'green', shape:'dot',\n"
" text:`${writes} POSTs · ${totalLines} lines (${errors} err)`});\n"
"}\n"
"return null;",
outputs=1, wires=[[]],
))
return nodes
def _all_tlm_lout_ids():
"""Every link-out id that emits to evt:tlm. Listed explicitly for stable
cross-tab wiring."""
ids = []
for pump in PUMPS:
ids.append(f"lout_tlm_{pump}")
for suffix in ("u", "d", "f", "p"):
ids.append(f"lout_tlm_meas_{pump}_{suffix}")
ids.append("lout_tlm_mgc")
ids.append("lout_tlm_ps")
return ids
# ---------------------------------------------------------------------------
# Assemble + emit
# ---------------------------------------------------------------------------
def main():
nodes = (
build_process_tab()
+ build_ui_tab()
+ build_drivers_tab()
+ build_setup_tab()
+ build_telemetry_tab()
)
json.dump(nodes, sys.stdout, indent=2)
sys.stdout.write("\n")
if __name__ == "__main__":
main()