B3.1 + B3.2 + B3.3: ChildRouter fan-out + commandRegistry 'none' + UnitPolicy dual-shape

B3.1 ChildRouter per-listener fan-out (drop emit monkey-patch):
  Partial-filter subscriptions enumerate every concrete
  <type>.measured.<position> event name (cartesian product over
  the canonical POSITIONS list + 19 KNOWN_TYPES) and register a
  plain emitter.on() per combo. Multi-parent semantics are trivial:
  each ChildRouter's listeners are independent. Drop the wrap/unwrap
  bookkeeping in tearDown. ChildRouter.js 184→164 lines.

B3.2 commandRegistry 'none' + description:
  Add 'none' to payloadSchema.type — handler still fires; logs warn
  if msg.payload is non-empty (catches accidental passes). Add
  optional `description` field per descriptor; surfaced via .list()
  so wikiGen can render per-topic effect text.
  commandRegistry.js 157→164 lines. 23/23 tests pass.

B3.3 UnitPolicy dual-shape:
  policy.canonical/output/curve are now BOTH callable methods AND
  frozen property bags. policy.canonical('flow') === 'm3/s' and
  policy.canonical.flow === 'm3/s' both work. Property bags are
  frozen (assign/delete/redefine throw in strict). Drops the
  _unitView workaround in MGC + rotatingMachine specificClass.
  UnitPolicy.js 149→163 lines, 15/15 tests pass.

CONTRACTS.md §4 + §6 updated.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
znetsixe
2026-05-11 17:13:15 +02:00
parent ff9aec8702
commit f11754635b
6 changed files with 255 additions and 88 deletions

View File

