Source code for pyfabil.plugins.tpm.antenna_buffer

from __future__ import division
from builtins import range
__author__ = 'chiello'

from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import numpy as np
import logging
import time


[docs] class AntennaBuffer(FirmwareBlock): """ AntennaBuffer plugin """ @compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board) @friendlyname('tpm_antenna_buffer') @maxinstances(2) def __init__(self, board, samples_per_frame=864, **kwargs): """ AntennaBuffer initialiser :param board: Pointer to board instance """ super(AntennaBuffer, self).__init__(board) if 'device' not in list(kwargs.keys()): raise PluginError("AntennaBuffer: Require a node instance") self._device = kwargs['device'] if self._device == Device.FPGA_1: self._device = 'fpga1' elif self._device == Device.FPGA_2: self._device = 'fpga2' else: raise PluginError("AntennaBuffer: Invalid device %s" % self._device) # number of buffered antennas self._nof_antenna = 2 # number of ADC samples in a frame self._samples_per_frame = samples_per_frame # number of frames in one multiframe self._nof_frame_in_multiframe = 256 # number of buffered multiframe, calculated furing buffer allocation self._nof_ddr_multiframe = 0 # ddr_multiframe_byte_size = nof_samples * pols * nof_antenna * nof_frames, 1 byte per sample self._ddr_multiframe_byte_size = self._samples_per_frame * 2 * self._nof_antenna * self._nof_frame_in_multiframe #######################################################################################
[docs] def set_download(self, mode="1g", payload_length=1024): if mode == "1g" or mode == "1G": self.board['%s.antenna_buffer.control.tx_demux' % self._device] = 0 self.board['%s.antenna_buffer.payload_length' % self._device] = payload_length else: self.board['%s.antenna_buffer.control.tx_demux' % self._device] = 1 self.board['%s.antenna_buffer.payload_length' % self._device] = payload_length
[docs] def select_antenna(self, antenna_0, antenna_1, antenna_2=0, antenna_3=0): self.board['%s.antenna_buffer.input_sel.sel_antenna_id_0' % self._device] = antenna_0 self.board['%s.antenna_buffer.input_sel.sel_antenna_id_1' % self._device] = antenna_1 self.board['%s.antenna_buffer.input_sel.sel_antenna_id_2' % self._device] = antenna_2 self.board['%s.antenna_buffer.input_sel.sel_antenna_id_3' % self._device] = antenna_3
[docs] def configure_ddr_buffer(self, ddr_start_byte_address, byte_size): if ddr_start_byte_address % 64 != 0: logging.error("AntennaBuffer: ddr_start_address %s must be aligned to 64 bytes boundary" % self._device) return -1 if byte_size < self._ddr_multiframe_byte_size: logging.error("AntennaBuffer: allocated buffer size must be larger than %d bytes" % self._ddr_multiframe_byte_size) return -1 self._nof_ddr_multiframe = byte_size // self._ddr_multiframe_byte_size ddr_byte_length = self._nof_ddr_multiframe * self._ddr_multiframe_byte_size # divide by 64 as per DDR addressing self.board['%s.antenna_buffer.ddr_write_start_addr' % self._device] = ddr_start_byte_address // 8 self.board['%s.antenna_buffer.ddr_write_length' % self._device] = ddr_byte_length // 64 - 1 return ddr_byte_length
[docs] def buffer_write(self, start_time=-1, delay=256, continuous_mode=False): if self.board['%s.antenna_buffer.control.start_read' % self._device] == 1: logging.info("AntennaBuffer: Still reading from DDR buffer, not possible to write yet.") return False if self._nof_ddr_multiframe == 0: logging.info("AntennaBuffer: DDR buffer %s not configured, not writing" % self._device) return False self.board["%s.antenna_buffer.control.ddr_pointer_reset" % self._device] = 1 t0 = self.board["fpga1.pps_manager.timestamp_read_val"] if start_time == -1: start_time = t0 + delay else: if t0 > start_time: logging.error("AntennaBuffer: %s antenna buffer write failed, requested start_time passed" % self._device) return False if continuous_mode: stop_time = 0 else: stop_time = start_time + self._nof_ddr_multiframe self.board['%s.antenna_buffer.start_frame_write' % self._device] = start_time self.board['%s.antenna_buffer.stop_frame_write' % self._device] = stop_time self.board["%s.antenna_buffer.control.frame_write_update" % self._device] = 1 self.board["%s.antenna_buffer.control.frame_write_update" % self._device] = 0 t1 = self.board["fpga1.pps_manager.timestamp_read_val"] if t1 >= start_time: logging.error("AntennaBuffer: %s antenna buffer write failed, write buffer activated after start frame timestamp: t1 >= start_time" % self._device) return False return True
[docs] def one_shot_buffer_read(self): current_time = self.board["fpga1.pps_manager.timestamp_read_val"] if current_time < self.board['%s.antenna_buffer.stop_frame_write' % self._device]: logging.info("AntennaBuffer: Still writing into DDR buffer, not possible to read yet.") return False while self.board['%s.antenna_buffer.write_status.busy' % self._device] == 1: logging.info("AntennaBuffer: Waiting for busy bit to go low...") time.sleep(0.1) logging.info("AntennaBuffer: Write busy bit is low, start reading from DDR...") self.board['%s.antenna_buffer.ddr_read_start_addr' % self._device] = self.board['%s.antenna_buffer.ddr_write_start_addr' % self._device] self.board['%s.antenna_buffer.ddr_read_high_addr' % self._device] = self.board['%s.antenna_buffer.ddr_write_start_addr' % self._device] - 8 self.board['%s.antenna_buffer.ddr_read_length' % self._device] = self.board['%s.antenna_buffer.ddr_write_length' % self._device] self.board['%s.antenna_buffer.spead_timestamp' % self._device] = self.board['%s.antenna_buffer.first_frame' % self._device] self.board['%s.antenna_buffer.control.start_read' % self._device] = 1 return True
[docs] def one_shot(self, start_time=-1, delay=256): self.buffer_write(start_time, delay, continuous_mode=False) while not self.one_shot_buffer_read(): time.sleep(0.1)
[docs] def stop_now(self): self.board["%s.antenna_buffer.control.stop_now" % self._device] = 1 while self.board['%s.antenna_buffer.write_status.busy' % self._device] == 1: logging.info("AntennaBuffer: Waiting for busy bit to go low...") time.sleep(0.1) self.board['%s.antenna_buffer.start_frame_write' % self._device] = 0xFFFFFFFF self.board['%s.antenna_buffer.stop_frame_write' % self._device] = 0xFFFFFFFE self.board["%s.antenna_buffer.control.frame_write_update" % self._device] = 1 self.board["%s.antenna_buffer.control.frame_write_update" % self._device] = 0 self.board["%s.antenna_buffer.control.stop_now" % self._device] = 0
##################### Superclass method implementations #################################
[docs] def initialise(self): """ Initialise AntennaBuffer """ logging.debug("AntennaBuffer has been initialised") return True
[docs] def status_check(self): """ Perform status check :return: Status """ logging.debug("AntennaBuffer : Checking status") return Status.OK
[docs] def clean_up(self): """ Perform cleanup :return: Success """ logging.debug("AntennaBuffer : Cleaning up") return True