Source code for pyfabil.plugins.tpm.pattern_generator

from __future__ import division
from builtins import range
from math import fabs, cos, sin
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
import logging
import random

__author__ = 'chiello'


[docs] class TpmPatternGenerator(FirmwareBlock): """ Pattern generator """ @compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board) @friendlyname('tpm_pattern_generator') @maxinstances(2) def __init__(self, board, **kwargs): """ TpmPatternGenerator initialiser :param board: Pointer to board instance """ super(TpmPatternGenerator, self).__init__(board) if 'device' not in list(kwargs.keys()): raise PluginError("TpmPatternGenerator: Require a node instance") self._device = kwargs['device'] if self._device == Device.FPGA_1: self._device = 'fpga1' elif self._device == Device.FPGA_2: self._device = 'fpga2' else: raise PluginError("TpmPatternGenerator: Invalid device %s" % self._device) ############################################################################
[docs] def check_stage(self, stage): if stage not in ["jesd", "channel", "beamf"]: logging.info("TpmPatternGenerator : Not supported stage name!") return False else: return True
[docs] def initialise(self, nof_inputs_per_fpga=16): """ Initialise Pattern Generator with default incremental pattern """ default_pattern = list(range(1024)) self.set_pattern(default_pattern, "jesd") self.set_pattern(self.channelize_pattern(default_pattern), "channel") self.set_pattern(default_pattern, "beamf") # set default pattern shift self.board['%s.pattern_gen.jesd_left_shift' % self._device] = 0x0 self.board['%s.pattern_gen.channel_left_shift' % self._device] = 0x4 self.board['%s.pattern_gen.beamf_left_shift' % self._device] = 0x4 if self._device == "fpga1": signal_offset = 0 else: signal_offset = nof_inputs_per_fpga signal_adder = [] for n in range(16): signal_adder += [4 * (n + signal_offset)] * 4 self.set_signal_adder(signal_adder, "jesd") self.set_signal_adder(signal_adder, "channel") self.set_signal_adder(signal_adder, "beamf") # set stream locked control bit self.board['%s.pattern_gen.jesd_ctrl' % self._device] = 0x2 self.board['%s.pattern_gen.channel_ctrl' % self._device] = 0x2 self.board['%s.pattern_gen.beamf_ctrl' % self._device] = 0x2 return True
[docs] def channelize_pattern(self, pattern): """ Change the frequency channel order to match che channelizer output :param pattern: pattern buffer, frequency channel in increasing order """ tmp = [0]*len(pattern) half = len(pattern) // 2 for n in range(half // 2): tmp[4*n] = pattern[2*n] tmp[4*n+1] = pattern[2*n+1] tmp[4*n+2] = pattern[-(1+2*n+1)] tmp[4*n+3] = pattern[-(1+2*n)] return tmp
[docs] def set_pattern(self, buff, stage="channel"): """ Write pattern in FPGA internal BRAM buffer :param buff: pattern buffer, each element represents an output value :param stage: stage where write the pattern: jesd, channel or beamf """ if not self.check_stage(stage): return -1 else: ba = self.board.memory_map['%s.pattern_gen.%s_data' % (self._device, stage)].address if len(buff) > 1024: logging.info("TpmPatternGenerator : Pattern buffer is too large, resizing to 1024 samples.") buff = buff[0:1024] dat = 0 for n in range(len(buff)): dat = dat | ((buff[n] & 0xFF) << (8 * (n % 4))) if n % 4 == 3: self.board[ba + (n // 4) * 4] = dat dat = 0
[docs] def set_random_pattern(self, stage="channel", seed=0): """ Write a randmon pattern in FPGA internal BRAM buffer :param stage: stage where write the pattern: jesd, channel or beamf :param seed: seed for random number generator generator """ random.seed(seed) buff = [0]*1024 for n in range(1024): buff[n] = random.randint(0, 255) if not self.check_stage(stage): return -1 else: ba = self.board.memory_map['%s.pattern_gen.%s_data' % (self._device, stage)].address if len(buff) > 1024: logging.info("TpmPatternGenerator : Pattern buffer is too large, resizing to 1024 samples.") buff = buff[0:1024] dat = 0 for n in range(len(buff)): dat = dat | ((buff[n] & 0xFF) << (8 * (n % 4))) if n % 4 == 3: self.board[ba + 4 * n // 4] = dat dat = 0
[docs] def get_pattern(self, stage="channel"): """ Read pattern from FPGA internal BRAM buffer :param stage: stage where write the pattern: jesd, channel or beamf """ ba = self.board.memory_map['%s.pattern_gen.%s_data' % (self._device, stage)].address mem = self.board.read_address(ba, 256) pattern = [0]*1024 for n in range(256): pattern[n * 4 + 0] = (mem[n] & 0xFF) >> 0 pattern[n * 4 + 1] = (mem[n] & 0xFF00) >> 8 pattern[n * 4 + 2] = (mem[n] & 0xFF0000) >> 16 pattern[n * 4 + 3] = (mem[n] & 0xFF000000) >> 24 return pattern
[docs] def set_signal_adder(self, adder_list, stage): """ Set signal adder. For each signal its pattern is constructed by adding a value to the pattern of input 0. :param adder_list: list of 64 adder (one adder per each signal and muxed value = 16*4) :param stage: stage where write the pattern: jesd, channel or beamf """ if len(adder_list) != 64: logging.info("TpmPatternGenerator : adder_list must contain 64 elements!") return -1 elif not self.check_stage(stage): return -1 else: for n in range(64): self.board['%s.pattern_gen.%s_signal_adder_%i' % (self._device, stage, n)] = adder_list[n]
[docs] def clear_signal_adder(self, stage): """ Clear signal adder. Set to zero the signal adder of each signal. :param stage: stage where write clear to signal adders """ for n in range(64): self.board['%s.pattern_gen.%s_signal_adder_%i' % (self._device, stage, n)] = 0
[docs] def start_pattern(self, stage): """ Start pattern. :param stage: stage where start the pattern: jesd, channel or beamf """ if not self.check_stage(stage): return -1 else: self.board['%s.pattern_gen.%s_ctrl' % (self._device, stage)] = 0x3
[docs] def stop_pattern(self, stage): """ Stop pattern. :param stage: stage where stop the pattern: jesd, channel or beamf """ if not self.check_stage(stage): return -1 else: self.board['%s.pattern_gen.%s_ctrl' % (self._device, stage)] = 0x2
[docs] def status_check(self): logging.info("TpmPatternGenerator : Checking status") return Status.OK
[docs] def clean_up(self): """ Perform cleanup :return: Success """ logging.info("TpmPatternGenerator : Cleaning up") return True