Source code for pyfabil.plugins.tpm.adc_power_meter

from __future__ import division
from builtins import range
from math import fabs, cos, sin, ceil, log, sqrt
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
import logging
import time

__author__ = 'gcomoretto'


[docs] class AdcPowerMeter(FirmwareBlock): """ Total power meter and RFI detector The total power meter measures the broadband total power level for the input signals. Total power integration time is programmable Integration time is expressed in seconds, total power is returned in ADC counts squared, RMS in ADC counts Specific methods: initialise(): Initialises everything. Device left with: 10 ms integration time, TP running, no RFI removing set_intTime(intTime) Sets integration time. Does not modify running status Integration time in seconds (min. 2 us, max 0.05 s) start_IntTime(intTime) Stops integration, changes integration time and restarts it start_TP(), stop_TP() Starts and stops TP. Synchronous start not yet supported enable_RFI(mask), disable_RFI() enables/disables RFI flagging. Mask selects input signals that flag data wait_TpReady() If TP is running, waits for data to become ready. Else returns False read_TpData() Reads the TP data if ready, else returns empty list wait_TpData() Waits for data to become ready and returns them. Returns empty list if TP not running get_RmsAmplitude() Same of wait_TpData(), but returns RMS amplitude instead of power read_RfiData(): Returns number of RFI affected frames """ @compatibleboards(BoardMake.TpmBoard,BoardMake.Tpm16Board) @friendlyname('adc_power_meter') @maxinstances(2) def __init__(self, board, fsample=800e6, samples_per_frame=864, **kwargs): """ AdcPowerMeter initialiser :param board: Pointer to board instance """ super(AdcPowerMeter, self).__init__(board) if 'device' not in list(kwargs.keys()): raise PluginError("AdcPowerMeter: Require a node instance") self._device = kwargs['device'] if self._device == Device.FPGA_1: self._device = 'fpga1' elif self._device == Device.FPGA_2: self._device = 'fpga2' else: raise PluginError("TpmTotalPower: Invalid device %s" % self._device) # sampling frequency self._fsample = fsample # hertz self._frameLen = samples_per_frame # samples per tick self._minDiscard = 2 # Min. number of discarded bits self.intTime = 0.01 # seconds self.discardBits = 0 # bits discarded self.initialise() #########################################################################
[docs] def initialise(self): """ Initialise AdcPowerMeter""" logging.debug("Initialising ADC power meter %s: " % (self._device,)) # Reset core self.board["%s.adc_power_meter.control.reset " % self._device] = 0x1 self.board['%s.adc_power_meter.control' % self._device] = 0x0 self.disable_RFI() self.stop_TP() self.set_intTime(self.intTime) self.start_TP() return True
############################################################################ # Set integration time # Compute number of frames in given integration time and associated # number of bits to discard ############################################################################
[docs] def set_intTime(self, integrationTime): """ Set integration time and discard bits. Integration time in seconds """ integration_ticks = round(integrationTime * self._fsample / self._frameLen) if integration_ticks < 1: integration_ticks = 1 if integration_ticks > 2**16 - 1: integration_ticks = 2**16 - 1 self.intTime = integration_ticks * self._frameLen / self._fsample nBits = int(ceil(log(integrationTime * self._fsample) / log(4.0) - 15.5 + 8)) - self._minDiscard if nBits > 7: nBits = 7 if nBits < 0: nBits = 0 self.discardBits = nBits self.board["%s.adc_power_meter.integration_time" % self._device] = int(integration_ticks - 1) self.board["%s.adc_power_meter.control.discard_bits" % self._device] = self.discardBits return True
[docs] def start_intTime(self, intTime): self.stop_TP() self.set_intTime(intTime) self.start_TP() return True
############################################################################ # Start/stop total power detector ############################################################################
[docs] def start_TP(self, sync_time=None): """ Starts total power detector at predefined frame time """ self.board["%s.adc_power_meter.control.reset " % self._device] = 0 self.board["%s.adc_power_meter.control.tp_run" % self._device] = 1 return True
#
[docs] def stop_TP(self): """ Stops total power detector""" self.board["%s.adc_power_meter.control.tp_run" % self._device] = 0 return True
############################################################################ # Enable and disable RFI flagging ############################################################################
[docs] def disable_RFI(self): self.enable_RFI(0) return True
[docs] def enable_RFI(self, mask=0xffff): self.board["%s.adc_power_meter.control.rfi_enable" % self._device] = mask return True
[docs] def read_data(self): norm = (4.0 ** self.discardBits) * (2 ** self._minDiscard) / (self._fsample * self.intTime) tp = self.board.read_register("%s.adc_power_meter.tp_counts_0" % self._device, 16) for i in range(16): tp[i] = tp[i] * norm return tp
############################################################################ # Read total power data if available # Returns empty array if no power is available ############################################################################
[docs] def read_TpData(self): """ Read total power data if available Returns empty array if no power is available Output is in units of ADC units squared """ ready = self.board["%s.adc_power_meter.status.ready" % self._device] if ready == 1: tp = self.read_data() self.board["%s.adc_power_meter.control.clear_ready" % self._device] = 1 self.board["%s.adc_power_meter.control.clear_ready" % self._device] = 0 else: logging.warning("AdcPowerMeter : %s read_TpData not ready. " % self._device) tp = [] return tp
# Wait for total power data to be ready
[docs] def wait_TpReady(self): # Check that the TP function is enabled if self.board["%s.adc_power_meter.control.reset " % self._device] == 1: return False if self.board["%s.adc_power_meter.control.tp_run" % self._device] == 0: return False # Wait at most for one integration time (plus 1.5 ms) # to avoid locking if something does not work #timeout = round(self.intTime / 0.001 + 1.5) timeout = 1.0 / 0.001 while self.board["%s.adc_power_meter.status.ready" % self._device] == 0: time.sleep(0.001) if timeout < 0: return False timeout = timeout - 1 return True
# Get total power data waiting for it to be available
[docs] def wait_TpData(self): """ Wait TP data to be available and return it Return False if TP detector is not programmed Output is in units of ADC units squared """ if self.wait_TpReady(): return self.read_TpData() else: logging.warning("AdcPowerMeter : %s wait_TpData not ready. " % self._device) return []
# Wait for data to be available and returns RMS amplitude
[docs] def get_RmsAmplitude(self, sync=True): if sync: read_method = self.read_TpData else: read_method = self.read_data if sync: if not self.wait_TpReady(): logging.warning("AdcPowerMeter : %s get_RmsAmplitude failed. " % self._device) return [] tp = read_method() for i in range(len(tp)): tp[i] = sqrt(tp[i]) return tp
############################################################################ # Read RFI data. Does not check availability ############################################################################
[docs] def read_RfiData(self): tp = self.board.read_register("%s.adc_power_meter.rfi_0" % self._device, 16) return tp
############################################################################
[docs] def status_check(self): logging.info("AdcPowerMeter : Checking status") return Status.OK
[docs] def clean_up(self): """ Perform cleanup :return: Success """ logging.info("AdcPowerMeter : Cleaning up") return True