from __future__ import division
from builtins import range
from math import fabs, cos, sin, ceil, log, sqrt
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
import logging
import time
__author__ = 'gcomoretto'
[docs]
class AdcPowerMeter(FirmwareBlock):
""" Total power meter and RFI detector
The total power meter measures the broadband total power
level for the input signals.
Total power integration time is programmable
Integration time is expressed in seconds,
total power is returned in ADC counts squared, RMS in ADC counts
Specific methods:
initialise(): Initialises everything.
Device left with: 10 ms integration time, TP running, no RFI removing
set_intTime(intTime)
Sets integration time. Does not modify running status
Integration time in seconds (min. 2 us, max 0.05 s)
start_IntTime(intTime)
Stops integration, changes integration time and restarts it
start_TP(), stop_TP()
Starts and stops TP. Synchronous start not yet supported
enable_RFI(mask), disable_RFI()
enables/disables RFI flagging. Mask selects input signals that flag data
wait_TpReady()
If TP is running, waits for data to become ready. Else returns False
read_TpData()
Reads the TP data if ready, else returns empty list
wait_TpData()
Waits for data to become ready and returns them.
Returns empty list if TP not running
get_RmsAmplitude()
Same of wait_TpData(), but returns RMS amplitude instead of power
read_RfiData(): Returns number of RFI affected frames
"""
@compatibleboards(BoardMake.TpmBoard,BoardMake.Tpm16Board)
@friendlyname('adc_power_meter')
@maxinstances(2)
def __init__(self, board, fsample=800e6, samples_per_frame=864, **kwargs):
""" AdcPowerMeter initialiser
:param board: Pointer to board instance
"""
super(AdcPowerMeter, self).__init__(board)
if 'device' not in list(kwargs.keys()):
raise PluginError("AdcPowerMeter: Require a node instance")
self._device = kwargs['device']
if self._device == Device.FPGA_1:
self._device = 'fpga1'
elif self._device == Device.FPGA_2:
self._device = 'fpga2'
else:
raise PluginError("TpmTotalPower: Invalid device %s" % self._device)
# sampling frequency
self._fsample = fsample # hertz
self._frameLen = samples_per_frame # samples per tick
self._minDiscard = 2 # Min. number of discarded bits
self.intTime = 0.01 # seconds
self.discardBits = 0 # bits discarded
self.initialise()
#########################################################################
[docs]
def initialise(self):
""" Initialise AdcPowerMeter"""
logging.debug("Initialising ADC power meter %s: " % (self._device,))
# Reset core
self.board["%s.adc_power_meter.control.reset " % self._device] = 0x1
self.board['%s.adc_power_meter.control' % self._device] = 0x0
self.disable_RFI()
self.stop_TP()
self.set_intTime(self.intTime)
self.start_TP()
return True
############################################################################
# Set integration time
# Compute number of frames in given integration time and associated
# number of bits to discard
############################################################################
[docs]
def set_intTime(self, integrationTime):
""" Set integration time and discard bits.
Integration time in seconds """
integration_ticks = round(integrationTime * self._fsample / self._frameLen)
if integration_ticks < 1:
integration_ticks = 1
if integration_ticks > 2**16 - 1:
integration_ticks = 2**16 - 1
self.intTime = integration_ticks * self._frameLen / self._fsample
nBits = int(ceil(log(integrationTime * self._fsample) / log(4.0) - 15.5 + 8)) - self._minDiscard
if nBits > 7:
nBits = 7
if nBits < 0:
nBits = 0
self.discardBits = nBits
self.board["%s.adc_power_meter.integration_time" % self._device] = int(integration_ticks - 1)
self.board["%s.adc_power_meter.control.discard_bits" % self._device] = self.discardBits
return True
[docs]
def start_intTime(self, intTime):
self.stop_TP()
self.set_intTime(intTime)
self.start_TP()
return True
############################################################################
# Start/stop total power detector
############################################################################
[docs]
def start_TP(self, sync_time=None):
""" Starts total power detector at predefined frame time """
self.board["%s.adc_power_meter.control.reset " % self._device] = 0
self.board["%s.adc_power_meter.control.tp_run" % self._device] = 1
return True
#
[docs]
def stop_TP(self):
""" Stops total power detector"""
self.board["%s.adc_power_meter.control.tp_run" % self._device] = 0
return True
############################################################################
# Enable and disable RFI flagging
############################################################################
[docs]
def disable_RFI(self):
self.enable_RFI(0)
return True
[docs]
def enable_RFI(self, mask=0xffff):
self.board["%s.adc_power_meter.control.rfi_enable" % self._device] = mask
return True
[docs]
def read_data(self):
norm = (4.0 ** self.discardBits) * (2 ** self._minDiscard) / (self._fsample * self.intTime)
tp = self.board.read_register("%s.adc_power_meter.tp_counts_0" % self._device, 16)
for i in range(16):
tp[i] = tp[i] * norm
return tp
############################################################################
# Read total power data if available
# Returns empty array if no power is available
############################################################################
[docs]
def read_TpData(self):
""" Read total power data if available
Returns empty array if no power is available
Output is in units of ADC units squared """
ready = self.board["%s.adc_power_meter.status.ready" % self._device]
if ready == 1:
tp = self.read_data()
self.board["%s.adc_power_meter.control.clear_ready" % self._device] = 1
self.board["%s.adc_power_meter.control.clear_ready" % self._device] = 0
else:
logging.warning("AdcPowerMeter : %s read_TpData not ready. " % self._device)
tp = []
return tp
# Wait for total power data to be ready
[docs]
def wait_TpReady(self):
# Check that the TP function is enabled
if self.board["%s.adc_power_meter.control.reset " % self._device] == 1:
return False
if self.board["%s.adc_power_meter.control.tp_run" % self._device] == 0:
return False
# Wait at most for one integration time (plus 1.5 ms)
# to avoid locking if something does not work
#timeout = round(self.intTime / 0.001 + 1.5)
timeout = 1.0 / 0.001
while self.board["%s.adc_power_meter.status.ready" % self._device] == 0:
time.sleep(0.001)
if timeout < 0:
return False
timeout = timeout - 1
return True
# Get total power data waiting for it to be available
[docs]
def wait_TpData(self):
""" Wait TP data to be available and return it
Return False if TP detector is not programmed
Output is in units of ADC units squared """
if self.wait_TpReady():
return self.read_TpData()
else:
logging.warning("AdcPowerMeter : %s wait_TpData not ready. " % self._device)
return []
# Wait for data to be available and returns RMS amplitude
[docs]
def get_RmsAmplitude(self, sync=True):
if sync:
read_method = self.read_TpData
else:
read_method = self.read_data
if sync:
if not self.wait_TpReady():
logging.warning("AdcPowerMeter : %s get_RmsAmplitude failed. " % self._device)
return []
tp = read_method()
for i in range(len(tp)):
tp[i] = sqrt(tp[i])
return tp
############################################################################
# Read RFI data. Does not check availability
############################################################################
[docs]
def read_RfiData(self):
tp = self.board.read_register("%s.adc_power_meter.rfi_0" % self._device, 16)
return tp
############################################################################
[docs]
def status_check(self):
logging.info("AdcPowerMeter : Checking status")
return Status.OK
[docs]
def clean_up(self):
""" Perform cleanup
:return: Success
"""
logging.info("AdcPowerMeter : Cleaning up")
return True