Source code for ska_low_cbf.allocator.allocator_device

# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low CBF project
#
# Copyright (c) 2021, CSIRO
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
# pylint: disable=invalid-name,protected-access,too-few-public-methods

""" SKA Low CBF

LowCbfAllocator is responsible for assignment of the key processing hardware
items that will exist in Low.CBF to Subarrays.
"""

import json
import os
import time

from ska_tango_base import SKABaseDevice
from ska_tango_base.commands import FastCommand, ResultCode
from tango import AttrWriteType, Database, DebugIt
from tango.server import attribute, command, device_property, run

from ska_low_cbf import release
from ska_low_cbf.allocator.arp_replies import ArpReplies

# from ska_low_cbf.allocator.capabilities import Capabilities
from ska_low_cbf.allocator.arp_subscriber import ArpSubscriber
from ska_low_cbf.allocator.component_manager import AllocatorComponentManager
from ska_low_cbf.allocator.default_connections import default_connection_list
from ska_low_cbf.allocator.resources import Resources
from ska_low_cbf.device_proxy import MccsDeviceProxy

__all__ = ["LowCbfAllocator", "main"]


# TODO - need some better scheme to map the IDs used by the allocator to the
#  TANGO FQDNs
alveo_id_fqdn = {1: "low-cbf/processor/0.0.0"}

# TODO this config should be a device property, overridden by Helm chart
array_config = {
    "stations": 512,
    "channels": 384,
}


