Source code for pyfabil.plugins.tpm.spead_tx_gen

from __future__ import division
__author__ = 'lessju'

from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging
import socket
import time


[docs] class SpeadTxGen(FirmwareBlock): """ SpeadTxGen plugin """ @compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board) @friendlyname('spead_tx_gen') @maxinstances(8) def __init__(self, board, **kwargs): """ SpeadTxGen initialiser :param board: Pointer to board instance """ super(SpeadTxGen, self).__init__(board) if 'device' not in list(kwargs.keys()): raise PluginError("SpeadTxGen: Require a node instance") self._device = kwargs['device'] if 'core' not in list(kwargs.keys()): raise PluginError("SpeadTxGen: core_id required") if self._device == Device.FPGA_1: self._device = 'fpga1' elif self._device == Device.FPGA_2: self._device = 'fpga2' else: raise PluginError("SpeadTxGen: Invalid device %s" % self._device) self._core = kwargs['core'] #######################################################################################
[docs] def stop(self): self.board['%s.spead_gen%d.enable' % (self._device, self._core)] = 0 while self.board['%s.spead_gen%d.running' % (self._device, self._core)] == 1: time.sleep(0.1)
[docs] def start(self): self.board['%s.spead_gen%d.enable' % (self._device, self._core)] = 1
[docs] def get_data_rate(self): clock_rate = 200e6 pause_cnt = self.board['%s.spead_gen%d.packet_pause_cnt' % (self._device, self._core)] if pause_cnt <= 312: pause_cnt = 313 payload_len = self.board['%s.spead_gen%d.packet_byte_size' % (self._device, self._core)] header_len = 9*8 + 8 + 20 + 14 total_len = payload_len + header_len pkt_s = clock_rate / (pause_cnt + payload_len / 8) gbit_s = pkt_s * total_len * 8 / 1e9 return gbit_s
[docs] def set_data_rate(self, gbit_s): clock_rate = 200e6 payload_len = self.board['%s.spead_gen%d.packet_byte_size' % (self._device, self._core)] header_len = 9*8 + 8 + 20 + 14 total_len = payload_len + header_len pkt_s = (gbit_s * 1e9 / (total_len * 8)) clock_cycles_for_1_pkt = clock_rate // pkt_s clock_cycles_for_1_payload = payload_len // 8 pause_cnt = clock_cycles_for_1_pkt - clock_cycles_for_1_payload if pause_cnt <= 312: pause_cnt = 313 enable_todo = 0 if self.board['%s.spead_gen%d.enable' % (self._device, self._core)] == 1: enable_todo = 1 self.stop() self.board['%s.spead_gen%d.packet_pause_cnt' % (self._device, self._core)] = int(pause_cnt) if enable_todo == 1: self.start()
[docs] def set_payload_length(self, len): enable_todo = 0 if self.board['%s.spead_gen%d.enable' % (self._device, self._core)] == 1: enable_todo = 1 self.stop() gbit_s = self.get_data_rate() self.board['%s.spead_gen%d.packet_byte_size' % (self._device, self._core)] = len self.set_data_rate(gbit_s) time.sleep(0.1) if enable_todo == 1: self.start()
[docs] def set_station_id(self, id): self.board['%s.spead_gen%d.id.station_id' % (self._device, self._core)] = id
[docs] def set_tpm_id(self, id): self.board['%s.spead_gen%d.id.tpm_id' % (self._device, self._core)] = id
##################### Superclass method implementations #################################
[docs] def initialise(self): """ Initialise SpeadTxGen """ logging.info("SpeadTxGen has been initialised") return True
[docs] def status_check(self): """ Perform status check :return: Status """ logging.info("SpeadTxGen : Checking status") return Status.OK
[docs] def clean_up(self): """ Perform cleanup :return: Success """ logging.info("SpeadTxGen : Cleaning up") return True