Source code for ska_tango_base.csp_subelement_master

# -*- coding: utf-8 -*-
#
# This file is part of the CspSubElementMaster project
#
#
#
# Distributed under the terms of the GPL license.
# See LICENSE.txt for more info.

""" CspSubElementMaster

Master device for SKA CSP Subelement.
"""
# PROTECTED REGION ID(CspSubElementMaster.additionnal_import) ENABLED START #
# Python standard library
from collections import defaultdict
# Tango imports
import tango
from tango import DebugIt, AttrWriteType
from tango.server import run, attribute, command, device_property

# SKA specific imports

from ska_tango_base import SKAMaster
from ska_tango_base.commands import ResultCode, ResponseCommand
from ska_tango_base.control_model import AdminMode
from ska_tango_base.faults import CommandError
# PROTECTED REGION END #    //  CspSubElementMaster.additionnal_import

__all__ = ["CspSubElementMaster", "main"]


[docs]class CspSubElementMaster(SKAMaster): """ Master device for SKA CSP Subelement. **Properties:** - Device Property PowerDelayStandbyOn - Delay in sec between power-up stages in Standby<-> On transitions. - Type:'DevFloat' PowerDelayStandByOff - Delay in sec between power-up stages in Standby-> Off transition. - Type:'DevFloat' """ # PROTECTED REGION ID(CspSubElementMaster.class_variable) ENABLED START # # PROTECTED REGION END # // CspSubElementMaster.class_variable # ----------------- # Device Properties # ----------------- PowerDelayStandbyOn = device_property( dtype='DevFloat', default_value=2.0 ) PowerDelayStandbyOff = device_property( dtype='DevFloat', default_value=1.5 ) # ---------- # Attributes # ---------- powerDelayStandbyOn = attribute( dtype='DevFloat', access=AttrWriteType.READ_WRITE, label="powerDelayStandbyOn", unit="sec.", doc="Delay in sec between the power-up stages in Standby<->On transitions.", ) """Device attribute.""" powerDelayStandbyOff = attribute( dtype='DevFloat', access=AttrWriteType.READ_WRITE, label="powerDelayStandbyOff", unit="sec", doc="Delay in sec between the power-up stages in Standby->Off transitions.", ) """Device attribute.""" onProgress = attribute( dtype='DevUShort', label="onProgress", max_value=100, min_value=0, doc="Progress percentage of the command execution.", ) """Device attribute.""" onMaximumDuration = attribute( dtype='DevFloat', access=AttrWriteType.READ_WRITE, label="onMaximumDuration", unit="sec.", doc="The expected maximum duration (sec.) to execute the On command.", ) """Device attribute.""" onMeasuredDuration = attribute( dtype='DevFloat', label="onMeasuredDuration", unit="sec", doc="The measured time (sec) taken to execute the command.", ) """Device attribute.""" standbyProgress = attribute( dtype='DevUShort', label="standbyProgress", max_value=100, min_value=0, doc="Progress percentage of the command execution.", ) """Device attribute.""" standbyMaximumDuration = attribute( dtype='DevFloat', access=AttrWriteType.READ_WRITE, label="standbyMaximumDuration", unit="sec.", doc="The expected maximum duration (sec.) to execute the Standby command.", ) """Device attribute.""" standbyMeasuredDuration = attribute( dtype='DevFloat', label="standbyMeasuredDuration", unit="sec", doc="The measured time (sec) taken to execute the Standby command.", ) """Device attribute.""" offProgress = attribute( dtype='DevUShort', label="offProgress", max_value=100, min_value=0, doc="Progress percentage of the command execution.", ) """Device attribute.""" offMaximumDuration = attribute( dtype='DevFloat', access=AttrWriteType.READ_WRITE, label="offMaximumDuration", unit="sec.", doc="The expected maximum duration (sec.) to execute the Off command.", ) """Device attribute.""" offMeasuredDuration = attribute( dtype='DevFloat', label="offMeasuredDuration", unit="sec", doc="The measured time (sec) taken to execute the Off command.", ) """Device attribute.""" totalOutputDataRateToSdp = attribute( dtype='DevFloat', label="totalOutputDataRateToSdp", unit="GB/s", doc="Report the total link expected output data rate.", ) """Device attribute.""" loadFirmwareProgress = attribute( dtype='DevUShort', label="loadFirmwareProgress", max_value=100, min_value=0, doc="The command progress percentage.", ) """Device attribute.""" loadFirmwareMaximumDuration = attribute( dtype='DevFloat', access=AttrWriteType.READ_WRITE, label="loadFirmwareMaximumDuration", unit="sec", doc="The expected maximum duration (in sec) for command execution.", ) """Device attribute.""" loadFirmwareMeasuredDuration = attribute( dtype='DevFloat', label="loadFirmwareMeasuredDuration", unit="sec", doc="The command execution measured duration (in sec).", ) """Device attribute.""" # --------------- # General methods # ---------------
[docs] def init_command_objects(self): """ Sets up the command objects """ super().init_command_objects() device_args = (self, self.state_model, self.logger) self.register_command_object( "LoadFirmware", self.LoadFirmwareCommand(*device_args) ) self.register_command_object( "PowerOnDevices", self.PowerOnDevicesCommand(*device_args) ) self.register_command_object( "PowerOffDevices", self.PowerOffDevicesCommand(*device_args) ) self.register_command_object( "ReInitDevices", self.ReInitDevicesCommand(*device_args) )
[docs] class InitCommand(SKAMaster.InitCommand): """ A class for the CspSubElementMaster's init_device() "command". """
[docs] def do(self): """ Stateless hook for device initialisation. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ super().do() device = self.target # _cmd_progress: command execution's progress percentage # implemented as a default dictionary: # keys: the command name in lower case(on, off, standby,..) # values: the progress percentage (default 0) device._cmd_progress = defaultdict(int) # _cmd_maximun_duration: command execution's expected maximum duration (msec.) # implemented as a default dictionary: # keys: the command name in lower case(on, off, standby,..) # values: the expected maximum duration in sec. device._cmd_maximum_duration = defaultdict(float) # _cmd_measure_duration: command execution's measured duration (msec.) # implemented as a default dictionary: # keys: the command name in lower case(on, off, standby,..) # values: the measured execution time (sec.) device._cmd_measured_duration = defaultdict(float) device._total_output_rate_to_sdp = 0.0 # initialise using defaults in device properties device._power_delay_standy_on = device.PowerDelayStandbyOn device._power_delay_standy_off = device.PowerDelayStandbyOff message = "CspSubElementMaster Init command completed OK" device.logger.info(message) return (ResultCode.OK, message)
[docs] def always_executed_hook(self): """Method always executed before any TANGO command is executed."""
# PROTECTED REGION ID(CspSubElementMaster.always_executed_hook) ENABLED START # # PROTECTED REGION END # // CspSubElementMaster.always_executed_hook
[docs] def delete_device(self): """Hook to delete resources allocated in init_device. This method allows for any memory or other resources allocated in the init_device method to be released. This method is called by the device destructor and by the device Init command. """
# PROTECTED REGION ID(CspSubElementMaster.delete_device) ENABLED START # # PROTECTED REGION END # // CspSubElementMaster.delete_device # ------------------ # Attributes methods # ------------------
[docs] def read_powerDelayStandbyOn(self): # PROTECTED REGION ID(CspSubElementMaster.powerDelayStandbyOn_read) ENABLED START # """Return the powerDelayStandbyOn attribute.""" return self._power_delay_standy_on
# PROTECTED REGION END # // CspSubElementMaster.powerDelayStandbyOn_read
[docs] def write_powerDelayStandbyOn(self, value): # PROTECTED REGION ID(CspSubElementMaster.powerDelayStandbyOn_write) ENABLED START # """Set the powerDelayStandbyOn attribute.""" self._power_delay_standy_on = value
# PROTECTED REGION END # // CspSubElementMaster.powerDelayStandbyOn_write
[docs] def read_onProgress(self): # PROTECTED REGION ID(CspSubElementMaster.onProgress_read) ENABLED START # """Return the onProgress attribute.""" return self._cmd_progress['on']
# PROTECTED REGION END # // CspSubElementMaster.onProgress_read
[docs] def read_onMaximumDuration(self): # PROTECTED REGION ID(CspSubElementMaster.onMaximumDuration_read) ENABLED START # """Return the onMaximumDuration attribute.""" return self._cmd_maximum_duration['on']
# PROTECTED REGION END # // CspSubElementMaster.onMaximumDuration_read
[docs] def write_onMaximumDuration(self, value): # PROTECTED REGION ID(CspSubElementMaster.onMaximumDuration_write) ENABLED START # """Set the onMaximumDuration attribute.""" self._cmd_maximum_duration['on'] = value
# PROTECTED REGION END # // CspSubElementMaster.onMaximumDuration_write
[docs] def read_onMeasuredDuration(self): # PROTECTED REGION ID(CspSubElementMaster.onMeasuredDuration_read) ENABLED START # """Return the onMeasuredDuration attribute.""" return self._cmd_measured_duration['on']
# PROTECTED REGION END # // CspSubElementMaster.onMeasuredDuration_read
[docs] def read_standbyProgress(self): # PROTECTED REGION ID(CspSubElementMaster.standbyProgress_read) ENABLED START # """Return the standbyProgress attribute.""" return self._cmd_progress['standby']
# PROTECTED REGION END # // CspSubElementMaster.standbyProgress_read
[docs] def read_standbyMaximumDuration(self): # PROTECTED REGION ID(CspSubElementMaster.standbyMaximumDuration_read) ENABLED START # """Return the standbyMaximumDuration attribute.""" return self._cmd_maximum_duration['standby']
# PROTECTED REGION END # // CspSubElementMaster.standbyMaximumDuration_read
[docs] def write_standbyMaximumDuration(self, value): # PROTECTED REGION ID(CspSubElementMaster.standbyMaximumDuration_write) ENABLED START # """Set the standbyMaximumDuration attribute.""" self._cmd_maximum_duration['standby'] = value
# PROTECTED REGION END # // CspSubElementMaster.standbyMaximumDuration_write
[docs] def read_standbyMeasuredDuration(self): # PROTECTED REGION ID(CspSubElementMaster.standbyMeasuredDuration_read) ENABLED START # """Return the standbyMeasuredDuration attribute.""" return self._cmd_measured_duration['standby']
# PROTECTED REGION END # // CspSubElementMaster.standbyMeasuredDuration_read
[docs] def read_offProgress(self): # PROTECTED REGION ID(CspSubElementMaster.offProgress_read) ENABLED START # """Return the offProgress attribute.""" return self._cmd_progress['off']
# PROTECTED REGION END # // CspSubElementMaster.offProgress_read
[docs] def read_offMaximumDuration(self): # PROTECTED REGION ID(CspSubElementMaster.offMaximumDuration_read) ENABLED START # """Return the offMaximumDuration attribute.""" return self._cmd_maximum_duration['off']
# PROTECTED REGION END # // CspSubElementMaster.offMaximumDuration_read
[docs] def write_offMaximumDuration(self, value): # PROTECTED REGION ID(CspSubElementMaster.offMaximumDuration_write) ENABLED START # """Set the offMaximumDuration attribute.""" self._cmd_maximum_duration['off'] = value
# PROTECTED REGION END # // CspSubElementMaster.offMaximumDuration_write
[docs] def read_offMeasuredDuration(self): # PROTECTED REGION ID(CspSubElementMaster.offMeasuredDuration_read) ENABLED START # """Return the offMeasuredDuration attribute.""" return self._cmd_measured_duration['off']
# PROTECTED REGION END # // CspSubElementMaster.offMeasuredDuration_read
[docs] def read_totalOutputDataRateToSdp(self): # PROTECTED REGION ID(CspSubElementMaster.totalOutputDataRateToSdp_read) ENABLED START # """Return the totalOutputDataRateToSdp attribute.""" return self._total_output_rate_to_sdp
# PROTECTED REGION END # // CspSubElementMaster.totalOutputDataRateToSdp_read
[docs] def read_powerDelayStandbyOff(self): # PROTECTED REGION ID(CspSubElementMaster.powerDelayStandbyOff_read) ENABLED START # """Return the powerDelayStandbyOff attribute.""" return self._power_delay_standy_off
# PROTECTED REGION END # // CspSubElementMaster.powerDelayStandbyOff_read
[docs] def write_powerDelayStandbyOff(self, value): # PROTECTED REGION ID(CspSubElementMaster.powerDelayStandbyOff_write) ENABLED START # """Set the powerDelayStandbyOff attribute.""" self._power_delay_standy_off = value
# PROTECTED REGION END # // CspSubElementMaster.powerDelayStandbyOff_write
[docs] def read_loadFirmwareProgress(self): # PROTECTED REGION ID(CspSubElementMaster.loadFirmwareProgress_read) ENABLED START # """Return the loadFirmwareProgress attribute.""" return self._cmd_progress['loadfirmware']
# PROTECTED REGION END # // CspSubElementMaster.loadFirmwareProgress_read
[docs] def read_loadFirmwareMaximumDuration(self): # PROTECTED REGION ID(CspSubElementMaster.loadFirmwareMaximumDuration_read) ENABLED START # """Return the loadFirmwareMaximumDuration attribute.""" return self._cmd_maximum_duration['loadfirmware']
# PROTECTED REGION END # // CspSubElementMaster.loadFirmwareMaximumDuration_read
[docs] def write_loadFirmwareMaximumDuration(self, value): # PROTECTED REGION ID(CspSubElementMaster.loadFirmwareMaximumDuration_write) ENABLED START # """Set the loadFirmwareMaximumDuration attribute.""" self._cmd_maximum_duration['loadfirmware'] = value
# PROTECTED REGION END # // CspSubElementMaster.loadFirmwareMaximumDuration_write
[docs] def read_loadFirmwareMeasuredDuration(self): # PROTECTED REGION ID(CspSubElementMaster.loadFirmwareMeasuredDuration_read) ENABLED START # """Return the loadFirmwareMeasuredDuration attribute.""" return self._cmd_measured_duration['loadfirmware']
# PROTECTED REGION END # // CspSubElementMaster.loadFirmwareMeasuredDuration_read # -------- # Commands # --------
[docs] class LoadFirmwareCommand(ResponseCommand): """ A class for the CspSubElementMaster's LoadFirmware command. """
[docs] def do(self, argin): """ Stateless hook for device LoadFirmware() command. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ message = "LoadFirmware command completed OK" return (ResultCode.OK, message)
[docs] def check_allowed(self): """ Check if the command is in the proper state (State/adminMode) to be executed. The master device has to be in OFF/MAINTENACE to process the LoadFirmware command. :raises: ``CommandError`` if command not allowed :return: ``True`` if the command is allowed. :rtype: boolean """ if (self.state_model.op_state == tango.DevState.OFF and self.state_model.admin_mode == AdminMode.MAINTENANCE): return True msg = "{} not allowed in {}/{}".format(self.name, self.state_model.op_state, AdminMode(self.state_model.admin_mode).name) raise CommandError(msg)
[docs] class PowerOnDevicesCommand(ResponseCommand): """ A class for the CspSubElementMaster's PowerOnDevices command. """
[docs] def do(self, argin): """ Stateless hook for device PowerOnDevices() command. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ message = "PowerOnDevices command completed OK" return (ResultCode.OK, message)
[docs] def check_allowed(self): """ Check if the command is in the proper state to be executed. The master device has to be in ON to process the PowerOnDevices command. : raises: ``CommandError`` if command not allowed : return: ``True`` if the command is allowed. : rtype: boolean """ if self.state_model.op_state == tango.DevState.ON: return True msg = "{} not allowed in {}".format(self.name, self.state_model.op_state) raise CommandError(msg)
[docs] class PowerOffDevicesCommand(ResponseCommand): """ A class for the CspSubElementMaster's PowerOffDevices command. """
[docs] def do(self, argin): """ Stateless hook for device PowerOffDevices() command. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ message = "PowerOffDevices command completed OK" return (ResultCode.OK, message)
[docs] def check_allowed(self): """ Check if the command is in the proper state to be executed. The master device has to be in ON to process the PowerOffDevices command. : raises: ``CommandError`` if command not allowed : return: ``True`` if the command is allowed. : rtype: boolean """ if self.state_model.op_state == tango.DevState.ON: return True msg = "{} not allowed in {}".format(self.name, self.state_model.op_state) raise CommandError(msg)
[docs] class ReInitDevicesCommand(ResponseCommand): """ A class for the CspSubElementMaster's ReInitDevices command. """
[docs] def do(self, argin): """ Stateless hook for device ReInitDevices() command. :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ message = "ReInitDevices command completed OK" return (ResultCode.OK, message)
[docs] def check_allowed(self): """ Check if the command is in the proper state to be executed. The master device has to be in ON to process the ReInitDevices command. : raises: ``CommandError`` if command not allowed : return: ``True`` if the command is allowed. : rtype: boolean """ if self.state_model.op_state == tango.DevState.ON: return True msg = "{} not allowed in {}".format(self.name, self.state_model.op_state) raise CommandError(msg)
[docs] def is_LoadFirmware_allowed(self): """ Check if the LoadFirmware command is allowed in the current state. :raises: ``CommandError`` if command not allowed :return: ``True`` if command is allowed :rtype: boolean """ command = self.get_command_object("LoadFirmware") return command.check_allowed()
[docs] @command( dtype_in='DevVarStringArray', doc_in="The file name or a pointer to the filename , " "the list of components that use software or firmware package (file)," "checksum or signing", dtype_out='DevVarLongStringArray', ) @DebugIt() def LoadFirmware(self, argin): # PROTECTED REGION ID(CspSubElementMaster.LoadFirmware) ENABLED START # """ Deploy new versions of software and firmware and trigger a restart so that a Component initializes using a newly deployed version. :param argin: A list of three strings: - The file name or a pointer to the filename specified as URL. - the list of components that use software or firmware package (file), - checksum or signing Ex: ['file://firmware.txt','test/dev/1, test/dev/2, test/dev/3', '918698a7fea3fa9da5996db001d33628'] :type argin: 'DevVarStringArray' :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ command = self.get_command_object("LoadFirmware") (return_code, message) = command(argin) return [[return_code], [message]]
# PROTECTED REGION END # // CspSubElementMaster.LoadFirmware
[docs] def is_PowerOnDevices_allowed(self): """ Check if the PowerOnDevice command is allowed in the current state. :raises ``tango.DevFailed`` if command not allowed :return ``True`` if command is allowed :rtype: boolean """ command = self.get_command_object("PowerOnDevices") return command.check_allowed()
[docs] @command( dtype_in='DevVarStringArray', doc_in="The list of FQDNs to power-up", dtype_out='DevVarLongStringArray', doc_out="ReturnType, `informational message`", ) @DebugIt() def PowerOnDevices(self, argin): # PROTECTED REGION ID(CspSubElementMaster.PowerOnDevices) ENABLED START # """ Power-on a selected list of devices. :param argin: List of devices (FQDNs) to power-on. :type argin: 'DevVarStringArray' :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ command = self.get_command_object("PowerOnDevices") (return_code, message) = command(argin) return [[return_code], [message]]
# PROTECTED REGION END # // CspSubElementMaster.PowerOnDevices
[docs] def is_PowerOffDevices_allowed(self): """ Check if the PowerOffDevices command is allowed in the current state. :raises: ``tango.DevFailed`` if command not allowed :return: ``True`` if command is allowed :rtype: boolean """ command = self.get_command_object("PowerOffDevices") return command.check_allowed()
[docs] @command( dtype_in='DevVarStringArray', doc_in="List of FQDNs to power-off", dtype_out='DevVarLongStringArray', doc_out="ReturnType, `informational message`", ) @DebugIt() def PowerOffDevices(self, argin): # PROTECTED REGION ID(CspSubElementMaster.PowerOffDevices) ENABLED START # """ Power-off a selected list of devices. :param argin: List of devices (FQDNs) to power-off. :type argin: 'DevVarStringArray' :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ command = self.get_command_object("PowerOffDevices") (return_code, message) = command(argin) return [[return_code], [message]]
# PROTECTED REGION END # // CspSubElementMaster.PowerOffDevices
[docs] def is_ReInitDevices_allowed(self): """ Check if the ReInitDevices command is allowed in the current state. :raises: ``tango.DevFailed`` if command not allowed :return: ``True`` if command is allowed :rtype: boolean """ command = self.get_command_object("ReInitDevices") return command.check_allowed()
[docs] @command( dtype_in='DevVarStringArray', doc_in="List of devices to re-initialize", dtype_out='DevVarLongStringArray', doc_out="ReturnType, `informational message`", ) @DebugIt() def ReInitDevices(self, argin): # PROTECTED REGION ID(CspSubElementMaster.ReInitDevices) ENABLED START # """ Reinitialize the devices passed in the input argument. The exact functionality may vary for different devices and sub-systems, each TANGO Device/Server should define what does ReInitDevices means. Ex: ReInitDevices FPGA -> reset ReInitDevices Master -> Restart ReInitDevices Leaf PC -> reboot :param argin: List of devices (FQDNs) to re-initialize. :type argin: 'DevVarStringArray' :return: A tuple containing a return code and a string message indicating status. The message is for information purpose only. :rtype: (ResultCode, str) """ command = self.get_command_object("ReInitDevices") (return_code, message) = command(argin) return [[return_code], [message]]
# PROTECTED REGION END # // CspSubElementMaster.ReInitDevices # ---------- # Run server # ---------- def main(args=None, **kwargs): """Main function of the CspSubElementMaster module.""" # PROTECTED REGION ID(CspSubElementMaster.main) ENABLED START # return run((CspSubElementMaster,), args=args, **kwargs) # PROTECTED REGION END # // CspSubElementMaster.main if __name__ == '__main__': main()