# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low CBF project
#
# Copyright (c) 2021 CSIRO
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
# pylint: disable=invalid-name,protected-access,too-few-public-methods
# can't do much about these:
# pylint: disable=attribute-defined-outside-init,pointless-string-statement
# pylint: disable=too-many-lines,too-many-ancestors,too-many-public-methods
""" SKA Low CBF
Subarray device for Low.CBF
"""
import json
import os
import re
import time
from collections.abc import Iterable
from dataclasses import dataclass, field
from enum import IntEnum
from functools import partial
from queue import Queue
from threading import Lock, Thread
from typing import Any, Union
import numpy as np
from ska_tango_base import SKABaseDevice, SKASubarray
from ska_tango_base.commands import ResultCode
from ska_tango_base.control_model import AdminMode, HealthState, ObsState
from tango import (
AttrQuality,
AttrWriteType,
Database,
DevFailed,
DeviceProxy,
DevUShort,
EventData,
EventSystemFailed,
EventType,
)
from tango.server import attribute, device_property, run
from ska_low_cbf import release
from ska_low_cbf.device_proxy import MccsDeviceProxy
from ska_low_cbf.subarray.component_manager import (
LowCbfSubarrayComponentManager,
SubarrayTangoIface,
)
__all__ = ["LowCbfSubarray", "main"]
# sometimes the default (3 sec) Tango timeout is insufficient
TANGO_TIMEOUT_MS = 10_000
UNUSED = 2 # a kind of 3-rd state boolean indicating subarray polynomial is not used
MAX_STATIONS = 1024
MAX_SPS_CHAN = 384
class Evt(IntEnum):
"Encode events received from other Tango devices."
ALVEOS_RELEASED = 0
ALVEOS_ASSIGNED = 1
DELAY_POLY_SUMMARY = 3
CALC_PROC_PERCENT = 4 # trigger processorsReadyPercent recalculation
CALC_ETHER_PERCENT = 5 # processorEthernetLockedPercent recalculation
PROCESS_HEALTH = 6
@dataclass
class EtherPortStat:
"""Class tracking Processor and Connector Ethernet port status."""
sw_name: str = None # comes from Allocator e.g. "p4_01"
sw_port: str = None # e.g. "17/0"
sw_proxy: DeviceProxy = None
sw_dev_name: str = None # e.g. "low-cbf/connector/0"
subscription_id: int = None
proc_ether_locked: bool = False
sw_ether_locked: bool = False
@property
def is_subscribed(self) -> bool:
"""
Check if we are subscribed to Connector port-up events.
:return: True if subscribed, False otherwise
"""
return self.subscription_id is not None
def unsubscribe_connector(self) -> None:
"""
Unsubscribe from Connector port updates.
:return: True if unsubscribed, False otherwise
"""
if all((self.subscription_id, self.sw_proxy)):
self.sw_proxy.unsubscribe_event(self.subscription_id)
return True
return False
@dataclass
class ConnectorPortsUp:
"""
Maintain port-up status for multiple connector devices.
Each switch (connector) has its own entry in the `ports_up` dictionary.
"""
ports_up: dict[str, list[int]] = field(default_factory=dict)
"""Port-up status for each participating switch.
key: Tango device name (e.g. 'low-cbf/connector/0')
value: list of integers; 1 == port is up, otherwise 0
"""
def is_up(self, dev_name: str, port: Union[int, str]) -> bool:
"""
Check if a port is up.
:param dev_name: connector Tango device (e.g. 'low-cbf/connector/0')
:param port: Port number (int) or string (e.g. '17/0')
:return: True if the port is up, False otherwise
"""
if isinstance(port, str):
if (index := port.find("/")) != -1:
port = port[:index]
port = int(port)
# arrray is 0 based, ports are 1 based
port -= 1
if dev_name not in self.ports_up:
return False
all_ports = self.ports_up[dev_name]
return bool(all_ports[port]) if len(all_ports) > port else False
TANGO_SUB_EXC = (TypeError, EventSystemFailed)
"""Exceptions proxy.subscribe_event can throw"""
class LowCbfSubarray(SKASubarray):
"""
Subarray device for Low.CBF
**Properties:**
- Device Property
ControllerAddress
- Tango address of Low.CBF Controller
- Type:'DevString'
AllocatorAddress
- Tango address of allocator device that will handle assignment of
FPGA resources
- Type:'DevString'
"""
def __init__(self, *args, **kwargs):
self._sps_stats_nflags_cache = {}
self._sps_stats_npkts_cache = {}
self._sps_stats_rms_cache = {}
self._sps_stats_vchans_cache = {}
self._sps_stats_update_time = {}
self._sps_stats_push_ctr = 0
self._sps_stats_push_per_archive = 10 # default 1 in 10 archived
self._sps_stats_flags_percent = np.asarray([], dtype=np.int8)
self._sps_stats_rms_xpol = np.asarray([], dtype=np.uint8)
self._sps_stats_rms_ypol = np.asarray([], dtype=np.uint8)
self._sps_stn_sstn = json.dumps([])
self._sps_bm_frq = json.dumps([])
self._proc_stats_mode = {}
"""Dict; key: processor fqdn; value: stats_mode"""
self._sdp_host_ready_percent = (0, AttrQuality.ATTR_INVALID)
self._pss_host_ready_percent = (0, AttrQuality.ATTR_INVALID)
self._pst_host_ready_percent = (0, AttrQuality.ATTR_INVALID)
super().__init__(*args, **kwargs)
_health_states = (HealthState.FAILED, HealthState.DEGRADED, HealthState.OK)
"""HealthStates order by severity"""
# Device Properties
ControllerAddress = device_property(
dtype="DevString", default_value="low-cbf/control/0"
)
AllocatorAddress = device_property(
dtype="DevString", default_value="low-cbf/allocator/0"
)
ConnectorDbHost = device_property(
dtype="str", default_value="tango-databaseds"
)
ConnectorDbPort = device_property(dtype="int", default_value=10000)
@attribute(
label="Stations",
doc="Report station & substation membership in subarray",
)
def stations(self) -> str:
"""
Report ``station`` & ``substation`` membership in subarray
"""
return str(self.component_manager.subarray.stations)
@attribute(dtype=DevUShort)
def delaysValid(self) -> DevUShort:
"""Get per-subarray delay polynomial validity value.
Legal values are:
* 0 - INVALID
* 1 - VALID
* 2 - UNUSED
:return: delay validity status as integer
:rtype: DevUShort
"""
return self._delays_valid
def _update_delays_valid(self, fqdn: str, new_delays: list[int]) -> None:
"Update this subarray specific delay polynomial validity status"
assert len(new_delays) >= self._subarray_id
self.logger.info(f"{new_delays}")
sub_id = self._subarray_id - 1 # subarrays are 1-based
# update this processor's contribution
self._delay_contributors[fqdn] = new_delays[sub_id]
for device, value in self._delay_contributors.items():
self.logger.info(f"{device}: {value}")
new_value = min(self._delay_contributors.values())
# pylint: disable=access-member-before-definition
if self._delays_valid != new_value:
self._delays_valid = new_value
self.push_change_event("delaysValid", new_value)
self.push_archive_event("delaysValid", new_value)
@attribute(
label="Station Beams",
doc="Report configuration of all Station Beams",
)
def stationBeams(self) -> str:
"""Return the ``stationBeams`` attribute."""
return str(self.component_manager.subarray.station_beams)
@attribute(
dtype="DevString",
label="Pulsar Search Beams",
doc=(
"Each Pulsar Search Beam is associated with one Station Beam, and "
"has additional configuration parameters including a delay "
"polynomial source (supplied via Configure)"
),
)
def pssBeams(self) -> str:
"""Return the ``pssBeams`` attribute."""
return str(self.component_manager.subarray.pss_beams)
@attribute(
label="Pulsar Timing Beams",
doc=(
"Each Pulsar Timing Beam is associated with one Station Beam, and "
"has additional configuration parameters including a delay "
"polynomial source (supplied via Configure)"
),
)
def pstBeams(self) -> str:
"""Return the ``pstBeams`` attribute."""
return str(self.component_manager.subarray.pst_beams)
@attribute(
label="Zoom window IDs",
doc="Report Zooms by window ID",
)
def zooms(self) -> str:
"""Return the ``zooms`` attribute."""
return str(self.component_manager.subarray.zooms)
# We override SKA-Tango-BASE adminMode attribute to prevent it being
# set to OFFLINE while subarray is resourced. This change means that
# a subarray can have resources only in adminMode ONLINE or ENGINEERING
@attribute(
# dtype=AdminMode, ... FIXME Sphinx doesn't like this
memorized=True,
hw_memorized=True,
access=AttrWriteType.READ_WRITE,
)
def adminMode(self: SKABaseDevice) -> AdminMode:
"""
Read the Admin Mode of the device.
It may interpret the current device condition and condition of all
managed devices to set this. Most possibly an aggregate attribute.
:return: Admin Mode of the device
"""
return self._admin_mode
[docs] def write_adminMode(self: SKABaseDevice, value: AdminMode) -> None:
"""
Set the Admin Mode of the device. Overide of ska-tango-base
to prevent being set offline while resourced
:param value: Admin Mode of the device.
:raises ValueError: for unknown adminMode
"""
# Block adminMode change if subarray active but obsState is not EMPTY
# or IDLE. IDLE also allowed - subarrays have empty assignresources.
# Allowing IDLE allows PTC1 to pass. PTC1 changes AdminMode in IDLE
blocked = self._admin_mode in (
AdminMode.ONLINE,
AdminMode.ENGINEERING,
) and not self._is_obs_state((ObsState.IDLE, ObsState.EMPTY))
assert not blocked, "Can't change adminMode when obsState not EMPTY"
value = AdminMode(value)
self._adjust_health(value)
if value == AdminMode.NOT_FITTED:
self.admin_mode_model.perform_action("to_notfitted")
elif value == AdminMode.OFFLINE:
# should health monitoring stop here?
self.admin_mode_model.perform_action("to_offline")
self.component_manager.stop_communicating()
elif value == AdminMode.ENGINEERING:
self.admin_mode_model.perform_action("to_engineering")
self.component_manager.start_communicating()
# propagate healthState in case it was updated while OFFLINE
self._recalculate_health()
elif value == AdminMode.ONLINE:
self.admin_mode_model.perform_action("to_online")
self.component_manager.start_communicating()
self._recalculate_health()
elif value == AdminMode.RESERVED:
self.admin_mode_model.perform_action("to_reserved")
else:
raise ValueError(f"Unknown adminMode {value}")
def _adjust_health(self, admin_mode: AdminMode) -> None:
"""Set ``healthState`` to UNKNOWN when switching to e.g. OFFLINE MODE
Otherwise start health monitoring when in ONLINE or ENGINEERING
:param admin_mode: the ``adminMode`` we are about to transition to
"""
monitoring_now = self.is_monitoring_mode()
monitoring_next = self.is_monitoring_mode(admin_mode)
# if currently not in a monitoring mode but will be after this change
# ensure the monitoring thread is running
if not monitoring_now and monitoring_next:
msg = f"Entering {admin_mode.name} mode: will track healthState"
self.logger.info(msg)
self._start_health_monitoring()
# if currently in a monitoring mode but won't be after this change
# change the healthState to UNKNOWN
elif monitoring_now and not monitoring_next:
msg = f"Entering {admin_mode.name} mode: won't track healthState"
self.logger.info(msg)
self._update_health_state(HealthState.UNKNOWN)
def _start_health_monitoring(self):
"""Start halth monitoring thread if it's not already running but
not in case NO_HEALTH_ROLLUP enivonment variable is set.
"""
if self._no_health_rollup or self._subscribed_to_alloc:
return
self._thrd = Thread(target=self._subscribe_allocator)
self._thrd.start()
self._subscribe_to_connector()
# TBD: Taranta didn't like the name 'assignedProcessors' (:shrug:)
@attribute(dtype="str", doc="List of processors assigned to subarray")
def assigned_processors(self: SKABaseDevice) -> str:
"""
Get a list of processors assigned to the subarray.
:return: JSON string, a list of processor serial numbers
"""
return json.dumps(self._assigned_proc_details)
@attribute(
dtype=int,
doc="Percentage of PSS hosts resolved.",
min_value="0",
max_value="100",
unit="%",
)
def pssHostsResolvedPercent(self):
"""Return the percentage of PSS hosts resolved."""
ready = self._commanded_obs_state in (
ObsState.READY,
ObsState.SCANNING,
)
value_from_alloc, qual = self._pss_host_ready_percent
value = value_from_alloc if ready else 0
quality = qual if ready else AttrQuality.ATTR_INVALID
return value, time.time(), quality
def _get_percent_pss_resolved(self) -> (int, AttrQuality):
"""Return the percentage of PSS hosts resolved."""
subarray_id = self._subarray_id
configuration = self._configured_subarrays
allocator_proxy = MccsDeviceProxy(
self.AllocatorAddress, self.logger, connect=False
)
allocator_proxy.connect(max_time=120)
qual = AttrQuality.ATTR_INVALID
percentage = 0.0
if f"{subarray_id}" in configuration:
if "search_beams" in configuration[f"{subarray_id}"]:
percentage = allocator_proxy.GetPSSHostResolved(subarray_id)
qual = AttrQuality.ATTR_VALID
return int(percentage), qual
@attribute(
dtype=int,
doc="Percentage of PST hosts resolved.",
min_value="0",
max_value="100",
unit="%",
)
def pstHostsResolvedPercent(self):
"""Return the percentage of PST hosts resolved."""
ready = self._commanded_obs_state in (
ObsState.READY,
ObsState.SCANNING,
)
value_from_alloc, qual = self._pst_host_ready_percent
value = value_from_alloc if ready else 0
quality = qual if ready else AttrQuality.ATTR_INVALID
return value, time.time(), quality
def _get_percent_pst_resolved(self) -> (int, AttrQuality):
"""Return percentage of PST hosts resolved."""
subarray_id = self._subarray_id
configuration = self._configured_subarrays
allocator_proxy = MccsDeviceProxy(
self.AllocatorAddress, self.logger, connect=False
)
allocator_proxy.connect(max_time=120)
qual = AttrQuality.ATTR_INVALID
percentage = 0.0
if f"{subarray_id}" in configuration:
if "timing_beams" in configuration[f"{subarray_id}"]:
percentage = allocator_proxy.GetPSTHostResolved(subarray_id)
qual = AttrQuality.ATTR_VALID
return int(percentage), qual
@attribute(
dtype=int,
doc="Percentage of SDP hosts resolved.",
min_value="0",
max_value="100",
unit="%",
)
def sdpHostsResolvedPercent(self):
"""Return percentage of SDP hosts resolved."""
ready = self._commanded_obs_state in (
ObsState.READY,
ObsState.SCANNING,
)
value_from_alloc, qual = self._sdp_host_ready_percent
value = value_from_alloc if ready else 0
quality = qual if ready else AttrQuality.ATTR_INVALID
return value, time.time(), quality
def _get_percent_sdp_resolved(self) -> (int, AttrQuality):
"""Return the percentage of SDP hosts resolved."""
subarray_id = self._subarray_id
configuration = self._configured_subarrays
allocator_proxy = MccsDeviceProxy(
self.AllocatorAddress, self.logger, connect=False
)
allocator_proxy.connect(max_time=120)
qual = AttrQuality.ATTR_INVALID
percentage = 0.0
if f"{subarray_id}" in configuration:
if (
"vis" in configuration[f"{subarray_id}"]
or "coarse_zooms" in configuration[f"{subarray_id}"]
):
percentage = allocator_proxy.GetSDPHostResolved(subarray_id)
qual = AttrQuality.ATTR_VALID
return int(percentage), qual
@attribute(
dtype="int",
doc=("Percentage of processors ready for scan."),
min_value="0",
max_value="100",
unit="%",
)
def processorsReadyPercent(
self: SKABaseDevice,
) -> tuple[int, float, AttrQuality]:
"""
Get a percentage of processors ready for subarray scan.
The value is meaningful only when ``obsState`` is READY or SCANNING so
we also return Tango attribute quality.
:return: tuple
* percentage: rounded integer in the range [0, 100], None when ATTR_INVALID
* time: float
* quality: AttrQuality
"""
# adjust Tango quality attribute depending on obsState
ready = self._is_obs_state(ObsState.READY)
scanning = self._is_obs_state(ObsState.SCANNING)
value = self._proc_ready_percent if ready or scanning else 0
quality = (
AttrQuality.ATTR_VALID
if ready or scanning
else AttrQuality.ATTR_INVALID
)
return value, time.time(), quality
def _is_obs_state(
self, state: Union[ObsState, Iterable[ObsState]]
) -> bool:
"""Determine if subarray ``obsState`` value is in the specified state.
:param state: can be either a single state or a list of multiple
states e.g. (READY, SCANNING)
:return: True if ``obsState`` is in specified state, False otherwise
"""
if isinstance(state, Iterable):
return self._obs_state in state
return self._obs_state == state
@attribute(
dtype="int",
doc="Percentage of PST Jones applied (Not yet ready).",
min_value="0",
max_value="100",
unit="%",
)
def pstJonesAppliedPercent(
self: SKABaseDevice,
) -> tuple[int, float, AttrQuality]:
"""
Get a percentage of PST Jones applied.
The value is meaningful only when ``obsState`` is READY or SCANNING so we also
return Tango attribute quality.
:return: tuple
* percentage: rounded integer in the range [0, 100], None when ATTR_INVALID
* time: float
* quality: AttrQuality
"""
# adjust Tango quality attribute depending on obsState
ready = self._is_obs_state(ObsState.READY)
scanning = self._is_obs_state(ObsState.SCANNING)
value = self._pst_jones_applied_percent if ready or scanning else 0
quality = (
AttrQuality.ATTR_VALID
if ready or scanning
else AttrQuality.ATTR_INVALID
)
return value, time.time(), quality
@attribute(
dtype="int",
doc="Percentage of PSS Jones applied (Not yet ready).",
min_value="0",
max_value="100",
unit="%",
)
def pssJonesAppliedPercent(
self: SKABaseDevice,
) -> tuple[int, float, AttrQuality]:
"""
Get a percentage of PSS Jones applied.
The value is meaningful only when ``obsState`` is READY or SCANNING so we also
return Tango attribute quality.
:return: tuple
* percentage: rounded integer in the range [0, 100], None when ATTR_INVALID
* time: float
* quality: AttrQuality
"""
# adjust Tango quality attribute depending on obsState
ready = self._is_obs_state(ObsState.READY)
scanning = self._is_obs_state(ObsState.SCANNING)
value = self._pss_jones_applied_percent if ready or scanning else 0
quality = (
AttrQuality.ATTR_VALID
if ready or scanning
else AttrQuality.ATTR_INVALID
)
return value, time.time(), quality
@attribute(
dtype="int",
doc="Age of applied PST Jones matrices (Not yet ready).",
min_value="0",
unit="seconds",
)
def pstJonesAge(
self: SKABaseDevice,
) -> tuple[int, float, AttrQuality]:
"""
Get the age of latest PST Jones applied.
The value is meaningful only when ``obsState`` is READY or SCANNING so we also
return Tango attribute quality.
:return: tuple
* age: age of the PST Jones Matrix
* time: float
* quality: AttrQuality
"""
# adjust Tango quality attribute depending on obsState
ready = self._is_obs_state(ObsState.READY)
scanning = self._is_obs_state(ObsState.SCANNING)
value = self._pst_jones_age if ready or scanning else 0
quality = (
AttrQuality.ATTR_VALID
if ready or scanning
else AttrQuality.ATTR_INVALID
)
return value, time.time(), quality
@attribute(
dtype="int",
doc="Age of applied PSS Jones matrices (Not yet ready)",
min_value="0",
unit="seconds",
)
def pssJonesAge(
self: SKABaseDevice,
) -> tuple[int, float, AttrQuality]:
"""
Get the age of latest PSS Jones applied.
The value is meaningful only when ``obsState`` is READY or SCANNING so we also
return Tango attribute quality.
:return: tuple
* age: age of the PST Jones Matrix
* time: float
* quality: AttrQuality
"""
# adjust Tango quality attribute depending on obsState
ready = self._is_obs_state(ObsState.READY)
scanning = self._is_obs_state(ObsState.SCANNING)
value = self._pss_jones_age if ready or scanning else 0
quality = (
AttrQuality.ATTR_VALID
if ready or scanning
else AttrQuality.ATTR_INVALID
)
return value, time.time(), quality
# override of CspSubElementObsDevice _component_state_changed
def _component_state_changed_low(
self,
fault=None,
power=None,
resourced=None,
configured=None,
scanning=None,
obsfault=None, # new here
): # pylint: disable=too-many-arguments
"""Enhanced obsState callback to allow obsState=FAULT"""
# Most state is handled by calling ska-tango-base BaseComponentManager
self._component_state_changed(
fault=fault,
power=power,
resourced=resourced,
configured=configured,
scanning=scanning,
)
# But our new state variable is handled here
if obsfault is not None:
if obsfault:
self.obs_state_model.perform_action("component_obsfault")
else:
pass # (no state change needed when obsfault is cleared)
# General methods
def create_component_manager(self):
"""Create Subarray component manager"""
subarray_name = self.get_name()
subarray_name_parts = subarray_name.split("/")
subarray_id = int(subarray_name_parts[2])
allocator = MccsDeviceProxy(
self.AllocatorAddress, self.logger, connect=True
)
tango_iface = SubarrayTangoIface(
update_attr_sps_stats=self.sps_stats_update,
get_admin_mode=self.get_admin_mode,
)
return LowCbfSubarrayComponentManager(
logger=self.logger,
subarray_id=subarray_id,
communication_state_callback=self._communication_state_changed,
component_state_callback=self._component_state_changed_low,
tango_iface=tango_iface,
allocator=allocator,
)
def get_admin_mode(self) -> int:
"""
callback providing access to subarray "adminMode"
:return: Which adminMode subarray is in
"""
return self._admin_mode
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command.
"""
# Commands
def init_command_objects(self):
"""
Initialises the command handlers for commands supported by this device.
"""
# pylint: disable=useless-super-delegation
super().init_command_objects()
class InitCommand(SKASubarray.InitCommand):
"""Init Command object"""
def do(self):
"""
Initialises the attributes and properties of the LowCbfSubarray.
"""
super().do()
self._device._version_id = release.version
self._device._build_state = (
f"{release.name}, {release.version}, {release.description}"
)
# Events to manually be pushed
self._device.set_change_event("healthState", True, False)
self._device._proc_sn_to_fqdn = {}
"""All known Processors { serial number: Processor FQDN }"""
self._device._assigned_proc_details = []
"""Serial numbers and routing details of Processors assigned to
this subarray e.g.
{ "XFL10NIYKVEU"": {"name": "p4_01", "port": "17/0"}, ...}
"""
self._device._subscribed_sn = []
"""Serial numbers of Processors we are subscribed to"""
self._device._proc_subscriptions = {}
"""Record of processor subscription IDs (for later unsubscribe)"""
self._device._subscribed_fqdn = []
"""FQDNs of Processors we are subscribed to"""
self._device._eth_port_stat = {}
"""A dictionary keeping track about Ethernet port status.
key: processor fqdn; value: EtherPortStat dataclass
"""
self._device._conn_port = ConnectorPortsUp()
"""Keeps track which Connector ports are up.
An array of integers for each port; value: 1 == port is up
"""
self._device._ether_locked_percent = 0
"""Percentage of Ethernet ports ready for scan."""
self._device._proc_thread = None
"""Thread used to subscribe to processors healthState as they get
assigned to this subarray"""
# TBD expose this as an attribute:
self._device._component_health = {}
"""A dictionary of health for constituent components; e.g.
{
"low-cbf/processor/0" : OK,
"low-cbf/processor/1" : DEGRADED,
"low-cbf/connector/0" : OK
}
"""
self._device._callback_lock = Lock()
"""Serialise callbacks from different devices"""
self._device._delays_valid = UNUSED
"""Overall indicator if all delays in subarray are valid"""
self._device._delay_contributors = {}
"""FQDN: delay_valid map - how much each processor contributes to
overall subarray's 'delaysValid' state"""
self._device._proc_ready_percent = 0
"""Percentage of processors ready for scan"""
self._device._pst_jones_applied_percent = 0
"""Percentage of PST jones matrices applied"""
self._device._pss_jones_applied_percent = 0
"""Percentage of PSS jones matrices applied"""
self._device._pst_jones_age = 0
"""Age of latest PST jones matrices"""
self._device._pss_jones_age = 0
"""Age of latest PSS jones matrices"""
self._device._pss_jones_information = {}
"""Information for the latest PSS jones matrices"""
self._device._pst_jones_information = {}
"""Information for the latest PST jones matrices"""
self._device._configured_ok = False
"""Subarray configuration went ok."""
self._device._configured_subarrays = {}
"""Copy of internal_subarray from the Allocator."""
# A few more attributes that could be pushed on change however
# at present they come from the component_manager so needs more work
# eg: stations, stationbeams, pssbeams, pstbeams, zooms
for attribute_name in (
"state",
"status",
"adminMode",
"healthState",
"controlMode",
"simulationMode",
"testMode",
"obsState",
"obsMode",
"configurationProgress",
"configurationDelayExpected",
"delaysValid",
"assigned_processors",
"processorsReadyPercent",
"pssHostsResolvedPercent",
"pstHostsResolvedPercent",
"sdpHostsResolvedPercent",
"pstJonesAppliedPercent",
"pssJonesAppliedPercent",
"pssJonesAge",
"pstJonesAge",
"processorEthernetLockedPercent",
):
self._device.set_change_event(attribute_name, True, False)
self._device.set_archive_event(attribute_name, True, False)
my_name = self._device.get_name()
# is there a better way to find my_id ?
self._device._subarray_id = int(my_name.split("/")[-1])
self._device.logger.info(
f"{my_name} ID {self._device._subarray_id}"
)
self._device._subscribed_to_alloc = False
self._device._admin_modes_using_health = [AdminMode.ONLINE]
value = os.getenv(
"ENGINEERING_MODE_IGNORE_HEALTH", default="false"
)
if value.lower() != "true":
self._device._admin_modes_using_health.append(
AdminMode.ENGINEERING
)
self._device._event_q = Queue()
eventloop_fun = self._device._tango_event_completion
self._device._event_thread = Thread(target=eventloop_fun)
self._device._event_thread.start()
# Use env. variable to determine if health rollup is used
# e.i. monitor external devices (proc, connector) health state
env_var = "NO_HEALTH_ROLLUP"
self._device._no_health_rollup = os.getenv(env_var) is not None
message = "LowCbfSubarray init complete"
self._device.logger.info(message)
self._completed()
return ResultCode.OK, message
# this will break in v11 of ska-tango-base
# (and we will get a whole new way of handling resources...)
# device.resource_manager._key = "lowcbf"
#
# try:
# allocator = DeviceProxy(device.AllocatorAddress)
# device.subarray = Subarray(allocator, device._connect)
# # dirty hack to make resource manager have a link to subarray
# # object (for use in assign command)
# device.resource_manager.subarray = device.subarray
#
# message = (
# f"LowCbfSubarray init complete"
# f", using allocator {device.AllocatorAddress}"
# )
# return ResultCode.OK, message
# except Exception as e:
# message = "Exception connecting to Allocator"
# device.logger.error(f"{message}\n{e}")
# return ResultCode.FAILED, message
# Inherited partially-implemented commands from ska-tango-base.subarray
# ==== Command ==== ==CommandObject== == method ==
# AssignResources "AssignResources" "assign"
# ReleaseResources "ReleaseResources" "release"
# Configure - replaced in ska-tango-base.csp.subarray -
# ReleaseAllResources "ReleaseAllResources" "release"
# Scan "Scan" "scan"
# EndScan "EndScan" "end_scan"
# End - replaced in ska-tango-base.csp.subarray -
# Abort "Abort" abort (not submitted)
# ObsReset "ObsReset" "obsreset"
# Restart "Restart" "restart"
# Inherited partially-implemented command from ska-tango-base:csp.subarray
# ConfigureScan "ConfigureScan"
# Configure - calls ConfigureScan
# GoToIdle "GoToIdle"
# End - calls GoToIdle
def _subscribe_to_connector(self):
"""Subscribe to healthState attribute changes on all connectors"""
if (db := self._get_conn_db()) is None:
return
prefix = f"{self.ConnectorDbHost}:{self.ConnectorDbPort}/"
dev_names = db.get_device_exported_for_class("LowCbfConnector")
for fqdn in dev_names:
full_path = prefix + fqdn
try:
if (proxy := DeviceProxy(full_path)) is None:
self.logger.warning("%s proxy FAILED", full_path)
continue
except DevFailed as e:
self.logger.warning("SUBSCR. EXCEPTION %s %s", full_path, e)
continue
self.logger.info(f"SUBSCRIBING to {full_path}")
proxy.subscribe_event(
"healthState",
EventType.CHANGE_EVENT,
# keep format compatible with processor subscription:
partial(self._health_callback, fqdn),
)
def _flush_alveos(self):
"Purge Alveo details on ReleaseResources event"
# pylint: disable=access-member-before-definition
self.logger.info("flush all Alveos")
for sn in self._subscribed_sn:
if sn not in self._proc_sn_to_fqdn:
continue
fqdn = self._proc_sn_to_fqdn[sn]
# this processor's delays are no longer used but we won't get
# its events as we are unsubscribing from them
delays_summary = [UNUSED] * self._subarray_id
self._update_delays_valid(fqdn, delays_summary)
if fqdn in self._component_health:
self._component_health.pop(fqdn)
# unsubscribe from Connector diagnostics_port_up updates
for item in self._eth_port_stat.values():
if item.unsubscribe_connector():
self.logger.info("Unsubscribed port %s", item.sw_port)
self._assigned_proc_details = {}
self.push_change_event("assigned_processors", "[]")
self.push_archive_event("assigned_processors", "[]")
self._subscribed_sn = []
self._subscribed_fqdn = []
self._eth_port_stat = {}
"""Keep track of processor/connector Ethernet port status"""
self._proc_stats_mode = {}
# Unsubscribe from all processor tango attributes
serials = list(self._proc_subscriptions.keys())
for serial in serials:
dev_proxy, sub_ids = self._proc_subscriptions.pop(serial)
for sub_id in sub_ids:
dev_proxy.unsubscribe_event(sub_id)
del dev_proxy
txt = f"Monitor Processor health: {str(self._subscribed_sn)}"
self.logger.info(txt)
def _enqueue_event(
self, event: Evt, data: Union[None, Any] = None
) -> None:
"""Add event (and optionally associated data) to a queue to be handled
by a separate thread
"""
# obsState change could call this before things are initialised
if hasattr(self, "_event_q"):
self._event_q.put((event, data))
def is_monitoring_mode(self, mode: Union[AdminMode, None] = None) -> bool:
"""Determine if we are in monitoring mode
In monitoring mode we keep track of healthState and report its
changes.
ONLINE and ENGINEERING admin modes are considered monitoring while
NOT_FITTED, OFFLINE and RESERVED are not.
See RtD https://is.gd/nCuv1q
:param mode: adminMode we are about to transition to; when not
supplied use the current ``self._admin_mode`` value instead
:return: True when in monitoring mode; False otherwise
"""
value = self._admin_mode if mode is None else mode
return value in self._admin_modes_using_health
# ----------
# Callbacks
# ----------
def _fqdn_callback(self, name: str, jsonstr: str, quality) -> None:
"""Gets called when processor fqdn list gets updated in Allocator"""
self.logger.info(f"from ALLOCATOR {name} {jsonstr}")
procs = json.loads(jsonstr)
self._proc_sn_to_fqdn = procs
def _internal_alveo_callback(self, name: str, jsonstr: str, quality):
"""Allocator calls this with information of mapping between Alveo
serial number and a subarray it is allocated to"""
self._delay_contributors = {}
log = self.logger
log.info(f"ALLOCATOR CFG {name}: {jsonstr}")
cfg_dict = json.loads(jsonstr)
if not cfg_dict:
self._enqueue_event(Evt.ALVEOS_RELEASED)
return
# handle ONLY configuration relevant to THIS subarray
# probably the bit: "sa_id": 1 in cfg
assigned_procs = {}
for k, v in cfg_dict.items():
# check expected dict keys
for mandatory_key in ("regs", "switch"):
if mandatory_key not in v:
log.warning("MISSING %s key in %s", mandatory_key, k)
return
# NOTE PST and CORR have different key names
sub_key_names = ("sa_id", "subarray_id")
for subarray in v["regs"]:
if not subarray:
# skip empty register dicts representing unused subarray capacity
continue
if all(_ not in subarray for _ in sub_key_names):
log.warning("MISSING subarray ID")
continue
# get the subarray number using one of possible dict keys
for sub_key in sub_key_names:
if sub_key in subarray:
subarray_id = subarray[sub_key]
break
if self._subarray_id == subarray_id:
# bundle Alveo S/N and associated switch port together
evt_data = {k: v["switch"]}
assigned_procs.update(evt_data)
# 15-Jan-2024: formats are messy:
# -------------------------------
# for CORR:
# {"XFL1TJCHM3ON": {
# "fw": "vis:0.0.2-main.e4b5ad79",
# "regs": [
# {
# "sa_id": 4,
# "stns": [[345, 1], [350, 1], [352, 1], [355, 1], [431, 1], [434, 1]],
# "sa_bm": [[1, 140, 0, 27648, 192, 24, 0]]
# }
# ]}}
# for PST:
# {"XFL1XCRTUC22 ": {
# "fw": "pst:0.0.20-dev.7c13dd33",
# "regs": [
# {
# "subarray_id": 4,
# "stn_bm_id": 1,
# "stns": [[2, 1], [3, 1], [345, 1], [350, 1], [352, 1], [355, 1], [431, 1], [434, 1 ]], "freq_ids": [140, 141, 142, 143, 144, 145, 146, 147], "pst_bm_ids": [15]}, {}, {}]}}
# publish change in assigned processors (if any)
if assigned_procs != self._assigned_proc_details:
self._enqueue_event(Evt.ALVEOS_ASSIGNED, assigned_procs)
def _internal_subarray_callback(self, fqdn: str, jsonstr: str, quality):
"""Allocator event with subarray configuration details.
:param jsonstr: dict with
* key = ``subarray_id`` number
* value = subarray definition (not used here)
Warning: This is a Tango event callback. Don't do any work here
"""
configured_subarrays = tuple(json.loads(jsonstr).keys())
# NOTE: in transport subarray ID was converted to string
config_ok = str(self._subarray_id) in configured_subarrays
self.logger.info("subarray configured ok: %s", config_ok)
self._configured_subarrays = configured_subarrays
# update dependencies if changed
# pylint: disable=access-member-before-definition
if self._configured_ok != config_ok:
self._configured_ok = config_ok
self._enqueue_event(Evt.CALC_PROC_PERCENT)
self._enqueue_event(Evt.CALC_ETHER_PERCENT)
def _health_callback(self, fqdn: str, evt: EventData) -> None:
"""Called by LowCbfProcessor or LowCbfConnector when their health state
changes.
Enqueue event for processing in a separate thread.
"""
self._enqueue_event(Evt.PROCESS_HEALTH, (fqdn, evt))
def _process_health(self, fqdn: str, evt: EventData) -> None:
"""Process LowCbfProcessor or LowCbfConnector health state change.
Will trigger processorsReadyPercent recalculation.
:param fqdn: device whose healthState changed
:param evt: health event details
"""
if self.is_evt_error(evt, "_process_health"):
return
name = evt.attr_value.name
value = evt.attr_value.value
is_processor = fqdn.find("/processor/") != -1
self.logger.info(f"HEALTH {fqdn} {name}: {value}")
self._component_health[fqdn] = value
# _recalculate_health ensures healthState is updated/propagated
# only when subarray is ONLINE (or ENGINEERING - depending on
# environment variable value)
self._recalculate_health()
if is_processor:
self._calc_proc_scan_quality()
def _recalculate_health(self):
"""Recalculate overall health state (across all devices) and
propagate change upwards (Controller) if needed"""
if not self.is_monitoring_mode():
return
current_health = HealthState.UNKNOWN
for hs in self._health_states:
if hs in self._component_health.values():
current_health = hs
break
if current_health != self._health_state:
self._update_health_state(current_health)
def _conn_port_up_cb(self, tango_evt: EventData) -> None:
"""
Handle a change event for a connector's port-up status.
Updates the internal record of port status for participating
connectors and triggers a recalculation of the overall Ethernet
locked percentage.
:param tango_evt: The Tango event data
Warning: This is a Tango event callback. Don't do any work here
"""
if self.is_evt_error(tango_evt, "_conn_port_up_cb"):
return
tango_dev_name = tango_evt.device.name()
value = tango_evt.attr_value.value
self.logger.info("%s ETHERNET %s", tango_dev_name, value)
self._conn_port.ports_up[tango_dev_name] = tango_evt.attr_value.value
self._enqueue_event(Evt.CALC_ETHER_PERCENT)
def _calc_proc_scan_quality(self) -> None:
"""Reevaluate percentage of processors ready for scan.
Do nothing if ``obsState`` is other than READY or SCANNING
Side effects:
* update self._proc_ready_percent, see processorsReadyPercent attribute
* generate event for subscribers
"""
def notify_not_ready(valid: bool) -> None:
"""Notify subscribers processors are not ready.
Even though processors are not ready (for whatever reason)
subscribers need to know about the new state.
:param valid: True if attr. quality is VALID, False otherwise
"""
if self._proc_ready_percent == 0:
return
self._proc_ready_percent = 0
q = AttrQuality.ATTR_VALID if valid else AttrQuality.ATTR_INVALID
evt_value = (0, time.time(), q)
self.push_change_event("processorsReadyPercent", *evt_value)
self.push_archive_event("processorsReadyPercent", *evt_value)
self.logger.info("Chk proc percent in %s", self._obs_state)
if not self._configured_ok:
self.logger.info("subarray not configured")
notify_not_ready(valid=False)
return
if not self._is_obs_state(
(ObsState.READY, ObsState.SCANNING, ObsState.CONFIGURING)
):
notify_not_ready(valid=False)
return # nothing to do
proc_total_count = len(self._assigned_proc_details)
if proc_total_count < 1:
notify_not_ready(valid=True)
return
proc_ok_count = 0
for sn in self._assigned_proc_details:
if (fqdn := self._proc_sn_to_fqdn.get(sn)) is None:
self.logger.warning("NO fqdn for %s", sn)
continue
# check PROC FW loaded + is healthy
if (
stats_mode := self._proc_stats_mode.get(fqdn)
) is None or not stats_mode["ready"]:
continue
if self._component_health.get(fqdn) == HealthState.OK:
proc_ok_count += 1
new_ready_percent = round(100 * proc_ok_count / proc_total_count + 0.5)
# pylint: disable=access-member-before-definition
self.logger.info(
"processorsReadyPerc: new:%d, old:%d, count_ok:%d; count_all: %d",
new_ready_percent,
self._proc_ready_percent,
proc_ok_count,
proc_total_count,
)
if new_ready_percent != self._proc_ready_percent:
self._proc_ready_percent = new_ready_percent
self.push_change_event("processorsReadyPercent", new_ready_percent)
self.push_archive_event(
"processorsReadyPercent", new_ready_percent
)
def _calc_ether_port_percent(self) -> None:
"""
Recalculate the overall percentage of locked Ethernet ports.
Calculates the percentage based on both processor and switch
port lock status for all assigned processors. Result is
pushed as a Tango change event.
"""
def notify_not_locked() -> None:
"""Notify subscribers Ethernet ports are not ready.
Even though Ethernet ports are not ready (e.g no longer READY
state) subscribers need to know about the new (changed) state.
"""
if self._ether_locked_percent is None:
return
self._ether_locked_percent = None
data = (0, time.time(), AttrQuality.ATTR_INVALID)
self.push_change_event("processorEthernetLockedPercent", *data)
self.push_archive_event("processorEthernetLockedPercent", *data)
self.logger.info("Chk Ether percent in %s", self._obs_state)
if not self._is_obs_state(
(ObsState.READY, ObsState.SCANNING, ObsState.CONFIGURING)
):
notify_not_locked()
return
# the number of Ethernet links we track
if (count_all := len(self._eth_port_stat)) == 0:
return
if not self._configured_ok:
notify_not_locked()
return
# determine if corresponding switch side Ethernet port(s) are up
for stat in self._eth_port_stat.values():
dev, port = stat.sw_dev_name, stat.sw_port
stat.sw_ether_locked = self._conn_port.is_up(dev, port)
# this should be so rare that it's worth logging:
if not stat.sw_ether_locked:
self.logger.warning("SW:%s PORT:%s NOT LOCKED", dev, port)
# the number of switch<-->Alveo links that are up:
count_ok = sum(
(int(all((i.proc_ether_locked, i.sw_ether_locked))))
for i in self._eth_port_stat.values()
)
percent = round(100 * count_ok / count_all + 0.5)
self.logger.info(
"NEW ETHER percent: %d, count_ok:%d, count_all:%d",
percent,
count_ok,
count_all,
)
if self._ether_locked_percent != percent:
self._ether_locked_percent = percent
self.push_change_event("processorEthernetLockedPercent", percent)
self.push_archive_event("processorEthernetLockedPercent", percent)
def _update_obs_state(self, obs_state: ObsState) -> None:
"""Override SKAObsDevice class implementation as we need to know about
``obsState`` changes.
"""
# some Tango attributes will depend on the obsState value
self._obs_state = obs_state
self._enqueue_event(Evt.CALC_PROC_PERCENT)
self._enqueue_event(Evt.CALC_ETHER_PERCENT)
def _poly_delay_callback(self, fqdn: str, tango_evt) -> None:
if self.is_evt_error(tango_evt, "_poly_delay_callback"):
return
value = tango_evt.attr_value.value
self._enqueue_event(Evt.DELAY_POLY_SUMMARY, (fqdn, value))
# ----------
# Threads
# ----------
def _subscribe_allocator(self):
"""A short lived thread to subscribe to Allocator attributes informing
us which Alveo cards were assigned to this subarray
"""
alloc_proxy = MccsDeviceProxy(
self.AllocatorAddress,
self.logger,
connect=False,
)
attribute_callbacks = (
("procDevFqdn", self._fqdn_callback),
("internal_alveo", self._internal_alveo_callback),
("internal_subarray", self._internal_subarray_callback),
("ip_to_resolve", self._update_host_resolution),
)
for att, cb in attribute_callbacks:
alloc_proxy.evt_sub_on_connect(att, cb)
try:
alloc_proxy.connect()
self._subscribed_to_alloc = True
except DevFailed as e:
self.logger.error(f"_subscribe_allocator failed: {e}")
def _update_host_resolution(self):
"""Up host resolution percentage."""
sdp_host_ready_percent = self._get_percent_sdp_resolved()
pss_host_ready_percent = self._get_percent_pss_resolved()
pst_host_ready_percent = self._get_percent_pst_resolved()
if self._sdp_host_ready_percent != sdp_host_ready_percent:
self._sdp_host_ready_percent = sdp_host_ready_percent
self.push_change_event(
"sdpHostsResolvedPercent", sdp_host_ready_percent[0]
)
self.push_archive_event(
"sdpHostsResolvedPercent", sdp_host_ready_percent[0]
)
if self._pss_host_ready_percent != pss_host_ready_percent:
self._pss_host_ready_percent = pss_host_ready_percent
self.push_change_event(
"pssHostsResolvedPercent", pss_host_ready_percent[0]
)
self.push_archive_event(
"pssHostsResolvedPercent", pss_host_ready_percent[0]
)
if self._pst_host_ready_percent != pst_host_ready_percent:
self._pst_host_ready_percent = pst_host_ready_percent
self.push_change_event(
"pstHostsResolvedPercent", pst_host_ready_percent[0]
)
self.push_archive_event(
"pstHostsResolvedPercent", pst_host_ready_percent[0]
)
def _tango_event_completion(self):
"""A thread to decouple (nontrivial) Tango event processing from
callback in which it was reported.
"""
while True:
event, data = self._event_q.get()
if event == Evt.ALVEOS_RELEASED:
# resources are released: flush all references to Alveos
self._flush_alveos()
self._recalculate_health()
self.clear_sps_stats_cache()
self._calc_proc_scan_quality()
self._calc_ether_port_percent()
elif event == Evt.ALVEOS_ASSIGNED:
# data is a dict:
# {"alveo_SN": {"name": "p4_01", "port": "17/0"}, ...}
self._assigned_proc_details = data
# keep track of which sw. port is associated with which Alveo
for sn, sw in data.items():
fqdn = self._proc_sn_to_fqdn[sn]
if fqdn not in self._eth_port_stat:
sw_name, port = sw["name"], sw["port"]
msg = f"ASSOC {fqdn} with sw:{sw_name} port:{port}"
self.logger.info(msg)
self._eth_port_stat[fqdn] = EtherPortStat(
sw_name, port
)
proc_sn_str = json.dumps(list(data.keys()))
self.push_change_event("assigned_processors", proc_sn_str)
self.push_archive_event("assigned_processors", proc_sn_str)
# ensure the thread handling Processor attribute subscriptions
# is alive
# pylint: disable=access-member-before-definition
proc_count = len(self._assigned_proc_details)
if proc_count > 0 and not self._proc_thread:
subscription_loop = self._check_subscribe_processors
self._proc_thread = Thread(target=subscription_loop)
self._proc_thread.start()
elif event == Evt.DELAY_POLY_SUMMARY:
self._update_delays_valid(*data)
elif event == Evt.CALC_PROC_PERCENT:
self._calc_proc_scan_quality() # extract these into a table
elif event == Evt.CALC_ETHER_PERCENT:
self._calc_ether_port_percent()
elif event == Evt.PROCESS_HEALTH:
self._process_health(*data)
else:
self.logger.error("Unknown event %s", event)
self._event_q.task_done()
def _check_subscribe_processors(self):
"""
Update subscriptions to processor Tango attribute changes.
As allocator assigns and removes processors used by this subarray,
subscriptions to attrs of new processors need to be created and
subscriptions to processors no longer in use deleted
"""
subs = (
("subarrayDelaysSummary", self._poly_delay_callback),
("healthState", self._health_callback),
# Receive SPS statistics from processors used by subarray
("stats_sps_npkts", self._update_sps_stats_npkts),
("stats_sps_nflags", self._update_sps_stats_nflags),
("stats_sps_rms", self._update_sps_stats_rms),
("stats_sps_vchans", self._update_sps_stats_vchans),
("stats_mode", self._update_stats_mode),
# TODO: needs to implement those attributes in processor
# ("stats_pst_jones", self._update_stats_pst_jones),
# ("stats_pss_jones", self._update_stats_pss_jones),
("stats_ethernet_status", self._update_proc_ether_port),
)
"""Attribute name, event callback to be called with fqdn & event"""
while True:
time.sleep(1)
assigned_procs = self._assigned_proc_details # short alias
changes = False
# subscribe to each newly assigned processor
for sn in [
p for p in assigned_procs if p not in self._subscribed_sn
]:
if sn not in self._proc_sn_to_fqdn:
self.logger.info("CAN'T FIND %s", sn)
continue
# Get Tango proxy for new processor
fqdn = self._proc_sn_to_fqdn[sn]
try:
new_proxy = DeviceProxy(fqdn)
new_proxy.set_timeout_millis(TANGO_TIMEOUT_MS)
except DevFailed:
# Processors may be unresponsive if loading firmware
continue
# Subscribe to Tango attributes listed in subs above
sub_ids = []
try:
for attr_name, cbk in subs:
sub_id = new_proxy.subscribe_event(
attr_name,
EventType.CHANGE_EVENT,
partial(cbk, fqdn),
stateless=True,
)
sub_ids.append(sub_id)
except EventSystemFailed:
# Tango docs lie about this exception never occurring
for sub_id in sub_ids:
new_proxy.unsubscribe_event(sub_id)
del new_proxy
continue
# Success with subscription
self._proc_subscriptions[sn] = (new_proxy, sub_ids)
self._subscribed_sn.append(sn)
self._subscribed_fqdn.append(fqdn)
changes = True
# unsubscribe from any no-longer-used processors
for sn in [
p for p in self._subscribed_sn if p not in assigned_procs
]:
old_proxy, sub_ids = self._proc_subscriptions.pop(sn)
for sub_id in sub_ids:
old_proxy.unsubscribe_event(sub_id) # TODO can throw??
del old_proxy
self._subscribed_sn.remove(sn)
fqdn = self._proc_sn_to_fqdn[sn]
self._subscribed_fqdn.remove(fqdn)
changes = True
if changes:
txt = f"Monitor Processor health: {str(self._subscribed_sn)}"
self.logger.info(txt)
# Subscribe to Connector port_up attribute associated with proc.
for sn, item in self._assigned_proc_details.items():
# _assigned_proc_details:
# { "XFL10NIYKVEU"": {"name": "p4_01", "port": "17/0"}, ...}
fqdn = self._proc_sn_to_fqdn[sn]
if (eth_stat := self._eth_port_stat.get(fqdn)) is None:
continue
if eth_stat.is_subscribed:
continue
switch_name, port = item["name"], item["port"]
proxy = self._get_switch_proxy(switch_name)
if proxy is None:
self.logger.warning(
"Failed to get switch %s proxy", switch_name
)
continue
try:
# Tango event can arrive really fast so be ready:
self._eth_port_stat[fqdn].sw_proxy = proxy
self._eth_port_stat[fqdn].sw_dev_name = proxy.name()
subscr_id = proxy.subscribe_event(
"diagnostics_port_up",
EventType.CHANGE_EVENT,
self._conn_port_up_cb,
)
self._eth_port_stat[fqdn].subscription_id = subscr_id
except TANGO_SUB_EXC as exc:
del proxy
self._eth_port_stat[fqdn].sw_proxy = None
msg = f"Failed to subscribe to {switch_name} {port} {exc}"
self.logger.warning(msg)
continue
self._eth_port_stat[fqdn].sw_proxy = proxy
self._eth_port_stat[fqdn].subscription_id = subscr_id
self._eth_port_stat[fqdn].sw_dev_name = proxy.name()
self.logger.info("Subscribed to %s port %s", switch_name, port)
def _update_sps_stats_npkts(self, fqdn: str, tango_evt) -> None:
"""
Callback receives npkts statistics from a processor.
Warning: This is a tango event callback. Don't do any work here
"""
if self.is_evt_error(tango_evt, "_update_sps_stats_npkts"):
return
self._sps_stats_npkts_cache[fqdn] = tango_evt.attr_value.value
def _update_sps_stats_nflags(self, fqdn: str, tango_evt) -> None:
"""
Callback receives nflags statistics from a processor.
Warning: This is a tango event callback. Don't do any work here
"""
if self.is_evt_error(tango_evt, "_update_sps_stats_nflags"):
return
self._sps_stats_nflags_cache[fqdn] = tango_evt.attr_value.value
def _update_sps_stats_rms(self, fqdn: str, tango_evt) -> None:
"""
Callback receives rms statistics from a processor.
Warning: This is a tango event callback. Don't do any work here
"""
if self.is_evt_error(tango_evt, "_update_sps_stats_rms"):
return
np_rms = tango_evt.attr_value.value
self._sps_stats_rms_cache[fqdn] = np_rms.T # fix Tango to NP order
def _update_stats_mode(self, fqdn: str, tango_evt) -> None:
"""
Callback receives details about processor state change.
Warning: This is a Tango event callback. Don't do any work here
"""
if self.is_evt_error(tango_evt, "_update_stats_mode"):
return
value_str = tango_evt.attr_value.value
self.logger.info("stats_mode from %s: %s", fqdn, value_str)
value = json.loads(value_str)
self._proc_stats_mode[fqdn] = value
self._enqueue_event(Evt.CALC_PROC_PERCENT)
self._enqueue_event(Evt.CALC_ETHER_PERCENT)
def _update_stats_pst_jones(self, fqdn: str, tango_evt) -> None:
if self.is_evt_error(tango_evt, "_update_stats_pst_jones"):
return
value_str = tango_evt.attr_value.value
self.logger.info("stats_pst_jones from %s: %s", fqdn, value_str)
# TODO: update the local version of PST Jones when we are ready with processor
def _update_stats_pss_jones(self, fqdn: str, tango_evt) -> None:
if self.is_evt_error(tango_evt, "_update_stats_pss_jones"):
return
value_str = tango_evt.attr_value.value
self.logger.info("stats_pss_jones from %s: %s", fqdn, value_str)
# TODO: update the local version of PSS Jones when we are ready with processor
def _update_sps_stats_vchans(self, fqdn: str, tango_evt) -> None:
"""
Callback receives vchans statistics from a processor.
Warning: This is a tango event callback. Don't do any work here
"""
if self.is_evt_error(tango_evt, "_update_sps_stats_vchans"):
return
self._sps_stats_vchans_cache[fqdn] = tango_evt.attr_value.value
# Processors send stats_vchans last so this evt completes an update
self._sps_stats_update_time[fqdn] = time.time()
# Spawn thread because this is running in a Tango event notify thread
thr = Thread(
target=self.component_manager.announce_sps_stats,
args=(
self._sps_stats_nflags_cache,
self._sps_stats_npkts_cache,
self._sps_stats_rms_cache,
self._sps_stats_vchans_cache,
self._sps_stats_update_time,
),
)
thr.start()
def _get_conn_db(self) -> Union[Database, None]:
"""
Get Tango database for Connector device.
:return: Tango Database object, or None if the connection failed
"""
try:
db = Database(self.ConnectorDbHost, self.ConnectorDbPort)
return db
except DevFailed as e:
# CI/CD k8s-test won't have the connector deployed so bail out
self.logger.warning(f"DB EXCEPTION {e}")
return None
def _get_switch_proxy(self, sw_name: str) -> Union[DeviceProxy, None]:
"""
Find a LowCbfConnector device that corresponds to a switch name.
Iterates through all exported LowCbfConnector devices and checks
their ``switchName`` attribute.
:param sw_name: The name of the switch to find (e.g. 'p4_01')
:return: A DeviceProxy for the connector, or None if not found
"""
if (db := self._get_conn_db()) is None:
return None
prefix = f"{self.ConnectorDbHost}:{self.ConnectorDbPort}/"
dev_names = db.get_device_exported_for_class("LowCbfConnector")
for dev in dev_names:
fqdn = prefix + dev
try:
proxy = DeviceProxy(fqdn)
proxy.set_timeout_millis(TANGO_TIMEOUT_MS)
name_str = proxy.switchName
# conn.switchName typically: '{"Name": "p4_01"}'
inst_name = json.loads(name_str)
if "Name" in inst_name and sw_name == inst_name["Name"]:
return proxy
except DevFailed:
self.logger.warning("failed to get proxy for %s", fqdn)
continue
return None
def _update_proc_ether_port(self, fqdn: str, evt: EventData) -> None:
"""
Handle a change event for a processor's Ethernet port status.
Updates the internal state and triggers a recalculation of the
overall Ethernet locked percentage.
:param fqdn: FQDN of the processor device
:param evt: The Tango event data
Warning: This is a tango event callback. Don't do any work here
"""
if self.is_evt_error(evt, "_update_proc_ether_port"):
return
# we get an integer: 1 == Ether port locked, 0 == not locked
status = bool(evt.attr_value.value)
self.logger.info("%s ETHERNET locked: %s", fqdn, status)
with self._callback_lock:
self._eth_port_stat[fqdn].proc_ether_locked = status
self._enqueue_event(Evt.CALC_ETHER_PERCENT)
def _switch_port_to_proc_fqdn(
self, name: str, port: str
) -> Union[str, None]:
"""
Map a switch name and port back to a processor FQDN.
:param name: Switch name
:param port: Switch port
:return: FQDN of the processor connected to this port, or None
"""
for fqdn, stat in self._eth_port_stat.items():
if stat.sw_name == name and stat.sw_port == port:
return fqdn
return None
def clear_sps_stats_cache(self):
"""Erase cache of SPS stats, and clear attribute values"""
self._sps_stats_nflags_cache = {}
self._sps_stats_npkts_cache = {}
self._sps_stats_rms_cache = {}
self._sps_stats_vchans_cache = {}
self._sps_stats_update_time = {}
self._sps_stats_push_ctr = 0
self.sps_stats_update(
[],
[],
np.asarray([], dtype=np.uint8),
np.asarray([], dtype=np.uint8),
np.asarray([], dtype=np.int8),
)
def sps_stats_update(
self, stn_sstns, bm_freqs, np_rms_xpol, np_rms_ypol, np_flags
): # pylint: disable=too-many-arguments
"""Update all the SPS statistics attributes"""
# save attribute value data so it can be read anytime
self._sps_bm_frq = json.dumps(bm_freqs)
self._sps_stn_sstn = json.dumps(stn_sstns)
# Tango C order vs Numpy For_sps_bm_frqtran order for the following
self._sps_stats_rms_xpol = np_rms_xpol.T
self._sps_stats_rms_ypol = np_rms_ypol.T
self._sps_stats_flags_percent = np_flags.T
# update all the SPS-related tango attributes
self.push_change_event("sps_beam_freqs", self._sps_bm_frq)
self.push_change_event("sps_station_substations", self._sps_stn_sstn)
self.push_change_event("sps_stats_rms_xpol", self._sps_stats_rms_xpol)
self.push_change_event("sps_stats_rms_ypol", self._sps_stats_rms_ypol)
self.push_change_event(
"sps_stats_flag_percent", self._sps_stats_flags_percent
)
# Archive 1 in N (=sps_stats_push_per_archive)
self._sps_stats_push_ctr += 1
if self._sps_stats_push_ctr < self._sps_stats_push_per_archive:
self.logger.info("SPS statistics attributes updated")
return
self._sps_stats_push_ctr = 0
self.push_archive_event("sps_beam_freqs", self._sps_bm_frq)
self.push_archive_event("sps_station_substations", self._sps_stn_sstn)
self.push_archive_event("sps_stats_rms_xpol", self._sps_stats_rms_xpol)
self.push_archive_event("sps_stats_rms_ypol", self._sps_stats_rms_ypol)
self.push_archive_event(
"sps_stats_flag_percent", self._sps_stats_flags_percent
)
self.logger.info("SPS statistics attributes updated & archived")
def is_evt_error(self, tango_evt: EventData, cb_name: str) -> bool:
"""Utility function - check if Tango event has failed.
Log warning message with error details
:param tango_event: Tango event object
:param cb_name: name of the callback function handling event
:return: True if event failed, False otherwise
"""
if tango_evt.err:
txt = f"{cb_name} got failed event: stack{tango_evt.errors}"
self.logger.warning(txt)
return True
return False
@attribute(
dtype=((np.uint8,),),
max_dim_x=MAX_STATIONS,
max_dim_y=MAX_SPS_CHAN,
doc="SPS Xpol RMS levels",
)
def sps_stats_rms_xpol(self):
"""Return latest SPS X-polarisation RMS statistics
:return: 2D array (stations * channels) of ``np.uint8``
"""
return self._sps_stats_rms_xpol
@attribute(
dtype=((np.uint8,),),
max_dim_x=MAX_STATIONS,
max_dim_y=MAX_SPS_CHAN,
doc="SPS Ypol RMS levels",
)
def sps_stats_rms_ypol(self):
"""Return latest SPS Y-polarisation RMS statistics
:return: 2D array (stations * channels) of ``np.uint8``
"""
return self._sps_stats_rms_ypol
@attribute(
dtype=((np.uint8,),),
max_dim_x=MAX_STATIONS,
max_dim_y=MAX_SPS_CHAN,
doc="SPS flagging percent (0-100, 0xff if no data)",
)
def sps_stats_flag_percent(self):
"""Return latest SPS flagging statistics
:return: 2D array (stations * channels) of ``np.uint8``; range: [0, 100]%, 255 if no data
"""
return self._sps_stats_flags_percent
@attribute(dtype=str, doc="SPS stats (station, substation) order")
def sps_station_substations(self) -> str:
"""Return station/substation order used for SPS stats"""
return self._sps_stn_sstn
@attribute(dtype=str, doc="SPS stats (beam_id, freq_id) order")
def sps_beam_freqs(self) -> str:
"""
:return: beam/freq order used for SPS stats
"""
return self._sps_bm_frq
@attribute(
dtype=int,
doc="SPS statistics archive interval: 1 in N updates",
access=AttrWriteType.READ_WRITE,
)
def sps_archive_interval(self) -> int:
"""Set/get archive interval (1 in N updates)"""
return self._sps_stats_push_per_archive
def write_sps_archive_interval(self, value: int):
"""Change archive interval"""
self._sps_stats_push_per_archive = value
self._sps_stats_push_ctr = value # force archive immediately
@attribute(
dtype=int,
min_value="0",
max_value="100",
unit="%",
doc="Percentage of Ethernet ports (switch and proc.) ready for scan",
)
def processorEthernetLockedPercent(self) -> tuple[int, float, AttrQuality]:
"""
Get a percentage of Ethernet ports (processor/switch) ready for scan.
The value is meaningful only when ``obsState`` is READY or SCANNING so
we also return Tango attribute quality.
:return: tuple
* percentage: rounded integer in the range [0, 100], None when ATTR_INVALID
* time: float
* quality: AttrQuality
"""
ready = self._is_obs_state((ObsState.READY, ObsState.SCANNING))
value = self._ether_locked_percent if ready else 0
quality = AttrQuality.ATTR_VALID if ready else AttrQuality.ATTR_INVALID
return value, time.time(), quality
# Run server
def main(args=None, **kwargs):
"""Main function of the LowCbfSubarray module."""
return run((LowCbfSubarray,), args=args, **kwargs)
if __name__ == "__main__":
main()