from builtins import range
__author__ = 'lessju'
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging
import time
from copy import copy
[docs]
class PreAdu(FirmwareBlock):
""" AdcPowerMeter plugin """
@compatibleboards(BoardMake.TpmBoard,BoardMake.Tpm16Board)
@friendlyname('tpm_preadu')
@maxinstances(2)
def __init__(self, board, **kwargs):
""" AdcPowerMeter initialiser
:param board: Pointer to board instance
"""
super(PreAdu, self).__init__(board)
if 'preadu_id' not in list(kwargs.keys()):
raise PluginError("PreAdu: preadu_id required")
self._preadu_id = kwargs['preadu_id']
# Passband filter values
self._HIGH_PASSBAND = 0x3
self._LOW_PASSBAND = 0x5
# Define a filter per channel in order to be able to enable and disable
# RF output from the channel
self._nof_channels = 16
self.channel_filters = [0x0] * self._nof_channels
self._passband = 0x0
# Define cpld register addresses
self._cpld_reg_addr = [0x70000000, 0x70000004, 0x70000008, 0x7000000C]
self._cpld_preadu_cntl_addr = 0x70000010
# Define cpld register constants
self._nof_cpld_regs = len(self._cpld_reg_addr)
self._cpld_subword_width = 8
self._nof_cpld_subwords = int(32 / self._cpld_subword_width)
# Read current configuration
self.read_configuration()
# Check if preadu is present, sets self.is_present
self.check_present()
#######################################################################################
[docs]
def select_low_passband(self):
""" Select low passband """
self._passband = 0x5
logging.debug("preADU selected low passband!")
[docs]
def select_high_passband(self):
""" Select high passband """
self._passband = 0x3
logging.debug("preADU selected high passband!")
[docs]
def disable_channels(self, channels=None):
""" Disable channel output
:param channels: Input channel to disable (None for all channels) """
if channels is None:
channels = list(range(self._nof_channels))
for channel in channels:
if not 0 <= channel <= self._nof_channels:
logging.warning(f"Cannot disable channel {channel}, invalid channel")
self.channel_filters[channel] = 0x0
logging.debug(f"preADU channel {channel} has been disabled")
[docs]
def enable_channels(self, channels=None):
""" Enable channel output
:param channels: Input channels to enable (None for all channels)"""
if channels is None:
channels = list(range(self._nof_channels))
for channel in channels:
if not 0 <= channel <= self._nof_channels:
logging.warning(f"Cannot enable channel {channel}, invalid channel")
self.channel_filters[channel] = self._passband
logging.debug(f"preADU channel {channel} has been enabled")
[docs]
def set_attenuation(self, attenuation, channels=None):
""" Set attenuation level for a particular channel
:param attenuation: Value of attenuation
:param channels: Preadu channels (None if to be applied to all channels """
if attenuation % 1.0 != 0:
logging.warning(f"TPM 1.2 PreADU attenuation precision of 1.00 supported. Value {attenuation} specified will be rounded to {round(attenuation)}")
attenuation = int(round(attenuation))
if channels is None:
channels = list(range(self._nof_channels))
# Sanity checks
if not 0 <= attenuation < 2 ** 5:
logging.warning(f"Cannot set PREADU attenuation {attenuation}, out of range")
return
for channel in channels:
if not 0 <= channel <= self._nof_channels:
logging.warning(f"Cannot set attenuation for channel {channel}, invalid channel")
# Apply attenuation with currently selected filter
# Set Passband to chan_att[2:0]
chan_att = self.set_bits(input=0, bit_range=(0, 2), value=self._passband)
# Set attenuation to chan_att[7:3]
chan_att = self.set_bits(input=chan_att, bit_range=(3, 7), value=attenuation)
self.channel_filters[channel] = chan_att
logging.debug(f"preADU channel {channel} attenuation has been set to {hex(chan_att)}")
[docs]
def get_attenuation(self):
"""
Returns the current attenuation value.
Stored in channel_filters[7:3].
return integer
"""
output = [None] * self._nof_channels
for channel_index in range(self._nof_channels):
output[channel_index] = float(self.get_bits(input=self.channel_filters[channel_index], bit_range=(3, 7)))
return output
[docs]
def get_passband(self):
"""
Returns the current passband value.
Stored in channel_filters[2:0].
return integer
"""
output = [None] * self._nof_channels
for channel_index in range(self._nof_channels):
output[channel_index] = self.get_bits(input=self.channel_filters[channel_index], bit_range=(0, 2))
return output
[docs]
def powered_on(self):
reg = self.board['board.regfile.frontend_3v5_en']
return self.get_bit(input=reg, bit=self._preadu_id) > 0
[docs]
def switch_on(self):
""" Switch preadu on """
# Switch on preadu
self.board['board.regfile.frontend_3v5_en'] = self.set_bit(input=self.board['board.regfile.frontend_3v5_en'],
bit=self._preadu_id,
value=1)
time.sleep(0.2)
# Check that preadu has been switched on properly
if self.get_bit(input=self.board['board.regfile.fe_cal_status'], bit=self._preadu_id) == 0:
logging.warning(f"Error! preADU power is not high: {self.board['board.regfile.fe_cal_status'] & 0x3}")
return
logging.debug("preADU power on done!")
# Read and load preadu configuration and
self.eep_read()
self.write_configuration()
[docs]
def switch_off(self):
""" Switch preadu off """
# sets the correct bit corresponding to the preADU id to 0, without effecting any of the other bits.
self.board['board.regfile.frontend_3v5_en'] = self.set_bit(input=self.board['board.regfile.fe_cal_status'],
bit=self._preadu_id,
value=0)
time.sleep(0.2)
# Check that preadu has been switched off properly
if self.get_bit(input=self.board['board.regfile.fe_cal_status'], bit=self._preadu_id) == 0:
logging.debug("preADU power off done!")
else:
logging.warning(f"Error! preADU power is not low: {self.board['board.regfile.fe_cal_status'] & 0x3}")
[docs]
def write_configuration(self, update_preadu_reg=True):
""" Write configuration to preadu
WARNING if update_preadu_reg is false, the preadu shift registers and attenuation registers out of sync
"""
# Each channel occupies 8-bit subword within a 32-bit word (one cpld shift register)
for register_num, addr in enumerate(self._cpld_reg_addr):
config = 0
for subword_num in range(self._nof_cpld_subwords):
flat_channel_number = register_num * self._nof_cpld_subwords + subword_num
from_bit = self._cpld_subword_width * subword_num
to_bit = self._cpld_subword_width * (subword_num + 1) - 1
value_to_be_written = self.bit_reverse(self.channel_filters[flat_channel_number])
config = self.set_bits(input=config, bit_range=(from_bit, to_bit), value=value_to_be_written)
self.board[addr] = config
logging.debug(f"cpld shift register {register_num} has been set to {hex(config)}")
cpld_preadu_cntl_value = 0
# Shifts the cpld shift register to the preadu shift register
cpld_preadu_cntl_value = self.set_bit(input=cpld_preadu_cntl_value, bit=0, value=1)
if update_preadu_reg:
# And updates the preADU attenuation register with the values in the preadu shift register
cpld_preadu_cntl_value = self.set_bit(input=cpld_preadu_cntl_value, bit=1, value=1)
# To the correct preADU
cpld_preadu_cntl_value = self.set_bit(input=cpld_preadu_cntl_value, bit=2, value=self._preadu_id)
# set preADU cntl register on the cpld to the correct value
self.board[self._cpld_preadu_cntl_addr] = cpld_preadu_cntl_value
# wait for the shifting of registers to have occurred
while self.get_bit(input=self.board[self._cpld_preadu_cntl_addr], bit=0) != 0:
time.sleep(0.01)
logging.debug(f"preADU registers have been written to")
[docs]
def read_configuration(self):
""" Read configuration from preadu """
cpld_preadu_cntl_value = 0
# Shifts the cpld shift register to the preadu shift register,
# and the preadu shift register to the cpld shift register
cpld_preadu_cntl_value = self.set_bit(input=cpld_preadu_cntl_value, bit=0, value=1)
# of the correct preADU
cpld_preadu_cntl_value = self.set_bit(input=cpld_preadu_cntl_value, bit=2, value=self._preadu_id)
# set preADU cntl register on the cpld to the correct value
self.board[self._cpld_preadu_cntl_addr] = cpld_preadu_cntl_value
# wait for the shifting of registers to have occurred
while self.get_bit(input=self.board[self._cpld_preadu_cntl_addr], bit=0) != 0:
time.sleep(0.01)
# Each channel occupies 8-bit within a 32-bit word
for register_num, addr in enumerate(self._cpld_reg_addr):
config = self.board[addr]
# Extract channel values
for subword_num in range(self._nof_cpld_subwords):
flat_channel_number = register_num * self._nof_cpld_subwords + subword_num
from_bit = self._cpld_subword_width * subword_num
to_bit = self._cpld_subword_width * (subword_num + 1) - 1
subword_value = self.get_bits(input=config, bit_range=(from_bit, to_bit))
self.channel_filters[flat_channel_number] = self.bit_reverse(subword_value)
logging.debug(f"channel_filters[{flat_channel_number}] has been set to {hex(self.bit_reverse(subword_value))}")
# set preADU cntl register on the cpld, to switch back the cpld and preadu shift registers
self.board[self._cpld_preadu_cntl_addr] = cpld_preadu_cntl_value
# wait for the shifting of registers to have occurred
while self.get_bit(input=self.board[self._cpld_preadu_cntl_addr], bit=0) != 0:
time.sleep(0.01)
# NOTE: rewrite of method removed to test on AAVS2
# def eep_write(self):
# """ Write current configuration to non-volatile memory """
# start_index = 0x10 + (16 * self._preadu_id)
# for register_num in range(self._nof_cpld_regs):
# config = 0
# for subword_num in range(self._nof_cpld_subwords):
# flat_channel_number = register_num * self._nof_cpld_subwords + subword_num
# from_bit = self._cpld_subword_width * subword_num
# to_bit = self._cpld_subword_width * (subword_num + 1) - 1
# config = self.set_bits(input=config, bit_range=(from_bit, to_bit), value=self.channel_filters[flat_channel_number])
# eep_index = start_index + self._nof_cpld_subwords * register_num
# self.board.tpm_cpld.eep_wr32(eep_index, config)
# logging.debug(f"eep memory {register_num} has been set to {hex(config)}")
# NOTE: untested - not convinced this works as required. Peter - 03/07/2023
[docs]
def eep_write(self):
""" Write current configuration to non-volatile memory """
self.board.tpm_cpld.eep_wr32(0x10 + (16 * self._preadu_id),
self.bit_reverse(self.channel_filters[0] +
(self.channel_filters[1] << 8) +
(self.channel_filters[2] << 16) +
(self.channel_filters[3] << 24)))
self.board.tpm_cpld.eep_wr32(0x10 + (16 * self._preadu_id) + 4,
self.bit_reverse(self.channel_filters[4] +
(self.channel_filters[5] << 8) +
(self.channel_filters[6] << 16) +
(self.channel_filters[7] << 24)))
self.board.tpm_cpld.eep_wr32(0x10 + (16 * self._preadu_id) + 8,
self.bit_reverse(self.channel_filters[8] +
(self.channel_filters[9] << 8) +
(self.channel_filters[10] << 16) +
(self.channel_filters[11] << 24)))
self.board.tpm_cpld.eep_wr32(0x10 + (16 * self._preadu_id) + 12,
self.bit_reverse(self.channel_filters[12] +
(self.channel_filters[13] << 8) +
(self.channel_filters[14] << 16) +
(self.channel_filters[15] << 24)))
# NOTE: rewrite of method removed to test on AAVS2
# def eep_read(self):
# """ Read configuration from non-volatile memory"""
# start_index = 0x10 + (16 * self._preadu_id)
# for register_num in range(self._nof_cpld_regs):
# eep_index = start_index + self._nof_cpld_subwords * register_num
# config = self.bit_reverse(self.board.tpm_cpld.eep_rd32(eep_index))
# logging.debug(f"read {hex(config)}] from eeep memory")
# for subword_num in range(self._nof_cpld_subwords):
# flat_channel_number = register_num * self._nof_cpld_subwords + subword_num
# from_bit = self._cpld_subword_width * subword_num
# to_bit = self._cpld_subword_width * (subword_num + 1) - 1
# self.channel_filters[flat_channel_number] = self.get_bits(input=config, bit_range=(from_bit, to_bit))
# logging.debug(f"channel_filters[{flat_channel_number}] has been set to {hex(self.get_bits(input=config, bit_range=(from_bit, to_bit)))}")
# NOTE: untested - not convinced this works as required. Peter - 03/07/2023
[docs]
def eep_read(self):
""" Read configuration from non-volatile memory"""
start_index = 0x10 + (16 * self._preadu_id)
config0 = self.bit_reverse(self.board.tpm_cpld.eep_rd32(start_index))
config1 = self.bit_reverse(self.board.tpm_cpld.eep_rd32(start_index + 4))
config2 = self.bit_reverse(self.board.tpm_cpld.eep_rd32(start_index + 8))
config3 = self.bit_reverse(self.board.tpm_cpld.eep_rd32(start_index + 12))
# Extract channel values
self.channel_filters[0] = config0 & 0xFF
self.channel_filters[1] = (config0 >> 8) & 0xFF
self.channel_filters[2] = (config0 >> 16) & 0xFF
self.channel_filters[3] = (config0 >> 24) & 0xFF
self.channel_filters[4] = config1 & 0xFF
self.channel_filters[5] = (config1 >> 8) & 0xFF
self.channel_filters[6] = (config1 >> 16) & 0xFF
self.channel_filters[7] = (config1 >> 24) & 0xFF
self.channel_filters[8] = config2 & 0xFF
self.channel_filters[9] = (config2 >> 8) & 0xFF
self.channel_filters[10] = (config2 >> 16) & 0xFF
self.channel_filters[11] = (config2 >> 24) & 0xFF
self.channel_filters[12] = config3 & 0xFF
self.channel_filters[13] = (config3 >> 8) & 0xFF
self.channel_filters[14] = (config3 >> 16) & 0xFF
self.channel_filters[15] = (config3 >> 24) & 0xFF
[docs]
def check_present(self):
"""
Detects if a preADU is present.
Sets attribute "is_present".
A gain of 21 is written to all channels and read back.
If the first gain value read back is 21, then a preADU
is present.
The starting configuration of the software gains and
preADU hardware gains are copied and restored at the
end of the method.
"""
if not self.powered_on():
logging.warning(f"Preadu powered OFF, unable to check if present")
self.is_present = False
return
# Save starting configuration
starting_configuration_sw = copy(self.channel_filters)
self.read_configuration()
starting_configuration_hw = copy(self.channel_filters)
self.set_attenuation(21)
self.write_configuration(update_preadu_reg=False)
self.read_configuration()
returned_value = self.get_attenuation()[0]
preadu_detected = True if returned_value == 21 else False
if not preadu_detected:
logging.debug(f"got: {returned_value}, expected: 21, assuming preADU is not present")
self.is_present = preadu_detected
# Write back starting configuration
self.channel_filters = starting_configuration_hw
self.write_configuration(update_preadu_reg=False)
self.channel_filters = starting_configuration_sw
return
[docs]
@staticmethod
def bit_reverse(value):
""" Reverse bits in value
:param value: Value to bit reverse """
return int("%d" % (int('{:08b}'.format(value)[::-1], 2)))
[docs]
@staticmethod
def set_bit(input, bit, value):
"""
Sets the specified bit in input with a value.
:param input: value before set bit.
:param bit: position of bit to set.
:param value: value of bit in returned value
"""
if value == 1:
return input | (1 << bit)
else:
return input & ~(1 << bit)
[docs]
@staticmethod
def get_bit(input, bit):
"""
Gets the specified bit in input.
:param input: value to get bit.
:param bit: position of bit.
"""
return (input & (1 << bit)) >> bit
[docs]
def set_bits(self, input, bit_range, value):
"""
Sets the specified bits in input with value.
:param input: value before set bits.
:param bit_range: position of bits to set (from,to).
:param value: value of bits in returned value
"""
output = input
for input_index, output_index in enumerate(range(bit_range[0], bit_range[1] + 1)):
output = self.set_bit(input=output, bit=output_index, value=self.get_bit(input=value, bit=input_index))
return output
[docs]
def get_bits(self, input, bit_range):
"""
Gets the specified bits in an input.
:param input: value to get bit.
:param bit_range: position of bits to get (from,to).
"""
output = 0
for index in range(bit_range[0], bit_range[1] + 1):
output = self.set_bit(input=output, bit=index - bit_range[0], value=self.get_bit(input=input, bit=index))
return output
##################### Superclass method implementations #################################
[docs]
def initialise(self):
""" Initialise AdcPowerMeter """
logging.info("PreAdu has been initialised")
return True
[docs]
def status_check(self):
""" Perform status check
:return: Status
"""
logging.info("PreAdu: Checking status")
return Status.OK
[docs]
def clean_up(self):
""" Perform cleanup
:return: Success
"""
logging.info("PreAdu: Cleaning up")
return True