Source code for pyfabil.plugins.tpm.beamf_simple

from __future__ import division
import socket

__author__ = 'chiello'

from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging


[docs] class BeamfSimple(FirmwareBlock): """ BeamfSimple plugin """ @compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board) @friendlyname('tpm_beamf_simple') @maxinstances(8) def __init__(self, board, **kwargs): """ BeamfSimple initialiser :param board: Pointer to board instance """ super(BeamfSimple, self).__init__(board) if 'device' not in list(kwargs.keys()): raise PluginError("BeamfSimple: Require a node instance") self._device = kwargs['device'] if 'core' not in list(kwargs.keys()): raise PluginError("BeamfSimple: core_id required") if self._device == Device.FPGA_1: self._device = 'fpga1' elif self._device == Device.FPGA_2: self._device = 'fpga2' else: raise PluginError("BeamfSimple: Invalid device %s" % self._device) self._core = kwargs['core'] #######################################################################################
[docs] def initialise_core(self): """ Initialise BeamfSimple core """ logging.info("Initialising Simple Beamformer %i of %s: " % (self._core, self._device))
[docs] def download_weights(self, weights, antenna): """ Apply beamforming weights :param weights: Weights array :param antenna: Antenna ID """ if len(weights) != 256: logging.info("BeamfSimple weights list must contain 256 values. Weights have not been written!") return self.board["%s.beamf%i.ch%02dcoeff" % (self._device, self._core, antenna)] = weights
[docs] def read_weights(self, antenna): """ Apply beamforming weights :param antenna: Antenna ID """ weights = self.board.read_register("%s.beamf%i.ch%02dcoeff" % (self._device, self._core, antenna), 256) return weights
[docs] def apply_weights(self, timestamp, seconds=0.2): """ Synchronise beamformer coefficients download :param timestamp: Timestamp value when weights will be applied :param seconds: Number of seconds to delay operation """ # Set arm timestamp # delay = number of frames to delay * frame time (shift by 8) delay = seconds * (1.0 / (1080 * 1e-9)) / 256 timestamp_delay = timestamp + int(delay) self.board["%s.beamf%i.timestamp_req" % (self._device, self._core)] = timestamp_delay if self.board["%s.beamf%i.timestamp_req" % (self._device, self._core)] > timestamp_delay: logging.info("BeamfSimple coefficients not applied.") else: logging.info("BeamfSimple coefficients applied.")
##################### Superclass method implementations #################################
[docs] def initialise(self): """ Initialise BeamfSimple """ logging.info("BeamfSimple has been initialised") return True
[docs] def status_check(self): """ Perform status check :return: Status """ logging.info("BeamfSimple : Checking status") return Status.OK
[docs] def clean_up(self): """ Perform cleanup :return: Success """ logging.info("BeamfSimple : Cleaning up") return True