from __future__ import division
from builtins import range
__author__ = 'chiello'
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import numpy as np
import logging
import time
[docs]
class AdcPowerMeterSimple(FirmwareBlock):
""" AdcPowerMeter plugin """
@compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board)
@friendlyname('adc_power_meter')
@maxinstances(2)
def __init__(self, board, fsample=800e6, samples_per_frame=864, integration_time=0.01, **kwargs):
""" AdcPowerMeter initialiser
:param board: Pointer to board instance
"""
super(AdcPowerMeterSimple, self).__init__(board)
if 'device' not in list(kwargs.keys()):
raise PluginError("AdcPowerMeter: Require a node instance")
self._device = kwargs['device']
if self._device == Device.FPGA_1:
self._device = 'fpga1'
elif self._device == Device.FPGA_2:
self._device = 'fpga2'
else:
raise PluginError("AdcPowerMeter: Invalid device %s" % self._device)
self._fsample = fsample
self._sample_period = 1.0 / fsample
self._nof_signals = 16
self._frame_length = samples_per_frame
self._integration_time = integration_time
self._integration_length = self.get_integration_length(integration_time)
self.initialise()
#######################################################################################
[docs]
def get_integration_length(self, integration_time):
return int(integration_time // (self._sample_period * self._frame_length))
[docs]
def get_RmsAmplitude(self):
self.board['%s.adc_power_meter.control.req' % self._device] = 0x1
timeout = 2 * self._integration_time
timeout_cnt = timeout / 0.001
while timeout_cnt > 0:
if self.board['%s.adc_power_meter.control.req' % self._device] == 0:
data = self.board.read_register("%s.adc_power_meter.power_0" % self._device, self._nof_signals*2)
accu64 = np.zeros(self._nof_signals, dtype=np.uint64)
for n in range(self._nof_signals):
accu64[n] = ((data[2*n+1] << 32) + data[2*n]) // (self._integration_length * self._frame_length)
accu64 = np.sqrt(accu64)
return accu64.tolist()
time.sleep(0.001)
timeout_cnt -= 1
logging.info("ADC power meter simple %s is not receiving data" % (self._device,))
return []
##################### Superclass method implementations #################################
[docs]
def initialise(self):
""" Initialise AdcPowerMeter """
self.configure()
logging.debug("AdcPowerMeter has been initialised")
return True
[docs]
def status_check(self):
""" Perform status check
:return: Status
"""
logging.debug("AdcPowerMeter : Checking status")
return Status.OK
[docs]
def clean_up(self):
""" Perform cleanup
:return: Success
"""
logging.debug("AdcPowerMeter : Cleaning up")
return True