class LowCbfAllocator(
    SKABaseDevice
):  # pylint: disable=too-many-public-methods
    """
    Allocator is responsible for assignment of the key processing hardware
    items that will exist in Low.CBF to Subarrays.
    """

    # Properties (value in database, loaded by helm chart)
    hardware_connections = device_property(
        dtype=("DevString",),
        mandatory=False,  # if not present, reads test_connections.py
        doc="List of P4 switch to Alveo and I/O hardware cabling",
    )

    ConnectorAddress = device_property(
        dtype="DevString",
        mandatory=False,  # if not present, do not subscribe to ARP replies
        doc="Connector Tango device address. ARP replies will be subscribed to if an address is given.",
    )

    # Attributes
    @attribute(
        dtype="DevULong",
        label="Allocation Version Counter",
        doc="Increments for each subarray resourcing change",
    )
    def allocationVersionCounter(self):
        """Return the allocationVersionCounter attribute."""
        return self._allocation_version_counter

    @attribute(
        dtype=("DevULong",),
        max_dim_x=512,
        label="Processor Update List",
        polling_period=3000,
        doc=(
            "Array of Processor configuration versions, "
            "one element per Processor, in the same order as processorIDs"
        ),
    )
    def processorUpdate(self):
        """Return the processorUpdate attribute."""
        return self._processor_update

    @attribute(
        dtype=("DevULong",),
        max_dim_x=512,
        label="Connector Update List",
        polling_period=3000,
        doc=(
            "Array of Connector configuration versions, one element per "
            "Connector, in the same order as connectorIDs"
        ),
    )
    def connectorUpdate(self):
        """Return the connectorUpdate attribute."""
        return self._connector_update

    @attribute(
        dtype=("DevString",),
        max_dim_x=512,
        label="Connector Identifiers",
        polling_period=3000,
        doc=(
            "Array of Connector identifiers, in the same order as "
            "connectorUpdate"
        ),
    )
    def connectorIDs(self):
        """Return the connectorIDs attribute."""
        return self._connector_ids

    @attribute(
        dtype=("DevString",),
        max_dim_x=512,
        label="Processor Identifiers",
        polling_period=3000,
        doc=(
            "Array of Processor identifiers, in the same order as "
            "processorUpdate"
        ),
    )
    def processorIDs(self):
        """Return the processorIDs attribute."""
        return self._processor_ids

    @attribute(
        label="P4 resource table",
        doc="The P4 resource table as JSON string.",
    )
    def resourceTableP4(self) -> str:
        """Return the P4 resource table attribute.

        :return: JSON string
        """
        p4_table = self._resource_mgr.get_p4s_as_table()
        return json.dumps(p4_table)

    @attribute(label="processor device fqdn")
    def procDevFqdn(self) -> str:
        """Return a JSON string representation of dictionary mapping Alveo
        serial numbers to Tango device fqdn, e.g.

        .. code:: python

          {
             "XFL1XCRTUC22": "low-cbf/processor/0.0.0",
             "XFL1TJCHM3ON": "low-cbf/processor/0.0.1"
          }
        """
        return json.dumps(self._proc_dev_fqdn)

    def __init__(self, *args, **kwargs):
        self._arp_subscriber = None
        # SKA base class can now run InitCommand.do
        super().__init__(*args, **kwargs)

    # General methods
    def create_component_manager(self):
        return AllocatorComponentManager(
            logger=self.logger,
            communication_state_callback=self._communication_state_changed,
            component_state_callback=self._component_state_changed,
        )

    def always_executed_hook(self):
        """Method always executed before any TANGO command is executed."""

    def delete_device(self):
        """Hook to delete resources allocated in init_device.

        This method allows for any memory or other resources allocated in the
        init_device method to be released.  This method is called by the device
        destructor and by the device Init command.
        """

    @attribute
    def internal_alveo(self) -> str:
        """
        Get representation of internal state of all Processor devices
        that are configured with firmware (no firmware, no entry here)

        :return: JSON string containing a dictionary

                 * key = alveo serial number;
                 * value = dict containing keys ``fw``, ``regs``.

                 ``fw`` value is a dict with ``personality`` and ``url`` keys
                 (each strings).
        """
        internal_repr = self._resource_mgr.get_internal_repr()
        return json.dumps(internal_repr)

    @attribute
    def internal_subarray(self) -> str:
        """
        Get representation of internal state of all configured subarrays
        (no configuration, no entry in dict)

        :return: JSON string containing a dictionary

                 * key = ``subarray_id`` number
                 * value = subarray definition (also a dict)
        """

        subarray_repr = self._resource_mgr.get_subarray_repr()
        return json.dumps(subarray_repr)

    @attribute
    def p4_stn_routes(self) -> str:
        """
        Get route info needed by P4 switches to route SPS packets

        :return: JSON string
        """
        routes = self._resource_mgr.get_stn_routes()
        return json.dumps(routes)

    @attribute
    def alveo_firmware_image_names(self: SKABaseDevice) -> str:
        """
        Read the firmware to run on each alveo.

        :return: JSON string - dict of alveo firmware entries
        """
        fw_names = self._resource_mgr.fw_names_with_test_overrides
        return json.dumps(fw_names)

