Source code for controller.controller_device

# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low CBF project
#
# Copyright (c) 2024 CSIRO
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.

""" SKA Low CBF

Sub-element controller device for Low.CBf
"""
import json
import os
import threading
from collections import deque
from enum import IntEnum
from threading import Condition, Thread
from typing import List, Union

import tango
from ska_tango_base import SKABaseDevice, SKAController
from ska_tango_base.commands import ResultCode
from ska_tango_base.control_model import AdminMode, HealthState
from tango import AttrQuality, AttrWriteType, Database, DevFailed
from tango.server import attribute, device_property, run

from ska_low_cbf import release
from ska_low_cbf.controller.component_manager import (
    LowCbfControllerComponentManager,
)
from ska_low_cbf.controller.controller import SearchBeamBandwidthMode
from ska_low_cbf.device_proxy import MccsDeviceProxy
from ska_low_cbf.events import EventManager

# Tango naming conventions clash with Python conventions...
# pylint: disable=invalid-name,protected-access,too-few-public-methods
# pylint: disable=pointless-string-statement


__all__ = ["LowCbfController", "main"]


class Evt(IntEnum):
    """Enumerate events originating from Allocator"""

    PROC_REGISTERED = 0  # processor registered with the Allocator
    PROC_ALLOCATED = 1  # processor assigned to a subarray


class LowCbfController(SKAController):
    """
    Sub-element controller device for Low.CBf

    **Properties:**

    - Device Property
    """

    ConnectorDbHost = device_property(
        dtype="str", default_value="tango-databaseds"
    )
    ConnectorDbPort = device_property(dtype="int", default_value=10000)

    # Attributes
    searchBeamBandwidthMode = attribute(
        dtype=SearchBeamBandwidthMode,
        access=AttrWriteType.READ_WRITE,
        label="Search Beam Bandwidth Mode",
        doc=(
            "Search Beam Bandwidth Mode is configured at sub-element level "
            " and applies for all the instances of the Capability Search "
            "Beams in all sub-arrays.\n\nSupported modes are listed and "
            "described in Table 9-9\n\nTM can change the value of the "
            "parameter Search Beam Bandwidth Mode only when all the "
            "sub-arrays are IDLE."
        ),
    )

    @attribute(
        dtype=("DevString",),
        max_dim_x=32,
        label="Subelement Subarrays",
        doc="List of Low.CBF SubArray TANGO Device names.",
    )
    def subelementSubarrays(self):
        """
        List of Low.CBF SubArray TANGO Device names.
        """

    @attribute(doc="Table (JSON dict) of Low CBF device healthStates.")
    def health_table(self) -> str:
        """
        Read FQDNs and health states of all monitored devices.

        :return: JSON string
        """
        return json.dumps(
            self.component_manager.controller.device_health_states
        )

    # FIXME - requires pytango v9.4+
    # @attribute(dtype=(HealthState,), max_dim_x=512)
    @attribute(dtype=(int,), max_dim_x=512)
    def health_processors(self) -> List[int]:
        """Health of all Processors."""
        return [
            int(health)
            for health in self.component_manager.controller.health_by_type[
                "LowCbfProcessor"
            ]
        ]
        # FIXME - pytango 9.4+
        # return self.component_manager.controller.health_by_type["LowCbfProcessor"]

    # FIXME - requires pytango v9.4+
    # @attribute(dtype=(HealthState,), max_dim_x=32)
    @attribute(dtype=(int,), max_dim_x=32)
    def health_connectors(self) -> List[int]:
        """Health of all Connectors."""
        return [
            int(health)
            for health in self.component_manager.controller.health_by_type[
                "LowCbfConnector"
            ]
        ]
        # FIXME - pytango 9.4+
        # return self.component_manager.controller.health_by_type["LowCbfConnector"]

    @attribute(dtype=str, doc="A JSON array of all Alveos")
    def all_alveos(self) -> str:
        """Return JSON string listing all discovered Alveos."""
        return json.dumps(list(self._all_alveos.keys()))

    @attribute(dtype=str, doc="A JSON array of available Alveos.")
    def available_alveos(self) -> str:
        """Return JSON string listing all available Alveos."""
        return json.dumps(self._get_available_alveos())

    @attribute(dtype=float, doc="Percentage of SPS links up.")
    def spsLinkUpPercent(self) -> float:
        """
        Return the quality attribute percent SPS.

        Note that this quality attribute needs to be pulled by TMC. This is not
        automatically updated.
        """
        return self._get_percent_sps()

    def _get_percent_sps(self) -> float:
        """Return the percentage of SPS link up"""
        # Need the Allocator for quality attributes
        try:
            allocator_proxy = tango.DeviceProxy(self.allocator_device)
        except tango.DevFailed:
            return 0.0
        return allocator_proxy.GetSPSPercent()

    def _get_available_alveos(self) -> list[str]:
        """Return a list of Alveo serial numbers which are not allocated
        to any subarrays.
        """
        return [
            sn
            for sn in self._all_alveos.keys()
            if sn not in self._unavailable_alveos
        ]

    # Properties (value in database)
    allocator_device = device_property(
        dtype=("DevString",),
        default_value="low-cbf/allocator/0",
        doc="Tango device with allocation info",
    )

    # General methods

    # inherited
    def create_component_manager(self):
        return LowCbfControllerComponentManager(
            logger=self.logger,
            communication_state_callback=self._communication_state_changed,
            component_state_callback=self._component_state_changed,
        )

    def always_executed_hook(self):
        """Method always executed before any TANGO command is executed."""

    def delete_device(self):
        """Hook to delete resources allocated in init_device.

        This method allows for any memory or other resources allocated in the
        init_device method to be released.  This method is called by the device
        destructor and by the device Init command.
        """
        self.set_state(tango.DevState.OFF)

    def subarray_event(self, fqdn, attr_name, value, quality):
        """ "
        Process an event received from a Subarray
        Function call examples:
           low-cbf/subarray/01, adminMode, 2, ATTR_VALID
           low-cbf/subarray/01, State, ALARM, ATTR_VALID
        """

        self.logger.info(
            "Subarray Event: %s, %s, %s, %s", fqdn, attr_name, value, quality
        )
        n = int(fqdn.split("/")[-1])
        if quality == tango.AttrQuality.ATTR_INVALID:
            value = None
        self.component_manager.controller._subarrays[n][attr_name] = value

    # Attributes methods
