from __future__ import division
import socket
__author__ = 'chiello'
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging
[docs]
class BeamfSimple(FirmwareBlock):
""" BeamfSimple plugin """
@compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board)
@friendlyname('tpm_beamf_simple')
@maxinstances(8)
def __init__(self, board, **kwargs):
""" BeamfSimple initialiser
:param board: Pointer to board instance
"""
super(BeamfSimple, self).__init__(board)
if 'device' not in list(kwargs.keys()):
raise PluginError("BeamfSimple: Require a node instance")
self._device = kwargs['device']
if 'core' not in list(kwargs.keys()):
raise PluginError("BeamfSimple: core_id required")
if self._device == Device.FPGA_1:
self._device = 'fpga1'
elif self._device == Device.FPGA_2:
self._device = 'fpga2'
else:
raise PluginError("BeamfSimple: Invalid device %s" % self._device)
self._core = kwargs['core']
#######################################################################################
[docs]
def initialise_core(self):
""" Initialise BeamfSimple core """
logging.info("Initialising Simple Beamformer %i of %s: " % (self._core, self._device))
[docs]
def download_weights(self, weights, antenna):
""" Apply beamforming weights
:param weights: Weights array
:param antenna: Antenna ID
"""
if len(weights) != 256:
logging.info("BeamfSimple weights list must contain 256 values. Weights have not been written!")
return
self.board["%s.beamf%i.ch%02dcoeff" % (self._device, self._core, antenna)] = weights
[docs]
def read_weights(self, antenna):
""" Apply beamforming weights
:param antenna: Antenna ID
"""
weights = self.board.read_register("%s.beamf%i.ch%02dcoeff" % (self._device, self._core, antenna), 256)
return weights
[docs]
def apply_weights(self, timestamp, seconds=0.2):
""" Synchronise beamformer coefficients download
:param timestamp: Timestamp value when weights will be applied
:param seconds: Number of seconds to delay operation """
# Set arm timestamp
# delay = number of frames to delay * frame time (shift by 8)
delay = seconds * (1.0 / (1080 * 1e-9)) / 256
timestamp_delay = timestamp + int(delay)
self.board["%s.beamf%i.timestamp_req" % (self._device, self._core)] = timestamp_delay
if self.board["%s.beamf%i.timestamp_req" % (self._device, self._core)] > timestamp_delay:
logging.info("BeamfSimple coefficients not applied.")
else:
logging.info("BeamfSimple coefficients applied.")
##################### Superclass method implementations #################################
[docs]
def initialise(self):
""" Initialise BeamfSimple """
logging.info("BeamfSimple has been initialised")
return True
[docs]
def status_check(self):
""" Perform status check
:return: Status
"""
logging.info("BeamfSimple : Checking status")
return Status.OK
[docs]
def clean_up(self):
""" Perform cleanup
:return: Success
"""
logging.info("BeamfSimple : Cleaning up")
return True