# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low CBF project
#
# Copyright (c) 2024 CSIRO
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
""" SKA Low CBF
Sub-element controller device for Low.CBf
"""
import json
import os
import threading
from collections import deque
from enum import IntEnum
from threading import Condition, Thread
from typing import List, Union
import tango
from ska_tango_base import SKABaseDevice, SKAController
from ska_tango_base.commands import ResultCode
from ska_tango_base.control_model import AdminMode, HealthState
from tango import AttrQuality, AttrWriteType, Database, DevFailed
from tango.server import attribute, device_property, run
from ska_low_cbf import release
from ska_low_cbf.controller.component_manager import (
LowCbfControllerComponentManager,
)
from ska_low_cbf.controller.controller import SearchBeamBandwidthMode
from ska_low_cbf.device_proxy import MccsDeviceProxy
from ska_low_cbf.events import EventManager
# Tango naming conventions clash with Python conventions...
# pylint: disable=invalid-name,protected-access,too-few-public-methods
# pylint: disable=pointless-string-statement
__all__ = ["LowCbfController", "main"]
class Evt(IntEnum):
"""Enumerate events originating from Allocator"""
PROC_REGISTERED = 0 # processor registered with the Allocator
PROC_ALLOCATED = 1 # processor assigned to a subarray
class LowCbfController(SKAController):
"""
Sub-element controller device for Low.CBf
**Properties:**
- Device Property
"""
ConnectorDbHost = device_property(
dtype="str", default_value="tango-databaseds"
)
ConnectorDbPort = device_property(dtype="int", default_value=10000)
# Attributes
searchBeamBandwidthMode = attribute(
dtype=SearchBeamBandwidthMode,
access=AttrWriteType.READ_WRITE,
label="Search Beam Bandwidth Mode",
doc=(
"Search Beam Bandwidth Mode is configured at sub-element level "
" and applies for all the instances of the Capability Search "
"Beams in all sub-arrays.\n\nSupported modes are listed and "
"described in Table 9-9\n\nTM can change the value of the "
"parameter Search Beam Bandwidth Mode only when all the "
"sub-arrays are IDLE."
),
)
@attribute(
dtype=("DevString",),
max_dim_x=32,
label="Subelement Subarrays",
doc="List of Low.CBF SubArray TANGO Device names.",
)
def subelementSubarrays(self):
"""
List of Low.CBF SubArray TANGO Device names.
"""
@attribute(doc="Table (JSON dict) of Low CBF device healthStates.")
def health_table(self) -> str:
"""
Read FQDNs and health states of all monitored devices.
:return: JSON string
"""
return json.dumps(
self.component_manager.controller.device_health_states
)
# FIXME - requires pytango v9.4+
# @attribute(dtype=(HealthState,), max_dim_x=512)
@attribute(dtype=(int,), max_dim_x=512)
def health_processors(self) -> List[int]:
"""Health of all Processors."""
return [
int(health)
for health in self.component_manager.controller.health_by_type[
"LowCbfProcessor"
]
]
# FIXME - pytango 9.4+
# return self.component_manager.controller.health_by_type["LowCbfProcessor"]
# FIXME - requires pytango v9.4+
# @attribute(dtype=(HealthState,), max_dim_x=32)
@attribute(dtype=(int,), max_dim_x=32)
def health_connectors(self) -> List[int]:
"""Health of all Connectors."""
return [
int(health)
for health in self.component_manager.controller.health_by_type[
"LowCbfConnector"
]
]
# FIXME - pytango 9.4+
# return self.component_manager.controller.health_by_type["LowCbfConnector"]
@attribute(dtype=str, doc="A JSON array of all Alveos")
def all_alveos(self) -> str:
"""Return JSON string listing all discovered Alveos."""
return json.dumps(list(self._all_alveos.keys()))
@attribute(dtype=str, doc="A JSON array of available Alveos.")
def available_alveos(self) -> str:
"""Return JSON string listing all available Alveos."""
return json.dumps(self._get_available_alveos())
@attribute(dtype=float, doc="Percentage of SPS links up.")
def spsLinkUpPercent(self) -> float:
"""
Return the quality attribute percent SPS.
Note that this quality attribute needs to be pulled by TMC. This is not
automatically updated.
"""
return self._get_percent_sps()
def _get_percent_sps(self) -> float:
"""Return the percentage of SPS link up"""
# Need the Allocator for quality attributes
try:
allocator_proxy = tango.DeviceProxy(self.allocator_device)
except tango.DevFailed:
return 0.0
return allocator_proxy.GetSPSPercent()
def _get_available_alveos(self) -> list[str]:
"""Return a list of Alveo serial numbers which are not allocated
to any subarrays.
"""
return [
sn
for sn in self._all_alveos.keys()
if sn not in self._unavailable_alveos
]
# Properties (value in database)
allocator_device = device_property(
dtype=("DevString",),
default_value="low-cbf/allocator/0",
doc="Tango device with allocation info",
)
# General methods
# inherited
def create_component_manager(self):
return LowCbfControllerComponentManager(
logger=self.logger,
communication_state_callback=self._communication_state_changed,
component_state_callback=self._component_state_changed,
)
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command.
"""
self.set_state(tango.DevState.OFF)
def subarray_event(self, fqdn, attr_name, value, quality):
""" "
Process an event received from a Subarray
Function call examples:
low-cbf/subarray/01, adminMode, 2, ATTR_VALID
low-cbf/subarray/01, State, ALARM, ATTR_VALID
"""
self.logger.info(
"Subarray Event: %s, %s, %s, %s", fqdn, attr_name, value, quality
)
n = int(fqdn.split("/")[-1])
if quality == tango.AttrQuality.ATTR_INVALID:
value = None
self.component_manager.controller._subarrays[n][attr_name] = value
# Attributes methods
[docs] def read_searchBeamBandwidthMode(self) -> SearchBeamBandwidthMode:
"""
Search Beam Bandwidth Mode is configured at sub-element level and
applies for all the instances of the Capability Search Beams in
all sub-arrays.
Supported modes are listed and described in Table 9-9
TM can change the value of the parameter Search Beam Bandwidth Mode
only when all the sub-arrays are IDLE.
:return: the searchBeamBandwidthMode attribute (SINGLE, DOUBLE)
"""
return self.component_manager.controller.search_beam_bandwidth_mode
[docs] def write_searchBeamBandwidthMode(self, value: SearchBeamBandwidthMode):
"""Set the searchBeamBandwidthMode attribute.
:param value: SINGLE, DOUBLE
"""
self.component_manager.controller.search_beam_bandwidth_mode = value
def read_subelementSubarrays(self):
"""Return the subelementSubarrays attribute."""
return self._subelement_subarrays
def _update_admin_mode(self: SKABaseDevice, admin_mode: AdminMode) -> None:
"""Override adminMode change callback."""
# pylint: disable=attribute-defined-outside-init
if admin_mode == self._admin_mode:
return # nothing to do
# healthState reporting is adminMode dependent e.g UNKNOWN while
# adminMode is OFFLINE
self._adjust_health(admin_mode)
self._admin_mode = admin_mode
for func in (self.push_change_event, self.push_archive_event):
func("adminMode", admin_mode)
if self.is_monitoring_mode():
current_health = self.component_manager.controller.health_state
if current_health != self.healthState:
self._update_health_state(current_health)
if not self._health_subscribed:
self._subscribe_to_health()
def _adjust_health(self, admin_mode: AdminMode) -> None:
"""Set ``healthState`` to UNKNOWN when switching to e.g. OFFLINE MODE
Add log entry about ``healthState`` tracking. This is called just
before ``adminMode`` change.
:param admin_mode: the ``adminMode`` we are about to transition to
"""
monitoring_now = self.is_monitoring_mode()
monitoring_next = self.is_monitoring_mode(admin_mode)
if not monitoring_now and monitoring_next:
msg = f"Entering {admin_mode.name} mode: will track healthState"
self.logger.info(msg)
# if currently in a monitoring mode but won't be after this change
# change the healthState to UNKNOWN
elif monitoring_now and not monitoring_next:
msg = f"Entering {admin_mode.name} mode: won't track healthState"
self.logger.info(msg)
self._update_health_state(HealthState.UNKNOWN)
def _subscribe_to_health(self):
"""Subscribe to healthState of all Low CBF hardware devices."""
try:
db = Database()
except DevFailed:
# unit tests won't have the DB deployed and that's ok
return
device_type = "LowCbfProcessor"
# few short aliases:
em = self._event_manager
cbk_fn = self._health_callback
add_health_device = self.component_manager.controller.add_health_device
for fqdn in db.get_device_exported_for_class(device_type):
add_health_device(device_type, fqdn)
self.logger.info(f"SUBSCRIBING to {fqdn}:healthState")
em.register_callback(cbk_fn, fqdn, "healthState")
# for Connector device(s) we need to consult it's own db
try:
db = Database(self.ConnectorDbHost, self.ConnectorDbPort)
except Exception as e:
# if we got this far this shouldn't be running as unit/CI test
self.logger.error(f"Connector DB EXCEPTION {e}")
return
device_type = "LowCbfConnector"
for device_name in db.get_device_exported_for_class(device_type):
add_health_device(device_type, device_name)
fqdn = self.conn_db_prefix + device_name
self.logger.info(f"SUBSCRIBING to {fqdn}:healthState")
em.register_callback(cbk_fn, fqdn, "healthState")
# pylint: disable=attribute-defined-outside-init
self._health_subscribed = True
def subscribe_to_allocator(self, tango_dev_name: str) -> None:
"""
Subscribe to Alveo related allocator attribute (internal_alveo,
procDevFqdn) changes.
:param tango_dev_name: eg 'low-cbf/allocator/0'
"""
alloc_proxy = MccsDeviceProxy(
tango_dev_name, self.logger, connect=False
)
for attr_name in ("internal_alveo", "procDevFqdn"):
self.logger.info(f"SUBSCRIBE to {attr_name} from {tango_dev_name}")
alloc_proxy.evt_sub_on_connect(
attr_name, self._handle_allocator_callback
)
alloc_proxy.connect(max_time=120)
def _subscribe_to_proc_admin_mode(self, proc_dict):
"""Subscribe to Processor adminMode changes so we can flag
ENGINEERING/OFFLINE Alveos as unavailable
:param proc_dict: e.g {"XFL10NIYKVEU": "low-cbf/processor/0.0.0", ...}
"""
def is_subscribed_to(fqdn):
return fqdn in self._proc_subscribed
em = self._event_manager
for fqdn in proc_dict.values():
if is_subscribed_to(fqdn):
continue
self.logger.info(f"SUBSCRIBING to {fqdn}:adminMode")
em.register_callback(self._proc_adminmode_event, fqdn, "adminMode")
self._proc_subscribed.append(fqdn)
def is_monitoring_mode(self, mode: Union[AdminMode, None] = None) -> bool:
"""Determine if we are in monitoring mode
In monitoring mode we keep track of healthState and report its
changes.
ONLINE and ENGINEERING admin modes are considered monitoring while
NOT_FITTED, OFFLINE and RESERVED are not.
See RtD https://is.gd/nCuv1q
:param mode: adminMode we are about to transition to; when not
supplied use the current ``self._admin_mode`` value instead
:return: True when in monitoring mode; False otherwise
"""
value = self._admin_mode if mode is None else mode
return value in self._admin_modes_using_health
# ----------
# Commands
# ----------
def init_command_objects(self):
"""
Initialises the command handlers for commands supported by this device.
"""
# pylint: disable=useless-super-delegation
super().init_command_objects()
class InitCommand(SKAController.InitCommand):
"""Init Command class"""
def do(self):
"""
Initialises the attributes and properties of the LowCbfController.
"""
super().do()
self._device._version_id = release.version
self._device._build_state = (
f"{release.name}, {release.version}, {release.description}"
)
self._device.component_manager.controller.search_beam_bandwidth_mode = (
SearchBeamBandwidthMode.SINGLE
)
self._device.set_change_event("healthState", True, False)
self._device.set_archive_event("healthState", True, False)
self._device._event_manager = EventManager(
self._device.logger, events=["healthState", "adminMode"]
)
self._device._callback_lock = threading.Lock()
self._device._health_subscribed = False
"""A flag indicating we already subscribed to health events"""
# Keep track of all known Alveos - {serial_nr: fqdn}
self._device._all_alveos = {}
# Keep track of Alveos (serial numbers) that can't be used
self._device._unavailable_alveos = set()
self._device._admin_modes_using_health = [AdminMode.ONLINE]
"""adminMode values for which we report changes in healthState"""
# environment variable determines if healthState is ignored
# in ENGINEERING mode
ignore_engineering = os.getenv(
"ENGINEERING_MODE_IGNORE_HEALTH", default="false"
)
if ignore_engineering.lower() != "true":
self._device._admin_modes_using_health.append(
AdminMode.ENGINEERING
)
# processor FQDNs to which we're already subscribed
self._device._proc_subscribed = []
# a queue to serialise allocator events
self._device._deque = deque()
# Synchronise producer/consumer without time.sleep()
self._device._callback_cond_var = Condition()
# thread to process callback events; needed as we subscribe to
# processor's adminMode changes but we're notified about the
# processor being available in an allocator callback - a potential
# for chained callbacks can lock up execution of this module
self._device._subelement_subarrays = ""
self._device._thread = Thread(
target=self._device._handle_queued_events
)
self._device._thread.start()
# update subscribers on attribute change:
for attrib in ("all_alveos", "available_alveos"):
self._device.set_change_event(attrib, True, False)
self._device.set_archive_event(attrib, True, False)
pre = f"{self._device.ConnectorDbHost}:{self._device.ConnectorDbPort}/"
self._device.conn_db_prefix = pre
self._device.logger.info(
f"Using Connector DB:{self._device.conn_db_prefix}"
)
self._device.subscribe_to_allocator(self._device.allocator_device)
message = "LowCbfController init complete"
self._device.logger.info(message)
return ResultCode.OK, message
# ----------
# Callbacks
# ----------
def _health_callback(self, fqdn: str, name: str, value, quality) -> None:
"""
Update an entry in the health state table & re-evaluate overall health state.
Called when a subscribed device's healthState changes.
"""
with self._callback_lock:
self.logger.info(f"FROM {fqdn} {name} val: {value} Q: {quality}")
fqdn = fqdn.removeprefix(self.conn_db_prefix)
self.component_manager.controller.update_health_state(
fqdn, value, quality == AttrQuality.ATTR_VALID
)
# updates reported only while in monitoring mode
if not self.is_monitoring_mode():
return
previous_health = self._health_state
new_health = self.component_manager.controller.health_state
if new_health != previous_health:
self._update_health_state(new_health)
def _handle_allocator_callback(
self, attr_name: str, evt_json: str, quality: AttrQuality
) -> None:
"""
Handle allocator events triggred by a new Alveo discovery or state change
(e.g. assigned to a subarray)
:param attr_name: attribute name (procDevFqdn, internal_alveo)
:param evt_json: event details as JSON string
:param quality: ATTR_VALID when OK, `see <https://is.gd/nTVEzR>`_
"""
self.logger.info(f"GOT {attr_name} val: {evt_json} Q: {quality}")
if quality != AttrQuality.ATTR_VALID:
self.logger.warning(f"Invalid quality {quality}")
return
fsp_dict = json.loads(evt_json)
match attr_name.lower():
case "procdevfqdn":
# expecting evt_json:
# {"XFL10NIYKVEU": "low-cbf/processor/0.0.0", ...}
# pylint: disable=attribute-defined-outside-init
if not fsp_dict: # ignore empty dict
return
self._all_alveos = fsp_dict
# notify subscribers of value change
alveos_str = json.dumps(list(self._all_alveos.keys()))
avail_alveos_str = json.dumps(self._get_available_alveos())
for attr, val in (
("all_alveos", alveos_str),
("available_alveos", avail_alveos_str),
):
self.push_change_event(attr, val)
self.push_archive_event(attr, val)
with self._callback_cond_var:
self._deque.append({Evt.PROC_REGISTERED: fsp_dict})
self._callback_cond_var.notify()
case "internal_alveo":
# Expecting evt_json:
# {"XFL1TJCHM3ON": {"fw": ...}, "XFL1E35JVJTQ": {"fw"..}..}
# pylint: disable=attribute-defined-outside-init
# unavailable Alveos are simply those that are already used
self._unavailable_alveos = set(fsp_dict.keys())
# notify subscribers of value change
avail_alveos_str = json.dumps(self._get_available_alveos())
self.push_change_event("available_alveos", avail_alveos_str)
self.push_archive_event("available_alveos", avail_alveos_str)
self.logger.info(
f"UNAVAILABLE ALVEOS {self._unavailable_alveos}"
)
case _:
self.logger.warning("UNKNOWN attribute")
def _proc_adminmode_event(
self, fqdn: str, name: str, admin_mode, quality: AttrQuality
) -> None:
"""Called by LowCbfProcessor when adminMode changes.
:param fqdn: Tango device name e.g. "low-cbf/processor/0.0.0"
:param name: Tango attribute name e.g. "adminMode"
:param admin_mode: AdminMode enumeration `see <https://is.gd/nCuv1q>`_
:param quality: AttrQuality enumeration `see <https://is.gd/nTVEzR>`_
"""
with self._callback_lock:
self.logger.info(f"PROC {fqdn} {name}={admin_mode} Q: {quality}")
if fqdn not in self._all_alveos.values():
self.logger.error("UNKNOWN Alveo")
return
if admin_mode == AdminMode.ONLINE:
# remove from "unavailable Alveos" list (if applicable)
for sn, dev_name in self._all_alveos.items():
if dev_name == fqdn:
if sn in self._unavailable_alveos:
self._unavailable_alveos.remove(sn)
break
else:
# no longer ONLINE, flag as unavailable
for sn, dev_name in self._all_alveos.items():
if dev_name == fqdn:
self._unavailable_alveos.add(sn)
break
self.logger.info(f"UNAVAILABLE Alveos {self._unavailable_alveos}")
def _handle_queued_events(self):
"""A thread handling queued up allocator events. Blocks on _callback
condition variable - a producer will unblock it.
"""
cv = self._callback_cond_var
while True:
# keep the condition variable context small in order to avoid
# race condition
with cv:
while len(self._deque) == 0:
cv.wait()
item = self._deque.popleft()
if not isinstance(item, dict):
self.logger.error(f"ERROR need dict, got {item}")
continue
key, val = item.popitem()
match key:
case Evt.PROC_REGISTERED:
self._subscribe_to_proc_admin_mode(val)
case _:
self.logger.warning(f"UNKNOWN event {key}")
# Run server
def main(args=None, **kwargs):
"""Main function of the LowCbfController module."""
return run((LowCbfController,), args=args, **kwargs)
if __name__ == "__main__":
main()