Source code for ska_low_cbf.subarray.subarray_device

# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low CBF project
#
# Copyright (c) 2021 CSIRO
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
# pylint: disable=invalid-name,protected-access,too-few-public-methods
# can't do much about these:
# pylint: disable=attribute-defined-outside-init,pointless-string-statement
# pylint: disable=too-many-lines,too-many-ancestors,too-many-public-methods

""" SKA Low CBF

Subarray device for Low.CBF
"""

import json
import os
import re
import time
from collections.abc import Iterable
from dataclasses import dataclass, field
from enum import IntEnum
from functools import partial
from queue import Queue
from threading import Lock, Thread
from typing import Any, Union

import numpy as np
from ska_tango_base import SKABaseDevice, SKASubarray
from ska_tango_base.commands import ResultCode
from ska_tango_base.control_model import AdminMode, HealthState, ObsState
from tango import (
    AttrQuality,
    AttrWriteType,
    Database,
    DevFailed,
    DeviceProxy,
    DevUShort,
    EventData,
    EventSystemFailed,
    EventType,
)
from tango.server import attribute, device_property, run

from ska_low_cbf import release
from ska_low_cbf.device_proxy import MccsDeviceProxy
from ska_low_cbf.subarray.component_manager import (
    LowCbfSubarrayComponentManager,
    SubarrayTangoIface,
)

__all__ = ["LowCbfSubarray", "main"]


# sometimes the default (3 sec) Tango timeout is insufficient
TANGO_TIMEOUT_MS = 10_000


UNUSED = 2  # a kind of 3-rd state boolean indicating subarray polynomial is not used
MAX_STATIONS = 1024
MAX_SPS_CHAN = 384


class Evt(IntEnum):
    "Encode events received from other Tango devices."
    ALVEOS_RELEASED = 0
    ALVEOS_ASSIGNED = 1
    DELAY_POLY_SUMMARY = 3
    CALC_PROC_PERCENT = 4  # trigger processorsReadyPercent recalculation
    CALC_ETHER_PERCENT = 5  # processorEthernetLockedPercent recalculation
    PROCESS_HEALTH = 6


@dataclass
class EtherPortStat:
    """Class tracking Processor and Connector Ethernet port status."""

    sw_name: str = None  # comes from Allocator e.g. "p4_01"
    sw_port: str = None  # e.g. "17/0"
    sw_proxy: DeviceProxy = None
    sw_dev_name: str = None  # e.g. "low-cbf/connector/0"
    subscription_id: int = None
    proc_ether_locked: bool = False
    sw_ether_locked: bool = False

    @property
    def is_subscribed(self) -> bool:
        """
        Check if we are subscribed to Connector port-up events.

        :return: True if subscribed, False otherwise
        """
        return self.subscription_id is not None

    def unsubscribe_connector(self) -> None:
        """
        Unsubscribe from Connector port updates.

        :return: True if unsubscribed, False otherwise
        """
        if all((self.subscription_id, self.sw_proxy)):
            self.sw_proxy.unsubscribe_event(self.subscription_id)
            return True
        return False


@dataclass
class ConnectorPortsUp:
    """
    Maintain port-up status for multiple connector devices.

    Each switch (connector) has its own entry in the `ports_up` dictionary.
    """

    ports_up: dict[str, list[int]] = field(default_factory=dict)
    """Port-up status for each participating switch.

    key:   Tango device name (e.g. 'low-cbf/connector/0')
    value: list of integers; 1 == port is up, otherwise 0
    """

    def is_up(self, dev_name: str, port: Union[int, str]) -> bool:
        """
        Check if a port is up.

        :param dev_name: connector Tango device  (e.g. 'low-cbf/connector/0')
        :param port: Port number (int) or string (e.g. '17/0')
        :return: True if the port is up, False otherwise
        """
        if isinstance(port, str):
            if (index := port.find("/")) != -1:
                port = port[:index]
            port = int(port)
        # arrray is 0 based, ports are 1 based
        port -= 1
        if dev_name not in self.ports_up:
            return False
        all_ports = self.ports_up[dev_name]
        return bool(all_ports[port]) if len(all_ports) > port else False


TANGO_SUB_EXC = (TypeError, EventSystemFailed)
"""Exceptions proxy.subscribe_event can throw"""


class LowCbfSubarray(SKASubarray):
    """
    Subarray device for Low.CBF

    **Properties:**

    - Device Property
        ControllerAddress
            - Tango address of Low.CBF Controller
            - Type:'DevString'
        AllocatorAddress
            - Tango address of allocator device that will handle assignment of
            FPGA resources
            - Type:'DevString'
    """

    def __init__(self, *args, **kwargs):
        self._sps_stats_nflags_cache = {}
        self._sps_stats_npkts_cache = {}
        self._sps_stats_rms_cache = {}
        self._sps_stats_vchans_cache = {}
        self._sps_stats_update_time = {}
        self._sps_stats_push_ctr = 0
        self._sps_stats_push_per_archive = 10  # default 1 in 10 archived
        self._sps_stats_flags_percent = np.asarray([], dtype=np.int8)
        self._sps_stats_rms_xpol = np.asarray([], dtype=np.uint8)
        self._sps_stats_rms_ypol = np.asarray([], dtype=np.uint8)
        self._sps_stn_sstn = json.dumps([])
        self._sps_bm_frq = json.dumps([])
        self._proc_stats_mode = {}
        """Dict; key: processor fqdn; value: stats_mode"""
        self._sdp_host_ready_percent = (0, AttrQuality.ATTR_INVALID)
        self._pss_host_ready_percent = (0, AttrQuality.ATTR_INVALID)
        self._pst_host_ready_percent = (0, AttrQuality.ATTR_INVALID)
        super().__init__(*args, **kwargs)

    _health_states = (HealthState.FAILED, HealthState.DEGRADED, HealthState.OK)
    """HealthStates order by severity"""

    # Device Properties
    ControllerAddress = device_property(
        dtype="DevString", default_value="low-cbf/control/0"
    )

    AllocatorAddress = device_property(
        dtype="DevString", default_value="low-cbf/allocator/0"
    )

    ConnectorDbHost = device_property(
        dtype="str", default_value="tango-databaseds"
    )
    ConnectorDbPort = device_property(dtype="int", default_value=10000)

    @attribute(
        label="Stations",
        doc="Report station & substation membership in subarray",
    )
    def stations(self) -> str:
        """
        Report ``station`` & ``substation`` membership in subarray
        """
        return str(self.component_manager.subarray.stations)

    @attribute(dtype=DevUShort)
    def delaysValid(self) -> DevUShort:
        """Get per-subarray delay polynomial validity value.

        Legal values are:

        * 0 - INVALID
        * 1 - VALID
        * 2 - UNUSED

        :return: delay validity status as integer
        :rtype: DevUShort
        """
        return self._delays_valid

    def _update_delays_valid(self, fqdn: str, new_delays: list[int]) -> None:
        "Update this subarray specific delay polynomial validity status"
        assert len(new_delays) >= self._subarray_id
        self.logger.info(f"{new_delays}")
        sub_id = self._subarray_id - 1  # subarrays are 1-based
        # update this processor's contribution
        self._delay_contributors[fqdn] = new_delays[sub_id]
        for device, value in self._delay_contributors.items():
            self.logger.info(f"{device}: {value}")
        new_value = min(self._delay_contributors.values())
        # pylint: disable=access-member-before-definition
        if self._delays_valid != new_value:
            self._delays_valid = new_value
            self.push_change_event("delaysValid", new_value)
            self.push_archive_event("delaysValid", new_value)

    @attribute(
        label="Station Beams",
        doc="Report configuration of all Station Beams",
    )
    def stationBeams(self) -> str:
        """Return the ``stationBeams`` attribute."""
        return str(self.component_manager.subarray.station_beams)

    @attribute(
        dtype="DevString",
        label="Pulsar Search Beams",
        doc=(
            "Each Pulsar Search Beam is associated with one Station Beam, and "
            "has additional configuration parameters including a delay "
            "polynomial source (supplied via Configure)"
        ),
    )
    def pssBeams(self) -> str:
        """Return the ``pssBeams`` attribute."""
        return str(self.component_manager.subarray.pss_beams)

    @attribute(
        label="Pulsar Timing Beams",
        doc=(
            "Each Pulsar Timing Beam is associated with one Station Beam, and "
            "has additional configuration parameters including a delay "
            "polynomial source (supplied via Configure)"
        ),
    )
    def pstBeams(self) -> str:
        """Return the ``pstBeams`` attribute."""
        return str(self.component_manager.subarray.pst_beams)

    @attribute(
        label="Zoom window IDs",
        doc="Report Zooms by window ID",
    )
    def zooms(self) -> str:
        """Return the ``zooms`` attribute."""
        return str(self.component_manager.subarray.zooms)

    # We override SKA-Tango-BASE adminMode attribute to prevent it being
    # set to OFFLINE while subarray is resourced. This change means that
    # a subarray can have resources only in adminMode ONLINE or ENGINEERING
    @attribute(
        # dtype=AdminMode,  ... FIXME Sphinx doesn't like this
        memorized=True,
        hw_memorized=True,
        access=AttrWriteType.READ_WRITE,
    )
    def adminMode(self: SKABaseDevice) -> AdminMode:
        """
        Read the Admin Mode of the device.

        It may interpret the current device condition and condition of all
        managed devices to set this. Most possibly an aggregate attribute.

        :return: Admin Mode of the device
        """
        return self._admin_mode

