Source code for pyfabil.plugins.tpm_1_6.f2f_gt

from __future__ import division
from builtins import str
from builtins import hex
from builtins import range
import socket

__author__ = 'lessju'

from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.plugins.tpm.ten_g_xg_core import TpmTenGCoreXg
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging


[docs] class TpmFpga2FpgaGT(TpmTenGCoreXg): """ TpmFpga2FpgaGT plugin """ @compatibleboards(BoardMake.Tpm16Board) @friendlyname('tpm_f2f_core') @maxinstances(16) def __init__(self, board, **kwargs): """ TpmFpga2FpgaGT initialiser :param board: Pointer to board instance """ super(TpmFpga2FpgaGT, self).__init__(board, **kwargs) ####################################################################################### def _mii_test_register(self,register, core): return '%s.mii_test_core%d_%s' % (self._device_w_mii_prefix, core, register) def _xg_udp_core_register(self,register, core): return '%s.core%d_%s' % (self._device_w_mii_prefix, core, register)
[docs] def initialise_core(self): """ Initialise 10G core """ return
[docs] def set_src_mac(self, mac): return
[docs] def get_src_mac(self): return
[docs] def set_dst_mac(self, mac): return
[docs] def get_dst_mac(self): return
[docs] def set_src_ip(self, ip): return
[docs] def get_src_ip(self): return
[docs] def set_dst_ip(self, ip): return
[docs] def get_dst_ip(self): return
[docs] def set_src_port(self, port): return
[docs] def get_src_port(self): return
[docs] def set_dst_port(self, port): return
[docs] def get_dst_port(self): return
[docs] def get_arp_table_status(self, idx, silent_mode=True): return
[docs] def mii_test_reset(self): self.board[self._mii_test_register('rx_pkt_num', self._core)] = 0
[docs] def mii_test_init(self, pkt_len, pkt_num): # rx_core): self.board[self._mii_test_register('byte_swap', self._core)] = 0x1 self.board[self._mii_test_register('mode', self._core)] = 0x0 self.board[self._mii_test_register('eth_type', self._core)] = 0x88888888 self.board[self._mii_test_register('pkt_len', self._core)] = pkt_len // 8 self.board[self._mii_test_register('tx_pkt_num', self._core)] = pkt_num
# self.board[self._mii_test_register('rx_pkt_num', rx_core)] = 0
[docs] def mii_test_mac_config(self, board): if self._device == "fpga1": adder = 0 else: adder = 4 self.board[self._mii_test_register('src_mac_lo', self._core)] = self._core + adder self.board[self._mii_test_register('src_mac_hi', self._core)] = board self.board[self._mii_test_register('dst_mac_lo', self._core)] = ((self._core + 1) % self._nof_cores) + adder self.board[self._mii_test_register('dst_mac_hi', self._core)] = board
[docs] def mii_test(self, pkt_num, pkt_len=8192, show_result=True, wait_result=True): self.board[self._mii_test_register('tx_pkt_num', self._core)] = 0 # stop current transmission self.mii_send(pkt_num, pkt_len, wait_result) if show_result: self.mii_test_result()
[docs] def mii_send(self, pkt_num, pkt_len=8192, wait_result=True): # rx_core = (self._core + 1) % self._nof_cores self.board['%s.regfile.eth10g_ctrl' % self._device] = 0xFF0 self.mii_test_init(pkt_len, pkt_num) # rx_core) self.board[self._mii_test_register('tx_start', self._core)] = 0x1 if wait_result: self.mii_wait_idle() self.board['%s.regfile.eth10g_ctrl' % self._device] = 0x0
[docs] def mii_wait_idle(self): rx_core = self._core wait = 1 while wait == 1: time.sleep(0.3) rd = self.board[self._mii_test_register('tx_start', self._core)] if rd == 1: logging.info("Waiting for transmission...") rd = self.board[self._mii_test_register('rx_pkt_num', rx_core)] logging.info("Received Packets: " + str(rd)) for f in ["fpga1","fpga2"]: for c in range(self._nof_cores): rd1 = self.board[self._mii_test_register('rx_pkt_num', rx_core)] rd2 = self.board[self._mii_test_register('rx_error', rx_core)] _core_name=f if self._mii_prefix is not None: _core_name += "."+self._mii_prefix logging.info('%s core%d: %s error register - %d received packets' % (_core_name, c, hex(rd2), rd1)) else: wait = 0 self.board['%s.regfile.eth10g_ctrl' % self._device] = 0x0
[docs] def mii_test_result(self, verbose = True): rx_core = self._core rd1 = self.board[self._mii_test_register('rx_pkt_num', rx_core)] rd2 = self.board[self._mii_test_register('rx_error', rx_core)] if verbose: logging.info('%s core%d: %s error register - %d received packets' % (self._device_w_mii_prefix, rx_core, hex(rd2), rd1)) return [rd1, rd2]
##################### Superclass method implementations #################################
[docs] def initialise(self): """ Initialise TpmFpga2FpgaGT """ logging.info("TpmFpga2FpgaGT has been initialised") return True
[docs] def status_check(self): """ Perform status check :return: Status """ logging.info("TpmFpga2FpgaGT : Checking status") return Status.OK
[docs] def clean_up(self): """ Perform cleanup :return: Success """ logging.info("TpmFpga2FpgaGT : Cleaning up") return True