# -*- coding: utf-8 -*-
#
# This file is part of the SKA Low CBF project
#
# Copyright (c) 2021, CSIRO
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
# pylint: disable=invalid-name,protected-access,too-few-public-methods
""" SKA Low CBF
LowCbfAllocator is responsible for assignment of the key processing hardware
items that will exist in Low.CBF to Subarrays.
"""
import json
import os
import time
from ska_tango_base import SKABaseDevice
from ska_tango_base.commands import FastCommand, ResultCode
from tango import AttrWriteType, Database, DebugIt
from tango.server import attribute, command, device_property, run
from ska_low_cbf import release
from ska_low_cbf.allocator.arp_replies import ArpReplies
# from ska_low_cbf.allocator.capabilities import Capabilities
from ska_low_cbf.allocator.arp_subscriber import ArpSubscriber
from ska_low_cbf.allocator.component_manager import AllocatorComponentManager
from ska_low_cbf.allocator.default_connections import default_connection_list
from ska_low_cbf.allocator.resources import Resources
from ska_low_cbf.device_proxy import MccsDeviceProxy
__all__ = ["LowCbfAllocator", "main"]
# TODO - need some better scheme to map the IDs used by the allocator to the
# TANGO FQDNs
alveo_id_fqdn = {1: "low-cbf/processor/0.0.0"}
# TODO this config should be a device property, overridden by Helm chart
array_config = {
"stations": 512,
"channels": 384,
}
class LowCbfAllocator(
SKABaseDevice
): # pylint: disable=too-many-public-methods
"""
Allocator is responsible for assignment of the key processing hardware
items that will exist in Low.CBF to Subarrays.
"""
# Properties (value in database, loaded by helm chart)
hardware_connections = device_property(
dtype=("DevString",),
mandatory=False, # if not present, reads test_connections.py
doc="List of P4 switch to Alveo and I/O hardware cabling",
)
ConnectorAddress = device_property(
dtype="DevString",
mandatory=False, # if not present, do not subscribe to ARP replies
doc="Connector Tango device address. ARP replies will be subscribed to if an address is given.",
)
# Attributes
@attribute(
dtype="DevULong",
label="Allocation Version Counter",
doc="Increments for each subarray resourcing change",
)
def allocationVersionCounter(self):
"""Return the allocationVersionCounter attribute."""
return self._allocation_version_counter
@attribute(
dtype=("DevULong",),
max_dim_x=512,
label="Processor Update List",
polling_period=3000,
doc=(
"Array of Processor configuration versions, "
"one element per Processor, in the same order as processorIDs"
),
)
def processorUpdate(self):
"""Return the processorUpdate attribute."""
return self._processor_update
@attribute(
dtype=("DevULong",),
max_dim_x=512,
label="Connector Update List",
polling_period=3000,
doc=(
"Array of Connector configuration versions, one element per "
"Connector, in the same order as connectorIDs"
),
)
def connectorUpdate(self):
"""Return the connectorUpdate attribute."""
return self._connector_update
@attribute(
dtype=("DevString",),
max_dim_x=512,
label="Connector Identifiers",
polling_period=3000,
doc=(
"Array of Connector identifiers, in the same order as "
"connectorUpdate"
),
)
def connectorIDs(self):
"""Return the connectorIDs attribute."""
return self._connector_ids
@attribute(
dtype=("DevString",),
max_dim_x=512,
label="Processor Identifiers",
polling_period=3000,
doc=(
"Array of Processor identifiers, in the same order as "
"processorUpdate"
),
)
def processorIDs(self):
"""Return the processorIDs attribute."""
return self._processor_ids
@attribute(
label="P4 resource table",
doc="The P4 resource table as JSON string.",
)
def resourceTableP4(self) -> str:
"""Return the P4 resource table attribute.
:return: JSON string
"""
p4_table = self._resource_mgr.get_p4s_as_table()
return json.dumps(p4_table)
@attribute(label="processor device fqdn")
def procDevFqdn(self) -> str:
"""Return a JSON string representation of dictionary mapping Alveo
serial numbers to Tango device fqdn, e.g.
.. code:: python
{
"XFL1XCRTUC22": "low-cbf/processor/0.0.0",
"XFL1TJCHM3ON": "low-cbf/processor/0.0.1"
}
"""
return json.dumps(self._proc_dev_fqdn)
def __init__(self, *args, **kwargs):
self._arp_subscriber = None
# SKA base class can now run InitCommand.do
super().__init__(*args, **kwargs)
# General methods
def create_component_manager(self):
return AllocatorComponentManager(
logger=self.logger,
communication_state_callback=self._communication_state_changed,
component_state_callback=self._component_state_changed,
)
def always_executed_hook(self):
"""Method always executed before any TANGO command is executed."""
def delete_device(self):
"""Hook to delete resources allocated in init_device.
This method allows for any memory or other resources allocated in the
init_device method to be released. This method is called by the device
destructor and by the device Init command.
"""
@attribute
def internal_alveo(self) -> str:
"""
Get representation of internal state of all Processor devices
that are configured with firmware (no firmware, no entry here)
:return: JSON string containing a dictionary
* key = alveo serial number;
* value = dict containing keys ``fw``, ``regs``.
``fw`` value is a dict with ``personality`` and ``url`` keys
(each strings).
"""
internal_repr = self._resource_mgr.get_internal_repr()
return json.dumps(internal_repr)
@attribute
def internal_subarray(self) -> str:
"""
Get representation of internal state of all configured subarrays
(no configuration, no entry in dict)
:return: JSON string containing a dictionary
* key = ``subarray_id`` number
* value = subarray definition (also a dict)
"""
subarray_repr = self._resource_mgr.get_subarray_repr()
return json.dumps(subarray_repr)
@attribute
def p4_stn_routes(self) -> str:
"""
Get route info needed by P4 switches to route SPS packets
:return: JSON string
"""
routes = self._resource_mgr.get_stn_routes()
return json.dumps(routes)
@attribute
def alveo_firmware_image_names(self: SKABaseDevice) -> str:
"""
Read the firmware to run on each alveo.
:return: JSON string - dict of alveo firmware entries
"""
fw_names = self._resource_mgr.fw_names_with_test_overrides
return json.dumps(fw_names)
[docs] def write_alveo_firmware_image_names(
self: SKABaseDevice, value: str
) -> None:
"""
Set up or clear a firmware renaming for test
:param value: JSON string encoding a Dictionary with
* key=alveo_id
* value=firmware_name
:return: None
"""
name_overrides = json.loads(value)
self._resource_mgr.fw_names_with_test_overrides = name_overrides
new_names = self._resource_mgr.fw_names_with_test_overrides
self.push_change_event(
"alveo_firmware_image_names", json.dumps(new_names)
)
@attribute
def stats_alveo(self) -> str:
"""
Get list of alveo statistics.
Return Alveo details, its usage by subarrays and any unused capacities
that it may have.
:return: JSON string containing a list of Alveo details
"""
a_list = self._resource_mgr.get_alveo_list()
return json.dumps(a_list)
@attribute
def sdp_routes(self) -> str: # Deprecated TODO remove
"""Get list of SDP routes for each switch
:return: JSON string dictionary
* key = switch_id or "arp_rq"
* value = routes or arp_list
"""
if self._sdp_route_cache:
return json.dumps(self._sdp_route_cache)
route_dict = self._resource_mgr.get_sdp_routes(
self._sdp_arp_replies, self.logger
)
return json.dumps(route_dict)
@attribute
def switch_routes(self) -> str:
"""Get list of routes for each switch
:return: JSON string dictionary
* key = switch_id
* value = list [ (ip, sw_port), ]
"""
if self._sw_route_cache is None:
self._sw_route_cache = self._resource_mgr.get_sw_routes(
self._sdp_arp_replies, self.logger
)
return json.dumps(self._sw_route_cache)
@attribute
def ip_to_resolve(self) -> str:
"""
Get list of IP addresses to resolve by ARP.
:return: JSON dictinonary
{"hosts": [ip_addr, ], "ports" [(sw_id, port), ]}
"""
if self._ip_for_arp is None:
hosts = self._resource_mgr.get_hosts_for_arp()
ports = self._resource_mgr.get_swports_for_arp()
self._ip_for_arp = {"hosts": hosts, "ports": ports}
return json.dumps(self._ip_for_arp)
@attribute
def startup_time(self) -> int:
"""
Get Unix timestamp recorded at Tango device start-up.
"""
return self._deploy_secs
@attribute(access=AttrWriteType.READ_WRITE)
def internalAlveoLimits(self) -> str:
"""
Read the current override limits
"""
return json.dumps(self._resource_mgr.get_alveo_limits())
[docs] def write_internalAlveoLimits(self, limit_json):
"""
Expert level attribute: Change per-alveo allocation limits
Use empty dictionary to reset to defaults. EG current FPGA maximums are
.. code:: python
{
"pst": {
"vch": 900, # Virtual chans per pipeline (bf clock freq)
"sps_ch": 1024, # SPS chan per pipeline (packetizer limit)
},
"pss": {
"vch": 960, # VCH per pipeline (intermittent fails @ 1020)
"sps_ch": 128 # SPS chans per pipeline (packetizer limit)
},
"vis": {
"vch": 1024, # Virtual Channels into FPGA
"hbm": 606, # VChans in one matrix correlator's (MxC) HBM
"bli": 131328 # Baselines (=512*513/2) in one MxC
}
}
:param limit_json: limit dictionary as JSON.
"""
limit_dict = json.loads(limit_json)
# Check we are given a dict of dicts
if not isinstance(limit_dict, dict):
raise ValueError("Expecting dictionary for limits")
# Check contents of dict (if any) are also dicts
for itm in limit_dict.values():
if not isinstance(itm, dict):
raise ValueError("Expecting limit dict to contain dicts")
self._resource_mgr.set_alveo_limits(limit_dict)
@command(dtype_in=str, dtype_out=None, doc_in="subscribe to ARP replies")
def SubscribeToConnector(self, argin: str):
"""
Temporary method called to cause Allocator to subscribe to events from
a Tango device that has attributes providing ARP reply info
:param argin: A JSON string containing Tango device name and attribute name.
eg ``{'dev': 'low-cbf/connector/0', 'attr':'sdp_arp_reply'}``
"""
try:
names_dict = json.loads(argin)
except ValueError as ex:
self.logger.error("Bad command argument: %s", ex)
return
if not isinstance(names_dict, dict):
self.logger.error("Expected dictionary, got: %s", names_dict)
return
if "dev" not in names_dict:
self.logger.error("Missing 'dev' key, got: %s", names_dict)
return
if "attr" not in names_dict:
self.logger.error("missing 'attr' key, got: %s", names_dict)
return
self.subscribe_to_connector(names_dict)
@command(dtype_in=None, dtype_out=None)
def UnsubscribeFromConnector(self):
"""
Temporary method called to cause Allocator to unsubscribe from ARP
events sent by connector
"""
if self._arp_subscriber is not None:
self._arp_subscriber.unsubscribe()
self._arp_subscriber = None
def subscribe_to_connector(self, names_dict):
"""
subscribe to ARP reply attribute on allocator
"""
dev_name = names_dict["dev"]
attr_name = names_dict["attr"]
self.logger.info("subscribe dev=%s, attr=%s", dev_name, attr_name)
if self._arp_subscriber is None:
self._arp_subscriber = ArpSubscriber(self.logger)
else:
self._arp_subscriber.unsubscribe()
self._arp_subscriber.subscribe(
dev_name, attr_name, self.sdp_arp_rply_evt
)
# Commands called by processors to register their presence
@command(
dtype_in="DevString",
doc_in="Allocation request, JSON string",
)
@DebugIt()
def InternalRegisterAlveo(self, argin):
"""
Called by Processors to register their presence
:param argin: Serial numbers and FQDN to register, JSON string
.. code:: python
e.g.
{
"serial": "A123",
"fqdn": " "low-cbf/processor/0.1.0",
"hardware": "u55c" # unused at present
}
:return: None
"""
proc_details = json.loads(argin)
if not all(key in proc_details for key in ("serial", "fqdn", "hw")):
self.logger.error(f"DIDN'T FIND A KEY (serial/fqdn) in {argin}")
return
if "status" not in proc_details: # Allow old processor sw to work
proc_details["status"] = 0
serial_nr = proc_details["serial"]
processor_fqdn = proc_details["fqdn"]
hw = proc_details["hw"]
admin_mode = proc_details["status"]
self.logger.info(
f"register Alveo {serial_nr} adminMode {proc_details['status']} "
)
self._proc_dev_fqdn[serial_nr] = processor_fqdn
self.push_change_event("procDevFqdn", json.dumps(self._proc_dev_fqdn))
signup = {
"serial": serial_nr,
"hw": hw,
"tango": processor_fqdn,
"status": admin_mode,
}
(success, msg) = self._resource_mgr.alveo_registration(signup)
if success:
self.logger.info(msg)
else:
self.logger.error(msg)
# push attribute updated value
a_list = self._resource_mgr.get_alveo_list()
self.push_change_event("stats_alveo", json.dumps(a_list))
# Commands called by processors to register their presence
@command(
dtype_in="DevString",
doc_in="Allocation request, JSON string",
)
@DebugIt()
def InternalRegisterFsp(self, argin): # TODO remove when processor updated
"""
Called by Processors to register their presence
:param argin: 'DevString'
Serial numbers and FQDN to register, JSON string
e.g.
{
"serial": ["A123", "B456", "C789"],
"fqdn": "low-cbf/processor/0.1.0"
"status": 0
}
:return: None
"""
proc_details = json.loads(argin)
if not all(
key in proc_details for key in ("serial", "fqdn", "status")
):
print(f"WARNING: DIDN'T FIND A KEY (serial/fqdn) in {argin}")
return
serial_nr = proc_details["serial"][0] # The first alveo in list
processor_fqdn = proc_details["fqdn"]
adminmode = proc_details["status"]
self._proc_dev_fqdn[serial_nr] = processor_fqdn
self.push_change_event("procDevFqdn", json.dumps(self._proc_dev_fqdn))
signup = {
"serial": serial_nr,
"hw": "u55c",
"tango": processor_fqdn,
"status": adminmode,
}
(success, msg) = self._resource_mgr.alveo_registration(signup)
if success:
self.logger.info(msg)
else:
self.logger.error(msg)
# push attribute updated value
a_list = self._resource_mgr.get_alveo_list()
self.push_change_event("stats_alveo", json.dumps(a_list))
@command(
dtype_in="DevString",
doc_in="Capabilities reservation, JSON string",
dtype_out="DevString",
doc_out="Capabilities Available, JSON string",
)
@DebugIt()
def ReserveCapabilities(self, argin):
"""
Called by Subarrays to request "Capabilities"
(stations, PST/PSS beams, visibilities)
:param argin: 'DevString'
Allocation request, JSON string
:return:'DevString'
success/fail as JSON string
"""
command_object = self.get_command_object(
"cmd_name_ReserveCapabilities"
)
return command_object(argin)
@command(
dtype_in="DevString",
doc_in="Ignored string",
dtype_out="DevString",
doc_out="Informational text TODO: return success as JSON",
)
@DebugIt()
def ReleaseAllCapabilities(self, argin):
"""
Called by Subarrays to release all current "Capabilities"
:param argin: 'DevString'
Empty string (ignored at present)
:return: Informational text string
:rtype: DevString
"""
command_object = self.get_command_object(
"cmd_name_ReleaseAllCapabilities"
)
return command_object(argin)
@command(
dtype_in="DevString",
doc_in="Ignored string",
dtype_out="DevString",
doc_out="Informational text TODO: return success as JSON",
)
@DebugIt()
def RunScan(self, argin):
"""
Called by Subarrays to start/stop their processors scanning"
:param argin: 'DevString'
Empty string (ignored at present)
:return:'DevString'
Informational text string
"""
command_object = self.get_command_object("cmd_name_RunScan")
return command_object(argin)
@command(
dtype_in=None,
dtype_out=int,
doc_out="Get the percentage of SPS links up from various connector",
)
def GetSPSPercent(self):
"""Get percentage of SPS links up from all switches."""
percent_sps = 0
if self.ConnectorAddress is not None:
# here we need to get the various switches up status
connector = MccsDeviceProxy(
self.ConnectorAddress, self.logger, connect=False
)
connector.connect(max_time=120)
array_of_ports_up = connector.diagnostics_port_up
self.logger.info(f"array_of_ports_up: {array_of_ports_up}")
self.logger.info(f"get_sps_ports: {self._list_of_sps_ports}")
total_ports_up = sum(
array_of_ports_up[int(port.split("/")[0]) - 1]
for port in self._list_of_sps_ports
)
return int(total_ports_up / len(self._list_of_sps_ports) * 100)
return percent_sps
@command(
dtype_in=int,
doc_in="Subarray ID",
dtype_out=int,
doc_out="Get the percentage of SPS links up from various connector",
)
def GetPSSHostResolved(self, argin):
"""Get percentage of SPS links up from all switches."""
percent_pss = 0
sub_id = int(argin)
if self.ConnectorAddress is not None:
# here we need to get the various switches up status
subarray_repr = json.dumps(self._resource_mgr.get_subarray_repr())
subarray_repr_dict = json.loads(subarray_repr)
if f"{sub_id}" in subarray_repr_dict:
if "search_beams" in subarray_repr_dict[f"{argin}"]:
hosts = list(
set(
[
dest["data_host"]
for beam in subarray_repr_dict[f"{argin}"][
"search_beams"
].get("beams", [])
for dest in beam.get("destinations", [])
]
)
)
self.logger.info(f"hosts list is {hosts}")
resolved_hosts = 0
for host in hosts:
host_info = self._sdp_arp_replies.host_info(host)
if host_info is not None:
if host_info["mac"] != "00:00:00:00:00:00":
resolved_hosts += 1
percent_pss = int(resolved_hosts / len(hosts) * 100)
return percent_pss
@command(
dtype_in=int,
doc_in="Subarray ID",
dtype_out=int,
doc_out="Get the percentage of SPS links up from various connector",
)
def GetPSTHostResolved(self, argin):
"""Get percentage of SPS links up from all switches."""
percent_pst = 0
sub_id = int(argin)
if self.ConnectorAddress is not None:
# here we need to get the various switches up status
subarray_repr = json.dumps(self._resource_mgr.get_subarray_repr())
subarray_repr_dict = json.loads(subarray_repr)
if f"{sub_id}" in subarray_repr_dict:
if "timing_beams" in subarray_repr_dict[f"{argin}"]:
hosts = list(
set(
[
dest["data_host"]
for beam in subarray_repr_dict[f"{argin}"][
"timing_beams"
].get("beams", [])
for dest in beam.get("destinations", [])
]
)
)
self.logger.info(f"hosts list is {hosts}")
resolved_hosts = 0
for host in hosts:
host_info = self._sdp_arp_replies.host_info(host)
if host_info is not None:
if host_info["mac"] != "00:00:00:00:00:00":
resolved_hosts += 1
percent_pst = int(resolved_hosts / len(hosts) * 100)
return percent_pst
@command(
dtype_in=int,
doc_in="Subarray ID",
dtype_out=int,
doc_out="Get the percentage of SPS links up from various connector",
)
def GetSDPHostResolved(self, argin):
"""Get percentage of SPS links up from all switches."""
percent_sdp = 0
sub_id = int(argin)
if self.ConnectorAddress is not None:
subarray_repr = json.dumps(self._resource_mgr.get_subarray_repr())
subarray_repr_dict = json.loads(subarray_repr)
if f"{sub_id}" in subarray_repr_dict:
if "vis" in subarray_repr_dict[f"{sub_id}"]:
hosts = list(
set(
[
dest[1]
for beam in subarray_repr_dict[f"{sub_id}"][
"vis"
].get("stn_beams", [])
for dest in beam.get("host", [])
]
)
)
self.logger.info(f"hosts list is {hosts}")
resolved_hosts = 0
for host in hosts:
host_info = self._sdp_arp_replies.host_info(host)
if host_info is not None:
if host_info["mac"] != "00:00:00:00:00:00":
resolved_hosts += 1
percent_sdp = int(resolved_hosts / len(hosts) * 100)
self.logger.info(f"percent_sdp {percent_sdp}")
return percent_sdp
# Command Objects
def init_command_objects(self):
"""
Initialises the command handlers for commands supported by this device.
"""
super().init_command_objects()
self.register_command_object(
"cmd_name_ReserveCapabilities",
self.ReserveCapabilitiesCommand(
tango_device=self, logger=self.logger
),
)
self.register_command_object(
"cmd_name_ReleaseAllCapabilities",
self.ReleaseAllCapabilitiesCommand(
tango_device=self, logger=self.logger
),
)
self.register_command_object(
"cmd_name_RunScan",
self.RunScanCommand(tango_device=self, logger=self.logger),
)
def update_sps_routes_attr(self):
"""Get new attribute value and push change event"""
my_routes = self._resource_mgr.get_stn_routes()
self.push_change_event("p4_stn_routes", json.dumps(my_routes))
def update_internal_alveo_attr(self):
"""Get new attribute value and push change event"""
intl_rep = self._resource_mgr.get_internal_repr(self.logger)
self.push_change_event("internal_alveo", json.dumps(intl_rep))
def update_internal_subarray_attr(self):
"""Get new attribute value and push change event"""
intl_rep = self._resource_mgr.get_subarray_repr()
self.push_change_event("internal_subarray", json.dumps(intl_rep))
def update_sdp_route_attr(self):
"""Push change event with new attribute value"""
route_dict = self._resource_mgr.get_sdp_routes(
self._sdp_arp_replies, self.logger
)
self._sdp_route_cache = route_dict # cache: many processor attr reads
self.push_change_event("sdp_routes", json.dumps(route_dict))
def update_sw_route_attr(self):
"""Push change event with new 'switch_routes' attribute value."""
route_dict = self._resource_mgr.get_sw_routes(
self._sdp_arp_replies, self.logger
)
self._sw_route_cache = route_dict # cache: many processor attr reads
self.push_change_event("switch_routes", json.dumps(route_dict))
def update_ip_for_arp(self):
"""Push new value for ip_to_resolve attribute"""
hosts = self._resource_mgr.get_hosts_for_arp()
ports = self._resource_mgr.get_swports_for_arp()
self._ip_for_arp = {"hosts": hosts, "ports": ports}
self.push_change_event("ip_to_resolve", json.dumps(self._ip_for_arp))
def sdp_arp_rply_evt(self, evt_json):
"""Handler for Connector SDP arp replies"""
self.logger.info("Connector ARP update: %s", evt_json)
evt_dict = json.loads(evt_json)
# belt-and-braces insurance: should never occur
if not isinstance(evt_dict, dict):
self.logger.warning("Ignoring non-dict evt data: %s", evt_json)
return
# TODO identify which P4 is source device of event?
self._sdp_arp_replies.arp_update("p4_01", evt_json)
self.update_sdp_route_attr() # deprecated TODO remove
self.update_sw_route_attr()
class InitCommand(SKABaseDevice.InitCommand):
"""Init Command object"""
def do(self): # pylint: disable=too-many-statements
"""
Initialises the attributes and properties of the LowCbfAllocator.
"""
super().do()
self._device._version_id = release.version
self._device._build_state = (
f"{release.name}, {release.version}, {release.description}"
)
self._device._deploy_secs = int(time.time())
# In absence of a specified connections list, load a fake connection
# list (defaults) that will allow allocator to function for tests
if (self._device.hardware_connections is None) or (
len(self._device.hardware_connections) == 0
):
self.logger.info("Using built-in default connection list")
cnx_list = default_connection_list()
else:
self.logger.info("Using connection list from helm chart")
cnx_list = self._device.hardware_connections
if self._device.ConnectorAddress is not None:
names_dict = {
"dev": self._device.ConnectorAddress,
"attr": "arp_replies",
}
self._device.subscribe_to_connector(names_dict)
# Convert connections property back into a python list
# Each line is a string describing a physical connection, eg
# 'switch=p4_01 port=48/0 speed=100 alveo=XFL1ZIN0F4RO'
# TODO? Reuse this para for array_config?
hw_connections = []
n_cnx = 0
self.logger.info("Hardware connections:")
for line in cnx_list:
connection = {}
for token in line.split(" "):
# Expect token form "name=value", or spaces (discard)
if len(token) < 3:
continue
key_val = token.split("=")
if len(key_val) != 2:
continue
connection[key_val[0]] = key_val[1]
hw_connections.append(connection)
n_cnx += 1
self.logger.info(" %s: %s", n_cnx, connection)
# self._device.logger.info(hw_connections)
# initialise resources
self._device._resource_mgr = Resources(
array_config,
hw_connections,
)
self._device._list_of_sps_ports = (
self._device._resource_mgr.get_sps_ports()
)
# inialise table of sdp ARP replies
self._device._sdp_arp_replies = ArpReplies(self.logger)
self._device._sdp_route_cache = None
self._device._sw_route_cache = None
self._device._ip_for_arp = None
self._device.set_change_event("adminMode", True, True)
self._device.set_archive_event("adminMode", True, True)
self._device.set_change_event("processorUpdate", True, True)
self._device.set_change_event("connectorUpdate", True, True)
self._device._proc_dev_fqdn = {}
"""A dictionary of processor device fully qualified device name
(fqdn) with S/N as a key, like:
{
"XFL1VCYSXCL0": "low-cbf/processor/0.0.1",
"XFL10NIYKVEU": "low-cbf/processor/0.0.0"
}
"""
# Events to manually be pushed
self._device.set_change_event("resourceTableP4", True, False)
self._device.set_change_event("p4_stn_routes", True, False)
self._device.set_change_event(
"alveo_firmware_image_names", True, False
)
self._device.set_change_event("internal_alveo", True, False)
self._device.set_change_event("internal_subarray", True, False)
self._device.set_change_event("stats_alveo", True, False)
self._device.set_change_event("procDevFqdn", True, False)
self._device.set_change_event(
"sdp_routes", True, False
) # Deprecated
self._device.set_change_event("switch_routes", True, False)
self._device.set_change_event("ip_to_resolve", True, False)
# If there is no possibility of real FPGAs
# (deployment without any processor Tango devices),
# register all known Alveo/FPGAs in connections list instead.
# That will provide some resources for allocator to work with.
db = Database()
if (
os.getenv("ALLOW_AUTO_REGISTER_PROCESSORS") is not None
and db.get_class_list("LowCbfProcessor").is_empty()
):
self.logger.info(
"No FPGA H/W: Auto-register all Alveo in connections table"
)
self.register_known_alveos(cnx_list)
# push updated attribute value with new alveo list
a_list = self._device._resource_mgr.get_alveo_list()
self._device.push_change_event(
"stats_alveo", json.dumps(a_list)
)
message = "LowCbfAllocator init complete"
self._device.logger.info(message)
return ResultCode.OK, message
def register_known_alveos(self, cnx_list: list[str]) -> None:
"""
Register every alveo mentioned in the connections
(ignore switch connections that don't mention alveos)
:param cnx_list: list of strings, with
each string describing a connection to a switch port
(copy of 'hardware_connections' table from helm chart)
"""
processor_idx = 0
for line in cnx_list:
idx = line.find("alveo=")
if idx == -1:
continue
serial_no = line[idx + 6 :]
# Assume non-existant auto-registered processors are u55c
signup = {"serial": serial_no, "hw": "u55c", "tango": None}
signup["status"] = 0 # assume processor adminmode ONLINE
self._device._resource_mgr.alveo_registration(signup)
# Add a dummy tango device for the auto-registered processor
self._device._proc_dev_fqdn[
serial_no
] = f"low-cbf/processor/nodev_{processor_idx}"
processor_idx += 1
class ReserveCapabilitiesCommand(FastCommand):
"""ReserveCapabilities Command object reserves P4/Alveo hardware
capabilities for a subarray in anticipitation of a scan"""
def __init__(self, tango_device, logger):
super().__init__(logger)
self._tango_device = tango_device
def do(self, *argin, **kwargs):
"""Actions required to reserve capabilities for a subarray scan"""
request = json.loads(
argin[0]
) # expect a single JSON string argument
subarray = request["subarray_id"]
is_request_ok = True
# Must have at least one station beam
rq_stn_beams = request["stations"]["stn_beams"]
if len(rq_stn_beams) == 0:
msg = f"Subarray {subarray} has no station beams"
is_request_ok = False
# number of stations must be within array assembly limits
n_stns = len(request["stations"]["stns"])
if n_stns > self._tango_device._resource_mgr.max_stations:
is_request_ok = False
msg = f"Subarray {subarray} Too many stations"
res_mgr = self._tango_device._resource_mgr # use shorter name
# Channels for any beam must be within capability of array assembly
for beam in rq_stn_beams:
if len(beam["freq_ids"]) > res_mgr.max_channels:
is_request_ok = False
msg = f"Subarray {subarray} Too many coarse channels"
break
if not is_request_ok:
# Subarray configuration request did not pass checks
self.logger.info(msg)
rslt = {
"success": False,
"has_config": False,
"processor_fqdns": [],
"msg": msg,
}
return json.dumps(rslt)
# Request passes checks. Try to reserve resources for it
proc_fqdns = []
txt = "Get scan resources"
txt += f" subarray {subarray} adminmode={request['status']}"
self.logger.info(txt)
(config_ok, msg) = res_mgr.cfg_scan2(request)
self.logger.info(msg)
# Any configure may update params, even if it fails
# push change of VCT attribute
self._tango_device.update_sps_routes_attr()
self._tango_device.update_internal_alveo_attr()
# push because configured -> subarray now has scan_status
self._tango_device.update_internal_subarray_attr()
self._tango_device.update_sdp_route_attr()
self._tango_device.update_sw_route_attr()
self._tango_device.update_ip_for_arp()
alveo_serials = res_mgr.get_subarray_alveos(subarray)
proc_fqdns = [
self._tango_device._proc_dev_fqdn[serial]
for serial in alveo_serials
]
if not config_ok:
# if we don't have resources, request should be marked as bad
is_request_ok = False
rslt = {
"success": is_request_ok,
"has_config": config_ok,
"processor_fqdns": proc_fqdns,
"msg": msg,
}
return json.dumps(rslt)
class ReleaseAllCapabilitiesCommand(FastCommand):
"""ReleaseCapabilities Command object releases P4/Alveo hardware
capabilities that a subarray no longer needs"""
def __init__(self, tango_device, logger):
super().__init__(logger)
self._tango_device = tango_device
def do(self, argin):
"""Perform End command: release capabilities held by subarray"""
request = json.loads(argin)
subarray_id = request["subarray_id"]
msg = f"Release scan resources held by subarray {subarray_id}"
self.logger.info(msg)
# perform the release
self._tango_device._resource_mgr.cfg_end2(subarray_id)
# push change of state attribute
self._tango_device.update_sps_routes_attr()
self._tango_device.update_internal_alveo_attr()
self._tango_device.update_sw_route_attr()
self._tango_device.update_ip_for_arp()
# Push scan status because not configured -> no scan status
self._tango_device.update_internal_subarray_attr()
self._tango_device.update_sdp_route_attr()
self.logger.info("End success")
return msg
class RunScanCommand(FastCommand):
"""RunScan Command object publishes whether a subarray is or is not
running a scan, so that Processors can read and determine
whether they should generate output
"""
def __init__(self, tango_device, logger):
super().__init__(logger)
self._tango_device = tango_device
def do(self, argin):
"""Run/stop subarray scan by setting attributes"""
request = json.loads(argin)
subarray_id = request["subarray_id"]
should_scan = request["scan"]
scan_id = request["scan_id"]
txt = "start" if should_scan else "stop"
msg = f"Subarray {subarray_id} scan {txt}"
self.logger.info(msg)
self._tango_device._resource_mgr.enable_scan(
subarray_id, should_scan, scan_id
)
self._tango_device.update_internal_subarray_attr()
return msg
# Run server
def main(args=None, **kwargs):
"""Main function of the LowCbfAllocator module."""
# PROTECTED REGION ID(LowCbfAllocator.main) ENABLED START #
return run((LowCbfAllocator,), args=args, **kwargs)
# PROTECTED REGION END # // LowCbfAllocator.main
if __name__ == "__main__":
main()