Source code for pyfabil.plugins.tpm.integrator

from __future__ import division
__author__ = 'chiello'

from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
from math import log, ceil
import logging

import time

[docs] class TpmIntegrator(FirmwareBlock): """ FirmwareBlock tests class """ @compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board) @friendlyname('tpm_integrator') @maxinstances(16) def __init__(self, board, fsample=800e6, nof_frequency_channels=512, oversampling_factor=32.0/27.0, core="", **kwargs): """ TpmIntegrator initialiser :param board: Pointer to board instance """ super(TpmIntegrator, self).__init__(board) self._board_type = kwargs.get('board_type', 'XTPM') if 'device' not in list(kwargs.keys()): raise PluginError("TpmFpga: Require a node instance") self._device = kwargs['device'] if self._device == Device.FPGA_1: self._device = 'fpga1' elif self._device == Device.FPGA_2: self._device = 'fpga2' else: raise PluginError("TpmFpga: Invalid device %s" % self._device) self._core = core # sampling frequency self._fsample = fsample self.requested_integration_time = -1 self.actual_integration_time = -1 self.first_channel = 0 self.last_channel = nof_frequency_channels self.time_mux_factor = 2 self.nof_frequency_channels = nof_frequency_channels self.oversampling_factor = oversampling_factor try: self.board["%s.lmc_integrated_gen%s.regfile_tx_demux" % (self._device, self._core)] self.implemented = True except: self.implemented = False
[docs] def configure_parameters(self, stage, integration_time, first_channel, last_channel, time_mux_factor, carousel_enable=0x0, download_bit_width=None, data_bit_width=None): self.configure(stage, integration_time, first_channel, last_channel + 1, time_mux_factor, carousel_enable, download_bit_width, data_bit_width)
[docs] def configure(self, stage, integration_time, first_channel, last_channel, time_mux_factor, carousel_enable=0x0, download_bit_width=None, data_bit_width=None): if not self.implemented: logging.warning("Integrator is not implemented.") return self.requested_integration_time = integration_time self.actual_integration_time = 0 self.first_channel = first_channel self.last_channel = last_channel self.time_mux_factor = time_mux_factor if download_bit_width != None: download_bits = download_bit_width elif stage == "beamf": download_bits = 32 elif stage == "channel": download_bits = 16 else: logging.warning("Integrator %s is not implemented." % stage) return if data_bit_width != None: data_bits = data_bit_width elif stage == "beamf": data_bits = 12 elif stage == "channel": data_bits = 8 else: logging.warning("Integrator %s is not implemented." % stage) return freq = self._fsample # Calculate integration samples (including oversampling factor) spectra_per_second = (freq * self.oversampling_factor) / (self.nof_frequency_channels * 2.0) integration_length = spectra_per_second * self.requested_integration_time integration_length = int(integration_length) self.actual_integration_time = integration_length / spectra_per_second min_integration_time = 50e6 / (self.nof_frequency_channels * 2.0) / spectra_per_second max_bit_width = data_bits*2 + 1 + int(ceil(log(integration_length, 2))) scaling_factor = 0 if max_bit_width > download_bits: scaling_factor = max_bit_width - download_bits if stage == "beamf": logging.info("Setting beam integration time on %s to %.4fs" % (self._device, self.actual_integration_time)) elif stage == "channel": logging.info("Setting channel integration time on %s to %.4fs" % (self._device, self.actual_integration_time)) # Sanity check if self.actual_integration_time < min_integration_time: logging.warning("Integration time %.4fs is less than the minimum integration time (%.4f). Settings " "integration time to minimum" % (self.actual_integration_time, min_integration_time)) integration_length = int(min_integration_time * spectra_per_second) # Send request to TPM self.board["%s.lmc_integrated_gen%s.%s_start_read_channel" % (self._device, self._core, stage)] = self.first_channel // self.time_mux_factor self.board["%s.lmc_integrated_gen%s.%s_last_read_channel" % (self._device, self._core, stage)] = self.last_channel // self.time_mux_factor - 1 self.board["%s.lmc_integrated_gen%s.%s_enable" % (self._device, self._core, stage)] = 0x0 self.board["%s.lmc_integrated_gen%s.%s_scaling_factor" % (self._device, self._core, stage)] = scaling_factor self.board["%s.lmc_integrated_gen%s.%s_integration_length" % (self._device, self._core, stage)] = integration_length self.board["%s.lmc_integrated_gen%s.%s_carousel_enable" % (self._device, self._core, stage)] = carousel_enable self.board["%s.lmc_integrated_gen%s.%s_enable" % (self._device, self._core, stage)] = 0x1
[docs] def configure_download(self, download_mode, channel_payload_length, beam_payload_length): if not self.implemented: logging.warning("Integrator is not implemented.") return # Using 10G lane if download_mode.upper() == "10G": if channel_payload_length >= 8193 or beam_payload_length >= 8193: logging.warning("Packet length too large for 10G") return self.board['%s.lmc_integrated_gen%s.regfile_tx_demux' % (self._device, self._core)] = 2 # Using dedicated 1G link elif download_mode.upper() == "1G": self.board['%s.lmc_integrated_gen%s.regfile_tx_demux' % (self._device, self._core)] = 1 else: logging.warning("Supported mode are 1g, 10g") return self.board['%s.lmc_integrated_gen%s.regfile_channel_payload_length' % (self._device, self._core)] = channel_payload_length self.board['%s.lmc_integrated_gen%s.regfile_beamf_payload_length' % (self._device, self._core)] = beam_payload_length
[docs] def stop_integrated_channel_data(self): """ Stop receiving integrated beam data from the board """ if not self.implemented: logging.warning("Integrator is not implemented.") return self.board["%s.lmc_integrated_gen%s.channel_enable" % (self._device, self._core)] = 0x0
[docs] def stop_integrated_beam_data(self): """ Stop receiving integrated beam data from the board """ if not self.implemented: logging.warning("Integrator is not implemented.") return self.board["%s.lmc_integrated_gen%s.beamf_enable" % (self._device, self._core)] = 0x0
[docs] def stop_integrated_data(self): """ Stop transmission of integrated data""" if not self.implemented: logging.warning("Integrator is not implemented.") return self.stop_integrated_channel_data() self.stop_integrated_beam_data()
##################### Superclass method implementations #################################
[docs] def initialise(self): """ Initialise TpmIntegrator """ logging.info("TpmIntegrator has been initialised") return True
[docs] def status_check(self): """ Perform status check :return: Status """ logging.info("TpmIntegrator : Checking status") return Status.OK
[docs] def clean_up(self): """ Perform cleanup :return: Success """ logging.info("TpmIntegrator : Cleaning up") return True