B2.3 + P11.1 + P11.2 + monster schema fix
B2.3 LatestWinsGate fireAndWait:
Added fireAndWait(value, ctx?) returning per-fire settlement promise.
Supersede resolves with frozen sentinel {superseded: true} (no
rejection — callers branch on value without try/catch). Dispatch
errors also resolve (with undefined); error surfaces via gate.lastError.
LatestWinsGate.js 75 → 116 lines. 12/12 tests pass.
P11.1 convert.possibilities(measure):
New helper returning sorted+deduped unit names for a measure.
Cached per measure. Reuses existing convert measures map. Also
exposed convert.measures() listing all known measures.
convert/index.js +21 lines. New test file: 90 lines, 12/12 tests.
P11.2 commandRegistry.units field:
Pre-dispatch normalisation pipeline. descriptor.units = {measure,
default}; commandRegistry extracts msg.payload + msg.unit (3 shapes),
validates against measure, converts to default, falls back + warns
with accepted-list on unknown/wrong-measure. Falls back gracefully
if convert.possibilities is missing. commandRegistry.js 164 → 237.
+7 new tests covering all 4 paths.
monster schema fix (P11.2 sibling):
generalFunctions/src/configs/monster.json was stripping four
legitimate constraint keys (nominalFlowMin, flowMax, maxRainRef,
minSampleIntervalSec). Added them with defaults matching the
legacy nodeClass coercion. Side effect: this also UNBLOCKED the
monster cooldown-guard test (separate ROOT-CAUSE entry below).
CONTRACTS.md §4 + §8 updated. 144/144 basic tests + 206/206 full
generalFunctions tests pass.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -2,9 +2,22 @@
|
||||
|
||||
// Serialises an async dispatch so that high-frequency callers cannot stack
|
||||
// up overlapping invocations. Intermediate values are dropped — only the
|
||||
// most recent fire() during an in-flight dispatch is replayed afterwards.
|
||||
// Extracted from machineGroupControl's _dispatchInFlight + _delayedCall
|
||||
// pattern so MGC, pumpingStation, valveGroupControl etc. can share it.
|
||||
// most recent fire()/fireAndWait() during an in-flight dispatch is replayed
|
||||
// afterwards. Extracted from machineGroupControl's _dispatchInFlight +
|
||||
// _delayedCall pattern so MGC, pumpingStation, valveGroupControl etc. can
|
||||
// share it.
|
||||
//
|
||||
// fire(value) — never blocks; returns void.
|
||||
// fireAndWait(value) — returns a promise that settles when THIS value's
|
||||
// dispatch runs to completion. If a later fireAndWait
|
||||
// arrives during the in-flight call and supersedes
|
||||
// this one in the pending slot, the returned promise
|
||||
// RESOLVES with { superseded: true } instead of
|
||||
// rejecting — callers can branch on a sentinel
|
||||
// without try/catch. The dispatch's own return value
|
||||
// (when not superseded) is forwarded as the resolution.
|
||||
|
||||
const SUPERSEDED = Object.freeze({ superseded: true });
|
||||
|
||||
class LatestWinsGate {
|
||||
constructor(asyncDispatchFn, options = {}) {
|
||||
@@ -14,7 +27,7 @@ class LatestWinsGate {
|
||||
this._dispatch = asyncDispatchFn;
|
||||
this._logger = options.logger || null;
|
||||
this._inFlight = false;
|
||||
this._pending = null; // { value, ctx } | null
|
||||
this._pending = null; // { value, ctx, settle? } | null
|
||||
this._drainResolvers = []; // resolved when idle again
|
||||
this.lastError = null;
|
||||
}
|
||||
@@ -25,14 +38,31 @@ class LatestWinsGate {
|
||||
return this._pending ? 2 : 1;
|
||||
}
|
||||
|
||||
// Never blocks the caller. If a dispatch is in flight, the latest
|
||||
// value is parked; older parked values are silently overwritten.
|
||||
// Never blocks. If a dispatch is in flight, the latest value is parked;
|
||||
// older parked values are silently overwritten.
|
||||
fire(value, ctx) {
|
||||
if (this._inFlight) {
|
||||
this._pending = { value, ctx };
|
||||
this._supersedePending();
|
||||
this._pending = { value, ctx, settle: null };
|
||||
return;
|
||||
}
|
||||
this._run(value, ctx);
|
||||
this._run(value, ctx, null);
|
||||
}
|
||||
|
||||
// Returns a promise that resolves when THIS fire's dispatch settles.
|
||||
// If this fire gets overwritten while parked, resolves with the
|
||||
// SUPERSEDED sentinel ({ superseded: true }) — callers branch on
|
||||
// result.superseded === true without try/catch.
|
||||
fireAndWait(value, ctx) {
|
||||
return new Promise((resolve) => {
|
||||
const settle = resolve;
|
||||
if (this._inFlight) {
|
||||
this._supersedePending();
|
||||
this._pending = { value, ctx, settle };
|
||||
return;
|
||||
}
|
||||
this._run(value, ctx, settle);
|
||||
});
|
||||
}
|
||||
|
||||
drain() {
|
||||
@@ -40,18 +70,28 @@ class LatestWinsGate {
|
||||
return new Promise((resolve) => { this._drainResolvers.push(resolve); });
|
||||
}
|
||||
|
||||
_run(value, ctx) {
|
||||
_supersedePending() {
|
||||
const prev = this._pending;
|
||||
if (prev && typeof prev.settle === 'function') prev.settle(SUPERSEDED);
|
||||
this._pending = null;
|
||||
}
|
||||
|
||||
_run(value, ctx, settle) {
|
||||
this._inFlight = true;
|
||||
// Kick the dispatch on a microtask so fire() always returns
|
||||
// synchronously, even if _dispatch resolves immediately.
|
||||
// Kick the dispatch on a microtask so fire()/fireAndWait() always
|
||||
// return synchronously, even if _dispatch resolves immediately.
|
||||
Promise.resolve()
|
||||
.then(() => this._dispatch(value, ctx))
|
||||
.catch((err) => {
|
||||
.then((result) => {
|
||||
if (typeof settle === 'function') settle(result);
|
||||
}, (err) => {
|
||||
this.lastError = err;
|
||||
if (this._logger && typeof this._logger.error === 'function') {
|
||||
this._logger.error(err);
|
||||
}
|
||||
// Swallow: an error must not deadlock the gate.
|
||||
// Resolve (not reject) so fireAndWait callers don't need
|
||||
// try/catch. Dispatch errors stay observable via lastError.
|
||||
if (typeof settle === 'function') settle(undefined);
|
||||
})
|
||||
.then(() => this._afterDispatch());
|
||||
}
|
||||
@@ -59,9 +99,9 @@ class LatestWinsGate {
|
||||
_afterDispatch() {
|
||||
this._inFlight = false;
|
||||
if (this._pending) {
|
||||
const { value, ctx } = this._pending;
|
||||
const { value, ctx, settle } = this._pending;
|
||||
this._pending = null;
|
||||
this._run(value, ctx);
|
||||
this._run(value, ctx, settle);
|
||||
return;
|
||||
}
|
||||
// Idle — release any drain() waiters.
|
||||
@@ -71,4 +111,6 @@ class LatestWinsGate {
|
||||
}
|
||||
}
|
||||
|
||||
LatestWinsGate.SUPERSEDED = SUPERSEDED;
|
||||
|
||||
module.exports = LatestWinsGate;
|
||||
|
||||
Reference in New Issue
Block a user