from __future__ import division
from builtins import range
from math import fabs, cos, sin
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
import logging
__author__ = 'cbelli'
MAX_BEAMS = 48 # Maximum number of beams
[docs]
class BeamfFD(FirmwareBlock):
""" Frequency Domain Beamformer plugin """
@compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board)
@friendlyname('beamf_fd')
@maxinstances(2)
def __init__(self, board, **kwargs):
""" BeamformerFD initialiser
:param board: Pointer to board instance
"""
super(BeamfFD, self).__init__(board)
if 'device' not in list(kwargs.keys()):
raise PluginError("BeamfFD: Require a node instance")
self._device = kwargs['device']
if self._device == Device.FPGA_1:
self._device = 'fpga1'
elif self._device == Device.FPGA_2:
self._device = 'fpga2'
else:
raise PluginError("BeamfFD: Invalid device %s" % self._device)
self.beam_index = [0] * 64
self.region_off = [0] * 64
self.region_sel = [0] * 64
self.nof_chans = 384 # True number of channels
self.nof_hw_chans = 384 # FIXED number of chans in output packet.
# number of independent channel blocks
try:
self.nof_regions = self.board.find_register("fpga1.beamf_fd.region_off")[0].size
except:
self.nof_regions = 16
# Calibration quantities
self.cal_curve = [[[[1 + 0j, 0 + 0j, 0 + 0j, 1 + 0j]] * 512] * 8] * 8
self.antenna_tapering = [1.0] * 8
self.beam_angle = [0.0] * 8
# Rotation matrix
self.rot_matrix = [[1.0, 0.0, 0.0, 1.0]] * 8
try:
self.board['fpga1.dsp_regfile.feature.tile_beamformer_implemented']
self.scale = 2048
except:
self.scale = 128
############################################################################
[docs]
def initialise_beamf(self):
""" Initialise Frequency Domain Beamformer """
# Reset beamformer
self.board['%s.beamf_fd.control.reset' % self._device] = 1
self.board['%s.beamf_fd.control.reset' % self._device] = 0
# Load Delay
self.board['%s.beamf_fd.control.load_delay' % self._device] = 0
# Load Immediate Delay
self.board['%s.beamf_fd.control.load_delay_immediate' % self._device] = 0
# Number of processed channels: 384/8
self.nof_chans = 384 # 384
self.nof_hw_chans = 384 # 384
self.board['%s.beamf_fd.nof_chans' % self._device] = self.nof_hw_chans // 8 # 384
# Test point select
self.board['%s.beamf_fd.tp_sel' % self._device] = 0
# Load time as frame number
self.board['%s.beamf_fd.load_time' % self._device] = 0
# Region select table
region_sel = [0] * 64
self.board['%s.beamf_fd.region_sel' % self._device] = region_sel
# Region offset table
region_offset = [0] * self.nof_regions
self.board['%s.beamf_fd.region_off' % self._device] = region_offset
# Beam index table
beam_index = [0] * self.nof_regions
self.board['%s.beamf_fd.beam_index' % self._device] = beam_index
# Antenna delay and delay rate from beam X antenna Y
del_x_y_init = [0] * 8
self.board['%s.beamf_fd.delay_0_0' % self._device] = del_x_y_init
self.cal_curve = [[[[complex(1.0), complex(0.0), complex(0.0), complex(1.0)]] * 512] * 8] * 8
self.antenna_tapering = [1.0] * 8
self.beam_angle = [0.0] * 8
# Rotation matrix
self.rot_matrix = [[1.0, 0.0, 0.0, 1.0]] * 8
self.compute_calibration_coefs()
self.switch_calibration_bank(force=True)
logging.info("BeamfFD has been initialised")
[docs]
def current_frame(self):
"""
Return current frame processed by the tile beamformer
:returns: current frame number, in units of 256 ADC frames (276,48 us)
"""
return self.board[self._device+'.beamf_cal.current_time']
# Set regions. The method computes the total number of channels and the arrays and writes in the registers
[docs]
def set_regions(self, region_array):
""" Set frequency regions.
Regions are defined in a 2-d array, for a maximum of 16 (48) regions.
Actual limit defined in hardware by the size of the region_off table.
Each element in the array defines a region, with
the form [start_ch, nof_ch, beam_index]
- start_ch: region starting channel (currently must be a
multiple of 2, LS bit discarded)
- nof_ch: size of the region: must be multiple of 8 chans
- beam_index: beam used for this region, range [0:8)
Total number of channels must be <= 384
The routine computes the arrays beam_index, region_off, region_sel,
and the total number of channels nof_chans, and programs it in the HW
:param region_array: Bidimensional array of regions
"""
region_start = 0
region_idx = 0
beam_index = [0] * self.nof_regions
region_off = [0] * self.nof_regions
region_sel = [0] * 64
for region in region_array:
start_ch = region[0]
reg_length = region[1] & 0X1F8
if reg_length > 384:
raise PluginError(f"BeamfFD: Invalid region length {reg_length} in {self._device}")
if start_ch < 0 or start_ch + reg_length > 512:
raise PluginError(f"BeamfFD: Invalid region position in {self._device}")
beam_index[region_idx] = region[2]
if beam_index[region_idx] >= MAX_BEAMS or beam_index[region_idx] < 0:
raise PluginError(f"BeamfFD: Invalid beam index in {self._device}")
region_off[region_idx] = start_ch - region_start
for j in range(reg_length // 8):
region_sel[region_start // 8 + j] = region_idx
region_idx += 1
region_start += reg_length
if region_start > 384:
raise PluginError(f"BeamfFD: Invalid number of channels in {self._device}")
self.region_sel = region_sel
self.region_off = region_off
self.beam_index = beam_index
# Patch: changing nof_chans on the run desyncronizes the two FPGAs
# Hardware nof.chans MUST stay constant
self.nof_chans = region_start
self.board[ self._device+'.beamf_fd.region_sel'] = self.region_sel
self.board[ self._device+'.beamf_fd.region_off'] = self.region_off
self.board[ self._device+'.beamf_fd.beam_index'] = self.beam_index
# Set delay for a beam
[docs]
def set_delay(self, delay_array, beam_index):
"""
The method specifies the delay in seconds and the delay rate in seconds/seconds.
The delay_array specifies the delay and delay rate for each antenna.
beam_index specifies which beam is described (range 0:7).
Delay is updated inside the delay engine at the time specified
by method load_delay
:param delay_array: bidimensional array [8,2]. Each row contains 2 elements,
as delay (in s) and delay rate (in s/s).
One delay is specified for both polarization in each antenna
:param beam_index: hardware station beam to program. Range 0:8 (0:48 in future firmware)
"""
delay_x_y = [0] * 8
if beam_index >= 48 or beam_index < 0:
raise PluginError(f"BeamfFD: Invalid Beam Index in {self._device}")
elif beam_index >= 8:
raise PluginError("BeamfFD: only 8 beams supported by this firmware")
for j in range(8):
delay_hw = int(round(delay_array[j][0] * (2 ** 23 / 1280e-9)))
rate_hw = int(round(delay_array[j][1] * ((1024. * 1080e-9 * 2 ** 37) / 1280e-9)))
if fabs(delay_hw) > (2 ** 19) or fabs(rate_hw) > (2 ** 11):
raise PluginError(f"BeamfFD: Invalid Delay - Delay rate in {self._device}")
delay_x_y[j] = ((delay_hw & 0xFFFFF)<<12) | (rate_hw & 0xFFF)
self.board['%s.beamf_fd.delay_%d_0' % (self._device, beam_index)] = delay_x_y
# loads the delay
[docs]
def load_delay(self, load_time=0, beam_mask=0xffffffffffff):
"""
Transfers the delay to the delay computing hardware at the prescribed
frame number (load_time)
If load_time = 0 transfers the delay immediately.
:param load_time: Prescribed load time (frame number). 0 = immediate
:param beam_mask: beams to be activated. Default: all. unsupported in this FW
"""
if load_time == 0:
self.board[self._device+'.beamf_fd.control.load_delay_immediate'] = 1
self.board[self._device+'.beamf_fd.control.load_delay_immediate'] = 0
else:
self.board[self._device+'.beamf_fd.load_time'] = load_time
self.board[self._device+'.beamf_fd.control.load_delay'] = 1
self.board[self._device+'.beamf_fd.control.load_delay'] = 0
# Calibration
[docs]
def load_calibration(self, antenna, calibration_coefs):
"""
Loads calibration coefficients.
calibration_coefs is a bidimensional complex array of the form
calibration_coefs[channel, polarization], with each element representing
a normalized coefficient, with (1.0, 0.0) the normal, expected response for
an ideal antenna.
Channel is the index specifying the channels at the beamformer output,
i.e. considering only those channels actually processed and beam assignments.
The polarization index ranges from 0 to 3.
0: X polarization direct element
1: X->Y polarization cross element
2: Y->X polarization cross element
3: Y polarization direct element
The calibration coefficients may include any rotation matrix (e.g.
the parallactic angle), but do not include the geometric delay.
:param antenna: Antenna number. Integer in range 0:8
:param calibration_coefs: Calibration coefficients. array [384, 4]
"""
chans = len(calibration_coefs)
cal_data = [0]*chans
for j in range(4):
for i in range(chans):
cal_data[i] = ((int(round(calibration_coefs[i][j].imag * self.scale)) & 0xFFFF) << 16) | \
(int(round(calibration_coefs[i][j].real * self.scale)) & 0xFFFF)
self.board['%s.beamf_cal.block_sel' % self._device] = antenna * 4 + j
self.board['%s.beamf_cal.cal_data' % self._device] = cal_data
[docs]
def switch_calibration_bank(self, time=0, force=False):
"""
Switch the calibration bank. To be performed after all
calibration qantities have been updated on both FPGAS
:param time: Time to perform bank switch (frame number).
Default 0 = current time + 64 frames
:param force: Force immediate switch
:return: bank used after bank switch (0 or 1)
"""
b = self.board['%s.beamf_cal.control.cal_table_bank' % self._device]
self.board['%s.beamf_cal.control.cal_table_bank' % self._device] = 1 - b
if force:
self.board['%s.beamf_cal.control.sw_table_bank' % self._device] = 1
self.board['%s.beamf_cal.control.sw_table_bank' % self._device] = 0
else:
if time == 0:
time = self.board['%s.beamf_cal.current_time' % self._device] + 64
self.board['%s.beamf_cal.load_time' % self._device] = time
return 1 - b
[docs]
def load_cal_curve(self, antenna, beam, cal_coefficients):
"""
Calibration curve is specified for 512 frequency channels over the
whole (0-400) MHz range.
Calibration for unused frequency regions may assume any value (e.g. 0).
Default (at initialization) is 1.0 for diagonal terms and 0.0 for cross
diagonal terms.
Obsolete, does not support subarrays, kept for backward compatibility
"""
self.cal_curve[beam][antenna] = cal_coefficients
[docs]
def load_antenna_tapering(self, tapering_coefs):
"""
Tapering_coefs is a vector of 8 values, one per antenna.
Default (at initialization) is 1.0
Obsolete, does not support subarrays, kept for backward compatibility
"""
self.antenna_tapering = tapering_coefs
[docs]
def load_beam_angle(self, angle_coefs):
"""
Angle_coefs is an array of one element per beam, specifying a rotation
angle, in radians, for the specified beam. The rotation is the same
for all antennas. Default is 0 (no rotation).
A positive pi/4 value transfers the X polarization to the Y polarization.
The rotation is applied after regular calibration.
Rot_matrix[beam] then contains the rotation matrix of each beam
Obsolete, does not support subarrays, kept for backward compatibility
"""
self.beam_angle = angle_coefs
for beam in range(8):
cosine = cos(self.beam_angle[beam])
sine = sin(self.beam_angle[beam])
self.rot_matrix[beam] = [cosine, -sine, sine, cosine]
[docs]
def compute_calibration_coefs(self):
"""
Compute the calibration coefficients and load them in the hardware.
To be used after load_cal_curve(), load_antenna_tapering() and
load_beam_angle
Obsolete, does not support subarrays, kept for backward compatibility
"""
for ant in range(8):
cal_coefficients = [[1.0, 0.0, 0.0, 1.0] for ch in range(self.nof_chans)]
for ch in range(self.nof_chans):
region = self.region_sel[int(ch // 8)]
beam = self.beam_index[region]
freq = ch + self.region_off[region]
coef_curve = self.cal_curve[beam][ant][freq]
rm_0 = self.rot_matrix[beam][0]
rm_1 = self.rot_matrix[beam][1]
rm_2 = self.rot_matrix[beam][2]
rm_3 = self.rot_matrix[beam][3]
cc_0 = coef_curve[0] * self.antenna_tapering[ant]
cc_1 = coef_curve[1] * self.antenna_tapering[ant]
cc_2 = coef_curve[2] * self.antenna_tapering[ant]
cc_3 = coef_curve[3] * self.antenna_tapering[ant]
rc_0 = rm_0 * cc_0 + rm_2 * cc_1
rc_1 = rm_1 * cc_0 + rm_3 * cc_1
rc_2 = rm_0 * cc_2 + rm_2 * cc_3
rc_3 = rm_1 * cc_2 + rm_3 * cc_3
cal_coefficients[ch] = [rc_0, rc_1, rc_2, rc_3]
self.load_calibration(ant, cal_coefficients)
[docs]
def check_errors(self):
tlast_error = self.board[f'{self._device}.beamf_fd.errors.tlast_not_aligned']
fifo_write_error = self.board[f'{self._device}.beamf_fd.errors.fifo_write']
fifo_read_error = self.board[f'{self._device}.beamf_fd.errors.fifo_read']
return not any([tlast_error, fifo_write_error, fifo_read_error])
[docs]
def clear_errors(self):
self.board[f'{self._device}.beamf_fd.errors.errors_rst'] = 1
return