from __future__ import division
from builtins import range
__author__ = 'chiello'
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import numpy as np
import logging
import time
[docs]
class AntennaBuffer(FirmwareBlock):
""" AntennaBuffer plugin """
@compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board)
@friendlyname('tpm_antenna_buffer')
@maxinstances(2)
def __init__(self, board, samples_per_frame=864, **kwargs):
""" AntennaBuffer initialiser
:param board: Pointer to board instance
"""
super(AntennaBuffer, self).__init__(board)
if 'device' not in list(kwargs.keys()):
raise PluginError("AntennaBuffer: Require a node instance")
self._device = kwargs['device']
if self._device == Device.FPGA_1:
self._device = 'fpga1'
elif self._device == Device.FPGA_2:
self._device = 'fpga2'
else:
raise PluginError("AntennaBuffer: Invalid device %s" % self._device)
# number of buffered antennas
self._nof_antenna = 2
# number of ADC samples in a frame
self._samples_per_frame = samples_per_frame
# number of frames in one multiframe
self._nof_frame_in_multiframe = 256
# number of buffered multiframe, calculated furing buffer allocation
self._nof_ddr_multiframe = 0
# ddr_multiframe_byte_size = nof_samples * pols * nof_antenna * nof_frames, 1 byte per sample
self._ddr_multiframe_byte_size = self._samples_per_frame * 2 * self._nof_antenna * self._nof_frame_in_multiframe
#######################################################################################
[docs]
def set_download(self, mode="1g", payload_length=1024):
if mode == "1g" or mode == "1G":
self.board['%s.antenna_buffer.control.tx_demux' % self._device] = 0
self.board['%s.antenna_buffer.payload_length' % self._device] = payload_length
else:
self.board['%s.antenna_buffer.control.tx_demux' % self._device] = 1
self.board['%s.antenna_buffer.payload_length' % self._device] = payload_length
[docs]
def select_antenna(self, antenna_0, antenna_1, antenna_2=0, antenna_3=0):
self.board['%s.antenna_buffer.input_sel.sel_antenna_id_0' % self._device] = antenna_0
self.board['%s.antenna_buffer.input_sel.sel_antenna_id_1' % self._device] = antenna_1
self.board['%s.antenna_buffer.input_sel.sel_antenna_id_2' % self._device] = antenna_2
self.board['%s.antenna_buffer.input_sel.sel_antenna_id_3' % self._device] = antenna_3
[docs]
def buffer_write(self, start_time=-1, delay=256, continuous_mode=False):
if self.board['%s.antenna_buffer.control.start_read' % self._device] == 1:
logging.info("AntennaBuffer: Still reading from DDR buffer, not possible to write yet.")
return False
if self._nof_ddr_multiframe == 0:
logging.info("AntennaBuffer: DDR buffer %s not configured, not writing" % self._device)
return False
self.board["%s.antenna_buffer.control.ddr_pointer_reset" % self._device] = 1
t0 = self.board["fpga1.pps_manager.timestamp_read_val"]
if start_time == -1:
start_time = t0 + delay
else:
if t0 > start_time:
logging.error("AntennaBuffer: %s antenna buffer write failed, requested start_time passed" % self._device)
return False
if continuous_mode:
stop_time = 0
else:
stop_time = start_time + self._nof_ddr_multiframe
self.board['%s.antenna_buffer.start_frame_write' % self._device] = start_time
self.board['%s.antenna_buffer.stop_frame_write' % self._device] = stop_time
self.board["%s.antenna_buffer.control.frame_write_update" % self._device] = 1
self.board["%s.antenna_buffer.control.frame_write_update" % self._device] = 0
t1 = self.board["fpga1.pps_manager.timestamp_read_val"]
if t1 >= start_time:
logging.error("AntennaBuffer: %s antenna buffer write failed, write buffer activated after start frame timestamp: t1 >= start_time" % self._device)
return False
return True
[docs]
def one_shot_buffer_read(self):
current_time = self.board["fpga1.pps_manager.timestamp_read_val"]
if current_time < self.board['%s.antenna_buffer.stop_frame_write' % self._device]:
logging.info("AntennaBuffer: Still writing into DDR buffer, not possible to read yet.")
return False
while self.board['%s.antenna_buffer.write_status.busy' % self._device] == 1:
logging.info("AntennaBuffer: Waiting for busy bit to go low...")
time.sleep(0.1)
logging.info("AntennaBuffer: Write busy bit is low, start reading from DDR...")
self.board['%s.antenna_buffer.ddr_read_start_addr' % self._device] = self.board['%s.antenna_buffer.ddr_write_start_addr' % self._device]
self.board['%s.antenna_buffer.ddr_read_high_addr' % self._device] = self.board['%s.antenna_buffer.ddr_write_start_addr' % self._device] - 8
self.board['%s.antenna_buffer.ddr_read_length' % self._device] = self.board['%s.antenna_buffer.ddr_write_length' % self._device]
self.board['%s.antenna_buffer.spead_timestamp' % self._device] = self.board['%s.antenna_buffer.first_frame' % self._device]
self.board['%s.antenna_buffer.control.start_read' % self._device] = 1
return True
[docs]
def one_shot(self, start_time=-1, delay=256):
self.buffer_write(start_time, delay, continuous_mode=False)
while not self.one_shot_buffer_read():
time.sleep(0.1)
[docs]
def stop_now(self):
self.board["%s.antenna_buffer.control.stop_now" % self._device] = 1
while self.board['%s.antenna_buffer.write_status.busy' % self._device] == 1:
logging.info("AntennaBuffer: Waiting for busy bit to go low...")
time.sleep(0.1)
self.board['%s.antenna_buffer.start_frame_write' % self._device] = 0xFFFFFFFF
self.board['%s.antenna_buffer.stop_frame_write' % self._device] = 0xFFFFFFFE
self.board["%s.antenna_buffer.control.frame_write_update" % self._device] = 1
self.board["%s.antenna_buffer.control.frame_write_update" % self._device] = 0
self.board["%s.antenna_buffer.control.stop_now" % self._device] = 0
##################### Superclass method implementations #################################
[docs]
def initialise(self):
""" Initialise AntennaBuffer """
logging.debug("AntennaBuffer has been initialised")
return True
[docs]
def status_check(self):
""" Perform status check
:return: Status
"""
logging.debug("AntennaBuffer : Checking status")
return Status.OK
[docs]
def clean_up(self):
""" Perform cleanup
:return: Success
"""
logging.debug("AntennaBuffer : Cleaning up")
return True