CONTRACTS.md §4: full payloadSchema.type table including 'none', plus the optional description field example. Matches the B3.2 implementation. WIKI_TEMPLATE.md §5: Unit column appears with explanatory paragraph. Matches the P11.4 wikiGen output. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
26 KiB
Contracts
The exact shapes that the refactor delivers. These are the things every node converges on. Treat them as APIs.
Order: top-down — what a Node-RED user sees, what a node author writes,
what generalFunctions provides.
1. The Node-RED-visible contract per node
Every node exposes the same three Port shapes:
| Port | Direction | Carries |
|---|---|---|
| 0 | out | Process data — formatted via outputUtils.formatMsg(..., 'process') |
| 1 | out | InfluxDB telemetry — formatted via outputUtils.formatMsg(..., 'influxdb') |
| 2 | out | Registration / control plumbing |
| in | in | Commands routed by msg.topic through the commands/ registry |
Every node also publishes a per-repo CONTRACT.md listing:
- Every
msg.topicit accepts on Port 0 input, with the payload schema. - Every
topicshape it emits on Port 0/1/2. - Every event its
measurements.emitterfires for parents to subscribe. - Every position label it expects from children.
This file is generated from the node's commands/ module + a small
hand-written events section.
Topic naming — canonical from Phase 1
msg.topic always uses one of these prefixes. <noun> and <verb>
are kebab-case after the dot (set.flow-setpoint, not
set.flowSetpoint).
Inputs — topics the node accepts on Port-0 input
| Prefix | Meaning | Idempotent? | Examples |
|---|---|---|---|
set.<noun> |
Setter. Replaces a state value with the supplied payload. Repeating with the same payload does nothing extra. | Yes | set.mode, set.scaling, set.demand, set.inflow |
cmd.<verb> |
Imperative action. Triggers a transition or sequence. Repeating triggers it again (or is rejected). | No | cmd.startup, cmd.shutdown, cmd.estop, cmd.calibrate |
data.<noun> |
Bulk data input. Sensor readings, measurement values, raw streams. The node consumes them. | n/a — values flow | data.measurement, data.flow, data.pressure |
child.<verb> |
Parent/child plumbing. Registration handshakes routed via Port 2. | n/a | child.register, child.unregister |
query.<noun> |
Synchronous query. The node responds on the same msg (or a sibling output). Used for read-only debug queries from a dashboard. |
Yes (read-only) | query.curves, query.cog, query.snapshot |
Outputs — topics the node EMITS
| Prefix | Meaning | Where it appears |
|---|---|---|
evt.<noun> |
Event. A fact about something that just happened. Other nodes/dashboards subscribe to react. The node fires-and-forgets — no consumer is required. | msg.topic on Port 0 output, also fired internally on this.emitter so sibling modules can listen. |
evt.* is one-way: the node says "this happened", consumers can do
whatever they like with it. Examples: evt.state-change (state machine
moved), evt.alarm (a safety threshold tripped), evt.calibrated
(calibration completed). If you find yourself wanting to send a
command via evt.*, you actually want set.* or cmd.*.
The default measurement output (the delta-compressed payload from
outputUtils.formatMsg) keeps msg.topic = config.general.name per
the existing convention. evt.* is for additional event-shaped
emissions, not for the per-tick measurement stream.
Aliases for legacy names
Each commands/index.js declares the canonical name as topic and
lists pre-refactor names in aliases. The first time an alias fires,
the runtime logs a one-time deprecation warning. Aliases are removed
in Phase 7 after one release cycle.
Why these prefixes (the reasoning)
Today's topics mix setMode (verb-noun, no separator), q_in
(snake-case, abbreviation), Qd (PascalCase abbreviation),
changemode (lowercase joined), execSequence (verb-noun, camel).
A reader can't tell from the topic name whether it's a setter, an
action, or an event. The prefix system says it explicitly:
set.xmeans "I'm replacing the value of x". Safe to retry.cmd.xmeans "I'm asking you to do x once". Don't retry blindly.data.xmeans "here's a value I'm pushing into your stream".query.xmeans "tell me what x is right now".child.xmeans "plumbing — only the parent/child machinery cares".evt.x(output only) means "this happened, do what you want".
2. BaseNodeAdapter — the shape of every nodeClass
Lives in generalFunctions/src/nodered/BaseNodeAdapter.js. Each node's
nodeClass.js extends it.
const { BaseNodeAdapter } = require('generalFunctions');
const Domain = require('./specificClass');
const commands = require('./commands');
class nodeClass extends BaseNodeAdapter {
// The domain class to instantiate.
static DomainClass = Domain;
// The command registry — see section 4.
static commands = commands;
// Opt-in periodic tick. Default null = event-driven (domain emits
// 'output-changed' when output should refresh). Set to ms only when
// the domain genuinely needs a time-based heartbeat.
// Example reason (above the line): "needs delta-time for predicted
// volume integrator".
static tickInterval = null;
// Always-on status badge poll. Required for Node-RED's editor
// refresh. Set to 0 only in headless environments.
static statusInterval = 1000;
// Build the domain-specific config slice from the Node-RED uiConfig.
// Base config (general, asset, functionality, logging) is built by
// BaseNodeAdapter via configManager.buildConfig.
buildDomainConfig(uiConfig, nodeId) {
return {
basin: { volume: uiConfig.basinVolume, height: uiConfig.basinHeight, ... },
hydraulics: { ... },
control: { ... },
safety: { ... },
};
}
}
module.exports = nodeClass;
Lifecycle (provided by base, do not reimplement)
In order, in the constructor:
- Build merged config (
configManager.buildConfig+buildDomainConfig). - Instantiate
DomainClasswith that config; store asthis.source, also asthis.node.sourcefor sibling-node lookup. - Send Port 2 registration message (after a 100 ms delay).
- Output strategy — pick one based on
static tickInterval:tickInterval = N(ms): start a periodic timer that callsthis.source.tick?.(), then formats and sends outputs.tickInterval = null: subscribe to'output-changed'onthis.source.emitter. Whenever the domain fires that event, the adapter formats and sends outputs. In both modes,outputUtils.formatMsgdoes delta compression — a send only emits changed fields.
- Start the status loop at
static statusIntervalms:- Call
this.source.getStatusBadge()(see section 7), apply vianode.status(...).
- Call
- Attach the
inputhandler — dispatches bymsg.topicthrough the commands registry. - Attach the
closehandler — clears timers, removes child listeners, clears status.
Event-driven is the default
A domain that doesn't need time-driven math fires
this.emitter.emit('output-changed') whenever its public state shifts
(e.g. after a measurement update, a state transition, a calibration).
The base adapter pushes outputs in response. No 1 Hz polling.
A domain that DOES need time-driven math (e.g. pumpingStation
integrating predicted volume) opts into a tick. The tick runs the
time-based update; if that update changes output state, the domain
emits 'output-changed' and the same code path that handles
event-driven nodes pushes outputs.
This keeps the output pipeline single-shape regardless of which mode the domain uses.
Override hooks
A subclass may override:
| Hook | When |
|---|---|
buildDomainConfig(uiConfig, nodeId) |
Always — required. |
extraSetup() |
If a node needs custom wiring beyond the base. |
extraInputDispatch(msg, send, done) |
If commands registry can't express a topic. Avoid; prefer the registry. |
extraClose() |
Custom teardown beyond clearing intervals. |
Forbidden in subclasses
- Re-implementing the tick or status loop. Use
getOutput()/getStatusBadge()on the domain. - Calling
this.source._private. Domain exposes a public surface. - Importing from another node's
src/.
3. BaseDomain — the shape of every specificClass
Lives in generalFunctions/src/domain/BaseDomain.js. Each node's
specificClass.js extends it.
const { BaseDomain, UnitPolicy, ChildRouter } = require('generalFunctions');
class PumpingStation extends BaseDomain {
// Identifies the config in generalFunctions/src/configs/<name>.json.
static name = 'pumpingStation';
// Declarative unit policy — see section 6.
static unitPolicy = UnitPolicy.declare({
canonical: { flow: 'm3/s', pressure: 'Pa', power: 'W', temperature: 'K' },
output: { flow: 'm3/h', pressure: 'mbar', power: 'kW', temperature: 'C' },
});
// Run after BaseDomain has built emitter, config, logger, measurements,
// childRegistrationUtils. Wire concern-modules and any extra state.
configure() {
this.basin = new BasinGeometry(this.config, this.logger);
this.flowAggregator = new FlowAggregator(this.context());
this.safety = new SafetyController(this.context());
this.strategies = require('./control');
this.router = new ChildRouter(this)
.on('machinegroup', this._onMachineGroup)
.on('measurement', { type: 'pressure' }, this._onPressure)
.on('measurement', { type: 'level' }, this._onLevel);
}
// Per-tick — orchestration only, all real work is in modules.
tick() {
this.flowAggregator.update();
const safe = this.safety.evaluate();
if (safe.blocked) return;
this.strategies[this.mode]?.run(this.context());
}
// What goes on Port 0 / Port 1.
getOutput() {
return {
...this.measurements.getFlattenedOutput(),
...this.basin.snapshot(),
...this.flowAggregator.snapshot(),
};
}
// What the Node-RED status badge shows — see section 7.
// Aggregators (no clean state machine) use compose. State-machine
// nodes (rotatingMachine) use byState. Both return {fill, shape, text}.
getStatusBadge() {
const direction = this.flowAggregator.direction;
const vol = this.measurements.type('volume').variant('measured').position('atequipment').getCurrentValue('m3');
const pct = (vol / this.basin.maxVolAtOverflow * 100).toFixed(1);
const arrow = direction === 'filling' ? '⬆️' : direction === 'draining' ? '⬇️' : '⏸️';
return statusBadge.compose([
`${arrow} ${pct}%`,
`V=${vol.toFixed(2)}/${this.basin.maxVolAtOverflow.toFixed(2)} m³`,
]);
}
}
module.exports = PumpingStation;
What BaseDomain provides (do not reimplement)
The base constructor sets up:
| Property | Type | Notes |
|---|---|---|
this.emitter |
EventEmitter |
Internal events. Fire 'output-changed' here when public state shifts in event-driven nodes. |
this.configManager, this.configUtils, this.defaultConfig |
— | Wired from static name. |
this.config |
object | Validated config. |
this.logger |
logger | Named after config.general.name. |
this.measurements |
MeasurementContainer |
Built from static unitPolicy. |
this.childRegistrationUtils |
child registry | The child dict is auto-created. |
Then it calls this.configure() — your hook. Then it calls
this._init?.() if defined.
Named child accessors (registry-as-truth, readable in code)
Children live in this.child[<softwareType>][<category>] (the
registry, populated by childRegistrationUtils). For readable code,
each domain declares named getters in configure() that surface
the relevant slices:
configure() {
// Reads as: ps.machines, ps.machineGroups, ps.stations.
this.declareChildGetter('machines', 'machine');
this.declareChildGetter('machineGroups', 'machinegroup');
this.declareChildGetter('stations', 'pumpingstation');
}
declareChildGetter(name, softwareType, category?) (provided by
BaseDomain) installs a getter that flattens
this.child[softwareType] into one object keyed by child id (across
all categories) — or filters by category if given.
The registry is the source of truth; the getters keep call sites
readable. Object.values(this.machines).forEach(...) works exactly
like before; assignments like this.machines[id] = child no longer
work — registration goes through this.router (or registerChild).
Two output strategies — domain decides
| Strategy | When to pick | What domain does | What adapter does |
|---|---|---|---|
| Event-driven (default) | Domain reacts to incoming events (measurements, state changes, commands) and has no genuinely time-driven math. | Fire this.emitter.emit('output-changed') whenever the public output state shifts. |
Subscribes to 'output-changed'; on each fire, calls getOutput() and pushes the delta-compressed message. |
| Tick-driven (opt-in) | Domain has time-driven math that can't be expressed as a reaction to events (integrators, simulators, time-based thresholds). | Implement tick(). Fire 'output-changed' from inside it whenever the tick changes output state. |
Calls tick() every static tickInterval ms (set on the nodeClass subclass). Listens to 'output-changed' the same as event-driven nodes. |
Both strategies funnel into the same 'output-changed' → getOutput()
→ formatMsg → node.send pipeline. The only difference is what
fires the event.
this.context()
Returns a frozen view passed to concern-modules so they don't reach into
this. Default shape:
{
config: this.config,
logger: this.logger,
measurements: this.measurements,
emitter: this.emitter,
child: this.child,
unitPolicy: this.unitPolicy,
}
A node may override context() to add domain-specific keys (e.g.
pumpingStation adds basin).
getOutput() and getStatusBadge() are the only required methods
Everything else is configuration. If a domain can be expressed without a
custom tick() (e.g. a passive aggregator), don't define one.
4. The commands registry
Each node has src/commands/index.js that exports an array of command
descriptors:
const handlers = require('./handlers');
module.exports = [
{
topic: 'set.mode',
aliases: ['setMode', 'changemode'], // legacy names
payloadSchema: { type: 'string' },
description: 'Switch the node between auto and manual control modes.',
handler: handlers.setMode,
},
{
topic: 'cmd.startup',
aliases: ['execSequence:startup'],
payloadSchema: { type: 'object', properties: { source: { type: 'string' } } },
handler: handlers.startup,
},
{
topic: 'cmd.calibrate',
payloadSchema: { type: 'none' },
description: 'Trigger a one-shot calibration. Payload is ignored.',
handler: handlers.calibrate,
},
...
];
payloadSchema.type values
| Type | Meaning |
|---|---|
'string' |
typeof payload === 'string'. |
'number' |
typeof payload === 'number'. |
'boolean' |
typeof payload === 'boolean'. |
'object' |
Non-null object. Optional properties: { key: 'typeName' } enforces per-key typeof (missing keys allowed). |
'any' |
Anything passes. Use when the handler accepts heterogeneous payloads. |
'none' |
Trigger-only. Handler is invoked regardless of payload. If msg.payload is anything other than undefined/null, the registry logs a warn ("<topic>: payload ignored — this is a trigger-only topic") and still invokes the handler. Use for pure triggers (cmd.calibrate, cmd.estop, set.simulator, ...) — strict alternative to 'any'. |
Optional description field
A descriptor may include a free-text 1-line description string. It is surfaced by .list() (the docs surface) and consumed by wikiGen's topic-contract auto-gen. Example:
{ topic: 'cmd.calibrate', payloadSchema: { type: 'none' }, description: 'Trigger a one-shot calibration.', handler: handlers.calibrate }
Optional units field — pre-dispatch unit normalisation
A descriptor for a numeric setter / data topic may declare:
units: { measure: '<measureName>', default: '<unitAbbr>' }
measure: aconvert-recognised measure name (volumeFlowRate,pressure,power,temperature,volume,length, …).default: the unit the handler always receives. Operator-friendly (e.g.m3/h,mbar,kW,C).
Validation: if units is present, both fields must be non-empty strings. The registry throws at construction otherwise.
At dispatch time, before the handler runs and before payload-schema validation, the registry normalises the incoming msg:
- Extract value + unit. Three accepted shapes:
msg.payloadis a number →value = msg.payload,unit = msg.unit.msg.payload = { value: <number>, unit?: <string> }→ use those (falls back tomsg.unitifpayload.unitis absent).- Anything else (string, object without
value, missing payload, …) → normalisation is skipped; the handler receives the raw msg unchanged. No crash.
- Determine the unit-of-record:
- No unit supplied → silently assume
units.default. - Unit recognised + correct measure →
convert(value).from(unit).to(default). - Unit recognised but wrong measure → log
warnwith the topic, the actual measure, the expected measure, and the accepted-unit list. Fall through with the supplied value assumed to already be indefault. - Unit unrecognised → log
warnwith the topic, the unknown unit, and the accepted-unit list. Fall through with the supplied value assumed to already be indefault.
- No unit supplied → silently assume
- Rewrite the msg so the handler sees uniform inputs:
msg.payloadbecomes the normalised number inunits.default(the object form{value, unit}is flattened to a number).msg.unitis set tounits.default.
Accepted-unit lists come from convert.possibilities(measure). If that helper is unavailable, the warn falls back to (see convert docs).
The units field is surfaced by .list() (so wikiGen + query.units can render the contract) and is null for descriptors that don't declare it.
Example:
{
topic: 'set.demand',
units: { measure: 'volumeFlowRate', default: 'm3/h' },
payloadSchema: { type: 'number' },
description: 'Operator demand setpoint.',
handler: handlers.setDemand,
}
A handler is a pure function:
// handlers.js
exports.setMode = (source, msg, ctx) => {
source.setMode(msg.payload);
};
exports.startup = async (source, msg, ctx) => {
await source.handleInput(msg.payload?.source ?? 'parent', 'execSequence', 'startup');
};
The BaseNodeAdapter builds a Map<topic-or-alias, descriptor> at
construction time. Dispatch is one lookup. Aliases log a one-time
deprecation warning the first time each fires.
Why declarative?
- Auto-generates
CONTRACT.mdper node. - Lets us add cross-node static checks (no two nodes use the same
set.xfor different things). - Replaces the per-node 100-line input switch with a 5-line dispatch.
5. ChildRouter — declarative parent registration
Lives in generalFunctions/src/domain/ChildRouter.js. Built on top of
the existing childRegistrationUtils.
this.router = new ChildRouter(this)
// Register a callback when a child of a given software type registers.
.onRegister('machinegroup', (child) => this._onMachineGroupRegistered(child))
// Subscribe to a measurement event from any child of a given softwareType.
// The third arg filters by emit-side position.
.onMeasurement('measurement', { type: 'pressure', position: 'upstream' }, (data, child) => {
this._onPressure('upstream', data.value, data);
})
// Subscribe to predicted-flow events from any group/machine child.
.onPrediction('machinegroup', { type: 'flow', position: 'downstream' }, (data, child) => {
this._onPredictedFlow(child, data);
});
ChildRouter owns:
- The handler maps (
onRegister,onMeasurement,onPrediction). - Listener attachment + teardown (called from
BaseDomainon close). - Software-type alias resolution (already in
childRegistrationUtils).
Per-node registerChild boilerplate disappears. The base
childRegistrationUtils.registerChild calls this.mainClass.registerChild
which delegates to this.router.dispatchRegister(child, softwareType).
6. UnitPolicy
Lives in generalFunctions/src/domain/UnitPolicy.js. Replaces the
duplicated _buildUnitPolicy / _resolveUnitOrFallback /
_convertUnitValue in rotatingMachine and machineGroupControl.
static unitPolicy = UnitPolicy.declare({
canonical: { flow: 'm3/s', pressure: 'Pa', power: 'W', temperature: 'K' },
output: { flow: 'm3/h', pressure: 'mbar', power: 'kW', temperature: 'C' },
curve: { flow: 'm3/h', pressure: 'mbar', power: 'kW', control: '%' }, // optional
// Types whose values must always carry a unit on write.
requireUnitForTypes: ['flow', 'pressure', 'power', 'temperature'],
});
Methods on the resulting policy:
| Method | Purpose |
|---|---|
policy.canonical(type) |
Canonical unit for a measurement type. |
policy.output(type) |
Display / IO unit for a measurement type. |
policy.curve(type) |
Curve-input unit for a measurement type (returns null if no curve was declared). |
policy.resolve(candidate, expectedMeasure, fallback, label) |
Validate a user-supplied unit, fall back if invalid (logs warn). |
policy.convert(value, fromUnit, toUnit, contextLabel) |
Strict conversion. |
policy.containerOptions() |
Returns the option bag for a MeasurementContainer. |
Dual access shape (method OR frozen property bag)
canonical, output, and curve each work both as a method call AND as a
frozen own-property map. They are functions with Object.defineProperty-installed
non-writable, non-configurable own properties, frozen via Object.freeze:
policy.canonical('flow') // 'm3/s' (method)
policy.canonical.flow // 'm3/s' (property)
policy.output.pressure // 'mbar' (property)
policy.curve.control // '%' (property)
policy.canonical.flow = 'tampered'; // TypeError in strict mode
delete policy.canonical.pressure; // TypeError
Object.isFrozen(policy.canonical); // true
The property-bag form is preferred in hot paths and tight inner loops (one
lookup vs one function call). The method form is preferred when the type is
itself dynamic (policy.canonical(typeName)). Both forms are first-class
parts of the contract — call sites may use whichever reads best.
This replaces the per-node _unitView / unitPolicyView mirror that
pre-dated the dual-shape accessor — domains read this.unitPolicy directly.
BaseDomain reads static unitPolicy and passes
policy.containerOptions() straight into new MeasurementContainer(...).
7. getStatusBadge() shape
Every domain returns the standard Node-RED status object:
{
fill: 'green' | 'yellow' | 'red' | 'blue' | 'grey',
shape: 'dot' | 'ring',
text: string, // ≤ 60 chars in the Node-RED editor; aim for ≤ 50.
}
Helpers in generalFunctions/src/nodered/statusBadge.js:
const { statusBadge } = require('generalFunctions');
statusBadge.compose(['🟢 OK', `flow=${flow.toFixed(1)} m³/h`]) // joins with ' | '
statusBadge.error(message) // {fill:'red', shape:'ring', text:`⚠ ${message}`}
statusBadge.idle(label) // {fill:'blue', shape:'dot', text:`⏸️ ${label}`}
The badge is computed in domain, not in nodeClass. nodeClass just
calls this.source.getStatusBadge() once per second.
8. LatestWinsGate
Extracted from MGC's _dispatchInFlight + _delayedCall pattern. Used
anywhere a parent fires commands faster than children can absorb them.
const { LatestWinsGate } = require('generalFunctions');
this.demandGate = new LatestWinsGate(async (demand) => {
await this._dispatchDemandToChildren(demand);
});
// Fire-and-forget — never blocks. The latest demand always wins.
this.demandGate.fire(demand);
// Await the per-fire settlement.
const result = await this.demandGate.fireAndWait(demand);
if (result && result.superseded === true) {
// A later fire/fireAndWait overwrote this one in the pending slot.
}
Guarantees:
- At most one
dispatchrunning at a time per gate. - If a new value arrives while one is running, only the latest is enqueued; intermediate ones are dropped.
- After the in-flight call settles, the latest pending value fires.
fire(value) vs fireAndWait(value)
| Method | Returns | Settles when |
|---|---|---|
fire(value) |
void |
n/a — caller never awaits. |
fireAndWait(value) |
Promise<result | SUPERSEDED | undefined> |
THIS specific fire's dispatch settles. If a later fire (plain or awaited) overwrites this one in the pending slot, the returned promise resolves with the frozen sentinel LatestWinsGate.SUPERSEDED = { superseded: true }. If the dispatch itself throws, the promise still resolves (with undefined) and the error is recorded on gate.lastError — callers don't need try/catch. |
The supersede-resolves-with-sentinel choice (rather than rejecting with
'superseded') means consumers branch on a value:
const r = await gate.fireAndWait(v);
if (r && r.superseded) return; // dropped by a later fire
// ... otherwise r is the dispatch's return value
drain() remains the right tool for "wait until idle" (returns one
promise regardless of how many fires landed); fireAndWait is per-fire.
9. HealthStatus
A standardised shape for nodes that compute prediction quality / drift
(today: rotatingMachine.predictionHealth, future: MGC, pumpingStation
volume confidence).
{
level: 0 | 1 | 2 | 3, // 0 = fine, 3 = unusable
flags: string[], // machine-readable tags, e.g. 'no_pressure_input'
message: string, // single-line human summary
source: string | null, // free-text origin tag
}
Helpers compose multiple sub-statuses (e.g. flow drift + power drift + pressure init) into one node-level status.
10. Output port payload conventions
Already documented in .claude/rules/telemetry.md — kept here only as a
pointer:
- Port 0: process data, formatter chosen by
config.output.process. - Port 1: InfluxDB line-protocol, formatter chosen by
config.output.dbase. - Port 2: registration / control plumbing.
outputUtils.formatMsgdoes delta compression — only changed fields are sent. Consumers must cache + merge.