'use strict'; // Declarative dispatch for a node's input topics. Each node declares its // commands as an array of descriptors; the registry builds an O(1) lookup // keyed by canonical topic + alias, validates the payload against a small // shape schema, and invokes the handler. Replaces the per-node ~100-line // `switch (msg.topic)` block in nodeClass._attachInputHandler. // // Lightweight on purpose: the schema is a typeof-check ladder, not full // JSON-Schema. Anything richer belongs in the handler itself, which has // access to logger via ctx. const convert = require('../convert'); const SCALAR_TYPES = new Set(['string', 'number', 'boolean', 'object', 'any', 'none']); function _acceptedList(measure) { if (convert && typeof convert.possibilities === 'function') { const list = convert.possibilities(measure); if (Array.isArray(list) && list.length) return list.join(', '); } return '(see convert docs)'; } function _describeUnit(unit) { try { return convert().describe(unit); } catch (_) { return null; } } // A numeric scalar is a finite number, or a non-empty string that parses to a // finite number ("60", "1.5"). Node-RED inject/`change` nodes and upstream MQTT // payloads routinely arrive as strings; treating them as non-numeric here is the // gap that let values reach a handler unconverted. function _asNumber(x) { if (typeof x === 'number') return Number.isFinite(x) ? x : null; if (typeof x === 'string' && x.trim() !== '') { const n = Number(x); return Number.isFinite(n) ? n : null; } return null; } function _extractValueAndUnit(msg) { if (!msg || typeof msg !== 'object') return null; const p = msg.payload; if (p && typeof p === 'object') { const value = _asNumber(p.value); if (value === null) return null; return { value, unit: p.unit ?? msg.unit }; } const value = _asNumber(p); if (value === null) return null; return { value, unit: msg.unit }; } // Derive the dimensional measure (e.g. 'volumeFlowRate') from a unit string. // Returns null when convert doesn't recognise the unit. function _measureOf(unit) { const desc = _describeUnit(unit); return desc ? desc.measure : null; } // Command origin = which control authority issued this message (the rotatingMachine // `allowedSources` vocabulary: 'parent' = automation/parent controller, 'GUI' = // SCADA/HMI operator, 'fysical' = physical buttons). Default 'parent'. Named // `origin` on the message because `source` is already the domain instance handed // to handlers. const DEFAULT_ORIGIN = 'parent'; function _resolveOrigin(msg, descriptor) { const o = msg && typeof msg.origin === 'string' && msg.origin.trim() !== '' ? msg.origin.trim() : (descriptor.defaultOrigin || DEFAULT_ORIGIN); return o; } // allowedSources values may be a Set (post config processing, as rotatingMachine // stores them) or a plain array (raw config / other nodes). Accept both. function _setHas(coll, value) { if (!coll) return false; if (typeof coll.has === 'function') return coll.has(value); if (Array.isArray(coll)) return coll.includes(value); return false; } class CommandRegistry { constructor(commands, options = {}) { if (!Array.isArray(commands)) { throw new TypeError('CommandRegistry requires an array of command descriptors'); } this._logger = options.logger || null; this._byKey = new Map(); // topic-or-alias -> descriptor this._canonicalByAlias = new Map(); this._descriptors = []; this._deprecationCounts = new Map(); this._deprecationLogged = new Set(); for (const cmd of commands) this._register(cmd); } _register(cmd) { if (!cmd || typeof cmd.topic !== 'string' || cmd.topic.length === 0) { throw new TypeError('command descriptor requires a non-empty string topic'); } if (typeof cmd.handler !== 'function') { throw new TypeError(`command '${cmd.topic}' requires a handler function`); } if (this._byKey.has(cmd.topic)) { throw new Error(`duplicate command topic '${cmd.topic}'`); } const aliases = Array.isArray(cmd.aliases) ? cmd.aliases.slice() : []; for (const alias of aliases) { if (typeof alias !== 'string' || alias.length === 0) { throw new TypeError(`command '${cmd.topic}' has an invalid alias`); } if (this._byKey.has(alias)) { throw new Error(`alias '${alias}' for '${cmd.topic}' collides with existing topic or alias`); } } const units = this._validateUnits(cmd); const descriptor = { topic: cmd.topic, aliases, payloadSchema: cmd.payloadSchema || null, description: typeof cmd.description === 'string' ? cmd.description : null, units, gated: cmd.gated === true, defaultOrigin: typeof cmd.defaultOrigin === 'string' ? cmd.defaultOrigin : null, handler: cmd.handler, }; this._byKey.set(cmd.topic, descriptor); for (const alias of aliases) { this._byKey.set(alias, descriptor); this._canonicalByAlias.set(alias, cmd.topic); } this._descriptors.push(descriptor); } _validateUnits(cmd) { // Two ways to declare the unit, normalised to the same internal shape: // unit: 'm3/h' (preferred — measure derived) // units: { default: 'm3/h' } (measure derived) // units: { measure, default: 'm3/h' } (legacy — measure ignored, derived) // The measure is always derived from the unit so it can never drift from it. let def; if (typeof cmd.unit === 'string') def = cmd.unit; else if (cmd.units === undefined || cmd.units === null) return null; else if (typeof cmd.units === 'string') def = cmd.units; else def = cmd.units.default; if (typeof def !== 'string' || def.length === 0) { throw new TypeError( `command '${cmd.topic}' requires a unit string (unit: 'm3/h' or units: { default: 'm3/h' })`); } const measure = _measureOf(def); if (!measure) { throw new TypeError( `command '${cmd.topic}' declares unit '${def}' which convert does not recognise`); } return { measure, default: def }; } has(topic) { return typeof topic === 'string' && this._byKey.has(topic); } canonical(topic) { if (typeof topic !== 'string') return topic; return this._canonicalByAlias.get(topic) || topic; } list() { // Strip handler so callers can safely log / serialise the result // (handler functions are noisy and not contract-relevant). return this._descriptors.map((d) => ({ topic: d.topic, aliases: d.aliases.slice(), payloadSchema: d.payloadSchema, description: d.description, units: d.units ? { measure: d.units.measure, default: d.units.default } : null, })); } deprecationStats() { const out = {}; for (const [alias, count] of this._deprecationCounts) out[alias] = count; return out; } async dispatch(msg, source, ctx) { const log = this._loggerFor(ctx); const topic = msg && typeof msg.topic === 'string' ? msg.topic : null; if (!topic) { log.warn?.('commandRegistry: msg has no topic; ignoring'); return; } const descriptor = this._byKey.get(topic); if (!descriptor) { log.warn?.(`commandRegistry: unknown topic '${topic}'`); return; } if (topic !== descriptor.topic) this._noteAlias(topic, descriptor.topic, log); // Always stamp the command origin so handlers + gating can rely on it. msg.origin = _resolveOrigin(msg, descriptor); if (!this._originAllowed(descriptor, source, msg.origin, log)) return; if (descriptor.units) this._normaliseUnits(descriptor, msg, log); if (!this._validatePayload(descriptor, msg, log)) return; return descriptor.handler(source, msg, ctx); } // Mode-gated control-authority arbitration. Opt-in per command via // `gated: true`. The asset's mode (e.g. rotatingMachine's auto / // virtualControl / fysicalControl) decides which origins it accepts via // `source.config.mode.allowedSources[mode]`. Release = changing the mode. // Nodes without a mode model are advisory (allow-all) so this is inert // until a node opts in — never a silent behaviour change. _originAllowed(descriptor, source, origin, log) { if (!descriptor.gated) return true; const allowedSources = source && source.config && source.config.mode ? source.config.mode.allowedSources : null; const mode = source ? source.currentMode : undefined; if (!allowedSources || !mode) return true; // no mode model → advisory if (_setHas(allowedSources[mode], origin)) return true; log.warn?.(`${descriptor.topic}: origin '${origin}' not allowed in mode '${mode}'`); return false; } _noteAlias(alias, canonical, log) { const prev = this._deprecationCounts.get(alias) || 0; this._deprecationCounts.set(alias, prev + 1); if (this._deprecationLogged.has(alias)) return; this._deprecationLogged.add(alias); log.warn?.(`topic '${alias}' is deprecated; use '${canonical}'`); } _normaliseUnits(descriptor, msg, log) { const { measure, default: defaultUnit } = descriptor.units; const extracted = _extractValueAndUnit(msg); if (!extracted) return; // unknown shape — let payload validator handle it let { value, unit } = extracted; if (unit === undefined || unit === null || unit === '') { // No unit supplied — assume default, silent. msg.payload = value; msg.unit = defaultUnit; return; } const desc = _describeUnit(unit); if (!desc) { log.warn?.(`${descriptor.topic}: unknown unit '${unit}'. Accepted: ${_acceptedList(measure)}. Treating ${value} as ${defaultUnit}.`); msg.payload = value; msg.unit = defaultUnit; return; } if (desc.measure !== measure) { log.warn?.(`${descriptor.topic}: unit '${unit}' is ${desc.measure}, expected ${measure}. Accepted: ${_acceptedList(measure)}. Treating ${value} as ${defaultUnit}.`); msg.payload = value; msg.unit = defaultUnit; return; } try { msg.payload = convert(value).from(unit).to(defaultUnit); msg.unit = defaultUnit; } catch (err) { log.warn?.(`${descriptor.topic}: failed to convert ${value} ${unit} -> ${defaultUnit} (${err.message}). Treating as ${defaultUnit}.`); msg.payload = value; msg.unit = defaultUnit; } } _validatePayload(descriptor, msg, log) { const schema = descriptor.payloadSchema; if (!schema) return true; const payload = msg.payload; const type = schema.type || 'any'; if (!SCALAR_TYPES.has(type)) { log.warn?.(`commandRegistry: command '${descriptor.topic}' has unknown schema type '${type}'`); return true; } if (type === 'any') return true; if (type === 'none') { if (payload !== undefined && payload !== null) { log.warn?.(`${descriptor.topic}: payload ignored — this is a trigger-only topic`); } return true; } // typeof null === 'object' — explicit null fails an object schema. if (type === 'object') { if (payload === null || typeof payload !== 'object') { log.warn?.(`commandRegistry: '${descriptor.topic}' expected object payload, got ${payload === null ? 'null' : typeof payload}`); return false; } } else if (typeof payload !== type) { log.warn?.(`commandRegistry: '${descriptor.topic}' expected ${type} payload, got ${typeof payload}`); return false; } if (type === 'object' && schema.properties && typeof schema.properties === 'object') { for (const [key, expected] of Object.entries(schema.properties)) { if (!(key in payload)) continue; // missing keys allowed if (typeof payload[key] !== expected) { log.warn?.(`commandRegistry: '${descriptor.topic}' payload.${key} expected ${expected}, got ${typeof payload[key]}`); return false; } } } return true; } _loggerFor(ctx) { const candidate = (ctx && ctx.logger) || this._logger; return candidate || NOOP_LOGGER; } } const NOOP_LOGGER = { warn() {}, error() {}, info() {}, debug() {} }; function createRegistry(commands, options) { return new CommandRegistry(commands, options); } module.exports = { createRegistry, CommandRegistry };