from __future__ import division
from builtins import range
from math import fabs, cos, sin
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
import logging
import random
__author__ = 'chiello'
[docs]
class TpmPatternGenerator(FirmwareBlock):
""" Pattern generator
"""
@compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board)
@friendlyname('tpm_pattern_generator')
@maxinstances(2)
def __init__(self, board, **kwargs):
""" TpmPatternGenerator initialiser
:param board: Pointer to board instance
"""
super(TpmPatternGenerator, self).__init__(board)
if 'device' not in list(kwargs.keys()):
raise PluginError("TpmPatternGenerator: Require a node instance")
self._device = kwargs['device']
if self._device == Device.FPGA_1:
self._device = 'fpga1'
elif self._device == Device.FPGA_2:
self._device = 'fpga2'
else:
raise PluginError("TpmPatternGenerator: Invalid device %s" % self._device)
############################################################################
[docs]
def check_stage(self, stage):
if stage not in ["jesd", "channel", "beamf"]:
logging.info("TpmPatternGenerator : Not supported stage name!")
return False
else:
return True
[docs]
def initialise(self, nof_inputs_per_fpga=16):
""" Initialise Pattern Generator with default incremental pattern
"""
default_pattern = list(range(1024))
self.set_pattern(default_pattern, "jesd")
self.set_pattern(self.channelize_pattern(default_pattern), "channel")
self.set_pattern(default_pattern, "beamf")
# set default pattern shift
self.board['%s.pattern_gen.jesd_left_shift' % self._device] = 0x0
self.board['%s.pattern_gen.channel_left_shift' % self._device] = 0x4
self.board['%s.pattern_gen.beamf_left_shift' % self._device] = 0x4
if self._device == "fpga1":
signal_offset = 0
else:
signal_offset = nof_inputs_per_fpga
signal_adder = []
for n in range(16):
signal_adder += [4 * (n + signal_offset)] * 4
self.set_signal_adder(signal_adder, "jesd")
self.set_signal_adder(signal_adder, "channel")
self.set_signal_adder(signal_adder, "beamf")
# set stream locked control bit
self.board['%s.pattern_gen.jesd_ctrl' % self._device] = 0x2
self.board['%s.pattern_gen.channel_ctrl' % self._device] = 0x2
self.board['%s.pattern_gen.beamf_ctrl' % self._device] = 0x2
return True
[docs]
def channelize_pattern(self, pattern):
""" Change the frequency channel order to match che channelizer output
:param pattern: pattern buffer, frequency channel in increasing order
"""
tmp = [0]*len(pattern)
half = len(pattern) // 2
for n in range(half // 2):
tmp[4*n] = pattern[2*n]
tmp[4*n+1] = pattern[2*n+1]
tmp[4*n+2] = pattern[-(1+2*n+1)]
tmp[4*n+3] = pattern[-(1+2*n)]
return tmp
[docs]
def set_pattern(self, buff, stage="channel"):
""" Write pattern in FPGA internal BRAM buffer
:param buff: pattern buffer, each element represents an output value
:param stage: stage where write the pattern: jesd, channel or beamf
"""
if not self.check_stage(stage):
return -1
else:
ba = self.board.memory_map['%s.pattern_gen.%s_data' % (self._device, stage)].address
if len(buff) > 1024:
logging.info("TpmPatternGenerator : Pattern buffer is too large, resizing to 1024 samples.")
buff = buff[0:1024]
dat = 0
for n in range(len(buff)):
dat = dat | ((buff[n] & 0xFF) << (8 * (n % 4)))
if n % 4 == 3:
self.board[ba + (n // 4) * 4] = dat
dat = 0
[docs]
def set_random_pattern(self, stage="channel", seed=0):
""" Write a randmon pattern in FPGA internal BRAM buffer
:param stage: stage where write the pattern: jesd, channel or beamf
:param seed: seed for random number generator generator
"""
random.seed(seed)
buff = [0]*1024
for n in range(1024):
buff[n] = random.randint(0, 255)
if not self.check_stage(stage):
return -1
else:
ba = self.board.memory_map['%s.pattern_gen.%s_data' % (self._device, stage)].address
if len(buff) > 1024:
logging.info("TpmPatternGenerator : Pattern buffer is too large, resizing to 1024 samples.")
buff = buff[0:1024]
dat = 0
for n in range(len(buff)):
dat = dat | ((buff[n] & 0xFF) << (8 * (n % 4)))
if n % 4 == 3:
self.board[ba + 4 * n // 4] = dat
dat = 0
[docs]
def get_pattern(self, stage="channel"):
""" Read pattern from FPGA internal BRAM buffer
:param stage: stage where write the pattern: jesd, channel or beamf
"""
ba = self.board.memory_map['%s.pattern_gen.%s_data' % (self._device, stage)].address
mem = self.board.read_address(ba, 256)
pattern = [0]*1024
for n in range(256):
pattern[n * 4 + 0] = (mem[n] & 0xFF) >> 0
pattern[n * 4 + 1] = (mem[n] & 0xFF00) >> 8
pattern[n * 4 + 2] = (mem[n] & 0xFF0000) >> 16
pattern[n * 4 + 3] = (mem[n] & 0xFF000000) >> 24
return pattern
[docs]
def set_signal_adder(self, adder_list, stage):
""" Set signal adder. For each signal its pattern is constructed by adding a value to the pattern of input 0.
:param adder_list: list of 64 adder (one adder per each signal and muxed value = 16*4)
:param stage: stage where write the pattern: jesd, channel or beamf
"""
if len(adder_list) != 64:
logging.info("TpmPatternGenerator : adder_list must contain 64 elements!")
return -1
elif not self.check_stage(stage):
return -1
else:
for n in range(64):
self.board['%s.pattern_gen.%s_signal_adder_%i' % (self._device, stage, n)] = adder_list[n]
[docs]
def clear_signal_adder(self, stage):
""" Clear signal adder. Set to zero the signal adder of each signal.
:param stage: stage where write clear to signal adders
"""
for n in range(64):
self.board['%s.pattern_gen.%s_signal_adder_%i' % (self._device, stage, n)] = 0
[docs]
def start_pattern(self, stage):
""" Start pattern.
:param stage: stage where start the pattern: jesd, channel or beamf
"""
if not self.check_stage(stage):
return -1
else:
self.board['%s.pattern_gen.%s_ctrl' % (self._device, stage)] = 0x3
[docs]
def stop_pattern(self, stage):
""" Stop pattern.
:param stage: stage where stop the pattern: jesd, channel or beamf
"""
if not self.check_stage(stage):
return -1
else:
self.board['%s.pattern_gen.%s_ctrl' % (self._device, stage)] = 0x2
[docs]
def status_check(self):
logging.info("TpmPatternGenerator : Checking status")
return Status.OK
[docs]
def clean_up(self):
""" Perform cleanup
:return: Success
"""
logging.info("TpmPatternGenerator : Cleaning up")
return True