Source code for pyfabil.plugins.tpm.station_beamf
from __future__ import division
from builtins import range
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
import logging
import time
__author__ = 'gcomoretto'
[docs]
class StationBeamformer(FirmwareBlock):
"""
Ring (station) beamformer
"""
@compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board)
@friendlyname('station_beamf')
@maxinstances(2)
def __init__(self, board, **kwargs):
""" StationBeamformer initialiser
:param board: Pointer to board instance
"""
super(StationBeamformer, self).__init__(board)
if 'device' not in list(kwargs.keys()):
raise PluginError("StationBeamformer: Require a node instance")
self._device = kwargs['device']
if self._device == Device.FPGA_1:
self._device = 'fpga1'
elif self._device == Device.FPGA_2:
self._device = 'fpga2'
else:
raise PluginError(f"StationBeamformer: Invalid device {self._device}")
# Number of channels, beams, etc
self.max_nof_chans = 384
self.nof_chans = 256 # 128 channels/FPGA for now (256 total)
self.beam_table = 48 * [0]
self.freq_table = list(range(64, 512 - 64, 8))
self.subarray_table = 64 * [1]
self.subarray_channel_table = list(range(0, 512, 8))
self.subarray_beam_table = 64 * [0]
self.substation_table = 64 * [0]
self.aperture_table = 64 * [0]
self.scan_id_table = 64 * [0]
self.station_id = 1
# Lots of magic numbers. To be tuned for best performance
# Timing
# TPM to TPM frame time is enough to send all packets, for
# the maximum number of channels, in 90% of the time required
# to receive them
# CSP frame time is a bit less than twice, as sample size is halved
self.clock_frequency = 237.037e6
self.tpm_frame_time = 1080e-9 * 2048 / self.max_nof_chans * 0.9
self.csp_frame_time = 1080e-9 * 2048 / self.max_nof_chans * 0.9 * 1.85
# Corner turner block length in CSP blocks
# 1 means that CSP blocks cycle through all channels
# without further corner turning
self.int_block_length = 1
self.int_block_ovl = 0
self.ddr_timeout = 1500 # DDR timeout in clock cycles
self.csp_scaling = 4 # CSP scaling in bits
if self.board[f'{self._device}.beamf_ring.control.first_tile'] == 1:
self.first_tile = True
else:
self.first_tile = False
if self.board[f'{self._device}.beamf_ring.control.last_tile'] == 1:
self.last_tile = True
else:
self.last_tile = False
############################################################################
# Defines if a tile is first, last, both or intermediate.
############################################################################
[docs]
def set_first_last_tile(self, is_first, is_last):
"""
Defines if a tile is first, last, both or intermediate
One, and only one tile must be first, and last, in a chain
A tile can be both (one tile chain), or none
:param isFirst: Tile is first in the beamformer chain
:param isLast: Tile is first in the beamformer chain
"""
if self.is_running():
return False
self.board[self._device + '.beamf_ring.control.reset'] = 1
if is_first:
self.board[self._device+'.beamf_ring.control.first_tile'] = 1
self.first_tile = True
else:
self.board[self._device+'.beamf_ring.control.first_tile'] = 0
self.first_tile = False
if is_last:
self.board[self._device+'.beamf_ring.control.last_tile'] = 1
self.last_tile = True
else:
self.board[self._device+'.beamf_ring.control.last_tile'] = 0
self.last_tile = False
return True
############################################################################
# Initialize
# Resets hardware.
############################################################################
[docs]
def initialise_beamf(self):
"""
Initialise Station Beamformer
"""
if self.is_running():
return False
# Reset beamformer
self.board[self._device+'.beamf_ring.control.reset'] = 1
#self.board[self._device+'.beamf_ring.control.reset'] = 0
# set start and stop time in the past, to stop the beamforming
self.board[self._device+'.beamf_ring.start_frame'] = 2
self.board[self._device+'.beamf_ring.last_frame'] = 1
self._program_timing()
self.board[self._device+'.beamf_ring.ch_n'] = self.nof_chans
self.board[self._device+'.beamf_ring.csp_scaling'] = self.csp_scaling
# reset errors
self.board[self._device+'.beamf_ring.control.error_rst'] = 1
self.board[self._device+'.beamf_ring.control.error_rst'] = 0
logging.info("StationBeamformer has been initialised")
return True
############################################################################
# Private method to program the channel table
# from the values stored in the object
# Valid only for the last tile
############################################################################
def _program_channels(self):
"""
Private method to program the channel table
from the values stored in the object
Valid only for the last tile
"""
if self.is_running():
return False
self.board[self._device+'.beamf_ring.ch_n'] = self.nof_chans
if not self.last_tile:
return False
freq_beam_table = [0] * (self.nof_chans // 8)
subarray_table = [0] * (self.nof_chans // 8)
for i in range(self.nof_chans // 8):
freq_beam_table[i] = (
(self.subarray_channel_table[i] << (9+6)) +
(self.subarray_beam_table[i] << 9) +
self.freq_table[i]
)
subarray_table[i] = (
(self.substation_table[i] << 6) +
self.subarray_table[i]
)
self.board[self._device+'.beamf_ring.ch_n'] = self.nof_chans
self.board[self._device+'.beamf_ring.freq_beam_tab'] = freq_beam_table
if len(self.board.find_register(self._device+'.beamf_ring.subarray_tab')) > 0:
self.board[self._device+'.beamf_ring.subarray_tab'] = subarray_table
self.board[self._device+'.beamf_ring.scan_id_tab'] = self.scan_id_table[0:self.nof_chans]
self.board[self._device+'.beamf_ring.frame_id.antenna_index'] = self.aperture_table[0]
return True
############################################################################
# Define the channel table, for last tile
# Obsolete, legacy code, should be deleted.
# Replaced by define_channel_table()
############################################################################
[docs]
def defineChannelTable(self, region_array):
"""
Set frequency regions
(legacy version, note the CamelCase name).
Regions are defined in a 2-d array, for a maximum of 16 regions.
Each element in the array defines a region, with
the form [start_ch, nof_ch, beam_index]
- start_ch: region starting channel (currently must be a
multiple of 2, LS bit discarded)
- nof_ch: size of the region: must be multiple of 8 chans
- beam_index: beam used for this region, range [0:8)
Total number of channels must be <= 384
The routine computes the arrays beam_index, region_off, region_sel,
and the total number of channels nof_chans, and programs it in the HW
"""
region_idx = 0
self.beam_table = 64 * [0]
self.freq_table = 64 * [0]
for region in region_array:
start_ch = region[0]
reg_length = region[1] & 0X1F8
if reg_length > 384:
raise PluginError(
f"StationBeamformer: Invalid region length in {self._device}")
end_ch = start_ch + reg_length
if start_ch < 0 or end_ch > 512:
raise PluginError(
f"StationBeamformer: Invalid region position in {self._device}")
reg_length = reg_length // 8
if region_idx + reg_length > 64:
raise PluginError(
f"StationBeamformer: too many channels specified in {self._device}")
self.beam_table[region_idx:(region_idx + reg_length)] = \
[region[2]] * reg_length
self.freq_table[region_idx:(region_idx + reg_length)] = \
list(range(start_ch, end_ch, 8))
region_idx = region_idx + reg_length
self.nof_chans = region_idx * 8
return self._program_channels()
############################################################################
# Define the channel table, for last tile. New version, old kept for legacy
############################################################################
[docs]
def define_channel_table(self, region_array):
"""
Set frequency regions.
Regions are defined in a 2-d array, for a maximum of 16 regions.
Each element in the array defines a region, with the form:
[start_ch, nof_ch, beam_index,
<optional>
subarray_id, subarray_logical_ch, aperture_id, substation_id]
0: start_ch: region starting channel (currently must be a
multiple of 2, LS bit discarded)
1: nof_ch: size of the region: must be multiple of 8 chans
2: beam_index: subarray beam used for this region, range [0:48)
3: subarray_id: ID of the subarray [1:48]
4: subarray_logical_channel: Logical channel in the subarray
it is the same for all (sub)stations in the subarray
Defaults to station logical channel
5: subarray_beam_id: ID of the subarray beam
Defaults to beam index
6: substation_ID: ID of the substation
Defaults to 0 (no substation)
7: aperture_id: ID of the aperture (station*100+substation?)
Defaults to
Total number of channels must be <= 384
The routine computes the arrays beam_index, region_off, region_sel,
and the total number of channels nof_chans,
and programs it in the hardware.
Optional parameters are placeholders for firmware supporting
more than 1 subarray. Current firmware supports only one subarray
and substation, so corresponding IDs must be the same in each row
:param region_array: bidimensional array, one row for each
spectral region, 3 or 8 items long
:return: True if OK
:raises PluginError: if parameters are illegal
"""
region_idx = 0
self.beam_table = 64 * [0]
self.freq_table = 64 * [0]
self.subarray_table = 64 * [0]
self.subarray_channel_table = list(range(0,512,8))
self.subarray_beam_table = 64 * [0]
self.substation_table = 64 * [1]
self.aperture_table = 64 * [0]
if len(region_array[0]) == 8: # full table
full_table = True # Assume all regions have the same
subarray_id = region_array[0][3]
substation_id = region_array[0][6]
aperture_id = region_array[0][7]
else: # Partial table. Use default IDs
full_table = False
subarray_id = 1 # only subarray 1 is assumed
substation_id = 0
aperture_id = self.station_id*100
self.beam_table = 64 * [0]
self.freq_table = 64 * [0]
for region in region_array:
start_ch = region[0] & 0x1fe
nof_chans = region[1] & 0X1F8
if nof_chans > 384:
raise PluginError(
f"StationBeamformer: Invalid region length in {self._device}")
end_ch = start_ch + nof_chans
if start_ch < 0 or end_ch > 512:
raise PluginError(
f"StationBeamformer: Invalid region position in {self._device}")
reg_length = nof_chans // 8
if region_idx + reg_length > 64:
raise PluginError(
f"StationBeamformer: too many channels specified in {self._device}")
self.beam_table[region_idx:(region_idx + reg_length)] = \
[region[2]] * reg_length
self.freq_table[region_idx:(region_idx + reg_length)] = \
list(range(start_ch, end_ch, 8))
if full_table:
subarray_id = region[3] & 0x3f # limited to 6 bit
start_subarray_ch = region[4] & 0x1f8
subarray_beam_id = region[5] & 0xff
substation_id = region[6] & 0x3f # limited to 6 bit
aperture_id = region[7] & 0xffff
else:
subarray_beam_id = region[2] # equal to the station beam
start_subarray_ch = region_idx # equal to the logical channel
self.subarray_table[region_idx:(region_idx + reg_length)] = \
[subarray_id] * reg_length
self.subarray_channel_table[region_idx:(region_idx + reg_length)] =\
list(range(
start_subarray_ch,
start_subarray_ch + nof_chans,
8))
self.subarray_beam_table[region_idx:(region_idx + reg_length)] = \
[subarray_beam_id] * reg_length
self.substation_table[region_idx:(region_idx + reg_length)] = \
[substation_id] * reg_length
self.aperture_table[region_idx:(region_idx + reg_length)] = \
[aperture_id] * reg_length
region_idx = region_idx + reg_length
self.nof_chans = region_idx * 8
return self._program_channels()
############################################################################
def _program_timing(self):
"""
Private method to set the timing registers
from constants set during initialization (or modified afterwards)
:return: False if the beamformer is running, True if OK
"""
if self.is_running():
return False
self.board[self._device+'.beamf_ring.frame_rate.first_tile'] = \
int(round(self.tpm_frame_time * self.clock_frequency))
self.board[self._device+'.beamf_ring.frame_rate.last_tile'] = \
int(round(self.csp_frame_time * self.clock_frequency))
self.board[self._device+'.beamf_ring.timeout'] = self.ddr_timeout
self.board[self._device+'.beamf_ring.frame_timing.int_block_len'] = \
self.int_block_length
self.board[self._device+'.beamf_ring.frame_timing.int_block_ovl'] = \
self.int_block_ovl
return True
############################################################################
# Define the SPEAD header, for last tile
# With subarray enabled firmware, most values are specified in set_regions
# Only stationId and refEpoch are specified here.
############################################################################
[docs]
def define_spead_header(self, stationId, subarrayId=0, apertureId=0, refEpoch=-1, startTime=0):
"""
Define_spead_header() used to define SPEAD header for last tile
requires stationId, subarrayId and apertureId from LMC
Only stationId is require
:param stationId: ID of the station: 1-512
:param subarrayId: ID of the subarray. can be overrided by defineChannelTable
:param apertureId: ID of the aperture.
:param refEpoch: Reference peoch. -1 (default) uses value already i
defined in set_epoch()
:param startTime: (in seconds): offset from frame time, default 0
:return: True if OK
"""
if self.is_running():
return False
self.board[self._device+'.beamf_ring.frame_id.station_id'] = stationId
self.station_id = stationId
if apertureId == 0:
apertureId = self.aperture_table[0]
else:
self.aperture_table = [apertureId]*64
self.board[self._device+'.beamf_ring.frame_id.antenna_index'] = apertureId
if subarrayId == 0:
subarrayId = self.subarray_table[0]
else:
self.subarray_table = [subarrayId]*64
#self.board[self._device+'.beamf_ring.frame_id.sub_array_id'] = subarrayId
if refEpoch != -1:
self.set_epoch(refEpoch)
if self.board.memory_map.has_register(self._device + '.beamf_ring.start_time'):
self.board[self._device + '.beamf_ring.start_time'] = \
int(startTime * 1e9) & 0xffffffff
return True
############################################################################
# Set the Unix epoch in seconds since Unix reference time
############################################################################
[docs]
def set_epoch(self, epoch):
"""
Set the Unix epoch in seconds since Unix reference time
:param epoch: Unix time for reference time (TPM synch time) 48 bit int
:return: True if OK
"""
epoch = epoch & 0xffffffffff
self.board[self._device+'.beamf_ring.ref_epoch_lo'] = epoch & 0xffffffff
self.board[self._device+'.beamf_ring.ref_epoch_hi'] = (epoch >> 32) & 0xff
return True
############################################################################
# Get the channel table
############################################################################
[docs]
def get_channel_table(self):
"""
Returns a table with the following entries for each 8-channel block:
0: start physical channel (64-440)
1: beam_index: subarray beam used for this region, range [0:48)
2: subarray_id: ID of the subarray [1:48]
Here is the same for all channels
3: subarray_logical_channel: Logical channel in the subarray
Here equal to the station logical channel
4: subarray_beam_id: ID of the subarray beam
5: substation_id: ID of the substation
6: aperture_id: ID of the aperture (station*100+substation?)
:return: Nx7 table with one row every 8 channels
"""
nof_blocks = self.nof_chans//8
table = []
for block in range(nof_blocks):
table.append([
self.freq_table[block],
self.beam_table[block],
self.subarray_table[block],
self.subarray_channel_table[block],
self.subarray_beam_table[block],
self.substation_table[block],
self.aperture_table[block]])
return table
############################################################################
# Set output rounding for CSP
############################################################################
[docs]
def set_csp_rounding(self, rounding):
"""
Sets the number of bits rounded off before sending the result to the CSP.
For white noise it should be log2(sqrt(nof_antennas)),
i.e. 4 for 256 antennas
:param rounding: either scalar or list, of number of bits rounded off
before sending the result to the CSP. If list, only 1st element
is used. In future firmware, one value per channel.
:return: True if OK
"""
if type(rounding) == list:
round_int = rounding[0]
else:
round_int = rounding
if round_int < 0:
round_int = 0
if round_int > 7:
round_int = 7
self.csp_rounding = round_int
self.board[self._device+'.beamf_ring.csp_scaling'] = round_int
return True
############################################################################
# Return current frame
############################################################################
[docs]
def current_frame(self):
"""
Current frame as seen by the station beamformer
:return: current frame, in units of 256 ADC frames (276,48 us)
"""
return self.board[self._device+'.beamf_ring.current_frame']
############################################################################
# Start the beamformer
############################################################################
[docs]
def start(self, time=0, duration=-1, scan_id=0, mask=0xffffffffffff):
"""
starts an integration
The integration is specified in units of 256 ADC frames
from start_frame (included) to stop_frame (excluded).
Default for stop_frame is -1 = "forever"
:param time: first frame (as seen by current_frame) in integration
:param duration: Integration duration, in frames. If -1, forever
:param mask: chanell groups to start, default all (unsupported by FW)
:return: True if OK, False if not possible (integration already active)
"""
if self.is_running():
return False
# Reset beamformer to be sure everythig is OK
#self.board[self._device+'.beamf_ring.control.reset'] = 1
#self.board[self._device+'.beamf_ring.control.reset'] = 0
# Program scan_id
bit_mask = 1
for i in range(64):
if bit_mask & mask != 0:
self.scan_id_table[i] = scan_id
bit_mask = bit_mask << 1
if len(self.board.find_register(self._device+'.beamf_ring.scan_id_tab')) > 0:
self.board[self._device+'.beamf_ring.scan_id_tab'] = self.scan_id_table[0:self.nof_chans]
# Program start and stop times
if time == 0:
time = self.current_frame() + 48
time &= 0xfffffff8
if duration == -1:
end_time = 0xffffffff
else:
duration &= 0xfffffff8
end_time = time + duration # +1
self.board[self._device+'.beamf_ring.start_frame'] = time
self.board[self._device+'.beamf_ring.last_frame'] = end_time
self.board[self._device + '.beamf_ring.control.reset'] = 0
return True
############################################################################
[docs]
def is_running(self):
"""
Check if the beamformer is still running.
Compares current frame to programmed start and last frame
:return: True if beamformer is running
"""
start_frame = self.board[self._device+'.beamf_ring.start_frame']
last_frame = self.board[self._device+'.beamf_ring.last_frame']
if start_frame < last_frame and last_frame > self.current_frame():
return True
else:
return False
############################################################################
[docs]
def abort(self):
"""
Stop the beamformer
:return: True if OK
"""
self.board[self._device+'.beamf_ring.last_frame'] = 1
self.board[self._device+'.beamf_ring.start_frame'] = 2
return True
############################################################################
# Report errors
############################################################################
[docs]
def report_errors(self):
"""
:return: error flags
"""
# Error Flags
self.errors = (self.board[f'{self._device}.beamf_ring.errors']) & 0x3FF
return self.errors
[docs]
def clear_errors(self):
"""
Clear frame errors and general error flag
"""
self.board[f'{self._device}.beamf_ring.control.error_rst'] = 1
self.board[f'{self._device}.beamf_ring.control.error_rst'] = 0
return
[docs]
def check_ddr_parity_error_counter(self):
return self.board[f'{self._device}.beamf_ring.errors.ddr_parity_error_cnt']
############################################################################
# Some default methods
############################################################################
[docs]
def status_check(self):
"""
Perform status check
Checks if framing errors are present
:return: Status
"""
logging.info("StationBeamformer: Checking status")
self.frame_errors = 0 # Removed from firmware
self.errors = (self.board[self._device+'.beamf_ring.error']) & 0x3FF
if self.frame_errors == 0:
return Status.OK
logging.info(f"StationBeamformer: Frame errors {self.frame_errors}")
return Status.Error
[docs]
def clean_up(self):
"""
Perform cleanup
:return: Success
"""
logging.info("StationBeamformer : Cleaning up")
return True