Source code for pyfabil.plugins.tpm.beamf_fd

from __future__ import division
from builtins import range
from math import fabs, cos, sin
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
import logging

__author__ = 'cbelli'

MAX_BEAMS = 48  # Maximum number of beams

[docs] class BeamfFD(FirmwareBlock): """ Frequency Domain Beamformer plugin """ @compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board) @friendlyname('beamf_fd') @maxinstances(2) def __init__(self, board, **kwargs): """ BeamformerFD initialiser :param board: Pointer to board instance """ super(BeamfFD, self).__init__(board) if 'device' not in list(kwargs.keys()): raise PluginError("BeamfFD: Require a node instance") self._device = kwargs['device'] if self._device == Device.FPGA_1: self._device = 'fpga1' elif self._device == Device.FPGA_2: self._device = 'fpga2' else: raise PluginError("BeamfFD: Invalid device %s" % self._device) self.beam_index = [0] * 64 self.region_off = [0] * 64 self.region_sel = [0] * 64 self.nof_chans = 384 # True number of channels self.nof_hw_chans = 384 # FIXED number of chans in output packet. # number of independent channel blocks try: self.nof_regions = self.board.find_register("fpga1.beamf_fd.region_off")[0].size except: self.nof_regions = 16 # Calibration quantities self.cal_curve = [[[[1 + 0j, 0 + 0j, 0 + 0j, 1 + 0j]] * 512] * 8] * 8 self.antenna_tapering = [1.0] * 8 self.beam_angle = [0.0] * 8 # Rotation matrix self.rot_matrix = [[1.0, 0.0, 0.0, 1.0]] * 8 try: self.board['fpga1.dsp_regfile.feature.tile_beamformer_implemented'] self.scale = 2048 except: self.scale = 128 ############################################################################
[docs] def initialise_beamf(self): """ Initialise Frequency Domain Beamformer """ # Reset beamformer self.board['%s.beamf_fd.control.reset' % self._device] = 1 self.board['%s.beamf_fd.control.reset' % self._device] = 0 # Load Delay self.board['%s.beamf_fd.control.load_delay' % self._device] = 0 # Load Immediate Delay self.board['%s.beamf_fd.control.load_delay_immediate' % self._device] = 0 # Number of processed channels: 384/8 self.nof_chans = 384 # 384 self.nof_hw_chans = 384 # 384 self.board['%s.beamf_fd.nof_chans' % self._device] = self.nof_hw_chans // 8 # 384 # Test point select self.board['%s.beamf_fd.tp_sel' % self._device] = 0 # Load time as frame number self.board['%s.beamf_fd.load_time' % self._device] = 0 # Region select table region_sel = [0] * 64 self.board['%s.beamf_fd.region_sel' % self._device] = region_sel # Region offset table region_offset = [0] * self.nof_regions self.board['%s.beamf_fd.region_off' % self._device] = region_offset # Beam index table beam_index = [0] * self.nof_regions self.board['%s.beamf_fd.beam_index' % self._device] = beam_index # Antenna delay and delay rate from beam X antenna Y del_x_y_init = [0] * 8 self.board['%s.beamf_fd.delay_0_0' % self._device] = del_x_y_init self.cal_curve = [[[[complex(1.0), complex(0.0), complex(0.0), complex(1.0)]] * 512] * 8] * 8 self.antenna_tapering = [1.0] * 8 self.beam_angle = [0.0] * 8 # Rotation matrix self.rot_matrix = [[1.0, 0.0, 0.0, 1.0]] * 8 self.compute_calibration_coefs() self.switch_calibration_bank(force=True) logging.info("BeamfFD has been initialised")
[docs] def current_frame(self): """ Return current frame processed by the tile beamformer :returns: current frame number, in units of 256 ADC frames (276,48 us) """ return self.board[self._device+'.beamf_cal.current_time']
# Set regions. The method computes the total number of channels and the arrays and writes in the registers
[docs] def set_regions(self, region_array): """ Set frequency regions. Regions are defined in a 2-d array, for a maximum of 16 (48) regions. Actual limit defined in hardware by the size of the region_off table. Each element in the array defines a region, with the form [start_ch, nof_ch, beam_index] - start_ch: region starting channel (currently must be a multiple of 2, LS bit discarded) - nof_ch: size of the region: must be multiple of 8 chans - beam_index: beam used for this region, range [0:8) Total number of channels must be <= 384 The routine computes the arrays beam_index, region_off, region_sel, and the total number of channels nof_chans, and programs it in the HW :param region_array: Bidimensional array of regions """ region_start = 0 region_idx = 0 beam_index = [0] * self.nof_regions region_off = [0] * self.nof_regions region_sel = [0] * 64 for region in region_array: start_ch = region[0] reg_length = region[1] & 0X1F8 if reg_length > 384: raise PluginError(f"BeamfFD: Invalid region length {reg_length} in {self._device}") if start_ch < 0 or start_ch + reg_length > 512: raise PluginError(f"BeamfFD: Invalid region position in {self._device}") beam_index[region_idx] = region[2] if beam_index[region_idx] >= MAX_BEAMS or beam_index[region_idx] < 0: raise PluginError(f"BeamfFD: Invalid beam index in {self._device}") region_off[region_idx] = start_ch - region_start for j in range(reg_length // 8): region_sel[region_start // 8 + j] = region_idx region_idx += 1 region_start += reg_length if region_start > 384: raise PluginError(f"BeamfFD: Invalid number of channels in {self._device}") self.region_sel = region_sel self.region_off = region_off self.beam_index = beam_index # Patch: changing nof_chans on the run desyncronizes the two FPGAs # Hardware nof.chans MUST stay constant self.nof_chans = region_start self.board[ self._device+'.beamf_fd.region_sel'] = self.region_sel self.board[ self._device+'.beamf_fd.region_off'] = self.region_off self.board[ self._device+'.beamf_fd.beam_index'] = self.beam_index
# Set delay for a beam
[docs] def set_delay(self, delay_array, beam_index): """ The method specifies the delay in seconds and the delay rate in seconds/seconds. The delay_array specifies the delay and delay rate for each antenna. beam_index specifies which beam is described (range 0:7). Delay is updated inside the delay engine at the time specified by method load_delay :param delay_array: bidimensional array [8,2]. Each row contains 2 elements, as delay (in s) and delay rate (in s/s). One delay is specified for both polarization in each antenna :param beam_index: hardware station beam to program. Range 0:8 (0:48 in future firmware) """ delay_x_y = [0] * 8 if beam_index >= 48 or beam_index < 0: raise PluginError(f"BeamfFD: Invalid Beam Index in {self._device}") elif beam_index >= 8: raise PluginError("BeamfFD: only 8 beams supported by this firmware") for j in range(8): delay_hw = int(round(delay_array[j][0] * (2 ** 23 / 1280e-9))) rate_hw = int(round(delay_array[j][1] * ((1024. * 1080e-9 * 2 ** 37) / 1280e-9))) if fabs(delay_hw) > (2 ** 19) or fabs(rate_hw) > (2 ** 11): raise PluginError(f"BeamfFD: Invalid Delay - Delay rate in {self._device}") delay_x_y[j] = ((delay_hw & 0xFFFFF)<<12) | (rate_hw & 0xFFF) self.board['%s.beamf_fd.delay_%d_0' % (self._device, beam_index)] = delay_x_y
# loads the delay
[docs] def load_delay(self, load_time=0, beam_mask=0xffffffffffff): """ Transfers the delay to the delay computing hardware at the prescribed frame number (load_time) If load_time = 0 transfers the delay immediately. :param load_time: Prescribed load time (frame number). 0 = immediate :param beam_mask: beams to be activated. Default: all. unsupported in this FW """ if load_time == 0: self.board[self._device+'.beamf_fd.control.load_delay_immediate'] = 1 self.board[self._device+'.beamf_fd.control.load_delay_immediate'] = 0 else: self.board[self._device+'.beamf_fd.load_time'] = load_time self.board[self._device+'.beamf_fd.control.load_delay'] = 1 self.board[self._device+'.beamf_fd.control.load_delay'] = 0
# Calibration
[docs] def load_calibration(self, antenna, calibration_coefs): """ Loads calibration coefficients. calibration_coefs is a bidimensional complex array of the form calibration_coefs[channel, polarization], with each element representing a normalized coefficient, with (1.0, 0.0) the normal, expected response for an ideal antenna. Channel is the index specifying the channels at the beamformer output, i.e. considering only those channels actually processed and beam assignments. The polarization index ranges from 0 to 3. 0: X polarization direct element 1: X->Y polarization cross element 2: Y->X polarization cross element 3: Y polarization direct element The calibration coefficients may include any rotation matrix (e.g. the parallactic angle), but do not include the geometric delay. :param antenna: Antenna number. Integer in range 0:8 :param calibration_coefs: Calibration coefficients. array [384, 4] """ chans = len(calibration_coefs) cal_data = [0]*chans for j in range(4): for i in range(chans): cal_data[i] = ((int(round(calibration_coefs[i][j].imag * self.scale)) & 0xFFFF) << 16) | \ (int(round(calibration_coefs[i][j].real * self.scale)) & 0xFFFF) self.board['%s.beamf_cal.block_sel' % self._device] = antenna * 4 + j self.board['%s.beamf_cal.cal_data' % self._device] = cal_data
[docs] def switch_calibration_bank(self, time=0, force=False): """ Switch the calibration bank. To be performed after all calibration qantities have been updated on both FPGAS :param time: Time to perform bank switch (frame number). Default 0 = current time + 64 frames :param force: Force immediate switch :return: bank used after bank switch (0 or 1) """ b = self.board['%s.beamf_cal.control.cal_table_bank' % self._device] self.board['%s.beamf_cal.control.cal_table_bank' % self._device] = 1 - b if force: self.board['%s.beamf_cal.control.sw_table_bank' % self._device] = 1 self.board['%s.beamf_cal.control.sw_table_bank' % self._device] = 0 else: if time == 0: time = self.board['%s.beamf_cal.current_time' % self._device] + 64 self.board['%s.beamf_cal.load_time' % self._device] = time return 1 - b
[docs] def load_cal_curve(self, antenna, beam, cal_coefficients): """ Calibration curve is specified for 512 frequency channels over the whole (0-400) MHz range. Calibration for unused frequency regions may assume any value (e.g. 0). Default (at initialization) is 1.0 for diagonal terms and 0.0 for cross diagonal terms. Obsolete, does not support subarrays, kept for backward compatibility """ self.cal_curve[beam][antenna] = cal_coefficients
[docs] def load_antenna_tapering(self, tapering_coefs): """ Tapering_coefs is a vector of 8 values, one per antenna. Default (at initialization) is 1.0 Obsolete, does not support subarrays, kept for backward compatibility """ self.antenna_tapering = tapering_coefs
[docs] def load_beam_angle(self, angle_coefs): """ Angle_coefs is an array of one element per beam, specifying a rotation angle, in radians, for the specified beam. The rotation is the same for all antennas. Default is 0 (no rotation). A positive pi/4 value transfers the X polarization to the Y polarization. The rotation is applied after regular calibration. Rot_matrix[beam] then contains the rotation matrix of each beam Obsolete, does not support subarrays, kept for backward compatibility """ self.beam_angle = angle_coefs for beam in range(8): cosine = cos(self.beam_angle[beam]) sine = sin(self.beam_angle[beam]) self.rot_matrix[beam] = [cosine, -sine, sine, cosine]
[docs] def compute_calibration_coefs(self): """ Compute the calibration coefficients and load them in the hardware. To be used after load_cal_curve(), load_antenna_tapering() and load_beam_angle Obsolete, does not support subarrays, kept for backward compatibility """ for ant in range(8): cal_coefficients = [[1.0, 0.0, 0.0, 1.0] for ch in range(self.nof_chans)] for ch in range(self.nof_chans): region = self.region_sel[int(ch // 8)] beam = self.beam_index[region] freq = ch + self.region_off[region] coef_curve = self.cal_curve[beam][ant][freq] rm_0 = self.rot_matrix[beam][0] rm_1 = self.rot_matrix[beam][1] rm_2 = self.rot_matrix[beam][2] rm_3 = self.rot_matrix[beam][3] cc_0 = coef_curve[0] * self.antenna_tapering[ant] cc_1 = coef_curve[1] * self.antenna_tapering[ant] cc_2 = coef_curve[2] * self.antenna_tapering[ant] cc_3 = coef_curve[3] * self.antenna_tapering[ant] rc_0 = rm_0 * cc_0 + rm_2 * cc_1 rc_1 = rm_1 * cc_0 + rm_3 * cc_1 rc_2 = rm_0 * cc_2 + rm_2 * cc_3 rc_3 = rm_1 * cc_2 + rm_3 * cc_3 cal_coefficients[ch] = [rc_0, rc_1, rc_2, rc_3] self.load_calibration(ant, cal_coefficients)
[docs] def check_errors(self): tlast_error = self.board[f'{self._device}.beamf_fd.errors.tlast_not_aligned'] fifo_write_error = self.board[f'{self._device}.beamf_fd.errors.fifo_write'] fifo_read_error = self.board[f'{self._device}.beamf_fd.errors.fifo_read'] return not any([tlast_error, fifo_write_error, fifo_read_error])
[docs] def clear_errors(self): self.board[f'{self._device}.beamf_fd.errors.errors_rst'] = 1 return