[docs] def read_searchBeamBandwidthMode(self) -> SearchBeamBandwidthMode: """ Search Beam Bandwidth Mode is configured at sub-element level and applies for all the instances of the Capability Search Beams in all sub-arrays. Supported modes are listed and described in Table 9-9 TM can change the value of the parameter Search Beam Bandwidth Mode only when all the sub-arrays are IDLE. :return: the searchBeamBandwidthMode attribute (SINGLE, DOUBLE) """ return self.component_manager.controller.search_beam_bandwidth_mode
[docs] def write_searchBeamBandwidthMode(self, value: SearchBeamBandwidthMode): """Set the searchBeamBandwidthMode attribute. :param value: SINGLE, DOUBLE """ self.component_manager.controller.search_beam_bandwidth_mode = value
def read_subelementSubarrays(self): """Return the subelementSubarrays attribute.""" return self._subelement_subarrays def _update_admin_mode(self: SKABaseDevice, admin_mode: AdminMode) -> None: """Override adminMode change callback.""" # pylint: disable=attribute-defined-outside-init if admin_mode == self._admin_mode: return # nothing to do # healthState reporting is adminMode dependent e.g UNKNOWN while # adminMode is OFFLINE self._adjust_health(admin_mode) self._admin_mode = admin_mode for func in (self.push_change_event, self.push_archive_event): func("adminMode", admin_mode) if self.is_monitoring_mode(): current_health = self.component_manager.controller.health_state if current_health != self.healthState: self._update_health_state(current_health) if not self._health_subscribed: self._subscribe_to_health() def _adjust_health(self, admin_mode: AdminMode) -> None: """Set ``healthState`` to UNKNOWN when switching to e.g. OFFLINE MODE Add log entry about ``healthState`` tracking. This is called just before ``adminMode`` change. :param admin_mode: the ``adminMode`` we are about to transition to """ monitoring_now = self.is_monitoring_mode() monitoring_next = self.is_monitoring_mode(admin_mode) if not monitoring_now and monitoring_next: msg = f"Entering {admin_mode.name} mode: will track healthState" self.logger.info(msg) # if currently in a monitoring mode but won't be after this change # change the healthState to UNKNOWN elif monitoring_now and not monitoring_next: msg = f"Entering {admin_mode.name} mode: won't track healthState" self.logger.info(msg) self._update_health_state(HealthState.UNKNOWN) def _subscribe_to_health(self): """Subscribe to healthState of all Low CBF hardware devices.""" try: db = Database() except DevFailed: # unit tests won't have the DB deployed and that's ok return device_type = "LowCbfProcessor" # few short aliases: em = self._event_manager cbk_fn = self._health_callback add_health_device = self.component_manager.controller.add_health_device for fqdn in db.get_device_exported_for_class(device_type): add_health_device(device_type, fqdn) self.logger.info(f"SUBSCRIBING to {fqdn}:healthState") em.register_callback(cbk_fn, fqdn, "healthState") # for Connector device(s) we need to consult it's own db try: db = Database(self.ConnectorDbHost, self.ConnectorDbPort) except Exception as e: # if we got this far this shouldn't be running as unit/CI test self.logger.error(f"Connector DB EXCEPTION {e}") return device_type = "LowCbfConnector" for device_name in db.get_device_exported_for_class(device_type): add_health_device(device_type, device_name) fqdn = self.conn_db_prefix + device_name self.logger.info(f"SUBSCRIBING to {fqdn}:healthState") em.register_callback(cbk_fn, fqdn, "healthState") # pylint: disable=attribute-defined-outside-init self._health_subscribed = True def subscribe_to_allocator(self, tango_dev_name: str) -> None: """ Subscribe to Alveo related allocator attribute (internal_alveo, procDevFqdn) changes. :param tango_dev_name: eg 'low-cbf/allocator/0' """ alloc_proxy = MccsDeviceProxy( tango_dev_name, self.logger, connect=False ) for attr_name in ("internal_alveo", "procDevFqdn"): self.logger.info(f"SUBSCRIBE to {attr_name} from {tango_dev_name}") alloc_proxy.evt_sub_on_connect( attr_name, self._handle_allocator_callback ) alloc_proxy.connect(max_time=120) def _subscribe_to_proc_admin_mode(self, proc_dict): """Subscribe to Processor adminMode changes so we can flag ENGINEERING/OFFLINE Alveos as unavailable :param proc_dict: e.g {"XFL10NIYKVEU": "low-cbf/processor/0.0.0", ...} """ def is_subscribed_to(fqdn): return fqdn in self._proc_subscribed em = self._event_manager for fqdn in proc_dict.values(): if is_subscribed_to(fqdn): continue self.logger.info(f"SUBSCRIBING to {fqdn}:adminMode") em.register_callback(self._proc_adminmode_event, fqdn, "adminMode") self._proc_subscribed.append(fqdn) def is_monitoring_mode(self, mode: Union[AdminMode, None] = None) -> bool: """Determine if we are in monitoring mode In monitoring mode we keep track of healthState and report its changes. ONLINE and ENGINEERING admin modes are considered monitoring while NOT_FITTED, OFFLINE and RESERVED are not. See RtD https://is.gd/nCuv1q :param mode: adminMode we are about to transition to; when not supplied use the current ``self._admin_mode`` value instead :return: True when in monitoring mode; False otherwise """ value = self._admin_mode if mode is None else mode return value in self._admin_modes_using_health # ---------- # Commands # ---------- def init_command_objects(self): """ Initialises the command handlers for commands supported by this device. """ # pylint: disable=useless-super-delegation super().init_command_objects() class InitCommand(SKAController.InitCommand): """Init Command class""" def do(self): """ Initialises the attributes and properties of the LowCbfController. """ super().do() self._device._version_id = release.version self._device._build_state = ( f"{release.name}, {release.version}, {release.description}" ) self._device.component_manager.controller.search_beam_bandwidth_mode = ( SearchBeamBandwidthMode.SINGLE ) self._device.set_change_event("healthState", True, False) self._device.set_archive_event("healthState", True, False) self._device._event_manager = EventManager( self._device.logger, events=["healthState", "adminMode"] ) self._device._callback_lock = threading.Lock() self._device._health_subscribed = False """A flag indicating we already subscribed to health events""" # Keep track of all known Alveos - {serial_nr: fqdn} self._device._all_alveos = {} # Keep track of Alveos (serial numbers) that can't be used self._device._unavailable_alveos = set() self._device._admin_modes_using_health = [AdminMode.ONLINE] """adminMode values for which we report changes in healthState""" # environment variable determines if healthState is ignored # in ENGINEERING mode ignore_engineering = os.getenv( "ENGINEERING_MODE_IGNORE_HEALTH", default="false" ) if ignore_engineering.lower() != "true": self._device._admin_modes_using_health.append( AdminMode.ENGINEERING ) # processor FQDNs to which we're already subscribed self._device._proc_subscribed = [] # a queue to serialise allocator events self._device._deque = deque() # Synchronise producer/consumer without time.sleep() self._device._callback_cond_var = Condition() # thread to process callback events; needed as we subscribe to # processor's adminMode changes but we're notified about the # processor being available in an allocator callback - a potential # for chained callbacks can lock up execution of this module self._device._subelement_subarrays = "" self._device._thread = Thread( target=self._device._handle_queued_events ) self._device._thread.start() # update subscribers on attribute change: for attrib in ("all_alveos", "available_alveos"): self._device.set_change_event(attrib, True, False) self._device.set_archive_event(attrib, True, False) pre = f"{self._device.ConnectorDbHost}:{self._device.ConnectorDbPort}/" self._device.conn_db_prefix = pre self._device.logger.info( f"Using Connector DB:{self._device.conn_db_prefix}" ) self._device.subscribe_to_allocator(self._device.allocator_device) message = "LowCbfController init complete" self._device.logger.info(message) return ResultCode.OK, message # ---------- # Callbacks # ---------- def _health_callback(self, fqdn: str, name: str, value, quality) -> None: """ Update an entry in the health state table & re-evaluate overall health state. Called when a subscribed device's healthState changes. """ with self._callback_lock: self.logger.info(f"FROM {fqdn} {name} val: {value} Q: {quality}") fqdn = fqdn.removeprefix(self.conn_db_prefix) self.component_manager.controller.update_health_state( fqdn, value, quality == AttrQuality.ATTR_VALID ) # updates reported only while in monitoring mode if not self.is_monitoring_mode(): return previous_health = self._health_state new_health = self.component_manager.controller.health_state if new_health != previous_health: self._update_health_state(new_health) def _handle_allocator_callback( self, attr_name: str, evt_json: str, quality: AttrQuality ) -> None: """ Handle allocator events triggred by a new Alveo discovery or state change (e.g. assigned to a subarray) :param attr_name: attribute name (procDevFqdn, internal_alveo) :param evt_json: event details as JSON string :param quality: ATTR_VALID when OK, `see <https://is.gd/nTVEzR>`_ """ self.logger.info(f"GOT {attr_name} val: {evt_json} Q: {quality}") if quality != AttrQuality.ATTR_VALID: self.logger.warning(f"Invalid quality {quality}") return fsp_dict = json.loads(evt_json) match attr_name.lower(): case "procdevfqdn": # expecting evt_json: # {"XFL10NIYKVEU": "low-cbf/processor/0.0.0", ...} # pylint: disable=attribute-defined-outside-init if not fsp_dict: # ignore empty dict return self._all_alveos = fsp_dict # notify subscribers of value change alveos_str = json.dumps(list(self._all_alveos.keys())) avail_alveos_str = json.dumps(self._get_available_alveos()) for attr, val in ( ("all_alveos", alveos_str), ("available_alveos", avail_alveos_str), ): self.push_change_event(attr, val) self.push_archive_event(attr, val) with self._callback_cond_var: self._deque.append({Evt.PROC_REGISTERED: fsp_dict}) self._callback_cond_var.notify() case "internal_alveo": # Expecting evt_json: # {"XFL1TJCHM3ON": {"fw": ...}, "XFL1E35JVJTQ": {"fw"..}..} # pylint: disable=attribute-defined-outside-init # unavailable Alveos are simply those that are already used self._unavailable_alveos = set(fsp_dict.keys()) # notify subscribers of value change avail_alveos_str = json.dumps(self._get_available_alveos()) self.push_change_event("available_alveos", avail_alveos_str) self.push_archive_event("available_alveos", avail_alveos_str) self.logger.info( f"UNAVAILABLE ALVEOS {self._unavailable_alveos}" ) case _: self.logger.warning("UNKNOWN attribute") def _proc_adminmode_event( self, fqdn: str, name: str, admin_mode, quality: AttrQuality ) -> None: """Called by LowCbfProcessor when adminMode changes. :param fqdn: Tango device name e.g. "low-cbf/processor/0.0.0" :param name: Tango attribute name e.g. "adminMode" :param admin_mode: AdminMode enumeration `see <https://is.gd/nCuv1q>`_ :param quality: AttrQuality enumeration `see <https://is.gd/nTVEzR>`_ """ with self._callback_lock: self.logger.info(f"PROC {fqdn} {name}={admin_mode} Q: {quality}") if fqdn not in self._all_alveos.values(): self.logger.error("UNKNOWN Alveo") return if admin_mode == AdminMode.ONLINE: # remove from "unavailable Alveos" list (if applicable) for sn, dev_name in self._all_alveos.items(): if dev_name == fqdn: if sn in self._unavailable_alveos: self._unavailable_alveos.remove(sn) break else: # no longer ONLINE, flag as unavailable for sn, dev_name in self._all_alveos.items(): if dev_name == fqdn: self._unavailable_alveos.add(sn) break self.logger.info(f"UNAVAILABLE Alveos {self._unavailable_alveos}") def _handle_queued_events(self): """A thread handling queued up allocator events. Blocks on _callback condition variable - a producer will unblock it. """ cv = self._callback_cond_var while True: # keep the condition variable context small in order to avoid # race condition with cv: while len(self._deque) == 0: cv.wait() item = self._deque.popleft() if not isinstance(item, dict): self.logger.error(f"ERROR need dict, got {item}") continue key, val = item.popitem() match key: case Evt.PROC_REGISTERED: self._subscribe_to_proc_admin_mode(val) case _: self.logger.warning(f"UNKNOWN event {key}") # Run server def main(args=None, **kwargs): """Main function of the LowCbfController module.""" return run((LowCbfController,), args=args, **kwargs) if __name__ == "__main__": main()