P6: convert valveGroupControl to BaseDomain + BaseNodeAdapter + concern split
Refactor of valveGroupControl to use the platform infrastructure (BaseDomain, BaseNodeAdapter, ChildRouter, commandRegistry, statusBadge). Extracts concerns into focused modules per .claude/refactor/MODULE_SPLIT.md generic template. Tests stay green; CONTRACT.md generated; legacy aliases preserved. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
186
src/sources/fluidContract.js
Normal file
186
src/sources/fluidContract.js
Normal file
@@ -0,0 +1,186 @@
|
||||
'use strict';
|
||||
|
||||
// Upstream-source registration + fluid-contract reconciliation.
|
||||
// Sources are non-valve upstream children (rotatingMachine, MGC, PS, …)
|
||||
// that publish flow events and optionally a service-type contract.
|
||||
// VGC aggregates their contracts into one group-level view that valves
|
||||
// can read for compatibility checks.
|
||||
|
||||
const SERVICE_TYPES = new Set(['gas', 'liquid']);
|
||||
const SOURCE_SOFTWARE_TYPES = new Set([
|
||||
'machine',
|
||||
'rotatingmachine',
|
||||
'machinegroup',
|
||||
'machinegroupcontrol',
|
||||
'pumpingstation',
|
||||
'valvegroupcontrol',
|
||||
]);
|
||||
const SOURCE_FLOW_EVENTS = [
|
||||
'flow.predicted.downstream',
|
||||
'flow.predicted.atEquipment',
|
||||
'flow.predicted.atequipment',
|
||||
'flow.measured.downstream',
|
||||
'flow.measured.atEquipment',
|
||||
'flow.measured.atequipment',
|
||||
];
|
||||
const DEFAULT_SOURCE_SERVICE_TYPE = Object.freeze({
|
||||
machine: 'liquid',
|
||||
rotatingmachine: 'liquid',
|
||||
machinegroup: 'liquid',
|
||||
machinegroupcontrol: 'liquid',
|
||||
pumpingstation: 'liquid',
|
||||
});
|
||||
|
||||
function normalizeServiceType(value) {
|
||||
const raw = String(value || '').trim().toLowerCase();
|
||||
return SERVICE_TYPES.has(raw) ? raw : null;
|
||||
}
|
||||
|
||||
function isSourceSoftwareType(softwareType) {
|
||||
return SOURCE_SOFTWARE_TYPES.has(String(softwareType || '').trim().toLowerCase());
|
||||
}
|
||||
|
||||
function isSourceLike(child, softwareType) {
|
||||
if (isSourceSoftwareType(softwareType)) return true;
|
||||
return typeof child?.getFluidContract === 'function';
|
||||
}
|
||||
|
||||
function extractFluidContract(child, softwareType, logger) {
|
||||
let contract = null;
|
||||
if (typeof child?.getFluidContract === 'function') {
|
||||
try { contract = child.getFluidContract(); }
|
||||
catch (error) { logger?.warn?.(`Failed to read child fluid contract: ${error.message}`); }
|
||||
}
|
||||
const status = String(contract?.status || '').trim().toLowerCase();
|
||||
if (status === 'conflict') return { status: 'conflict', serviceType: null };
|
||||
|
||||
const fromContract = normalizeServiceType(contract?.serviceType);
|
||||
if (fromContract) return { status: 'resolved', serviceType: fromContract };
|
||||
|
||||
const direct = normalizeServiceType(
|
||||
child?.serviceType || child?.expectedServiceType || child?.config?.asset?.serviceType
|
||||
);
|
||||
if (direct) return { status: 'resolved', serviceType: direct };
|
||||
|
||||
const fallback = DEFAULT_SOURCE_SERVICE_TYPE[String(softwareType || '').trim().toLowerCase()] || null;
|
||||
if (fallback) return { status: 'inferred', serviceType: fallback };
|
||||
|
||||
return { status: 'unknown', serviceType: null };
|
||||
}
|
||||
|
||||
function _diff(prev, next) {
|
||||
return (
|
||||
prev.status !== next.status
|
||||
|| prev.serviceType !== next.serviceType
|
||||
|| prev.sourceCount !== next.sourceCount
|
||||
|| (prev.message || '') !== (next.message || '')
|
||||
);
|
||||
}
|
||||
|
||||
function refreshFluidContract(vgc) {
|
||||
const contracts = Object.values(vgc.sources).map((s) => s?.fluidContract || null).filter(Boolean);
|
||||
const serviceTypes = Array.from(new Set(
|
||||
contracts.map((c) => normalizeServiceType(c.serviceType)).filter(Boolean)
|
||||
));
|
||||
const hasConflict = contracts.some((c) => String(c.status || '').toLowerCase() === 'conflict');
|
||||
const sourceCount = Object.keys(vgc.sources).length;
|
||||
let next;
|
||||
|
||||
if (hasConflict || serviceTypes.length > 1) {
|
||||
next = {
|
||||
status: 'conflict', serviceType: null, upstreamServiceTypes: serviceTypes, sourceCount,
|
||||
message: `Conflicting upstream fluids detected: ${serviceTypes.join(', ') || 'unknown'}.`,
|
||||
};
|
||||
} else if (serviceTypes.length === 1) {
|
||||
next = {
|
||||
status: 'resolved', serviceType: serviceTypes[0], upstreamServiceTypes: serviceTypes, sourceCount,
|
||||
message: `Upstream fluid resolved as ${serviceTypes[0]}.`,
|
||||
};
|
||||
} else {
|
||||
next = {
|
||||
status: 'unknown', serviceType: null, upstreamServiceTypes: [], sourceCount,
|
||||
message: 'No upstream fluid sources registered.',
|
||||
};
|
||||
}
|
||||
|
||||
const prev = vgc.fluidContract || {};
|
||||
vgc.fluidContract = next;
|
||||
if (_diff(prev, next)) vgc.emitter.emit('fluidContractChange', vgc.getFluidContract());
|
||||
}
|
||||
|
||||
function registerSource(vgc, child, positionVsParent, softwareType) {
|
||||
const id = child?.config?.general?.id || child?.config?.general?.name || `source-${Object.keys(vgc.sources).length + 1}`;
|
||||
if (vgc._sourceListeners.has(id)) unbindSource(vgc, id);
|
||||
|
||||
child.positionVsParent = positionVsParent;
|
||||
vgc.sources[id] = child;
|
||||
bindSource(vgc, id, child);
|
||||
vgc.sources[id].fluidContract = extractFluidContract(child, softwareType, vgc.logger);
|
||||
refreshFluidContract(vgc);
|
||||
|
||||
vgc.logger.info(`Source '${id}' (${softwareType || 'unknown'}) registered at ${positionVsParent}.`);
|
||||
return true;
|
||||
}
|
||||
|
||||
function bindSource(vgc, sourceId, source) {
|
||||
const listeners = { flow: [], onFluidContractChange: null };
|
||||
if (source?.measurements?.emitter?.on) {
|
||||
SOURCE_FLOW_EVENTS.forEach((eventName) => {
|
||||
const handler = (eventData = {}) => {
|
||||
const value = Number(eventData.value);
|
||||
if (!Number.isFinite(value)) return;
|
||||
const variant = String(eventName).split('.')[1] === 'measured' ? 'measured' : 'predicted';
|
||||
const unit = eventData.unit || vgc.unitPolicy.output('flow');
|
||||
vgc.updateFlow(variant, value, 'atEquipment', unit);
|
||||
};
|
||||
source.measurements.emitter.on(eventName, handler);
|
||||
listeners.flow.push({ emitter: source.measurements.emitter, eventName, handler });
|
||||
});
|
||||
}
|
||||
if (source?.emitter?.on) {
|
||||
listeners.onFluidContractChange = () => {
|
||||
if (!vgc.sources[sourceId]) return;
|
||||
vgc.sources[sourceId].fluidContract = extractFluidContract(source, source?.config?.functionality?.softwareType, vgc.logger);
|
||||
refreshFluidContract(vgc);
|
||||
};
|
||||
source.emitter.on('fluidContractChange', listeners.onFluidContractChange);
|
||||
}
|
||||
vgc._sourceListeners.set(sourceId, { source, listeners });
|
||||
}
|
||||
|
||||
function unbindSource(vgc, sourceId) {
|
||||
const entry = vgc._sourceListeners.get(sourceId);
|
||||
if (!entry) return;
|
||||
const { source, listeners } = entry;
|
||||
listeners.flow.forEach(({ emitter, eventName, handler }) => {
|
||||
if (typeof emitter?.off === 'function') emitter.off(eventName, handler);
|
||||
else if (typeof emitter?.removeListener === 'function') emitter.removeListener(eventName, handler);
|
||||
});
|
||||
if (listeners.onFluidContractChange) {
|
||||
if (typeof source?.emitter?.off === 'function') source.emitter.off('fluidContractChange', listeners.onFluidContractChange);
|
||||
else if (typeof source?.emitter?.removeListener === 'function') source.emitter.removeListener('fluidContractChange', listeners.onFluidContractChange);
|
||||
}
|
||||
vgc._sourceListeners.delete(sourceId);
|
||||
}
|
||||
|
||||
function getFluidContract(vgc) {
|
||||
const s = vgc.fluidContract || {};
|
||||
return {
|
||||
status: s.status || 'unknown',
|
||||
serviceType: s.serviceType || null,
|
||||
upstreamServiceTypes: Array.isArray(s.upstreamServiceTypes) ? [...s.upstreamServiceTypes] : [],
|
||||
sourceCount: Number(s.sourceCount) || 0,
|
||||
message: s.message || '',
|
||||
source: 'valvegroupcontrol',
|
||||
};
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
isSourceLike,
|
||||
isSourceSoftwareType,
|
||||
registerSource,
|
||||
unbindSource,
|
||||
refreshFluidContract,
|
||||
getFluidContract,
|
||||
SOURCE_SOFTWARE_TYPES,
|
||||
};
|
||||
Reference in New Issue
Block a user