Source code for pyfabil.plugins.tpm.integrator
from __future__ import division
__author__ = 'chiello'
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
from math import log, ceil
import logging
import time
[docs]
class TpmIntegrator(FirmwareBlock):
""" FirmwareBlock tests class """
@compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board)
@friendlyname('tpm_integrator')
@maxinstances(16)
def __init__(self, board, fsample=800e6, nof_frequency_channels=512, oversampling_factor=32.0/27.0, core="", **kwargs):
""" TpmIntegrator initialiser
:param board: Pointer to board instance
"""
super(TpmIntegrator, self).__init__(board)
self._board_type = kwargs.get('board_type', 'XTPM')
if 'device' not in list(kwargs.keys()):
raise PluginError("TpmFpga: Require a node instance")
self._device = kwargs['device']
if self._device == Device.FPGA_1:
self._device = 'fpga1'
elif self._device == Device.FPGA_2:
self._device = 'fpga2'
else:
raise PluginError("TpmFpga: Invalid device %s" % self._device)
self._core = core
# sampling frequency
self._fsample = fsample
self.requested_integration_time = -1
self.actual_integration_time = -1
self.first_channel = 0
self.last_channel = nof_frequency_channels
self.time_mux_factor = 2
self.nof_frequency_channels = nof_frequency_channels
self.oversampling_factor = oversampling_factor
try:
self.board["%s.lmc_integrated_gen%s.regfile_tx_demux" % (self._device, self._core)]
self.implemented = True
except:
self.implemented = False
[docs]
def configure_parameters(self, stage, integration_time, first_channel, last_channel,
time_mux_factor, carousel_enable=0x0, download_bit_width=None, data_bit_width=None):
self.configure(stage, integration_time, first_channel, last_channel + 1,
time_mux_factor, carousel_enable, download_bit_width, data_bit_width)
[docs]
def configure(self, stage, integration_time, first_channel, last_channel,
time_mux_factor, carousel_enable=0x0, download_bit_width=None, data_bit_width=None):
if not self.implemented:
logging.warning("Integrator is not implemented.")
return
self.requested_integration_time = integration_time
self.actual_integration_time = 0
self.first_channel = first_channel
self.last_channel = last_channel
self.time_mux_factor = time_mux_factor
if download_bit_width != None:
download_bits = download_bit_width
elif stage == "beamf":
download_bits = 32
elif stage == "channel":
download_bits = 16
else:
logging.warning("Integrator %s is not implemented." % stage)
return
if data_bit_width != None:
data_bits = data_bit_width
elif stage == "beamf":
data_bits = 12
elif stage == "channel":
data_bits = 8
else:
logging.warning("Integrator %s is not implemented." % stage)
return
freq = self._fsample
# Calculate integration samples (including oversampling factor)
spectra_per_second = (freq * self.oversampling_factor) / (self.nof_frequency_channels * 2.0)
integration_length = spectra_per_second * self.requested_integration_time
integration_length = int(integration_length)
self.actual_integration_time = integration_length / spectra_per_second
min_integration_time = 50e6 / (self.nof_frequency_channels * 2.0) / spectra_per_second
max_bit_width = data_bits*2 + 1 + int(ceil(log(integration_length, 2)))
scaling_factor = 0
if max_bit_width > download_bits:
scaling_factor = max_bit_width - download_bits
if stage == "beamf":
logging.info("Setting beam integration time on %s to %.4fs" % (self._device, self.actual_integration_time))
elif stage == "channel":
logging.info("Setting channel integration time on %s to %.4fs" % (self._device, self.actual_integration_time))
# Sanity check
if self.actual_integration_time < min_integration_time:
logging.warning("Integration time %.4fs is less than the minimum integration time (%.4f). Settings "
"integration time to minimum" %
(self.actual_integration_time, min_integration_time))
integration_length = int(min_integration_time * spectra_per_second)
# Send request to TPM
self.board["%s.lmc_integrated_gen%s.%s_start_read_channel" % (self._device, self._core, stage)] = self.first_channel // self.time_mux_factor
self.board["%s.lmc_integrated_gen%s.%s_last_read_channel" % (self._device, self._core, stage)] = self.last_channel // self.time_mux_factor - 1
self.board["%s.lmc_integrated_gen%s.%s_enable" % (self._device, self._core, stage)] = 0x0
self.board["%s.lmc_integrated_gen%s.%s_scaling_factor" % (self._device, self._core, stage)] = scaling_factor
self.board["%s.lmc_integrated_gen%s.%s_integration_length" % (self._device, self._core, stage)] = integration_length
self.board["%s.lmc_integrated_gen%s.%s_carousel_enable" % (self._device, self._core, stage)] = carousel_enable
self.board["%s.lmc_integrated_gen%s.%s_enable" % (self._device, self._core, stage)] = 0x1
[docs]
def configure_download(self, download_mode, channel_payload_length, beam_payload_length):
if not self.implemented:
logging.warning("Integrator is not implemented.")
return
# Using 10G lane
if download_mode.upper() == "10G":
if channel_payload_length >= 8193 or beam_payload_length >= 8193:
logging.warning("Packet length too large for 10G")
return
self.board['%s.lmc_integrated_gen%s.regfile_tx_demux' % (self._device, self._core)] = 2
# Using dedicated 1G link
elif download_mode.upper() == "1G":
self.board['%s.lmc_integrated_gen%s.regfile_tx_demux' % (self._device, self._core)] = 1
else:
logging.warning("Supported mode are 1g, 10g")
return
self.board['%s.lmc_integrated_gen%s.regfile_channel_payload_length' % (self._device, self._core)] = channel_payload_length
self.board['%s.lmc_integrated_gen%s.regfile_beamf_payload_length' % (self._device, self._core)] = beam_payload_length
[docs]
def stop_integrated_channel_data(self):
""" Stop receiving integrated beam data from the board """
if not self.implemented:
logging.warning("Integrator is not implemented.")
return
self.board["%s.lmc_integrated_gen%s.channel_enable" % (self._device, self._core)] = 0x0
[docs]
def stop_integrated_beam_data(self):
""" Stop receiving integrated beam data from the board """
if not self.implemented:
logging.warning("Integrator is not implemented.")
return
self.board["%s.lmc_integrated_gen%s.beamf_enable" % (self._device, self._core)] = 0x0
[docs]
def stop_integrated_data(self):
""" Stop transmission of integrated data"""
if not self.implemented:
logging.warning("Integrator is not implemented.")
return
self.stop_integrated_channel_data()
self.stop_integrated_beam_data()
##################### Superclass method implementations #################################
[docs]
def initialise(self):
""" Initialise TpmIntegrator """
logging.info("TpmIntegrator has been initialised")
return True
[docs]
def status_check(self):
""" Perform status check
:return: Status
"""
logging.info("TpmIntegrator : Checking status")
return Status.OK
[docs]
def clean_up(self):
""" Perform cleanup
:return: Success
"""
logging.info("TpmIntegrator : Cleaning up")
return True