[docs] def write_adminMode(self: SKABaseDevice, value: AdminMode) -> None: """ Set the Admin Mode of the device. Overide of ska-tango-base to prevent being set offline while resourced :param value: Admin Mode of the device. :raises ValueError: for unknown adminMode """ # Block adminMode change if subarray active but obsState is not EMPTY # or IDLE. IDLE also allowed - subarrays have empty assignresources. # Allowing IDLE allows PTC1 to pass. PTC1 changes AdminMode in IDLE blocked = self._admin_mode in ( AdminMode.ONLINE, AdminMode.ENGINEERING, ) and not self._is_obs_state((ObsState.IDLE, ObsState.EMPTY)) assert not blocked, "Can't change adminMode when obsState not EMPTY" value = AdminMode(value) self._adjust_health(value) if value == AdminMode.NOT_FITTED: self.admin_mode_model.perform_action("to_notfitted") elif value == AdminMode.OFFLINE: # should health monitoring stop here? self.admin_mode_model.perform_action("to_offline") self.component_manager.stop_communicating() elif value == AdminMode.ENGINEERING: self.admin_mode_model.perform_action("to_engineering") self.component_manager.start_communicating() # propagate healthState in case it was updated while OFFLINE self._recalculate_health() elif value == AdminMode.ONLINE: self.admin_mode_model.perform_action("to_online") self.component_manager.start_communicating() self._recalculate_health() elif value == AdminMode.RESERVED: self.admin_mode_model.perform_action("to_reserved") else: raise ValueError(f"Unknown adminMode {value}")
def _adjust_health(self, admin_mode: AdminMode) -> None: """Set ``healthState`` to UNKNOWN when switching to e.g. OFFLINE MODE Otherwise start health monitoring when in ONLINE or ENGINEERING :param admin_mode: the ``adminMode`` we are about to transition to """ monitoring_now = self.is_monitoring_mode() monitoring_next = self.is_monitoring_mode(admin_mode) # if currently not in a monitoring mode but will be after this change # ensure the monitoring thread is running if not monitoring_now and monitoring_next: msg = f"Entering {admin_mode.name} mode: will track healthState" self.logger.info(msg) self._start_health_monitoring() # if currently in a monitoring mode but won't be after this change # change the healthState to UNKNOWN elif monitoring_now and not monitoring_next: msg = f"Entering {admin_mode.name} mode: won't track healthState" self.logger.info(msg) self._update_health_state(HealthState.UNKNOWN) def _start_health_monitoring(self): """Start halth monitoring thread if it's not already running but not in case NO_HEALTH_ROLLUP enivonment variable is set. """ if self._no_health_rollup or self._subscribed_to_alloc: return self._thrd = Thread(target=self._subscribe_allocator) self._thrd.start() self._subscribe_to_connector() # TBD: Taranta didn't like the name 'assignedProcessors' (:shrug:) @attribute(dtype="str", doc="List of processors assigned to subarray") def assigned_processors(self: SKABaseDevice) -> str: """ Get a list of processors assigned to the subarray. :return: JSON string, a list of processor serial numbers """ return json.dumps(self._assigned_proc_details) @attribute( dtype=int, doc="Percentage of PSS hosts resolved.", min_value="0", max_value="100", unit="%", ) def pssHostsResolvedPercent(self): """Return the percentage of PSS hosts resolved.""" ready = self._commanded_obs_state in ( ObsState.READY, ObsState.SCANNING, ) value_from_alloc, qual = self._pss_host_ready_percent value = value_from_alloc if ready else 0 quality = qual if ready else AttrQuality.ATTR_INVALID return value, time.time(), quality def _get_percent_pss_resolved(self) -> (int, AttrQuality): """Return the percentage of PSS hosts resolved.""" subarray_id = self._subarray_id configuration = self._configured_subarrays allocator_proxy = MccsDeviceProxy( self.AllocatorAddress, self.logger, connect=False ) allocator_proxy.connect(max_time=120) qual = AttrQuality.ATTR_INVALID percentage = 0.0 if f"{subarray_id}" in configuration: if "search_beams" in configuration[f"{subarray_id}"]: percentage = allocator_proxy.GetPSSHostResolved(subarray_id) qual = AttrQuality.ATTR_VALID return int(percentage), qual @attribute( dtype=int, doc="Percentage of PST hosts resolved.", min_value="0", max_value="100", unit="%", ) def pstHostsResolvedPercent(self): """Return the percentage of PST hosts resolved.""" ready = self._commanded_obs_state in ( ObsState.READY, ObsState.SCANNING, ) value_from_alloc, qual = self._pst_host_ready_percent value = value_from_alloc if ready else 0 quality = qual if ready else AttrQuality.ATTR_INVALID return value, time.time(), quality def _get_percent_pst_resolved(self) -> (int, AttrQuality): """Return percentage of PST hosts resolved.""" subarray_id = self._subarray_id configuration = self._configured_subarrays allocator_proxy = MccsDeviceProxy( self.AllocatorAddress, self.logger, connect=False ) allocator_proxy.connect(max_time=120) qual = AttrQuality.ATTR_INVALID percentage = 0.0 if f"{subarray_id}" in configuration: if "timing_beams" in configuration[f"{subarray_id}"]: percentage = allocator_proxy.GetPSTHostResolved(subarray_id) qual = AttrQuality.ATTR_VALID return int(percentage), qual @attribute( dtype=int, doc="Percentage of SDP hosts resolved.", min_value="0", max_value="100", unit="%", ) def sdpHostsResolvedPercent(self): """Return percentage of SDP hosts resolved.""" ready = self._commanded_obs_state in ( ObsState.READY, ObsState.SCANNING, ) value_from_alloc, qual = self._sdp_host_ready_percent value = value_from_alloc if ready else 0 quality = qual if ready else AttrQuality.ATTR_INVALID return value, time.time(), quality def _get_percent_sdp_resolved(self) -> (int, AttrQuality): """Return the percentage of SDP hosts resolved.""" subarray_id = self._subarray_id configuration = self._configured_subarrays allocator_proxy = MccsDeviceProxy( self.AllocatorAddress, self.logger, connect=False ) allocator_proxy.connect(max_time=120) qual = AttrQuality.ATTR_INVALID percentage = 0.0 if f"{subarray_id}" in configuration: if ( "vis" in configuration[f"{subarray_id}"] or "coarse_zooms" in configuration[f"{subarray_id}"] ): percentage = allocator_proxy.GetSDPHostResolved(subarray_id) qual = AttrQuality.ATTR_VALID return int(percentage), qual @attribute( dtype="int", doc=("Percentage of processors ready for scan."), min_value="0", max_value="100", unit="%", ) def processorsReadyPercent( self: SKABaseDevice, ) -> tuple[int, float, AttrQuality]: """ Get a percentage of processors ready for subarray scan. The value is meaningful only when ``obsState`` is READY or SCANNING so we also return Tango attribute quality. :return: tuple * percentage: rounded integer in the range [0, 100], None when ATTR_INVALID * time: float * quality: AttrQuality """ # adjust Tango quality attribute depending on obsState ready = self._is_obs_state(ObsState.READY) scanning = self._is_obs_state(ObsState.SCANNING) value = self._proc_ready_percent if ready or scanning else 0 quality = ( AttrQuality.ATTR_VALID if ready or scanning else AttrQuality.ATTR_INVALID ) return value, time.time(), quality def _is_obs_state( self, state: Union[ObsState, Iterable[ObsState]] ) -> bool: """Determine if subarray ``obsState`` value is in the specified state. :param state: can be either a single state or a list of multiple states e.g. (READY, SCANNING) :return: True if ``obsState`` is in specified state, False otherwise """ if isinstance(state, Iterable): return self._obs_state in state return self._obs_state == state @attribute( dtype="int", doc="Percentage of PST Jones applied (Not yet ready).", min_value="0", max_value="100", unit="%", ) def pstJonesAppliedPercent( self: SKABaseDevice, ) -> tuple[int, float, AttrQuality]: """ Get a percentage of PST Jones applied. The value is meaningful only when ``obsState`` is READY or SCANNING so we also return Tango attribute quality. :return: tuple * percentage: rounded integer in the range [0, 100], None when ATTR_INVALID * time: float * quality: AttrQuality """ # adjust Tango quality attribute depending on obsState ready = self._is_obs_state(ObsState.READY) scanning = self._is_obs_state(ObsState.SCANNING) value = self._pst_jones_applied_percent if ready or scanning else 0 quality = ( AttrQuality.ATTR_VALID if ready or scanning else AttrQuality.ATTR_INVALID ) return value, time.time(), quality @attribute( dtype="int", doc="Percentage of PSS Jones applied (Not yet ready).", min_value="0", max_value="100", unit="%", ) def pssJonesAppliedPercent( self: SKABaseDevice, ) -> tuple[int, float, AttrQuality]: """ Get a percentage of PSS Jones applied. The value is meaningful only when ``obsState`` is READY or SCANNING so we also return Tango attribute quality. :return: tuple * percentage: rounded integer in the range [0, 100], None when ATTR_INVALID * time: float * quality: AttrQuality """ # adjust Tango quality attribute depending on obsState ready = self._is_obs_state(ObsState.READY) scanning = self._is_obs_state(ObsState.SCANNING) value = self._pss_jones_applied_percent if ready or scanning else 0 quality = ( AttrQuality.ATTR_VALID if ready or scanning else AttrQuality.ATTR_INVALID ) return value, time.time(), quality @attribute( dtype="int", doc="Age of applied PST Jones matrices (Not yet ready).", min_value="0", unit="seconds", ) def pstJonesAge( self: SKABaseDevice, ) -> tuple[int, float, AttrQuality]: """ Get the age of latest PST Jones applied. The value is meaningful only when ``obsState`` is READY or SCANNING so we also return Tango attribute quality. :return: tuple * age: age of the PST Jones Matrix * time: float * quality: AttrQuality """ # adjust Tango quality attribute depending on obsState ready = self._is_obs_state(ObsState.READY) scanning = self._is_obs_state(ObsState.SCANNING) value = self._pst_jones_age if ready or scanning else 0 quality = ( AttrQuality.ATTR_VALID if ready or scanning else AttrQuality.ATTR_INVALID ) return value, time.time(), quality @attribute( dtype="int", doc="Age of applied PSS Jones matrices (Not yet ready)", min_value="0", unit="seconds", ) def pssJonesAge( self: SKABaseDevice, ) -> tuple[int, float, AttrQuality]: """ Get the age of latest PSS Jones applied. The value is meaningful only when ``obsState`` is READY or SCANNING so we also return Tango attribute quality. :return: tuple * age: age of the PST Jones Matrix * time: float * quality: AttrQuality """ # adjust Tango quality attribute depending on obsState ready = self._is_obs_state(ObsState.READY) scanning = self._is_obs_state(ObsState.SCANNING) value = self._pss_jones_age if ready or scanning else 0 quality = ( AttrQuality.ATTR_VALID if ready or scanning else AttrQuality.ATTR_INVALID ) return value, time.time(), quality # override of CspSubElementObsDevice _component_state_changed def _component_state_changed_low( self, fault=None, power=None, resourced=None, configured=None, scanning=None, obsfault=None, # new here ): # pylint: disable=too-many-arguments """Enhanced obsState callback to allow obsState=FAULT""" # Most state is handled by calling ska-tango-base BaseComponentManager self._component_state_changed( fault=fault, power=power, resourced=resourced, configured=configured, scanning=scanning, ) # But our new state variable is handled here if obsfault is not None: if obsfault: self.obs_state_model.perform_action("component_obsfault") else: pass # (no state change needed when obsfault is cleared) # General methods def create_component_manager(self): """Create Subarray component manager""" subarray_name = self.get_name() subarray_name_parts = subarray_name.split("/") subarray_id = int(subarray_name_parts[2]) allocator = MccsDeviceProxy( self.AllocatorAddress, self.logger, connect=True ) tango_iface = SubarrayTangoIface( update_attr_sps_stats=self.sps_stats_update, get_admin_mode=self.get_admin_mode, ) return LowCbfSubarrayComponentManager( logger=self.logger, subarray_id=subarray_id, communication_state_callback=self._communication_state_changed, component_state_callback=self._component_state_changed_low, tango_iface=tango_iface, allocator=allocator, ) def get_admin_mode(self) -> int: """ callback providing access to subarray "adminMode" :return: Which adminMode subarray is in """ return self._admin_mode def always_executed_hook(self): """Method always executed before any TANGO command is executed.""" def delete_device(self): """Hook to delete resources allocated in init_device. This method allows for any memory or other resources allocated in the init_device method to be released. This method is called by the device destructor and by the device Init command. """ # Commands def init_command_objects(self): """ Initialises the command handlers for commands supported by this device. """ # pylint: disable=useless-super-delegation super().init_command_objects() class InitCommand(SKASubarray.InitCommand): """Init Command object""" def do(self): """ Initialises the attributes and properties of the LowCbfSubarray. """ super().do() self._device._version_id = release.version self._device._build_state = ( f"{release.name}, {release.version}, {release.description}" ) # Events to manually be pushed self._device.set_change_event("healthState", True, False) self._device._proc_sn_to_fqdn = {} """All known Processors { serial number: Processor FQDN }""" self._device._assigned_proc_details = [] """Serial numbers and routing details of Processors assigned to this subarray e.g. { "XFL10NIYKVEU"": {"name": "p4_01", "port": "17/0"}, ...} """ self._device._subscribed_sn = [] """Serial numbers of Processors we are subscribed to""" self._device._proc_subscriptions = {} """Record of processor subscription IDs (for later unsubscribe)""" self._device._subscribed_fqdn = [] """FQDNs of Processors we are subscribed to""" self._device._eth_port_stat = {} """A dictionary keeping track about Ethernet port status. key: processor fqdn; value: EtherPortStat dataclass """ self._device._conn_port = ConnectorPortsUp() """Keeps track which Connector ports are up. An array of integers for each port; value: 1 == port is up """ self._device._ether_locked_percent = 0 """Percentage of Ethernet ports ready for scan.""" self._device._proc_thread = None """Thread used to subscribe to processors healthState as they get assigned to this subarray""" # TBD expose this as an attribute: self._device._component_health = {} """A dictionary of health for constituent components; e.g. { "low-cbf/processor/0" : OK, "low-cbf/processor/1" : DEGRADED, "low-cbf/connector/0" : OK } """ self._device._callback_lock = Lock() """Serialise callbacks from different devices""" self._device._delays_valid = UNUSED """Overall indicator if all delays in subarray are valid""" self._device._delay_contributors = {} """FQDN: delay_valid map - how much each processor contributes to overall subarray's 'delaysValid' state""" self._device._proc_ready_percent = 0 """Percentage of processors ready for scan""" self._device._pst_jones_applied_percent = 0 """Percentage of PST jones matrices applied""" self._device._pss_jones_applied_percent = 0 """Percentage of PSS jones matrices applied""" self._device._pst_jones_age = 0 """Age of latest PST jones matrices""" self._device._pss_jones_age = 0 """Age of latest PSS jones matrices""" self._device._pss_jones_information = {} """Information for the latest PSS jones matrices""" self._device._pst_jones_information = {} """Information for the latest PST jones matrices""" self._device._configured_ok = False """Subarray configuration went ok.""" self._device._configured_subarrays = {} """Copy of internal_subarray from the Allocator.""" # A few more attributes that could be pushed on change however # at present they come from the component_manager so needs more work # eg: stations, stationbeams, pssbeams, pstbeams, zooms for attribute_name in ( "state", "status", "adminMode", "healthState", "controlMode", "simulationMode", "testMode", "obsState", "obsMode", "configurationProgress", "configurationDelayExpected", "delaysValid", "assigned_processors", "processorsReadyPercent", "pssHostsResolvedPercent", "pstHostsResolvedPercent", "sdpHostsResolvedPercent", "pstJonesAppliedPercent", "pssJonesAppliedPercent", "pssJonesAge", "pstJonesAge", "processorEthernetLockedPercent", ): self._device.set_change_event(attribute_name, True, False) self._device.set_archive_event(attribute_name, True, False) my_name = self._device.get_name() # is there a better way to find my_id ? self._device._subarray_id = int(my_name.split("/")[-1]) self._device.logger.info( f"{my_name} ID {self._device._subarray_id}" ) self._device._subscribed_to_alloc = False self._device._admin_modes_using_health = [AdminMode.ONLINE] value = os.getenv( "ENGINEERING_MODE_IGNORE_HEALTH", default="false" ) if value.lower() != "true": self._device._admin_modes_using_health.append( AdminMode.ENGINEERING ) self._device._event_q = Queue() eventloop_fun = self._device._tango_event_completion self._device._event_thread = Thread(target=eventloop_fun) self._device._event_thread.start() # Use env. variable to determine if health rollup is used # e.i. monitor external devices (proc, connector) health state env_var = "NO_HEALTH_ROLLUP" self._device._no_health_rollup = os.getenv(env_var) is not None message = "LowCbfSubarray init complete" self._device.logger.info(message) self._completed() return ResultCode.OK, message # this will break in v11 of ska-tango-base # (and we will get a whole new way of handling resources...) # device.resource_manager._key = "lowcbf" # # try: # allocator = DeviceProxy(device.AllocatorAddress) # device.subarray = Subarray(allocator, device._connect) # # dirty hack to make resource manager have a link to subarray # # object (for use in assign command) # device.resource_manager.subarray = device.subarray # # message = ( # f"LowCbfSubarray init complete" # f", using allocator {device.AllocatorAddress}" # ) # return ResultCode.OK, message # except Exception as e: # message = "Exception connecting to Allocator" # device.logger.error(f"{message}\n{e}") # return ResultCode.FAILED, message # Inherited partially-implemented commands from ska-tango-base.subarray # ==== Command ==== ==CommandObject== == method == # AssignResources "AssignResources" "assign" # ReleaseResources "ReleaseResources" "release" # Configure - replaced in ska-tango-base.csp.subarray - # ReleaseAllResources "ReleaseAllResources" "release" # Scan "Scan" "scan" # EndScan "EndScan" "end_scan" # End - replaced in ska-tango-base.csp.subarray - # Abort "Abort" abort (not submitted) # ObsReset "ObsReset" "obsreset" # Restart "Restart" "restart" # Inherited partially-implemented command from ska-tango-base:csp.subarray # ConfigureScan "ConfigureScan" # Configure - calls ConfigureScan # GoToIdle "GoToIdle" # End - calls GoToIdle def _subscribe_to_connector(self): """Subscribe to healthState attribute changes on all connectors""" if (db := self._get_conn_db()) is None: return prefix = f"{self.ConnectorDbHost}:{self.ConnectorDbPort}/" dev_names = db.get_device_exported_for_class("LowCbfConnector") for fqdn in dev_names: full_path = prefix + fqdn try: if (proxy := DeviceProxy(full_path)) is None: self.logger.warning("%s proxy FAILED", full_path) continue except DevFailed as e: self.logger.warning("SUBSCR. EXCEPTION %s %s", full_path, e) continue self.logger.info(f"SUBSCRIBING to {full_path}") proxy.subscribe_event( "healthState", EventType.CHANGE_EVENT, # keep format compatible with processor subscription: partial(self._health_callback, fqdn), ) def _flush_alveos(self): "Purge Alveo details on ReleaseResources event" # pylint: disable=access-member-before-definition self.logger.info("flush all Alveos") for sn in self._subscribed_sn: if sn not in self._proc_sn_to_fqdn: continue fqdn = self._proc_sn_to_fqdn[sn] # this processor's delays are no longer used but we won't get # its events as we are unsubscribing from them delays_summary = [UNUSED] * self._subarray_id self._update_delays_valid(fqdn, delays_summary) if fqdn in self._component_health: self._component_health.pop(fqdn) # unsubscribe from Connector diagnostics_port_up updates for item in self._eth_port_stat.values(): if item.unsubscribe_connector(): self.logger.info("Unsubscribed port %s", item.sw_port) self._assigned_proc_details = {} self.push_change_event("assigned_processors", "[]") self.push_archive_event("assigned_processors", "[]") self._subscribed_sn = [] self._subscribed_fqdn = [] self._eth_port_stat = {} """Keep track of processor/connector Ethernet port status""" self._proc_stats_mode = {} # Unsubscribe from all processor tango attributes serials = list(self._proc_subscriptions.keys()) for serial in serials: dev_proxy, sub_ids = self._proc_subscriptions.pop(serial) for sub_id in sub_ids: dev_proxy.unsubscribe_event(sub_id) del dev_proxy txt = f"Monitor Processor health: {str(self._subscribed_sn)}" self.logger.info(txt) def _enqueue_event( self, event: Evt, data: Union[None, Any] = None ) -> None: """Add event (and optionally associated data) to a queue to be handled by a separate thread """ # obsState change could call this before things are initialised if hasattr(self, "_event_q"): self._event_q.put((event, data)) def is_monitoring_mode(self, mode: Union[AdminMode, None] = None) -> bool: """Determine if we are in monitoring mode In monitoring mode we keep track of healthState and report its changes. ONLINE and ENGINEERING admin modes are considered monitoring while NOT_FITTED, OFFLINE and RESERVED are not. See RtD https://is.gd/nCuv1q :param mode: adminMode we are about to transition to; when not supplied use the current ``self._admin_mode`` value instead :return: True when in monitoring mode; False otherwise """ value = self._admin_mode if mode is None else mode return value in self._admin_modes_using_health # ---------- # Callbacks # ---------- def _fqdn_callback(self, name: str, jsonstr: str, quality) -> None: """Gets called when processor fqdn list gets updated in Allocator""" self.logger.info(f"from ALLOCATOR {name} {jsonstr}") procs = json.loads(jsonstr) self._proc_sn_to_fqdn = procs def _internal_alveo_callback(self, name: str, jsonstr: str, quality): """Allocator calls this with information of mapping between Alveo serial number and a subarray it is allocated to""" self._delay_contributors = {} log = self.logger log.info(f"ALLOCATOR CFG {name}: {jsonstr}") cfg_dict = json.loads(jsonstr) if not cfg_dict: self._enqueue_event(Evt.ALVEOS_RELEASED) return # handle ONLY configuration relevant to THIS subarray # probably the bit: "sa_id": 1 in cfg assigned_procs = {} for k, v in cfg_dict.items(): # check expected dict keys for mandatory_key in ("regs", "switch"): if mandatory_key not in v: log.warning("MISSING %s key in %s", mandatory_key, k) return # NOTE PST and CORR have different key names sub_key_names = ("sa_id", "subarray_id") for subarray in v["regs"]: if not subarray: # skip empty register dicts representing unused subarray capacity continue if all(_ not in subarray for _ in sub_key_names): log.warning("MISSING subarray ID") continue # get the subarray number using one of possible dict keys for sub_key in sub_key_names: if sub_key in subarray: subarray_id = subarray[sub_key] break if self._subarray_id == subarray_id: # bundle Alveo S/N and associated switch port together evt_data = {k: v["switch"]} assigned_procs.update(evt_data) # 15-Jan-2024: formats are messy: # ------------------------------- # for CORR: # {"XFL1TJCHM3ON": { # "fw": "vis:0.0.2-main.e4b5ad79", # "regs": [ # { # "sa_id": 4, # "stns": [[345, 1], [350, 1], [352, 1], [355, 1], [431, 1], [434, 1]], # "sa_bm": [[1, 140, 0, 27648, 192, 24, 0]] # } # ]}} # for PST: # {"XFL1XCRTUC22 ": { # "fw": "pst:0.0.20-dev.7c13dd33", # "regs": [ # { # "subarray_id": 4, # "stn_bm_id": 1, # "stns": [[2, 1], [3, 1], [345, 1], [350, 1], [352, 1], [355, 1], [431, 1], [434, 1 ]], "freq_ids": [140, 141, 142, 143, 144, 145, 146, 147], "pst_bm_ids": [15]}, {}, {}]}} # publish change in assigned processors (if any) if assigned_procs != self._assigned_proc_details: self._enqueue_event(Evt.ALVEOS_ASSIGNED, assigned_procs) def _internal_subarray_callback(self, fqdn: str, jsonstr: str, quality): """Allocator event with subarray configuration details. :param jsonstr: dict with * key = ``subarray_id`` number * value = subarray definition (not used here) Warning: This is a Tango event callback. Don't do any work here """ configured_subarrays = tuple(json.loads(jsonstr).keys()) # NOTE: in transport subarray ID was converted to string config_ok = str(self._subarray_id) in configured_subarrays self.logger.info("subarray configured ok: %s", config_ok) self._configured_subarrays = configured_subarrays # update dependencies if changed # pylint: disable=access-member-before-definition if self._configured_ok != config_ok: self._configured_ok = config_ok self._enqueue_event(Evt.CALC_PROC_PERCENT) self._enqueue_event(Evt.CALC_ETHER_PERCENT) def _health_callback(self, fqdn: str, evt: EventData) -> None: """Called by LowCbfProcessor or LowCbfConnector when their health state changes. Enqueue event for processing in a separate thread. """ self._enqueue_event(Evt.PROCESS_HEALTH, (fqdn, evt)) def _process_health(self, fqdn: str, evt: EventData) -> None: """Process LowCbfProcessor or LowCbfConnector health state change. Will trigger processorsReadyPercent recalculation. :param fqdn: device whose healthState changed :param evt: health event details """ if self.is_evt_error(evt, "_process_health"): return name = evt.attr_value.name value = evt.attr_value.value is_processor = fqdn.find("/processor/") != -1 self.logger.info(f"HEALTH {fqdn} {name}: {value}") self._component_health[fqdn] = value # _recalculate_health ensures healthState is updated/propagated # only when subarray is ONLINE (or ENGINEERING - depending on # environment variable value) self._recalculate_health() if is_processor: self._calc_proc_scan_quality() def _recalculate_health(self): """Recalculate overall health state (across all devices) and propagate change upwards (Controller) if needed""" if not self.is_monitoring_mode(): return current_health = HealthState.UNKNOWN for hs in self._health_states: if hs in self._component_health.values(): current_health = hs break if current_health != self._health_state: self._update_health_state(current_health) def _conn_port_up_cb(self, tango_evt: EventData) -> None: """ Handle a change event for a connector's port-up status. Updates the internal record of port status for participating connectors and triggers a recalculation of the overall Ethernet locked percentage. :param tango_evt: The Tango event data Warning: This is a Tango event callback. Don't do any work here """ if self.is_evt_error(tango_evt, "_conn_port_up_cb"): return tango_dev_name = tango_evt.device.name() value = tango_evt.attr_value.value self.logger.info("%s ETHERNET %s", tango_dev_name, value) self._conn_port.ports_up[tango_dev_name] = tango_evt.attr_value.value self._enqueue_event(Evt.CALC_ETHER_PERCENT) def _calc_proc_scan_quality(self) -> None: """Reevaluate percentage of processors ready for scan. Do nothing if ``obsState`` is other than READY or SCANNING Side effects: * update self._proc_ready_percent, see processorsReadyPercent attribute * generate event for subscribers """ def notify_not_ready(valid: bool) -> None: """Notify subscribers processors are not ready. Even though processors are not ready (for whatever reason) subscribers need to know about the new state. :param valid: True if attr. quality is VALID, False otherwise """ if self._proc_ready_percent == 0: return self._proc_ready_percent = 0 q = AttrQuality.ATTR_VALID if valid else AttrQuality.ATTR_INVALID evt_value = (0, time.time(), q) self.push_change_event("processorsReadyPercent", *evt_value) self.push_archive_event("processorsReadyPercent", *evt_value) self.logger.info("Chk proc percent in %s", self._obs_state) if not self._configured_ok: self.logger.info("subarray not configured") notify_not_ready(valid=False) return if not self._is_obs_state( (ObsState.READY, ObsState.SCANNING, ObsState.CONFIGURING) ): notify_not_ready(valid=False) return # nothing to do proc_total_count = len(self._assigned_proc_details) if proc_total_count < 1: notify_not_ready(valid=True) return proc_ok_count = 0 for sn in self._assigned_proc_details: if (fqdn := self._proc_sn_to_fqdn.get(sn)) is None: self.logger.warning("NO fqdn for %s", sn) continue # check PROC FW loaded + is healthy if ( stats_mode := self._proc_stats_mode.get(fqdn) ) is None or not stats_mode["ready"]: continue if self._component_health.get(fqdn) == HealthState.OK: proc_ok_count += 1 new_ready_percent = round(100 * proc_ok_count / proc_total_count + 0.5) # pylint: disable=access-member-before-definition self.logger.info( "processorsReadyPerc: new:%d, old:%d, count_ok:%d; count_all: %d", new_ready_percent, self._proc_ready_percent, proc_ok_count, proc_total_count, ) if new_ready_percent != self._proc_ready_percent: self._proc_ready_percent = new_ready_percent self.push_change_event("processorsReadyPercent", new_ready_percent) self.push_archive_event( "processorsReadyPercent", new_ready_percent ) def _calc_ether_port_percent(self) -> None: """ Recalculate the overall percentage of locked Ethernet ports. Calculates the percentage based on both processor and switch port lock status for all assigned processors. Result is pushed as a Tango change event. """ def notify_not_locked() -> None: """Notify subscribers Ethernet ports are not ready. Even though Ethernet ports are not ready (e.g no longer READY state) subscribers need to know about the new (changed) state. """ if self._ether_locked_percent is None: return self._ether_locked_percent = None data = (0, time.time(), AttrQuality.ATTR_INVALID) self.push_change_event("processorEthernetLockedPercent", *data) self.push_archive_event("processorEthernetLockedPercent", *data) self.logger.info("Chk Ether percent in %s", self._obs_state) if not self._is_obs_state( (ObsState.READY, ObsState.SCANNING, ObsState.CONFIGURING) ): notify_not_locked() return # the number of Ethernet links we track if (count_all := len(self._eth_port_stat)) == 0: return if not self._configured_ok: notify_not_locked() return # determine if corresponding switch side Ethernet port(s) are up for stat in self._eth_port_stat.values(): dev, port = stat.sw_dev_name, stat.sw_port stat.sw_ether_locked = self._conn_port.is_up(dev, port) # this should be so rare that it's worth logging: if not stat.sw_ether_locked: self.logger.warning("SW:%s PORT:%s NOT LOCKED", dev, port) # the number of switch<-->Alveo links that are up: count_ok = sum( (int(all((i.proc_ether_locked, i.sw_ether_locked)))) for i in self._eth_port_stat.values() ) percent = round(100 * count_ok / count_all + 0.5) self.logger.info( "NEW ETHER percent: %d, count_ok:%d, count_all:%d", percent, count_ok, count_all, ) if self._ether_locked_percent != percent: self._ether_locked_percent = percent self.push_change_event("processorEthernetLockedPercent", percent) self.push_archive_event("processorEthernetLockedPercent", percent) def _update_obs_state(self, obs_state: ObsState) -> None: """Override SKAObsDevice class implementation as we need to know about ``obsState`` changes. """ # some Tango attributes will depend on the obsState value self._obs_state = obs_state self._enqueue_event(Evt.CALC_PROC_PERCENT) self._enqueue_event(Evt.CALC_ETHER_PERCENT) def _poly_delay_callback(self, fqdn: str, tango_evt) -> None: if self.is_evt_error(tango_evt, "_poly_delay_callback"): return value = tango_evt.attr_value.value self._enqueue_event(Evt.DELAY_POLY_SUMMARY, (fqdn, value)) # ---------- # Threads # ---------- def _subscribe_allocator(self): """A short lived thread to subscribe to Allocator attributes informing us which Alveo cards were assigned to this subarray """ alloc_proxy = MccsDeviceProxy( self.AllocatorAddress, self.logger, connect=False, ) attribute_callbacks = ( ("procDevFqdn", self._fqdn_callback), ("internal_alveo", self._internal_alveo_callback), ("internal_subarray", self._internal_subarray_callback), ("ip_to_resolve", self._update_host_resolution), ) for att, cb in attribute_callbacks: alloc_proxy.evt_sub_on_connect(att, cb) try: alloc_proxy.connect() self._subscribed_to_alloc = True except DevFailed as e: self.logger.error(f"_subscribe_allocator failed: {e}") def _update_host_resolution(self): """Up host resolution percentage.""" sdp_host_ready_percent = self._get_percent_sdp_resolved() pss_host_ready_percent = self._get_percent_pss_resolved() pst_host_ready_percent = self._get_percent_pst_resolved() if self._sdp_host_ready_percent != sdp_host_ready_percent: self._sdp_host_ready_percent = sdp_host_ready_percent self.push_change_event( "sdpHostsResolvedPercent", sdp_host_ready_percent[0] ) self.push_archive_event( "sdpHostsResolvedPercent", sdp_host_ready_percent[0] ) if self._pss_host_ready_percent != pss_host_ready_percent: self._pss_host_ready_percent = pss_host_ready_percent self.push_change_event( "pssHostsResolvedPercent", pss_host_ready_percent[0] ) self.push_archive_event( "pssHostsResolvedPercent", pss_host_ready_percent[0] ) if self._pst_host_ready_percent != pst_host_ready_percent: self._pst_host_ready_percent = pst_host_ready_percent self.push_change_event( "pstHostsResolvedPercent", pst_host_ready_percent[0] ) self.push_archive_event( "pstHostsResolvedPercent", pst_host_ready_percent[0] ) def _tango_event_completion(self): """A thread to decouple (nontrivial) Tango event processing from callback in which it was reported. """ while True: event, data = self._event_q.get() if event == Evt.ALVEOS_RELEASED: # resources are released: flush all references to Alveos self._flush_alveos() self._recalculate_health() self.clear_sps_stats_cache() self._calc_proc_scan_quality() self._calc_ether_port_percent() elif event == Evt.ALVEOS_ASSIGNED: # data is a dict: # {"alveo_SN": {"name": "p4_01", "port": "17/0"}, ...} self._assigned_proc_details = data # keep track of which sw. port is associated with which Alveo for sn, sw in data.items(): fqdn = self._proc_sn_to_fqdn[sn] if fqdn not in self._eth_port_stat: sw_name, port = sw["name"], sw["port"] msg = f"ASSOC {fqdn} with sw:{sw_name} port:{port}" self.logger.info(msg) self._eth_port_stat[fqdn] = EtherPortStat( sw_name, port ) proc_sn_str = json.dumps(list(data.keys())) self.push_change_event("assigned_processors", proc_sn_str) self.push_archive_event("assigned_processors", proc_sn_str) # ensure the thread handling Processor attribute subscriptions # is alive # pylint: disable=access-member-before-definition proc_count = len(self._assigned_proc_details) if proc_count > 0 and not self._proc_thread: subscription_loop = self._check_subscribe_processors self._proc_thread = Thread(target=subscription_loop) self._proc_thread.start() elif event == Evt.DELAY_POLY_SUMMARY: self._update_delays_valid(*data) elif event == Evt.CALC_PROC_PERCENT: self._calc_proc_scan_quality() # extract these into a table elif event == Evt.CALC_ETHER_PERCENT: self._calc_ether_port_percent() elif event == Evt.PROCESS_HEALTH: self._process_health(*data) else: self.logger.error("Unknown event %s", event) self._event_q.task_done() def _check_subscribe_processors(self): """ Update subscriptions to processor Tango attribute changes. As allocator assigns and removes processors used by this subarray, subscriptions to attrs of new processors need to be created and subscriptions to processors no longer in use deleted """ subs = ( ("subarrayDelaysSummary", self._poly_delay_callback), ("healthState", self._health_callback), # Receive SPS statistics from processors used by subarray ("stats_sps_npkts", self._update_sps_stats_npkts), ("stats_sps_nflags", self._update_sps_stats_nflags), ("stats_sps_rms", self._update_sps_stats_rms), ("stats_sps_vchans", self._update_sps_stats_vchans), ("stats_mode", self._update_stats_mode), # TODO: needs to implement those attributes in processor # ("stats_pst_jones", self._update_stats_pst_jones), # ("stats_pss_jones", self._update_stats_pss_jones), ("stats_ethernet_status", self._update_proc_ether_port), ) """Attribute name, event callback to be called with fqdn & event""" while True: time.sleep(1) assigned_procs = self._assigned_proc_details # short alias changes = False # subscribe to each newly assigned processor for sn in [ p for p in assigned_procs if p not in self._subscribed_sn ]: if sn not in self._proc_sn_to_fqdn: self.logger.info("CAN'T FIND %s", sn) continue # Get Tango proxy for new processor fqdn = self._proc_sn_to_fqdn[sn] try: new_proxy = DeviceProxy(fqdn) new_proxy.set_timeout_millis(TANGO_TIMEOUT_MS) except DevFailed: # Processors may be unresponsive if loading firmware continue # Subscribe to Tango attributes listed in subs above sub_ids = [] try: for attr_name, cbk in subs: sub_id = new_proxy.subscribe_event( attr_name, EventType.CHANGE_EVENT, partial(cbk, fqdn), stateless=True, ) sub_ids.append(sub_id) except EventSystemFailed: # Tango docs lie about this exception never occurring for sub_id in sub_ids: new_proxy.unsubscribe_event(sub_id) del new_proxy continue # Success with subscription self._proc_subscriptions[sn] = (new_proxy, sub_ids) self._subscribed_sn.append(sn) self._subscribed_fqdn.append(fqdn) changes = True # unsubscribe from any no-longer-used processors for sn in [ p for p in self._subscribed_sn if p not in assigned_procs ]: old_proxy, sub_ids = self._proc_subscriptions.pop(sn) for sub_id in sub_ids: old_proxy.unsubscribe_event(sub_id) # TODO can throw?? del old_proxy self._subscribed_sn.remove(sn) fqdn = self._proc_sn_to_fqdn[sn] self._subscribed_fqdn.remove(fqdn) changes = True if changes: txt = f"Monitor Processor health: {str(self._subscribed_sn)}" self.logger.info(txt) # Subscribe to Connector port_up attribute associated with proc. for sn, item in self._assigned_proc_details.items(): # _assigned_proc_details: # { "XFL10NIYKVEU"": {"name": "p4_01", "port": "17/0"}, ...} fqdn = self._proc_sn_to_fqdn[sn] if (eth_stat := self._eth_port_stat.get(fqdn)) is None: continue if eth_stat.is_subscribed: continue switch_name, port = item["name"], item["port"] proxy = self._get_switch_proxy(switch_name) if proxy is None: self.logger.warning( "Failed to get switch %s proxy", switch_name ) continue try: # Tango event can arrive really fast so be ready: self._eth_port_stat[fqdn].sw_proxy = proxy self._eth_port_stat[fqdn].sw_dev_name = proxy.name() subscr_id = proxy.subscribe_event( "diagnostics_port_up", EventType.CHANGE_EVENT, self._conn_port_up_cb, ) self._eth_port_stat[fqdn].subscription_id = subscr_id except TANGO_SUB_EXC as exc: del proxy self._eth_port_stat[fqdn].sw_proxy = None msg = f"Failed to subscribe to {switch_name} {port} {exc}" self.logger.warning(msg) continue self._eth_port_stat[fqdn].sw_proxy = proxy self._eth_port_stat[fqdn].subscription_id = subscr_id self._eth_port_stat[fqdn].sw_dev_name = proxy.name() self.logger.info("Subscribed to %s port %s", switch_name, port) def _update_sps_stats_npkts(self, fqdn: str, tango_evt) -> None: """ Callback receives npkts statistics from a processor. Warning: This is a tango event callback. Don't do any work here """ if self.is_evt_error(tango_evt, "_update_sps_stats_npkts"): return self._sps_stats_npkts_cache[fqdn] = tango_evt.attr_value.value def _update_sps_stats_nflags(self, fqdn: str, tango_evt) -> None: """ Callback receives nflags statistics from a processor. Warning: This is a tango event callback. Don't do any work here """ if self.is_evt_error(tango_evt, "_update_sps_stats_nflags"): return self._sps_stats_nflags_cache[fqdn] = tango_evt.attr_value.value def _update_sps_stats_rms(self, fqdn: str, tango_evt) -> None: """ Callback receives rms statistics from a processor. Warning: This is a tango event callback. Don't do any work here """ if self.is_evt_error(tango_evt, "_update_sps_stats_rms"): return np_rms = tango_evt.attr_value.value self._sps_stats_rms_cache[fqdn] = np_rms.T # fix Tango to NP order def _update_stats_mode(self, fqdn: str, tango_evt) -> None: """ Callback receives details about processor state change. Warning: This is a Tango event callback. Don't do any work here """ if self.is_evt_error(tango_evt, "_update_stats_mode"): return value_str = tango_evt.attr_value.value self.logger.info("stats_mode from %s: %s", fqdn, value_str) value = json.loads(value_str) self._proc_stats_mode[fqdn] = value self._enqueue_event(Evt.CALC_PROC_PERCENT) self._enqueue_event(Evt.CALC_ETHER_PERCENT) def _update_stats_pst_jones(self, fqdn: str, tango_evt) -> None: if self.is_evt_error(tango_evt, "_update_stats_pst_jones"): return value_str = tango_evt.attr_value.value self.logger.info("stats_pst_jones from %s: %s", fqdn, value_str) # TODO: update the local version of PST Jones when we are ready with processor def _update_stats_pss_jones(self, fqdn: str, tango_evt) -> None: if self.is_evt_error(tango_evt, "_update_stats_pss_jones"): return value_str = tango_evt.attr_value.value self.logger.info("stats_pss_jones from %s: %s", fqdn, value_str) # TODO: update the local version of PSS Jones when we are ready with processor def _update_sps_stats_vchans(self, fqdn: str, tango_evt) -> None: """ Callback receives vchans statistics from a processor. Warning: This is a tango event callback. Don't do any work here """ if self.is_evt_error(tango_evt, "_update_sps_stats_vchans"): return self._sps_stats_vchans_cache[fqdn] = tango_evt.attr_value.value # Processors send stats_vchans last so this evt completes an update self._sps_stats_update_time[fqdn] = time.time() # Spawn thread because this is running in a Tango event notify thread thr = Thread( target=self.component_manager.announce_sps_stats, args=( self._sps_stats_nflags_cache, self._sps_stats_npkts_cache, self._sps_stats_rms_cache, self._sps_stats_vchans_cache, self._sps_stats_update_time, ), ) thr.start() def _get_conn_db(self) -> Union[Database, None]: """ Get Tango database for Connector device. :return: Tango Database object, or None if the connection failed """ try: db = Database(self.ConnectorDbHost, self.ConnectorDbPort) return db except DevFailed as e: # CI/CD k8s-test won't have the connector deployed so bail out self.logger.warning(f"DB EXCEPTION {e}") return None def _get_switch_proxy(self, sw_name: str) -> Union[DeviceProxy, None]: """ Find a LowCbfConnector device that corresponds to a switch name. Iterates through all exported LowCbfConnector devices and checks their ``switchName`` attribute. :param sw_name: The name of the switch to find (e.g. 'p4_01') :return: A DeviceProxy for the connector, or None if not found """ if (db := self._get_conn_db()) is None: return None prefix = f"{self.ConnectorDbHost}:{self.ConnectorDbPort}/" dev_names = db.get_device_exported_for_class("LowCbfConnector") for dev in dev_names: fqdn = prefix + dev try: proxy = DeviceProxy(fqdn) proxy.set_timeout_millis(TANGO_TIMEOUT_MS) name_str = proxy.switchName # conn.switchName typically: '{"Name": "p4_01"}' inst_name = json.loads(name_str) if "Name" in inst_name and sw_name == inst_name["Name"]: return proxy except DevFailed: self.logger.warning("failed to get proxy for %s", fqdn) continue return None def _update_proc_ether_port(self, fqdn: str, evt: EventData) -> None: """ Handle a change event for a processor's Ethernet port status. Updates the internal state and triggers a recalculation of the overall Ethernet locked percentage. :param fqdn: FQDN of the processor device :param evt: The Tango event data Warning: This is a tango event callback. Don't do any work here """ if self.is_evt_error(evt, "_update_proc_ether_port"): return # we get an integer: 1 == Ether port locked, 0 == not locked status = bool(evt.attr_value.value) self.logger.info("%s ETHERNET locked: %s", fqdn, status) with self._callback_lock: self._eth_port_stat[fqdn].proc_ether_locked = status self._enqueue_event(Evt.CALC_ETHER_PERCENT) def _switch_port_to_proc_fqdn( self, name: str, port: str ) -> Union[str, None]: """ Map a switch name and port back to a processor FQDN. :param name: Switch name :param port: Switch port :return: FQDN of the processor connected to this port, or None """ for fqdn, stat in self._eth_port_stat.items(): if stat.sw_name == name and stat.sw_port == port: return fqdn return None def clear_sps_stats_cache(self): """Erase cache of SPS stats, and clear attribute values""" self._sps_stats_nflags_cache = {} self._sps_stats_npkts_cache = {} self._sps_stats_rms_cache = {} self._sps_stats_vchans_cache = {} self._sps_stats_update_time = {} self._sps_stats_push_ctr = 0 self.sps_stats_update( [], [], np.asarray([], dtype=np.uint8), np.asarray([], dtype=np.uint8), np.asarray([], dtype=np.int8), ) def sps_stats_update( self, stn_sstns, bm_freqs, np_rms_xpol, np_rms_ypol, np_flags ): # pylint: disable=too-many-arguments """Update all the SPS statistics attributes""" # save attribute value data so it can be read anytime self._sps_bm_frq = json.dumps(bm_freqs) self._sps_stn_sstn = json.dumps(stn_sstns) # Tango C order vs Numpy For_sps_bm_frqtran order for the following self._sps_stats_rms_xpol = np_rms_xpol.T self._sps_stats_rms_ypol = np_rms_ypol.T self._sps_stats_flags_percent = np_flags.T # update all the SPS-related tango attributes self.push_change_event("sps_beam_freqs", self._sps_bm_frq) self.push_change_event("sps_station_substations", self._sps_stn_sstn) self.push_change_event("sps_stats_rms_xpol", self._sps_stats_rms_xpol) self.push_change_event("sps_stats_rms_ypol", self._sps_stats_rms_ypol) self.push_change_event( "sps_stats_flag_percent", self._sps_stats_flags_percent ) # Archive 1 in N (=sps_stats_push_per_archive) self._sps_stats_push_ctr += 1 if self._sps_stats_push_ctr < self._sps_stats_push_per_archive: self.logger.info("SPS statistics attributes updated") return self._sps_stats_push_ctr = 0 self.push_archive_event("sps_beam_freqs", self._sps_bm_frq) self.push_archive_event("sps_station_substations", self._sps_stn_sstn) self.push_archive_event("sps_stats_rms_xpol", self._sps_stats_rms_xpol) self.push_archive_event("sps_stats_rms_ypol", self._sps_stats_rms_ypol) self.push_archive_event( "sps_stats_flag_percent", self._sps_stats_flags_percent ) self.logger.info("SPS statistics attributes updated & archived") def is_evt_error(self, tango_evt: EventData, cb_name: str) -> bool: """Utility function - check if Tango event has failed. Log warning message with error details :param tango_event: Tango event object :param cb_name: name of the callback function handling event :return: True if event failed, False otherwise """ if tango_evt.err: txt = f"{cb_name} got failed event: stack{tango_evt.errors}" self.logger.warning(txt) return True return False @attribute( dtype=((np.uint8,),), max_dim_x=MAX_STATIONS, max_dim_y=MAX_SPS_CHAN, doc="SPS Xpol RMS levels", ) def sps_stats_rms_xpol(self): """Return latest SPS X-polarisation RMS statistics :return: 2D array (stations * channels) of ``np.uint8`` """ return self._sps_stats_rms_xpol @attribute( dtype=((np.uint8,),), max_dim_x=MAX_STATIONS, max_dim_y=MAX_SPS_CHAN, doc="SPS Ypol RMS levels", ) def sps_stats_rms_ypol(self): """Return latest SPS Y-polarisation RMS statistics :return: 2D array (stations * channels) of ``np.uint8`` """ return self._sps_stats_rms_ypol @attribute( dtype=((np.uint8,),), max_dim_x=MAX_STATIONS, max_dim_y=MAX_SPS_CHAN, doc="SPS flagging percent (0-100, 0xff if no data)", ) def sps_stats_flag_percent(self): """Return latest SPS flagging statistics :return: 2D array (stations * channels) of ``np.uint8``; range: [0, 100]%, 255 if no data """ return self._sps_stats_flags_percent @attribute(dtype=str, doc="SPS stats (station, substation) order") def sps_station_substations(self) -> str: """Return station/substation order used for SPS stats""" return self._sps_stn_sstn @attribute(dtype=str, doc="SPS stats (beam_id, freq_id) order") def sps_beam_freqs(self) -> str: """ :return: beam/freq order used for SPS stats """ return self._sps_bm_frq @attribute( dtype=int, doc="SPS statistics archive interval: 1 in N updates", access=AttrWriteType.READ_WRITE, ) def sps_archive_interval(self) -> int: """Set/get archive interval (1 in N updates)""" return self._sps_stats_push_per_archive def write_sps_archive_interval(self, value: int): """Change archive interval""" self._sps_stats_push_per_archive = value self._sps_stats_push_ctr = value # force archive immediately @attribute( dtype=int, min_value="0", max_value="100", unit="%", doc="Percentage of Ethernet ports (switch and proc.) ready for scan", ) def processorEthernetLockedPercent(self) -> tuple[int, float, AttrQuality]: """ Get a percentage of Ethernet ports (processor/switch) ready for scan. The value is meaningful only when ``obsState`` is READY or SCANNING so we also return Tango attribute quality. :return: tuple * percentage: rounded integer in the range [0, 100], None when ATTR_INVALID * time: float * quality: AttrQuality """ ready = self._is_obs_state((ObsState.READY, ObsState.SCANNING)) value = self._ether_locked_percent if ready else 0 quality = AttrQuality.ATTR_VALID if ready else AttrQuality.ATTR_INVALID return value, time.time(), quality # Run server def main(args=None, **kwargs): """Main function of the LowCbfSubarray module.""" return run((LowCbfSubarray,), args=args, **kwargs) if __name__ == "__main__": main()