@@ -7,35 +7,65 @@
*
* See CONTRACTS.md §5. Built on top of `childRegistrationUtils`, which
* already canonicalises softwareType (e.g. rotatingmachine → machine).
*
* Wildcard / partial-filter subscriptions enumerate every concrete
* `<type>.<variant>.<position>` event name the filter matches and attach a
* plain `emitter.on(...)` per combination. No emit patching — multi-parent
* stacks compose cleanly because each parent owns its own listeners.
*/
const { POSITION_VALUES } = require('../constants/positions');
// Same alias map as childRegistrationUtils. Duplicated rather than imported
// because we need to canonicalise inputs to onRegister/onMeasurement/onPrediction
// at *declaration* time (before any child has registered), so that a domain
// can write `onRegister('rotatingmachine', ...)` or `onRegister('machine', ...)`
// interchangeably and have the dispatch match.
const SOFTWARE_TYPE_ALIASES = {
rotatingmachine: 'machine',
machinegroupcontrol: 'machinegroup',
};
// Canonical measurement-type set used to enumerate position-only and
// match-everything filters. Sourced from MeasurementContainer.measureMap
// plus the EVOLV-specific synthetic types the nodes routinely emit
// (level / volumePercent / efficiency / Ncog / netFlowRate). Keep in sync
// with MeasurementContainer if new types land there.
const KNOWN_TYPES = Object.freeze([
'flow',
'pressure',
'atmPressure',
'power',
'hydraulicPower',
'reactivePower',
'apparentPower',
'temperature',
'level',
'volume',
'volumePercent',
'length',
'mass',
'energy',
'reactiveEnergy',
'efficiency',
'Ncog',
'netFlowRate',
]);
function canonicalType(rawType) {
const t = String(rawType || '').toLowerCase();
return SOFTWARE_TYPE_ALIASES[t] || t;
}
function lowerPosition(p) {
return String(p).toLowerCase();
}
class ChildRouter {
constructor(domain) {
this.domain = domain;
this.logger = domain?.logger || null;
// Subscription tables, keyed by canonical softwareType.
this._registerSubs = new Map(); // softwareType -> Array<fn>
this._measurementSubs = new Map(); // softwareType -> Array<{filter, fn}>
this._predictionSubs = new Map(); // softwareType -> Array<{filter, fn}>
// Track every emitter listener we attach so tearDown can remove them.
this._attached = [];
// Every plain emitter listener we attach, so tearDown can remove them.
this._listeners = [];
}
// ── declaration API ────────────────────────────────────────────────
@@ -60,7 +90,6 @@ class ChildRouter {
_addEventSub(table, softwareType, filter, fn, label) {
if (typeof filter === 'function' && fn === undefined) {
// Allow `onMeasurement(type, fn)` shorthand — no filter.
fn = filter;
filter = {};
}
@@ -75,10 +104,6 @@ class ChildRouter {
// ── dispatch ──────────────────────────────────────────────────────
/**
* Called by the domain's registerChild(). Runs onRegister handlers, then
* attaches measurement/prediction listeners on the child's emitter.
*/
dispatchRegister(child, softwareType) {
const key = canonicalType(softwareType);
@@ -98,51 +123,24 @@ class ChildRouter {
_attachVariantListeners(child, key, emitter, variant, table) {
const subs = table.get(key) || [];
for (const { filter, fn } of subs) {
// Build the set of (type, position) tuples this sub matches. If a filter
// omits one or both of {type, position}, we can't pre-enumerate the event
// names — fall back to a wildcard listener via `emit`-time matching.
if (filter.type && filter.position) {
const eventName = `${filter.type}.${variant}.${String(filter.position).toLowerCase()}`;
this._attach(emitter, eventName, (data) => this._invoke(fn, data, child, variant));
continue;
}
const types = filter.type ? [filter.type] : KNOWN_TYPES;
const positions = filter.position ? [lowerPosition(filter.position)] : POSITION_VALUES.map(lowerPosition);
const handlerLabel = variant === 'measured' ? 'onMeasurement' : 'onPrediction';
// Wildcard: subscribe to a generic catch-all by patching emitter.emit.
// EventEmitter has no built-in wildcard — install a one-off proxy listener
// that intercepts every emit on this emitter and filters by name.
const proxyKey = `__childRouter_proxy_${variant}__`;
if (!emitter[proxyKey]) {
const origEmit = emitter.emit.bind(emitter);
const proxies = [];
emitter[proxyKey] = proxies;
emitter.emit = (eventName, ...args) => {
const parts = String(eventName).split('.');
if (parts.length === 3 && parts[1] === variant) {
for (const p of proxies) p({ type: parts[0], position: parts[2], args });
}
return origEmit(eventName, ...args);
};
// Track the proxy install for tearDown to undo.
this._attached.push({ emitter, kind: 'proxy', variant, original: origEmit, proxyKey });
}
const proxyFn = ({ type, position, args }) => {
if (filter.type && type !== filter.type) return;
if (filter.position && position !== String(filter.position).toLowerCase()) return;
this._invoke(fn, args[0], child, variant);
};
emitter[proxyKey].push(proxyFn);
this._attached.push({ emitter, kind: 'proxyEntry', proxyKey, proxyFn });
}
}
_attach(emitter, eventName, listener) {
for (const type of types) {
for (const pos of positions) {
const eventName = `${type}.${variant}.${pos}`;
const listener = (data) => this._invoke(fn, data, child, handlerLabel);
emitter.on(eventName, listener);
this._attached.push({ emitter, kind: 'listener', eventName, listener });
this._listeners.push({ emitter, eventName, listener });
}
}
}
}
_invoke(fn, eventData, child, variant) {
_invoke(fn, eventData, child, handlerLabel) {
try { fn.call(this.domain, eventData, child); }
catch (err) { this._logHandlerError(`on${variant === 'measured' ? 'Measurement' : 'Prediction'}`, '', err); }
catch (err) { this._logHandlerError(handlerLabel, '', err); }
}
_logHandlerError(kind, key, err) {
@@ -154,31 +152,13 @@ class ChildRouter {
// ── teardown ──────────────────────────────────────────────────────
tearDown() {
// Two passes: drop concrete listeners + proxy entries first, then unwrap
// any proxies whose entry list is now empty. Order matters — restoring
// emit before clearing entries would leave dangling proxy state.
for (const rec of this._attached) {
if (rec.kind === 'listener') {
if (typeof rec.emitter.off === 'function') rec.emitter.off(rec.eventName, rec.listener);
else if (typeof rec.emitter.removeListener === 'function') rec.emitter.removeListener(rec.eventName, rec.listener);
} else if (rec.kind === 'proxyEntry') {
const proxies = rec.emitter[rec.proxyKey];
if (Array.isArray(proxies)) {
const idx = proxies.indexOf(rec.proxyFn);
if (idx >= 0) proxies.splice(idx, 1);
for (const { emitter, eventName, listener } of this._listeners) {
if (typeof emitter.off === 'function') emitter.off(eventName, listener);
else if (typeof emitter.removeListener === 'function') emitter.removeListener(eventName, listener);
}
}
}
for (const rec of this._attached) {
if (rec.kind !== 'proxy') continue;
const proxies = rec.emitter[rec.proxyKey];
if (!Array.isArray(proxies) || proxies.length === 0) {
rec.emitter.emit = rec.original;
delete rec.emitter[rec.proxyKey];
}
}
this._attached = [];
this._listeners = [];
}
}
module.exports = ChildRouter;
module.exports.KNOWN_TYPES = KNOWN_TYPES;

View File

@@ -34,6 +34,14 @@ class UnitPolicy {
this._logger = logger || null;
// Warn-once memo: same (label, candidate) pair only logs the first time.
this._warned = new Set();
// Dual-shape accessors: each of canonical/output/curve is BOTH a method
// (legacy `policy.canonical('flow')`) AND a frozen property bag
// (`policy.canonical.flow`). The function carries the frozen map's own
// properties via Object.defineProperty so consumers can pick either form.
this.canonical = makeAccessor(this._canonical);
this.output = makeAccessor(this._output);
this.curve = makeAccessor(this._curve || {});
}
static declare(spec = {}) {
@@ -51,18 +59,6 @@ class UnitPolicy {
return this;
}
canonical(type) {
return this._canonical[type] || null;
}
output(type) {
return this._output[type] || null;
}
curve(type) {
return this._curve ? (this._curve[type] || null) : null;
}
/**
* Validate a user-supplied unit string against `expectedMeasure`. On any
* mismatch return `fallback` and warn once for this (label, candidate)
@@ -137,6 +133,24 @@ function freezeShallow(obj) {
return Object.freeze({ ...(obj || {}) });
}
// Build a function that doubles as a frozen property bag. `accessor(type)`
// returns the unit for that type (legacy method shape). `accessor.flow` etc.
// return the unit directly (new property shape). Own-properties are
// non-writable, non-configurable; attempts to assign / delete / redefine
// throw in strict mode — proving the bag is genuinely frozen.
function makeAccessor(map) {
const fn = (type) => map[type] || null;
for (const key of Object.keys(map)) {
Object.defineProperty(fn, key, {
value: map[key],
writable: false,
enumerable: true,
configurable: false,
});
}
return Object.freeze(fn);
}
// Accepts either the convert-module measure family ('volumeFlowRate') or one
// of our type names ('flow') and returns the convert-module measure.
function resolveMeasure(expected) {

View File

@@ -10,7 +10,7 @@
// JSON-Schema. Anything richer belongs in the handler itself, which has
// access to logger via ctx.
const SCALAR_TYPES = new Set(['string', 'number', 'boolean', 'object', 'any']);
const SCALAR_TYPES = new Set(['string', 'number', 'boolean', 'object', 'any', 'none']);
class CommandRegistry {
constructor(commands, options = {}) {
@@ -49,6 +49,7 @@ class CommandRegistry {
topic: cmd.topic,
aliases,
payloadSchema: cmd.payloadSchema || null,
description: typeof cmd.description === 'string' ? cmd.description : null,
handler: cmd.handler,
};
this._byKey.set(cmd.topic, descriptor);
@@ -75,6 +76,7 @@ class CommandRegistry {
topic: d.topic,
aliases: d.aliases.slice(),
payloadSchema: d.payloadSchema,
description: d.description,
}));
}
@@ -119,6 +121,12 @@ class CommandRegistry {
return true;
}
if (type === 'any') return true;
if (type === 'none') {
if (payload !== undefined && payload !== null) {
log.warn?.(`${descriptor.topic}: payload ignored — this is a trigger-only topic`);
}
return true;
}
// typeof null === 'object' — explicit null fails an object schema.
if (type === 'object') {
if (payload === null || typeof payload !== 'object') {

View File

@@ -195,3 +195,74 @@ test('chainable API returns the router instance', () => {
.onPrediction('machine', { type: 'flow', position: 'downstream' }, () => {});
assert.equal(r, router);
});
test('multi-parent: two routers on the same child both receive every event and tear down independently', () => {
// Regression for the pre-2026-05-11 emit-patching stack: two parents
// subscribing partial-filter wildcards on the same child must compose
// without stacking wrappers, and either teardown order must work.
const routerA = new ChildRouter(makeDomain());
const routerB = new ChildRouter(makeDomain());
const a = []; const b = [];
routerA.onMeasurement('measurement', { type: 'pressure' },
(data) => a.push(data.value));
routerB.onMeasurement('measurement', { type: 'pressure' },
(data) => b.push(data.value));
const ch = makeChild();
routerA.dispatchRegister(ch, 'measurement');
routerB.dispatchRegister(ch, 'measurement');
emitMeasured(ch, 'pressure', 'upstream', 11);
emitMeasured(ch, 'pressure', 'downstream', 22);
assert.deepEqual(a.sort(), [11, 22]);
assert.deepEqual(b.sort(), [11, 22]);
// Tear down B first — A must continue to fire on subsequent events.
routerB.tearDown();
emitMeasured(ch, 'pressure', 'upstream', 33);
assert.deepEqual(a.sort(), [11, 22, 33]);
assert.deepEqual(b.sort(), [11, 22], 'B receives nothing after its teardown');
// Now tear down A in the reverse order; neither should fire.
routerA.tearDown();
emitMeasured(ch, 'pressure', 'upstream', 44);
assert.deepEqual(a.sort(), [11, 22, 33], 'A receives nothing after its teardown');
assert.deepEqual(b.sort(), [11, 22]);
});
test('position-only filter fans out across every known type for that position', () => {
const router = new ChildRouter(makeDomain());
const hits = [];
router.onMeasurement('measurement', { position: 'upstream' },
(data) => hits.push(data.value));
const ch = makeChild();
router.dispatchRegister(ch, 'measurement');
emitMeasured(ch, 'pressure', 'upstream', 1);
emitMeasured(ch, 'flow', 'upstream', 2);
emitMeasured(ch, 'temperature', 'upstream', 3);
emitMeasured(ch, 'pressure', 'downstream', 99); // wrong position
emitPredicted(ch, 'pressure', 'upstream', 99); // wrong variant
assert.deepEqual(hits.sort(), [1, 2, 3]);
});
test('empty filter ({}) fires for every type/position combination', () => {
const router = new ChildRouter(makeDomain());
const hits = [];
router.onMeasurement('measurement', {}, (data) => hits.push(data.value));
const ch = makeChild();
router.dispatchRegister(ch, 'measurement');
emitMeasured(ch, 'pressure', 'upstream', 1);
emitMeasured(ch, 'flow', 'downstream', 2);
emitMeasured(ch, 'level', 'atequipment', 3);
emitPredicted(ch, 'flow', 'upstream', 99); // wrong variant
assert.deepEqual(hits.sort(), [1, 2, 3]);
});

View File

@@ -34,6 +34,53 @@ test('declare returns a policy whose canonical/output match the input', () => {
assert.equal(policy.curve('control'), '%');
});
test('canonical/output/curve are also frozen property bags (dot access)', () => {
const policy = UnitPolicy.declare(baseSpec);
// Property-access form — equivalent to the method-call form above.
assert.equal(policy.canonical.flow, 'm3/s');
assert.equal(policy.canonical.pressure, 'Pa');
assert.equal(policy.output.flow, 'm3/h');
assert.equal(policy.output.temperature, 'C');
assert.equal(policy.curve.flow, 'm3/h');
assert.equal(policy.curve.control, '%');
// Method-call form keeps working alongside it.
assert.equal(policy.canonical('flow'), 'm3/s');
assert.equal(policy.output('power'), 'kW');
});
test('canonical/output/curve property bags are frozen — no assignment / delete / redefine', () => {
'use strict';
const policy = UnitPolicy.declare(baseSpec);
// Existing own-properties are non-writable.
assert.throws(() => { policy.canonical.flow = 'tampered'; }, TypeError);
// Existing own-properties are non-configurable: delete throws.
assert.throws(() => { delete policy.canonical.pressure; }, TypeError);
// Redefining an existing prop throws.
assert.throws(
() => Object.defineProperty(policy.canonical, 'flow', { value: 'tampered' }),
TypeError
);
// Object.isFrozen reports the accessor as frozen.
assert.equal(Object.isFrozen(policy.canonical), true);
assert.equal(Object.isFrozen(policy.output), true);
assert.equal(Object.isFrozen(policy.curve), true);
// Original values survive the failed attempts.
assert.equal(policy.canonical.flow, 'm3/s');
assert.equal(policy.canonical.pressure, 'Pa');
});
test('curve property bag is present (empty) even when no curve was declared', () => {
const policy = UnitPolicy.declare({
canonical: baseSpec.canonical,
output: baseSpec.output,
});
// Method form returns null for unknown types.
assert.equal(policy.curve('flow'), null);
// Property form is an empty frozen function — accessing missing keys is undefined.
assert.equal(policy.curve.flow, undefined);
assert.equal(Object.isFrozen(policy.curve), true);
});
test('declare throws when canonical or output is missing', () => {
assert.throws(() => UnitPolicy.declare({ output: {} }), /canonical/);
assert.throws(() => UnitPolicy.declare({ canonical: {} }), /output/);

View File

@@ -151,15 +151,62 @@ test('list() returns descriptors without handler functions', () => {
topic: 'set.mode',
aliases: ['setMode'],
payloadSchema: { type: 'string' },
description: null,
});
assert.deepEqual(list[1], {
topic: 'cmd.startup',
aliases: [],
payloadSchema: null,
description: null,
});
for (const d of list) assert.ok(!('handler' in d), 'handler must not be in descriptor');
});
test("payloadSchema type 'none' invokes handler with no payload and no warning", async () => {
const logger = makeLogger();
let invoked = 0;
const reg = createRegistry([{
topic: 'cmd.calibrate',
payloadSchema: { type: 'none' },
handler: () => { invoked += 1; },
}], { logger });
await reg.dispatch({ topic: 'cmd.calibrate' }, {}, {});
await reg.dispatch({ topic: 'cmd.calibrate', payload: undefined }, {}, {});
await reg.dispatch({ topic: 'cmd.calibrate', payload: null }, {}, {});
assert.equal(invoked, 3);
assert.equal(logger._calls.warn.length, 0);
});
test("payloadSchema type 'none' invokes handler with non-empty payload but logs warn", async () => {
const logger = makeLogger();
let invoked = 0;
const reg = createRegistry([{
topic: 'cmd.calibrate',
payloadSchema: { type: 'none' },
handler: () => { invoked += 1; },
}], { logger });
await reg.dispatch({ topic: 'cmd.calibrate', payload: 'ignored' }, {}, {});
await reg.dispatch({ topic: 'cmd.calibrate', payload: { a: 1 } }, {}, {});
await reg.dispatch({ topic: 'cmd.calibrate', payload: 0 }, {}, {});
assert.equal(invoked, 3);
const warns = logger._calls.warn.filter((m) => m.includes('payload ignored'));
assert.equal(warns.length, 3);
assert.ok(warns[0].includes('cmd.calibrate'));
assert.ok(warns[0].includes('trigger-only'));
});
test('list() includes description field when present', () => {
const reg = createRegistry([
{ topic: 'cmd.calibrate', payloadSchema: { type: 'none' }, description: 'Trigger calibration.', handler: () => {} },
{ topic: 'set.mode', payloadSchema: { type: 'string' }, handler: () => {} },
]);
const list = reg.list();
assert.equal(list[0].description, 'Trigger calibration.');
assert.equal(list[1].description, null);
});
test('deprecationStats reflects alias hit counts', async () => {
const logger = makeLogger();
const reg = createRegistry([{