'use strict'; // Upstream-source registration + fluid-contract reconciliation. // Sources are non-valve upstream children (rotatingMachine, MGC, PS, …) // that publish flow events and optionally a service-type contract. // VGC aggregates their contracts into one group-level view that valves // can read for compatibility checks. const SERVICE_TYPES = new Set(['gas', 'liquid']); const SOURCE_SOFTWARE_TYPES = new Set([ 'machine', 'rotatingmachine', 'machinegroup', 'machinegroupcontrol', 'pumpingstation', 'valvegroupcontrol', ]); const SOURCE_FLOW_EVENTS = [ 'flow.predicted.downstream', 'flow.predicted.atEquipment', 'flow.predicted.atequipment', 'flow.measured.downstream', 'flow.measured.atEquipment', 'flow.measured.atequipment', ]; const DEFAULT_SOURCE_SERVICE_TYPE = Object.freeze({ machine: 'liquid', rotatingmachine: 'liquid', machinegroup: 'liquid', machinegroupcontrol: 'liquid', pumpingstation: 'liquid', }); function normalizeServiceType(value) { const raw = String(value || '').trim().toLowerCase(); return SERVICE_TYPES.has(raw) ? raw : null; } function isSourceSoftwareType(softwareType) { return SOURCE_SOFTWARE_TYPES.has(String(softwareType || '').trim().toLowerCase()); } function isSourceLike(child, softwareType) { if (isSourceSoftwareType(softwareType)) return true; return typeof child?.getFluidContract === 'function'; } function extractFluidContract(child, softwareType, logger) { let contract = null; if (typeof child?.getFluidContract === 'function') { try { contract = child.getFluidContract(); } catch (error) { logger?.warn?.(`Failed to read child fluid contract: ${error.message}`); } } const status = String(contract?.status || '').trim().toLowerCase(); if (status === 'conflict') return { status: 'conflict', serviceType: null }; const fromContract = normalizeServiceType(contract?.serviceType); if (fromContract) return { status: 'resolved', serviceType: fromContract }; const direct = normalizeServiceType( child?.serviceType || child?.expectedServiceType || child?.config?.asset?.serviceType ); if (direct) return { status: 'resolved', serviceType: direct }; const fallback = DEFAULT_SOURCE_SERVICE_TYPE[String(softwareType || '').trim().toLowerCase()] || null; if (fallback) return { status: 'inferred', serviceType: fallback }; return { status: 'unknown', serviceType: null }; } function _diff(prev, next) { return ( prev.status !== next.status || prev.serviceType !== next.serviceType || prev.sourceCount !== next.sourceCount || (prev.message || '') !== (next.message || '') ); } function refreshFluidContract(vgc) { const contracts = Object.values(vgc.sources).map((s) => s?.fluidContract || null).filter(Boolean); const serviceTypes = Array.from(new Set( contracts.map((c) => normalizeServiceType(c.serviceType)).filter(Boolean) )); const hasConflict = contracts.some((c) => String(c.status || '').toLowerCase() === 'conflict'); const sourceCount = Object.keys(vgc.sources).length; let next; if (hasConflict || serviceTypes.length > 1) { next = { status: 'conflict', serviceType: null, upstreamServiceTypes: serviceTypes, sourceCount, message: `Conflicting upstream fluids detected: ${serviceTypes.join(', ') || 'unknown'}.`, }; } else if (serviceTypes.length === 1) { next = { status: 'resolved', serviceType: serviceTypes[0], upstreamServiceTypes: serviceTypes, sourceCount, message: `Upstream fluid resolved as ${serviceTypes[0]}.`, }; } else { next = { status: 'unknown', serviceType: null, upstreamServiceTypes: [], sourceCount, message: 'No upstream fluid sources registered.', }; } const prev = vgc.fluidContract || {}; vgc.fluidContract = next; if (_diff(prev, next)) vgc.emitter.emit('fluidContractChange', vgc.getFluidContract()); } function registerSource(vgc, child, positionVsParent, softwareType) { const id = child?.config?.general?.id || child?.config?.general?.name || `source-${Object.keys(vgc.sources).length + 1}`; if (vgc._sourceListeners.has(id)) unbindSource(vgc, id); child.positionVsParent = positionVsParent; vgc.sources[id] = child; bindSource(vgc, id, child); vgc.sources[id].fluidContract = extractFluidContract(child, softwareType, vgc.logger); refreshFluidContract(vgc); vgc.logger.info(`Source '${id}' (${softwareType || 'unknown'}) registered at ${positionVsParent}.`); return true; } function bindSource(vgc, sourceId, source) { const listeners = { flow: [], onFluidContractChange: null }; if (source?.measurements?.emitter?.on) { SOURCE_FLOW_EVENTS.forEach((eventName) => { const handler = (eventData = {}) => { const value = Number(eventData.value); if (!Number.isFinite(value)) return; const variant = String(eventName).split('.')[1] === 'measured' ? 'measured' : 'predicted'; const unit = eventData.unit || vgc.unitPolicy.output('flow'); vgc.updateFlow(variant, value, 'atEquipment', unit); }; source.measurements.emitter.on(eventName, handler); listeners.flow.push({ emitter: source.measurements.emitter, eventName, handler }); }); } if (source?.emitter?.on) { listeners.onFluidContractChange = () => { if (!vgc.sources[sourceId]) return; vgc.sources[sourceId].fluidContract = extractFluidContract(source, source?.config?.functionality?.softwareType, vgc.logger); refreshFluidContract(vgc); }; source.emitter.on('fluidContractChange', listeners.onFluidContractChange); } vgc._sourceListeners.set(sourceId, { source, listeners }); } function unbindSource(vgc, sourceId) { const entry = vgc._sourceListeners.get(sourceId); if (!entry) return; const { source, listeners } = entry; listeners.flow.forEach(({ emitter, eventName, handler }) => { if (typeof emitter?.off === 'function') emitter.off(eventName, handler); else if (typeof emitter?.removeListener === 'function') emitter.removeListener(eventName, handler); }); if (listeners.onFluidContractChange) { if (typeof source?.emitter?.off === 'function') source.emitter.off('fluidContractChange', listeners.onFluidContractChange); else if (typeof source?.emitter?.removeListener === 'function') source.emitter.removeListener('fluidContractChange', listeners.onFluidContractChange); } vgc._sourceListeners.delete(sourceId); } function getFluidContract(vgc) { const s = vgc.fluidContract || {}; return { status: s.status || 'unknown', serviceType: s.serviceType || null, upstreamServiceTypes: Array.isArray(s.upstreamServiceTypes) ? [...s.upstreamServiceTypes] : [], sourceCount: Number(s.sourceCount) || 0, message: s.message || '', source: 'valvegroupcontrol', }; } module.exports = { isSourceLike, isSourceSoftwareType, registerSource, unbindSource, refreshFluidContract, getFluidContract, SOURCE_SOFTWARE_TYPES, };