[docs] def write_alveo_firmware_image_names( self: SKABaseDevice, value: str ) -> None: """ Set up or clear a firmware renaming for test :param value: JSON string encoding a Dictionary with * key=alveo_id * value=firmware_name :return: None """ name_overrides = json.loads(value) self._resource_mgr.fw_names_with_test_overrides = name_overrides new_names = self._resource_mgr.fw_names_with_test_overrides self.push_change_event( "alveo_firmware_image_names", json.dumps(new_names) )
@attribute def stats_alveo(self) -> str: """ Get list of alveo statistics. Return Alveo details, its usage by subarrays and any unused capacities that it may have. :return: JSON string containing a list of Alveo details """ a_list = self._resource_mgr.get_alveo_list() return json.dumps(a_list) @attribute def sdp_routes(self) -> str: # Deprecated TODO remove """Get list of SDP routes for each switch :return: JSON string dictionary * key = switch_id or "arp_rq" * value = routes or arp_list """ if self._sdp_route_cache: return json.dumps(self._sdp_route_cache) route_dict = self._resource_mgr.get_sdp_routes( self._sdp_arp_replies, self.logger ) return json.dumps(route_dict) @attribute def switch_routes(self) -> str: """Get list of routes for each switch :return: JSON string dictionary * key = switch_id * value = list [ (ip, sw_port), ] """ if self._sw_route_cache is None: self._sw_route_cache = self._resource_mgr.get_sw_routes( self._sdp_arp_replies, self.logger ) return json.dumps(self._sw_route_cache) @attribute def ip_to_resolve(self) -> str: """ Get list of IP addresses to resolve by ARP. :return: JSON dictinonary {"hosts": [ip_addr, ], "ports" [(sw_id, port), ]} """ if self._ip_for_arp is None: hosts = self._resource_mgr.get_hosts_for_arp() ports = self._resource_mgr.get_swports_for_arp() self._ip_for_arp = {"hosts": hosts, "ports": ports} return json.dumps(self._ip_for_arp) @attribute def startup_time(self) -> int: """ Get Unix timestamp recorded at Tango device start-up. """ return self._deploy_secs @attribute(access=AttrWriteType.READ_WRITE) def internalAlveoLimits(self) -> str: """ Read the current override limits """ return json.dumps(self._resource_mgr.get_alveo_limits())
[docs] def write_internalAlveoLimits(self, limit_json): """ Expert level attribute: Change per-alveo allocation limits Use empty dictionary to reset to defaults. EG current FPGA maximums are .. code:: python { "pst": { "vch": 900, # Virtual chans per pipeline (bf clock freq) "sps_ch": 1024, # SPS chan per pipeline (packetizer limit) }, "pss": { "vch": 960, # VCH per pipeline (intermittent fails @ 1020) "sps_ch": 128 # SPS chans per pipeline (packetizer limit) }, "vis": { "vch": 1024, # Virtual Channels into FPGA "hbm": 606, # VChans in one matrix correlator's (MxC) HBM "bli": 131328 # Baselines (=512*513/2) in one MxC } } :param limit_json: limit dictionary as JSON. """ limit_dict = json.loads(limit_json) # Check we are given a dict of dicts if not isinstance(limit_dict, dict): raise ValueError("Expecting dictionary for limits") # Check contents of dict (if any) are also dicts for itm in limit_dict.values(): if not isinstance(itm, dict): raise ValueError("Expecting limit dict to contain dicts") self._resource_mgr.set_alveo_limits(limit_dict)
@command(dtype_in=str, dtype_out=None, doc_in="subscribe to ARP replies") def SubscribeToConnector(self, argin: str): """ Temporary method called to cause Allocator to subscribe to events from a Tango device that has attributes providing ARP reply info :param argin: A JSON string containing Tango device name and attribute name. eg ``{'dev': 'low-cbf/connector/0', 'attr':'sdp_arp_reply'}`` """ try: names_dict = json.loads(argin) except ValueError as ex: self.logger.error("Bad command argument: %s", ex) return if not isinstance(names_dict, dict): self.logger.error("Expected dictionary, got: %s", names_dict) return if "dev" not in names_dict: self.logger.error("Missing 'dev' key, got: %s", names_dict) return if "attr" not in names_dict: self.logger.error("missing 'attr' key, got: %s", names_dict) return self.subscribe_to_connector(names_dict) @command(dtype_in=None, dtype_out=None) def UnsubscribeFromConnector(self): """ Temporary method called to cause Allocator to unsubscribe from ARP events sent by connector """ if self._arp_subscriber is not None: self._arp_subscriber.unsubscribe() self._arp_subscriber = None def subscribe_to_connector(self, names_dict): """ subscribe to ARP reply attribute on allocator """ dev_name = names_dict["dev"] attr_name = names_dict["attr"] self.logger.info("subscribe dev=%s, attr=%s", dev_name, attr_name) if self._arp_subscriber is None: self._arp_subscriber = ArpSubscriber(self.logger) else: self._arp_subscriber.unsubscribe() self._arp_subscriber.subscribe( dev_name, attr_name, self.sdp_arp_rply_evt ) # Commands called by processors to register their presence @command( dtype_in="DevString", doc_in="Allocation request, JSON string", ) @DebugIt() def InternalRegisterAlveo(self, argin): """ Called by Processors to register their presence :param argin: Serial numbers and FQDN to register, JSON string .. code:: python e.g. { "serial": "A123", "fqdn": " "low-cbf/processor/0.1.0", "hardware": "u55c" # unused at present } :return: None """ proc_details = json.loads(argin) if not all(key in proc_details for key in ("serial", "fqdn", "hw")): self.logger.error(f"DIDN'T FIND A KEY (serial/fqdn) in {argin}") return if "status" not in proc_details: # Allow old processor sw to work proc_details["status"] = 0 serial_nr = proc_details["serial"] processor_fqdn = proc_details["fqdn"] hw = proc_details["hw"] admin_mode = proc_details["status"] self.logger.info( f"register Alveo {serial_nr} adminMode {proc_details['status']} " ) self._proc_dev_fqdn[serial_nr] = processor_fqdn self.push_change_event("procDevFqdn", json.dumps(self._proc_dev_fqdn)) signup = { "serial": serial_nr, "hw": hw, "tango": processor_fqdn, "status": admin_mode, } (success, msg) = self._resource_mgr.alveo_registration(signup) if success: self.logger.info(msg) else: self.logger.error(msg) # push attribute updated value a_list = self._resource_mgr.get_alveo_list() self.push_change_event("stats_alveo", json.dumps(a_list)) # Commands called by processors to register their presence @command( dtype_in="DevString", doc_in="Allocation request, JSON string", ) @DebugIt() def InternalRegisterFsp(self, argin): # TODO remove when processor updated """ Called by Processors to register their presence :param argin: 'DevString' Serial numbers and FQDN to register, JSON string e.g. { "serial": ["A123", "B456", "C789"], "fqdn": "low-cbf/processor/0.1.0" "status": 0 } :return: None """ proc_details = json.loads(argin) if not all( key in proc_details for key in ("serial", "fqdn", "status") ): print(f"WARNING: DIDN'T FIND A KEY (serial/fqdn) in {argin}") return serial_nr = proc_details["serial"][0] # The first alveo in list processor_fqdn = proc_details["fqdn"] adminmode = proc_details["status"] self._proc_dev_fqdn[serial_nr] = processor_fqdn self.push_change_event("procDevFqdn", json.dumps(self._proc_dev_fqdn)) signup = { "serial": serial_nr, "hw": "u55c", "tango": processor_fqdn, "status": adminmode, } (success, msg) = self._resource_mgr.alveo_registration(signup) if success: self.logger.info(msg) else: self.logger.error(msg) # push attribute updated value a_list = self._resource_mgr.get_alveo_list() self.push_change_event("stats_alveo", json.dumps(a_list)) @command( dtype_in="DevString", doc_in="Capabilities reservation, JSON string", dtype_out="DevString", doc_out="Capabilities Available, JSON string", ) @DebugIt() def ReserveCapabilities(self, argin): """ Called by Subarrays to request "Capabilities" (stations, PST/PSS beams, visibilities) :param argin: 'DevString' Allocation request, JSON string :return:'DevString' success/fail as JSON string """ command_object = self.get_command_object( "cmd_name_ReserveCapabilities" ) return command_object(argin) @command( dtype_in="DevString", doc_in="Ignored string", dtype_out="DevString", doc_out="Informational text TODO: return success as JSON", ) @DebugIt() def ReleaseAllCapabilities(self, argin): """ Called by Subarrays to release all current "Capabilities" :param argin: 'DevString' Empty string (ignored at present) :return: Informational text string :rtype: DevString """ command_object = self.get_command_object( "cmd_name_ReleaseAllCapabilities" ) return command_object(argin) @command( dtype_in="DevString", doc_in="Ignored string", dtype_out="DevString", doc_out="Informational text TODO: return success as JSON", ) @DebugIt() def RunScan(self, argin): """ Called by Subarrays to start/stop their processors scanning" :param argin: 'DevString' Empty string (ignored at present) :return:'DevString' Informational text string """ command_object = self.get_command_object("cmd_name_RunScan") return command_object(argin) @command( dtype_in=None, dtype_out=int, doc_out="Get the percentage of SPS links up from various connector", ) def GetSPSPercent(self): """Get percentage of SPS links up from all switches.""" percent_sps = 0 if self.ConnectorAddress is not None: # here we need to get the various switches up status connector = MccsDeviceProxy( self.ConnectorAddress, self.logger, connect=False ) connector.connect(max_time=120) array_of_ports_up = connector.diagnostics_port_up self.logger.info(f"array_of_ports_up: {array_of_ports_up}") self.logger.info(f"get_sps_ports: {self._list_of_sps_ports}") total_ports_up = sum( array_of_ports_up[int(port.split("/")[0]) - 1] for port in self._list_of_sps_ports ) return int(total_ports_up / len(self._list_of_sps_ports) * 100) return percent_sps @command( dtype_in=int, doc_in="Subarray ID", dtype_out=int, doc_out="Get the percentage of SPS links up from various connector", ) def GetPSSHostResolved(self, argin): """Get percentage of SPS links up from all switches.""" percent_pss = 0 sub_id = int(argin) if self.ConnectorAddress is not None: # here we need to get the various switches up status subarray_repr = json.dumps(self._resource_mgr.get_subarray_repr()) subarray_repr_dict = json.loads(subarray_repr) if f"{sub_id}" in subarray_repr_dict: if "search_beams" in subarray_repr_dict[f"{argin}"]: hosts = list( set( [ dest["data_host"] for beam in subarray_repr_dict[f"{argin}"][ "search_beams" ].get("beams", []) for dest in beam.get("destinations", []) ] ) ) self.logger.info(f"hosts list is {hosts}") resolved_hosts = 0 for host in hosts: host_info = self._sdp_arp_replies.host_info(host) if host_info is not None: if host_info["mac"] != "00:00:00:00:00:00": resolved_hosts += 1 percent_pss = int(resolved_hosts / len(hosts) * 100) return percent_pss @command( dtype_in=int, doc_in="Subarray ID", dtype_out=int, doc_out="Get the percentage of SPS links up from various connector", ) def GetPSTHostResolved(self, argin): """Get percentage of SPS links up from all switches.""" percent_pst = 0 sub_id = int(argin) if self.ConnectorAddress is not None: # here we need to get the various switches up status subarray_repr = json.dumps(self._resource_mgr.get_subarray_repr()) subarray_repr_dict = json.loads(subarray_repr) if f"{sub_id}" in subarray_repr_dict: if "timing_beams" in subarray_repr_dict[f"{argin}"]: hosts = list( set( [ dest["data_host"] for beam in subarray_repr_dict[f"{argin}"][ "timing_beams" ].get("beams", []) for dest in beam.get("destinations", []) ] ) ) self.logger.info(f"hosts list is {hosts}") resolved_hosts = 0 for host in hosts: host_info = self._sdp_arp_replies.host_info(host) if host_info is not None: if host_info["mac"] != "00:00:00:00:00:00": resolved_hosts += 1 percent_pst = int(resolved_hosts / len(hosts) * 100) return percent_pst @command( dtype_in=int, doc_in="Subarray ID", dtype_out=int, doc_out="Get the percentage of SPS links up from various connector", ) def GetSDPHostResolved(self, argin): """Get percentage of SPS links up from all switches.""" percent_sdp = 0 sub_id = int(argin) if self.ConnectorAddress is not None: subarray_repr = json.dumps(self._resource_mgr.get_subarray_repr()) subarray_repr_dict = json.loads(subarray_repr) if f"{sub_id}" in subarray_repr_dict: if "vis" in subarray_repr_dict[f"{sub_id}"]: hosts = list( set( [ dest[1] for beam in subarray_repr_dict[f"{sub_id}"][ "vis" ].get("stn_beams", []) for dest in beam.get("host", []) ] ) ) self.logger.info(f"hosts list is {hosts}") resolved_hosts = 0 for host in hosts: host_info = self._sdp_arp_replies.host_info(host) if host_info is not None: if host_info["mac"] != "00:00:00:00:00:00": resolved_hosts += 1 percent_sdp = int(resolved_hosts / len(hosts) * 100) self.logger.info(f"percent_sdp {percent_sdp}") return percent_sdp # Command Objects def init_command_objects(self): """ Initialises the command handlers for commands supported by this device. """ super().init_command_objects() self.register_command_object( "cmd_name_ReserveCapabilities", self.ReserveCapabilitiesCommand( tango_device=self, logger=self.logger ), ) self.register_command_object( "cmd_name_ReleaseAllCapabilities", self.ReleaseAllCapabilitiesCommand( tango_device=self, logger=self.logger ), ) self.register_command_object( "cmd_name_RunScan", self.RunScanCommand(tango_device=self, logger=self.logger), ) def update_sps_routes_attr(self): """Get new attribute value and push change event""" my_routes = self._resource_mgr.get_stn_routes() self.push_change_event("p4_stn_routes", json.dumps(my_routes)) def update_internal_alveo_attr(self): """Get new attribute value and push change event""" intl_rep = self._resource_mgr.get_internal_repr(self.logger) self.push_change_event("internal_alveo", json.dumps(intl_rep)) def update_internal_subarray_attr(self): """Get new attribute value and push change event""" intl_rep = self._resource_mgr.get_subarray_repr() self.push_change_event("internal_subarray", json.dumps(intl_rep)) def update_sdp_route_attr(self): """Push change event with new attribute value""" route_dict = self._resource_mgr.get_sdp_routes( self._sdp_arp_replies, self.logger ) self._sdp_route_cache = route_dict # cache: many processor attr reads self.push_change_event("sdp_routes", json.dumps(route_dict)) def update_sw_route_attr(self): """Push change event with new 'switch_routes' attribute value.""" route_dict = self._resource_mgr.get_sw_routes( self._sdp_arp_replies, self.logger ) self._sw_route_cache = route_dict # cache: many processor attr reads self.push_change_event("switch_routes", json.dumps(route_dict)) def update_ip_for_arp(self): """Push new value for ip_to_resolve attribute""" hosts = self._resource_mgr.get_hosts_for_arp() ports = self._resource_mgr.get_swports_for_arp() self._ip_for_arp = {"hosts": hosts, "ports": ports} self.push_change_event("ip_to_resolve", json.dumps(self._ip_for_arp)) def sdp_arp_rply_evt(self, evt_json): """Handler for Connector SDP arp replies""" self.logger.info("Connector ARP update: %s", evt_json) evt_dict = json.loads(evt_json) # belt-and-braces insurance: should never occur if not isinstance(evt_dict, dict): self.logger.warning("Ignoring non-dict evt data: %s", evt_json) return # TODO identify which P4 is source device of event? self._sdp_arp_replies.arp_update("p4_01", evt_json) self.update_sdp_route_attr() # deprecated TODO remove self.update_sw_route_attr() class InitCommand(SKABaseDevice.InitCommand): """Init Command object""" def do(self): # pylint: disable=too-many-statements """ Initialises the attributes and properties of the LowCbfAllocator. """ super().do() self._device._version_id = release.version self._device._build_state = ( f"{release.name}, {release.version}, {release.description}" ) self._device._deploy_secs = int(time.time()) # In absence of a specified connections list, load a fake connection # list (defaults) that will allow allocator to function for tests if (self._device.hardware_connections is None) or ( len(self._device.hardware_connections) == 0 ): self.logger.info("Using built-in default connection list") cnx_list = default_connection_list() else: self.logger.info("Using connection list from helm chart") cnx_list = self._device.hardware_connections if self._device.ConnectorAddress is not None: names_dict = { "dev": self._device.ConnectorAddress, "attr": "arp_replies", } self._device.subscribe_to_connector(names_dict) # Convert connections property back into a python list # Each line is a string describing a physical connection, eg # 'switch=p4_01 port=48/0 speed=100 alveo=XFL1ZIN0F4RO' # TODO? Reuse this para for array_config? hw_connections = [] n_cnx = 0 self.logger.info("Hardware connections:") for line in cnx_list: connection = {} for token in line.split(" "): # Expect token form "name=value", or spaces (discard) if len(token) < 3: continue key_val = token.split("=") if len(key_val) != 2: continue connection[key_val[0]] = key_val[1] hw_connections.append(connection) n_cnx += 1 self.logger.info(" %s: %s", n_cnx, connection) # self._device.logger.info(hw_connections) # initialise resources self._device._resource_mgr = Resources( array_config, hw_connections, ) self._device._list_of_sps_ports = ( self._device._resource_mgr.get_sps_ports() ) # inialise table of sdp ARP replies self._device._sdp_arp_replies = ArpReplies(self.logger) self._device._sdp_route_cache = None self._device._sw_route_cache = None self._device._ip_for_arp = None self._device.set_change_event("adminMode", True, True) self._device.set_archive_event("adminMode", True, True) self._device.set_change_event("processorUpdate", True, True) self._device.set_change_event("connectorUpdate", True, True) self._device._proc_dev_fqdn = {} """A dictionary of processor device fully qualified device name (fqdn) with S/N as a key, like: { "XFL1VCYSXCL0": "low-cbf/processor/0.0.1", "XFL10NIYKVEU": "low-cbf/processor/0.0.0" } """ # Events to manually be pushed self._device.set_change_event("resourceTableP4", True, False) self._device.set_change_event("p4_stn_routes", True, False) self._device.set_change_event( "alveo_firmware_image_names", True, False ) self._device.set_change_event("internal_alveo", True, False) self._device.set_change_event("internal_subarray", True, False) self._device.set_change_event("stats_alveo", True, False) self._device.set_change_event("procDevFqdn", True, False) self._device.set_change_event( "sdp_routes", True, False ) # Deprecated self._device.set_change_event("switch_routes", True, False) self._device.set_change_event("ip_to_resolve", True, False) # If there is no possibility of real FPGAs # (deployment without any processor Tango devices), # register all known Alveo/FPGAs in connections list instead. # That will provide some resources for allocator to work with. db = Database() if ( os.getenv("ALLOW_AUTO_REGISTER_PROCESSORS") is not None and db.get_class_list("LowCbfProcessor").is_empty() ): self.logger.info( "No FPGA H/W: Auto-register all Alveo in connections table" ) self.register_known_alveos(cnx_list) # push updated attribute value with new alveo list a_list = self._device._resource_mgr.get_alveo_list() self._device.push_change_event( "stats_alveo", json.dumps(a_list) ) message = "LowCbfAllocator init complete" self._device.logger.info(message) return ResultCode.OK, message def register_known_alveos(self, cnx_list: list[str]) -> None: """ Register every alveo mentioned in the connections (ignore switch connections that don't mention alveos) :param cnx_list: list of strings, with each string describing a connection to a switch port (copy of 'hardware_connections' table from helm chart) """ processor_idx = 0 for line in cnx_list: idx = line.find("alveo=") if idx == -1: continue serial_no = line[idx + 6 :] # Assume non-existant auto-registered processors are u55c signup = {"serial": serial_no, "hw": "u55c", "tango": None} signup["status"] = 0 # assume processor adminmode ONLINE self._device._resource_mgr.alveo_registration(signup) # Add a dummy tango device for the auto-registered processor self._device._proc_dev_fqdn[ serial_no ] = f"low-cbf/processor/nodev_{processor_idx}" processor_idx += 1 class ReserveCapabilitiesCommand(FastCommand): """ReserveCapabilities Command object reserves P4/Alveo hardware capabilities for a subarray in anticipitation of a scan""" def __init__(self, tango_device, logger): super().__init__(logger) self._tango_device = tango_device def do(self, *argin, **kwargs): """Actions required to reserve capabilities for a subarray scan""" request = json.loads( argin[0] ) # expect a single JSON string argument subarray = request["subarray_id"] is_request_ok = True # Must have at least one station beam rq_stn_beams = request["stations"]["stn_beams"] if len(rq_stn_beams) == 0: msg = f"Subarray {subarray} has no station beams" is_request_ok = False # number of stations must be within array assembly limits n_stns = len(request["stations"]["stns"]) if n_stns > self._tango_device._resource_mgr.max_stations: is_request_ok = False msg = f"Subarray {subarray} Too many stations" res_mgr = self._tango_device._resource_mgr # use shorter name # Channels for any beam must be within capability of array assembly for beam in rq_stn_beams: if len(beam["freq_ids"]) > res_mgr.max_channels: is_request_ok = False msg = f"Subarray {subarray} Too many coarse channels" break if not is_request_ok: # Subarray configuration request did not pass checks self.logger.info(msg) rslt = { "success": False, "has_config": False, "processor_fqdns": [], "msg": msg, } return json.dumps(rslt) # Request passes checks. Try to reserve resources for it proc_fqdns = [] txt = "Get scan resources" txt += f" subarray {subarray} adminmode={request['status']}" self.logger.info(txt) (config_ok, msg) = res_mgr.cfg_scan2(request) self.logger.info(msg) # Any configure may update params, even if it fails # push change of VCT attribute self._tango_device.update_sps_routes_attr() self._tango_device.update_internal_alveo_attr() # push because configured -> subarray now has scan_status self._tango_device.update_internal_subarray_attr() self._tango_device.update_sdp_route_attr() self._tango_device.update_sw_route_attr() self._tango_device.update_ip_for_arp() alveo_serials = res_mgr.get_subarray_alveos(subarray) proc_fqdns = [ self._tango_device._proc_dev_fqdn[serial] for serial in alveo_serials ] if not config_ok: # if we don't have resources, request should be marked as bad is_request_ok = False rslt = { "success": is_request_ok, "has_config": config_ok, "processor_fqdns": proc_fqdns, "msg": msg, } return json.dumps(rslt) class ReleaseAllCapabilitiesCommand(FastCommand): """ReleaseCapabilities Command object releases P4/Alveo hardware capabilities that a subarray no longer needs""" def __init__(self, tango_device, logger): super().__init__(logger) self._tango_device = tango_device def do(self, argin): """Perform End command: release capabilities held by subarray""" request = json.loads(argin) subarray_id = request["subarray_id"] msg = f"Release scan resources held by subarray {subarray_id}" self.logger.info(msg) # perform the release self._tango_device._resource_mgr.cfg_end2(subarray_id) # push change of state attribute self._tango_device.update_sps_routes_attr() self._tango_device.update_internal_alveo_attr() self._tango_device.update_sw_route_attr() self._tango_device.update_ip_for_arp() # Push scan status because not configured -> no scan status self._tango_device.update_internal_subarray_attr() self._tango_device.update_sdp_route_attr() self.logger.info("End success") return msg class RunScanCommand(FastCommand): """RunScan Command object publishes whether a subarray is or is not running a scan, so that Processors can read and determine whether they should generate output """ def __init__(self, tango_device, logger): super().__init__(logger) self._tango_device = tango_device def do(self, argin): """Run/stop subarray scan by setting attributes""" request = json.loads(argin) subarray_id = request["subarray_id"] should_scan = request["scan"] scan_id = request["scan_id"] txt = "start" if should_scan else "stop" msg = f"Subarray {subarray_id} scan {txt}" self.logger.info(msg) self._tango_device._resource_mgr.enable_scan( subarray_id, should_scan, scan_id ) self._tango_device.update_internal_subarray_attr() return msg # Run server def main(args=None, **kwargs): """Main function of the LowCbfAllocator module.""" # PROTECTED REGION ID(LowCbfAllocator.main) ENABLED START # return run((LowCbfAllocator,), args=args, **kwargs) # PROTECTED REGION END # // LowCbfAllocator.main if __name__ == "__main__": main()