Source code for pyfabil.plugins.tpm.station_beamf

from __future__ import division
from builtins import range
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *

import logging
import time

__author__ = 'gcomoretto'


[docs] class StationBeamformer(FirmwareBlock): """ Ring (station) beamformer """ @compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board) @friendlyname('station_beamf') @maxinstances(2) def __init__(self, board, **kwargs): """ StationBeamformer initialiser :param board: Pointer to board instance """ super(StationBeamformer, self).__init__(board) if 'device' not in list(kwargs.keys()): raise PluginError("StationBeamformer: Require a node instance") self._device = kwargs['device'] if self._device == Device.FPGA_1: self._device = 'fpga1' elif self._device == Device.FPGA_2: self._device = 'fpga2' else: raise PluginError(f"StationBeamformer: Invalid device {self._device}") # Number of channels, beams, etc self.max_nof_chans = 384 self.nof_chans = 256 # 128 channels/FPGA for now (256 total) self.beam_table = 48 * [0] self.freq_table = list(range(64, 512 - 64, 8)) self.subarray_table = 64 * [1] self.subarray_channel_table = list(range(0, 512, 8)) self.subarray_beam_table = 64 * [0] self.substation_table = 64 * [0] self.aperture_table = 64 * [0] self.scan_id_table = 64 * [0] self.station_id = 1 # Lots of magic numbers. To be tuned for best performance # Timing # TPM to TPM frame time is enough to send all packets, for # the maximum number of channels, in 90% of the time required # to receive them # CSP frame time is a bit less than twice, as sample size is halved self.clock_frequency = 237.037e6 self.tpm_frame_time = 1080e-9 * 2048 / self.max_nof_chans * 0.9 self.csp_frame_time = 1080e-9 * 2048 / self.max_nof_chans * 0.9 * 1.85 # Corner turner block length in CSP blocks # 1 means that CSP blocks cycle through all channels # without further corner turning self.int_block_length = 1 self.int_block_ovl = 0 self.ddr_timeout = 1500 # DDR timeout in clock cycles self.csp_scaling = 4 # CSP scaling in bits if self.board[f'{self._device}.beamf_ring.control.first_tile'] == 1: self.first_tile = True else: self.first_tile = False if self.board[f'{self._device}.beamf_ring.control.last_tile'] == 1: self.last_tile = True else: self.last_tile = False ############################################################################ # Defines if a tile is first, last, both or intermediate. ############################################################################
[docs] def set_first_last_tile(self, is_first, is_last): """ Defines if a tile is first, last, both or intermediate One, and only one tile must be first, and last, in a chain A tile can be both (one tile chain), or none :param isFirst: Tile is first in the beamformer chain :param isLast: Tile is first in the beamformer chain """ if self.is_running(): return False self.board[self._device + '.beamf_ring.control.reset'] = 1 if is_first: self.board[self._device+'.beamf_ring.control.first_tile'] = 1 self.first_tile = True else: self.board[self._device+'.beamf_ring.control.first_tile'] = 0 self.first_tile = False if is_last: self.board[self._device+'.beamf_ring.control.last_tile'] = 1 self.last_tile = True else: self.board[self._device+'.beamf_ring.control.last_tile'] = 0 self.last_tile = False return True
############################################################################ # Initialize # Resets hardware. ############################################################################
[docs] def initialise_beamf(self): """ Initialise Station Beamformer """ if self.is_running(): return False # Reset beamformer self.board[self._device+'.beamf_ring.control.reset'] = 1 #self.board[self._device+'.beamf_ring.control.reset'] = 0 # set start and stop time in the past, to stop the beamforming self.board[self._device+'.beamf_ring.start_frame'] = 2 self.board[self._device+'.beamf_ring.last_frame'] = 1 self._program_timing() self.board[self._device+'.beamf_ring.ch_n'] = self.nof_chans self.board[self._device+'.beamf_ring.csp_scaling'] = self.csp_scaling # reset errors self.board[self._device+'.beamf_ring.control.error_rst'] = 1 self.board[self._device+'.beamf_ring.control.error_rst'] = 0 logging.info("StationBeamformer has been initialised") return True
############################################################################ # Private method to program the channel table # from the values stored in the object # Valid only for the last tile ############################################################################ def _program_channels(self): """ Private method to program the channel table from the values stored in the object Valid only for the last tile """ if self.is_running(): return False self.board[self._device+'.beamf_ring.ch_n'] = self.nof_chans if not self.last_tile: return False freq_beam_table = [0] * (self.nof_chans // 8) subarray_table = [0] * (self.nof_chans // 8) for i in range(self.nof_chans // 8): freq_beam_table[i] = ( (self.subarray_channel_table[i] << (9+6)) + (self.subarray_beam_table[i] << 9) + self.freq_table[i] ) subarray_table[i] = ( (self.substation_table[i] << 6) + self.subarray_table[i] ) self.board[self._device+'.beamf_ring.ch_n'] = self.nof_chans self.board[self._device+'.beamf_ring.freq_beam_tab'] = freq_beam_table if len(self.board.find_register(self._device+'.beamf_ring.subarray_tab')) > 0: self.board[self._device+'.beamf_ring.subarray_tab'] = subarray_table self.board[self._device+'.beamf_ring.scan_id_tab'] = self.scan_id_table[0:self.nof_chans] self.board[self._device+'.beamf_ring.frame_id.antenna_index'] = self.aperture_table[0] return True ############################################################################ # Define the channel table, for last tile # Obsolete, legacy code, should be deleted. # Replaced by define_channel_table() ############################################################################
[docs] def defineChannelTable(self, region_array): """ Set frequency regions (legacy version, note the CamelCase name). Regions are defined in a 2-d array, for a maximum of 16 regions. Each element in the array defines a region, with the form [start_ch, nof_ch, beam_index] - start_ch: region starting channel (currently must be a multiple of 2, LS bit discarded) - nof_ch: size of the region: must be multiple of 8 chans - beam_index: beam used for this region, range [0:8) Total number of channels must be <= 384 The routine computes the arrays beam_index, region_off, region_sel, and the total number of channels nof_chans, and programs it in the HW """ region_idx = 0 self.beam_table = 64 * [0] self.freq_table = 64 * [0] for region in region_array: start_ch = region[0] reg_length = region[1] & 0X1F8 if reg_length > 384: raise PluginError( f"StationBeamformer: Invalid region length in {self._device}") end_ch = start_ch + reg_length if start_ch < 0 or end_ch > 512: raise PluginError( f"StationBeamformer: Invalid region position in {self._device}") reg_length = reg_length // 8 if region_idx + reg_length > 64: raise PluginError( f"StationBeamformer: too many channels specified in {self._device}") self.beam_table[region_idx:(region_idx + reg_length)] = \ [region[2]] * reg_length self.freq_table[region_idx:(region_idx + reg_length)] = \ list(range(start_ch, end_ch, 8)) region_idx = region_idx + reg_length self.nof_chans = region_idx * 8 return self._program_channels()
############################################################################ # Define the channel table, for last tile. New version, old kept for legacy ############################################################################
[docs] def define_channel_table(self, region_array): """ Set frequency regions. Regions are defined in a 2-d array, for a maximum of 16 regions. Each element in the array defines a region, with the form: [start_ch, nof_ch, beam_index, <optional> subarray_id, subarray_logical_ch, aperture_id, substation_id] 0: start_ch: region starting channel (currently must be a multiple of 2, LS bit discarded) 1: nof_ch: size of the region: must be multiple of 8 chans 2: beam_index: subarray beam used for this region, range [0:48) 3: subarray_id: ID of the subarray [1:48] 4: subarray_logical_channel: Logical channel in the subarray it is the same for all (sub)stations in the subarray Defaults to station logical channel 5: subarray_beam_id: ID of the subarray beam Defaults to beam index 6: substation_ID: ID of the substation Defaults to 0 (no substation) 7: aperture_id: ID of the aperture (station*100+substation?) Defaults to Total number of channels must be <= 384 The routine computes the arrays beam_index, region_off, region_sel, and the total number of channels nof_chans, and programs it in the hardware. Optional parameters are placeholders for firmware supporting more than 1 subarray. Current firmware supports only one subarray and substation, so corresponding IDs must be the same in each row :param region_array: bidimensional array, one row for each spectral region, 3 or 8 items long :return: True if OK :raises PluginError: if parameters are illegal """ region_idx = 0 self.beam_table = 64 * [0] self.freq_table = 64 * [0] self.subarray_table = 64 * [0] self.subarray_channel_table = list(range(0,512,8)) self.subarray_beam_table = 64 * [0] self.substation_table = 64 * [1] self.aperture_table = 64 * [0] if len(region_array[0]) == 8: # full table full_table = True # Assume all regions have the same subarray_id = region_array[0][3] substation_id = region_array[0][6] aperture_id = region_array[0][7] else: # Partial table. Use default IDs full_table = False subarray_id = 1 # only subarray 1 is assumed substation_id = 0 aperture_id = self.station_id*100 self.beam_table = 64 * [0] self.freq_table = 64 * [0] for region in region_array: start_ch = region[0] & 0x1fe nof_chans = region[1] & 0X1F8 if nof_chans > 384: raise PluginError( f"StationBeamformer: Invalid region length in {self._device}") end_ch = start_ch + nof_chans if start_ch < 0 or end_ch > 512: raise PluginError( f"StationBeamformer: Invalid region position in {self._device}") reg_length = nof_chans // 8 if region_idx + reg_length > 64: raise PluginError( f"StationBeamformer: too many channels specified in {self._device}") self.beam_table[region_idx:(region_idx + reg_length)] = \ [region[2]] * reg_length self.freq_table[region_idx:(region_idx + reg_length)] = \ list(range(start_ch, end_ch, 8)) if full_table: subarray_id = region[3] & 0x3f # limited to 6 bit start_subarray_ch = region[4] & 0x1f8 subarray_beam_id = region[5] & 0xff substation_id = region[6] & 0x3f # limited to 6 bit aperture_id = region[7] & 0xffff else: subarray_beam_id = region[2] # equal to the station beam start_subarray_ch = region_idx # equal to the logical channel self.subarray_table[region_idx:(region_idx + reg_length)] = \ [subarray_id] * reg_length self.subarray_channel_table[region_idx:(region_idx + reg_length)] =\ list(range( start_subarray_ch, start_subarray_ch + nof_chans, 8)) self.subarray_beam_table[region_idx:(region_idx + reg_length)] = \ [subarray_beam_id] * reg_length self.substation_table[region_idx:(region_idx + reg_length)] = \ [substation_id] * reg_length self.aperture_table[region_idx:(region_idx + reg_length)] = \ [aperture_id] * reg_length region_idx = region_idx + reg_length self.nof_chans = region_idx * 8 return self._program_channels()
############################################################################ def _program_timing(self): """ Private method to set the timing registers from constants set during initialization (or modified afterwards) :return: False if the beamformer is running, True if OK """ if self.is_running(): return False self.board[self._device+'.beamf_ring.frame_rate.first_tile'] = \ int(round(self.tpm_frame_time * self.clock_frequency)) self.board[self._device+'.beamf_ring.frame_rate.last_tile'] = \ int(round(self.csp_frame_time * self.clock_frequency)) self.board[self._device+'.beamf_ring.timeout'] = self.ddr_timeout self.board[self._device+'.beamf_ring.frame_timing.int_block_len'] = \ self.int_block_length self.board[self._device+'.beamf_ring.frame_timing.int_block_ovl'] = \ self.int_block_ovl return True ############################################################################ # Define the SPEAD header, for last tile # With subarray enabled firmware, most values are specified in set_regions # Only stationId and refEpoch are specified here. ############################################################################
[docs] def define_spead_header(self, stationId, subarrayId=0, apertureId=0, refEpoch=-1, startTime=0): """ Define_spead_header() used to define SPEAD header for last tile requires stationId, subarrayId and apertureId from LMC Only stationId is require :param stationId: ID of the station: 1-512 :param subarrayId: ID of the subarray. can be overrided by defineChannelTable :param apertureId: ID of the aperture. :param refEpoch: Reference peoch. -1 (default) uses value already i defined in set_epoch() :param startTime: (in seconds): offset from frame time, default 0 :return: True if OK """ if self.is_running(): return False self.board[self._device+'.beamf_ring.frame_id.station_id'] = stationId self.station_id = stationId if apertureId == 0: apertureId = self.aperture_table[0] else: self.aperture_table = [apertureId]*64 self.board[self._device+'.beamf_ring.frame_id.antenna_index'] = apertureId if subarrayId == 0: subarrayId = self.subarray_table[0] else: self.subarray_table = [subarrayId]*64 #self.board[self._device+'.beamf_ring.frame_id.sub_array_id'] = subarrayId if refEpoch != -1: self.set_epoch(refEpoch) if self.board.memory_map.has_register(self._device + '.beamf_ring.start_time'): self.board[self._device + '.beamf_ring.start_time'] = \ int(startTime * 1e9) & 0xffffffff return True
############################################################################ # Set the Unix epoch in seconds since Unix reference time ############################################################################
[docs] def set_epoch(self, epoch): """ Set the Unix epoch in seconds since Unix reference time :param epoch: Unix time for reference time (TPM synch time) 48 bit int :return: True if OK """ epoch = epoch & 0xffffffffff self.board[self._device+'.beamf_ring.ref_epoch_lo'] = epoch & 0xffffffff self.board[self._device+'.beamf_ring.ref_epoch_hi'] = (epoch >> 32) & 0xff return True
############################################################################ # Get the channel table ############################################################################
[docs] def get_channel_table(self): """ Returns a table with the following entries for each 8-channel block: 0: start physical channel (64-440) 1: beam_index: subarray beam used for this region, range [0:48) 2: subarray_id: ID of the subarray [1:48] Here is the same for all channels 3: subarray_logical_channel: Logical channel in the subarray Here equal to the station logical channel 4: subarray_beam_id: ID of the subarray beam 5: substation_id: ID of the substation 6: aperture_id: ID of the aperture (station*100+substation?) :return: Nx7 table with one row every 8 channels """ nof_blocks = self.nof_chans//8 table = [] for block in range(nof_blocks): table.append([ self.freq_table[block], self.beam_table[block], self.subarray_table[block], self.subarray_channel_table[block], self.subarray_beam_table[block], self.substation_table[block], self.aperture_table[block]]) return table
############################################################################ # Set output rounding for CSP ############################################################################
[docs] def set_csp_rounding(self, rounding): """ Sets the number of bits rounded off before sending the result to the CSP. For white noise it should be log2(sqrt(nof_antennas)), i.e. 4 for 256 antennas :param rounding: either scalar or list, of number of bits rounded off before sending the result to the CSP. If list, only 1st element is used. In future firmware, one value per channel. :return: True if OK """ if type(rounding) == list: round_int = rounding[0] else: round_int = rounding if round_int < 0: round_int = 0 if round_int > 7: round_int = 7 self.csp_rounding = round_int self.board[self._device+'.beamf_ring.csp_scaling'] = round_int return True
############################################################################ # Return current frame ############################################################################
[docs] def current_frame(self): """ Current frame as seen by the station beamformer :return: current frame, in units of 256 ADC frames (276,48 us) """ return self.board[self._device+'.beamf_ring.current_frame']
############################################################################ # Start the beamformer ############################################################################
[docs] def start(self, time=0, duration=-1, scan_id=0, mask=0xffffffffffff): """ starts an integration The integration is specified in units of 256 ADC frames from start_frame (included) to stop_frame (excluded). Default for stop_frame is -1 = "forever" :param time: first frame (as seen by current_frame) in integration :param duration: Integration duration, in frames. If -1, forever :param mask: chanell groups to start, default all (unsupported by FW) :return: True if OK, False if not possible (integration already active) """ if self.is_running(): return False # Reset beamformer to be sure everythig is OK #self.board[self._device+'.beamf_ring.control.reset'] = 1 #self.board[self._device+'.beamf_ring.control.reset'] = 0 # Program scan_id bit_mask = 1 for i in range(64): if bit_mask & mask != 0: self.scan_id_table[i] = scan_id bit_mask = bit_mask << 1 if len(self.board.find_register(self._device+'.beamf_ring.scan_id_tab')) > 0: self.board[self._device+'.beamf_ring.scan_id_tab'] = self.scan_id_table[0:self.nof_chans] # Program start and stop times if time == 0: time = self.current_frame() + 48 time &= 0xfffffff8 if duration == -1: end_time = 0xffffffff else: duration &= 0xfffffff8 end_time = time + duration # +1 self.board[self._device+'.beamf_ring.start_frame'] = time self.board[self._device+'.beamf_ring.last_frame'] = end_time self.board[self._device + '.beamf_ring.control.reset'] = 0 return True
############################################################################
[docs] def is_running(self): """ Check if the beamformer is still running. Compares current frame to programmed start and last frame :return: True if beamformer is running """ start_frame = self.board[self._device+'.beamf_ring.start_frame'] last_frame = self.board[self._device+'.beamf_ring.last_frame'] if start_frame < last_frame and last_frame > self.current_frame(): return True else: return False
############################################################################
[docs] def abort(self): """ Stop the beamformer :return: True if OK """ self.board[self._device+'.beamf_ring.last_frame'] = 1 self.board[self._device+'.beamf_ring.start_frame'] = 2 return True
############################################################################ # Report errors ############################################################################
[docs] def report_errors(self): """ :return: error flags """ # Error Flags self.errors = (self.board[f'{self._device}.beamf_ring.errors']) & 0x3FF return self.errors
[docs] def clear_errors(self): """ Clear frame errors and general error flag """ self.board[f'{self._device}.beamf_ring.control.error_rst'] = 1 self.board[f'{self._device}.beamf_ring.control.error_rst'] = 0 return
[docs] def check_ddr_parity_error_counter(self): return self.board[f'{self._device}.beamf_ring.errors.ddr_parity_error_cnt']
############################################################################ # Some default methods ############################################################################
[docs] def status_check(self): """ Perform status check Checks if framing errors are present :return: Status """ logging.info("StationBeamformer: Checking status") self.frame_errors = 0 # Removed from firmware self.errors = (self.board[self._device+'.beamf_ring.error']) & 0x3FF if self.frame_errors == 0: return Status.OK logging.info(f"StationBeamformer: Frame errors {self.frame_errors}") return Status.Error
[docs] def initialise(self, **kwargs): pass
[docs] def clean_up(self): """ Perform cleanup :return: Success """ logging.info("StationBeamformer : Cleaning up") return True