import numpy as np
__author__ = 'chiello'
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging
[docs]
class PolyFilter(FirmwareBlock):
""" PolyFilter plugin """
@compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board)
@friendlyname('polyfilter')
@maxinstances(2)
def __init__(self, board, **kwargs):
""" PolyFilter initialiser
:param board: Pointer to board instance
"""
super(PolyFilter, self).__init__(board)
if 'device' not in list(kwargs.keys()):
raise PluginError("PolyFilter: Require a node instance")
self._device = kwargs['device']
# if 'core' not in list(kwargs.keys()):
# raise PluginError("PolyFilter: core_id required")
if self._device == Device.FPGA_1:
self._device = 'fpga1'
elif self._device == Device.FPGA_2:
self._device = 'fpga2'
else:
raise PluginError("PolyFilter: Invalid device %s" % self._device)
# self._core = kwargs['core']
self._available_windows = ["zeros", "rectangular", "hann", "hamming", "blackman",
"nutall", "blackman-nuttall", "blackman-harris"]
self._selected_window = "hann"
self._bin_width_scaling = 1.0
#######################################################################################
[docs]
def get_filter_param(self):
print("Length: %s" % (self.board['%s.poly.config1.length' % self._device]))
print("Stages: %s" % (self.board['%s.poly.config1.stages' % self._device]))
print("Coeff bitwidth: %s" % (self.board['%s.poly.config2.coeff_data_width' % self._device]))
print("Time mux: %s" % (self.board['%s.poly.config1.mux' % self._device]))
print("Coeff mux per RAM: %s" % (self.board['%s.poly.config2.coeff_mux_per_ram' % self._device]))
[docs]
def get_available_windows(self):
logging.info("Available windowing functions:")
for window in self._available_windows:
logging.info(" " + window)
[docs]
def set_window(self, window, bin_width_scaling=1.0):
if window in self._available_windows:
self._selected_window = window
self._bin_width_scaling = bin_width_scaling
else:
logging.info("Window %s is not supported. Not changing current selected window type: %s" % (window_type, self._selected_window))
[docs]
def get_window_val(self, idx, length):
i = idx
if self._selected_window == "zeros":
window_val = 0.0
elif self._selected_window == "rectangular":
window_val = 1.0
elif self._selected_window == "hann":
a0 = 0.5
a1 = 1 - a0
window_val = a0 - a1 * np.cos(2.0 * np.pi * i / float(length))
elif self._selected_window == "hamming":
a0 = 25.0 / 46.0
a1 = 1 - a0
window_val = a0 - a1 * np.cos(2.0 * np.pi * i / float(length))
elif self._selected_window == "blackman":
a0 = 7938.0 / 18608.0
a1 = 9240.0 / 18608.0
a2 = 1430.0 / 18608.0
window_val = a0 - a1 * np.cos(2.0 * np.pi * i / float(length)) + a2 * np.cos(4.0 * np.pi * i / float(length))
elif self._selected_window == "nutall":
a0 = 0.355768
a1 = 0.487396
a2 = 0.144232
a3 = 0.012604
window_val = a0 - a1 * np.cos(2.0 * np.pi * i / float(length)) + a2 * np.cos(4.0 * np.pi * i / float(length)) - a3 * np.cos(6.0 * np.pi * i / float(length))
elif self._selected_window == "blackman-nuttall":
a0 = 0.3635819
a1 = 0.4891775
a2 = 0.1365995
a3 = 0.0106411
window_val = a0 - a1 * np.cos(2.0 * np.pi * i / float(length)) + a2 * np.cos(4.0 * np.pi * i / float(length)) - a3 * np.cos(6.0 * np.pi * i / float(length))
elif self._selected_window == "blackman-harris":
a0 = 0.35875
a1 = 0.48829
a2 = 0.14128
a3 = 0.01168
window_val = a0 - a1 * np.cos(2.0 * np.pi * i / float(length)) + a2 * np.cos(4.0 * np.pi * i / float(length)) - a3 * np.cos(6.0 * np.pi * i / float(length))
return window_val
[docs]
def download_coeffs(self, removed_stages=0, bypass=False):
logging.info("Downloading coefficients to %s polyfilter. Windowing function: %s, Bin width scaling: %s" % (self._device, self._selected_window, self._bin_width_scaling))
N = self.board['%s.poly.config1.length' % self._device]
S = self.board['%s.poly.config1.stages' % self._device]
C = self.board['%s.poly.config2.coeff_data_width' % self._device]
MUX = self.board['%s.poly.config1.mux' % self._device]
MUX_PER_RAM = self.board['%s.poly.config2.coeff_mux_per_ram' % self._device]
NOF_RAM_PER_STAGE = int(MUX / MUX_PER_RAM)
try:
IS_FLOATING_POINT = self.board['%s.dsp_regfile.feature.use_floating_point' % self._device]
except:
IS_FLOATING_POINT = 0
stages = S - removed_stages
M = N * stages
base_width = C
while base_width > 32:
base_width /= 2
aspect_ratio_coeff = int(C / base_width)
coeff = np.zeros(M, dtype=int)
if not bypass:
for i in range(M):
window_val = self.get_window_val(i, M)
real_val = window_val * np.sinc(self._bin_width_scaling * (i - M / 2.0) / float(N))
if IS_FLOATING_POINT == 1:
coeff[i] = np.asarray(real_val, dtype=np.float32).view(np.int32).item()
else:
real_val *= (2 ** (C - 1) - 1) # rescaling
coeff[i] = int(round(real_val))
else:
for i in range(N):
if IS_FLOATING_POINT == 1:
coeff[i] = np.asarray(1.0, dtype=np.float32).view(np.int32).item()
else:
real_val = (2 ** (C - 1) - 1) # rescaling
coeff[i] = int(round(real_val))
coeff_ram = np.zeros(int(N / NOF_RAM_PER_STAGE), dtype=int)
for s in range(S):
for ram in range(NOF_RAM_PER_STAGE):
if s < stages:
# print("ram " + str(ram))
idx = 0
for n in range(N):
#if old_div((n % MUX), MUX_PER_RAM) == ram:
if int((n % MUX) / MUX_PER_RAM) == ram:
coeff_ram[idx] = coeff[N * (stages - 1 - s) + n]
idx += 1
if aspect_ratio_coeff > 1:
coeff_ram_arc = np.zeros(old_div(N, NOF_RAM_PER_STAGE) * aspect_ratio_coeff, dtype=int)
for n in range(old_div(N, NOF_RAM_PER_STAGE)):
for m in range(aspect_ratio_coeff):
coeff_ram_arc[n * aspect_ratio_coeff + m] = coeff_ram[n] >> (
old_div(m * C, aspect_ratio_coeff))
else:
coeff_ram_arc = coeff_ram
else:
if aspect_ratio_coeff > 1:
coeff_ram_arc = np.zeros(old_div(N, NOF_RAM_PER_STAGE) * aspect_ratio_coeff, dtype=int)
else:
coeff_ram_arc = np.zeros(old_div(N, NOF_RAM_PER_STAGE), dtype=int)
self.board['%s.poly.address.mux_ptr' % self._device] = ram
self.board['%s.poly.address.stage_ptr' % self._device] = s
self.board['%s.poly.coeff' % self._device] = coeff_ram_arc.tolist()
##################### Superclass method implementations #################################
[docs]
def initialise(self):
""" Initialise BeamfSimple """
logging.info("PolyFilter has been initialised")
return True
[docs]
def status_check(self):
""" Perform status check
:return: Status
"""
logging.info("PolyFilter : Checking status")
return Status.OK
[docs]
def clean_up(self):
""" Perform cleanup
:return: Success
"""
logging.info("PolyFilter : Cleaning up")
return True