Source code for pyfabil.plugins.tpm.polyfilter_ox

import numpy as np

__author__ = 'chiello'

from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging


[docs] class PolyFilter(FirmwareBlock): """ PolyFilter plugin """ @compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board) @friendlyname('polyfilter') @maxinstances(2) def __init__(self, board, **kwargs): """ PolyFilter initialiser :param board: Pointer to board instance """ super(PolyFilter, self).__init__(board) if 'device' not in list(kwargs.keys()): raise PluginError("PolyFilter: Require a node instance") self._device = kwargs['device'] # if 'core' not in list(kwargs.keys()): # raise PluginError("PolyFilter: core_id required") if self._device == Device.FPGA_1: self._device = 'fpga1' elif self._device == Device.FPGA_2: self._device = 'fpga2' else: raise PluginError("PolyFilter: Invalid device %s" % self._device) # self._core = kwargs['core'] self._available_windows = ["zeros", "rectangular", "hann", "hamming", "blackman", "nutall", "blackman-nuttall", "blackman-harris"] self._selected_window = "hann" self._bin_width_scaling = 1.0 #######################################################################################
[docs] def get_filter_param(self): print("Length: %s" % (self.board['%s.poly.config1.length' % self._device])) print("Stages: %s" % (self.board['%s.poly.config1.stages' % self._device])) print("Coeff bitwidth: %s" % (self.board['%s.poly.config2.coeff_data_width' % self._device])) print("Time mux: %s" % (self.board['%s.poly.config1.mux' % self._device])) print("Coeff mux per RAM: %s" % (self.board['%s.poly.config2.coeff_mux_per_ram' % self._device]))
[docs] def get_available_windows(self): logging.info("Available windowing functions:") for window in self._available_windows: logging.info(" " + window)
[docs] def set_window(self, window, bin_width_scaling=1.0): if window in self._available_windows: self._selected_window = window self._bin_width_scaling = bin_width_scaling else: logging.info("Window %s is not supported. Not changing current selected window type: %s" % (window_type, self._selected_window))
[docs] def get_window_val(self, idx, length): i = idx if self._selected_window == "zeros": window_val = 0.0 elif self._selected_window == "rectangular": window_val = 1.0 elif self._selected_window == "hann": a0 = 0.5 a1 = 1 - a0 window_val = a0 - a1 * np.cos(2.0 * np.pi * i / float(length)) elif self._selected_window == "hamming": a0 = 25.0 / 46.0 a1 = 1 - a0 window_val = a0 - a1 * np.cos(2.0 * np.pi * i / float(length)) elif self._selected_window == "blackman": a0 = 7938.0 / 18608.0 a1 = 9240.0 / 18608.0 a2 = 1430.0 / 18608.0 window_val = a0 - a1 * np.cos(2.0 * np.pi * i / float(length)) + a2 * np.cos(4.0 * np.pi * i / float(length)) elif self._selected_window == "nutall": a0 = 0.355768 a1 = 0.487396 a2 = 0.144232 a3 = 0.012604 window_val = a0 - a1 * np.cos(2.0 * np.pi * i / float(length)) + a2 * np.cos(4.0 * np.pi * i / float(length)) - a3 * np.cos(6.0 * np.pi * i / float(length)) elif self._selected_window == "blackman-nuttall": a0 = 0.3635819 a1 = 0.4891775 a2 = 0.1365995 a3 = 0.0106411 window_val = a0 - a1 * np.cos(2.0 * np.pi * i / float(length)) + a2 * np.cos(4.0 * np.pi * i / float(length)) - a3 * np.cos(6.0 * np.pi * i / float(length)) elif self._selected_window == "blackman-harris": a0 = 0.35875 a1 = 0.48829 a2 = 0.14128 a3 = 0.01168 window_val = a0 - a1 * np.cos(2.0 * np.pi * i / float(length)) + a2 * np.cos(4.0 * np.pi * i / float(length)) - a3 * np.cos(6.0 * np.pi * i / float(length)) return window_val
[docs] def download_coeffs(self, removed_stages=0, bypass=False): logging.info("Downloading coefficients to %s polyfilter. Windowing function: %s, Bin width scaling: %s" % (self._device, self._selected_window, self._bin_width_scaling)) N = self.board['%s.poly.config1.length' % self._device] S = self.board['%s.poly.config1.stages' % self._device] C = self.board['%s.poly.config2.coeff_data_width' % self._device] MUX = self.board['%s.poly.config1.mux' % self._device] MUX_PER_RAM = self.board['%s.poly.config2.coeff_mux_per_ram' % self._device] NOF_RAM_PER_STAGE = int(MUX / MUX_PER_RAM) try: IS_FLOATING_POINT = self.board['%s.dsp_regfile.feature.use_floating_point' % self._device] except: IS_FLOATING_POINT = 0 stages = S - removed_stages M = N * stages base_width = C while base_width > 32: base_width /= 2 aspect_ratio_coeff = int(C / base_width) coeff = np.zeros(M, dtype=int) if not bypass: for i in range(M): window_val = self.get_window_val(i, M) real_val = window_val * np.sinc(self._bin_width_scaling * (i - M / 2.0) / float(N)) if IS_FLOATING_POINT == 1: coeff[i] = np.asarray(real_val, dtype=np.float32).view(np.int32).item() else: real_val *= (2 ** (C - 1) - 1) # rescaling coeff[i] = int(round(real_val)) else: for i in range(N): if IS_FLOATING_POINT == 1: coeff[i] = np.asarray(1.0, dtype=np.float32).view(np.int32).item() else: real_val = (2 ** (C - 1) - 1) # rescaling coeff[i] = int(round(real_val)) coeff_ram = np.zeros(int(N / NOF_RAM_PER_STAGE), dtype=int) for s in range(S): for ram in range(NOF_RAM_PER_STAGE): if s < stages: # print("ram " + str(ram)) idx = 0 for n in range(N): #if old_div((n % MUX), MUX_PER_RAM) == ram: if int((n % MUX) / MUX_PER_RAM) == ram: coeff_ram[idx] = coeff[N * (stages - 1 - s) + n] idx += 1 if aspect_ratio_coeff > 1: coeff_ram_arc = np.zeros(old_div(N, NOF_RAM_PER_STAGE) * aspect_ratio_coeff, dtype=int) for n in range(old_div(N, NOF_RAM_PER_STAGE)): for m in range(aspect_ratio_coeff): coeff_ram_arc[n * aspect_ratio_coeff + m] = coeff_ram[n] >> ( old_div(m * C, aspect_ratio_coeff)) else: coeff_ram_arc = coeff_ram else: if aspect_ratio_coeff > 1: coeff_ram_arc = np.zeros(old_div(N, NOF_RAM_PER_STAGE) * aspect_ratio_coeff, dtype=int) else: coeff_ram_arc = np.zeros(old_div(N, NOF_RAM_PER_STAGE), dtype=int) self.board['%s.poly.address.mux_ptr' % self._device] = ram self.board['%s.poly.address.stage_ptr' % self._device] = s self.board['%s.poly.coeff' % self._device] = coeff_ram_arc.tolist()
##################### Superclass method implementations #################################
[docs] def initialise(self): """ Initialise BeamfSimple """ logging.info("PolyFilter has been initialised") return True
[docs] def status_check(self): """ Perform status check :return: Status """ logging.info("PolyFilter : Checking status") return Status.OK
[docs] def clean_up(self): """ Perform cleanup :return: Success """ logging.info("PolyFilter : Cleaning up") return True