from __future__ import division
from builtins import str
from builtins import hex
from builtins import range
import socket
__author__ = 'lessju'
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.plugins.tpm.ten_g_xg_core import TpmTenGCoreXg
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging
[docs]
class TpmFpga2FpgaGT(TpmTenGCoreXg):
""" TpmFpga2FpgaGT plugin """
@compatibleboards(BoardMake.Tpm16Board)
@friendlyname('tpm_f2f_core')
@maxinstances(16)
def __init__(self, board, **kwargs):
""" TpmFpga2FpgaGT initialiser
:param board: Pointer to board instance
"""
super(TpmFpga2FpgaGT, self).__init__(board, **kwargs)
#######################################################################################
def _mii_test_register(self,register, core):
return '%s.mii_test_core%d_%s' % (self._device_w_mii_prefix, core, register)
def _xg_udp_core_register(self,register, core):
return '%s.core%d_%s' % (self._device_w_mii_prefix, core, register)
[docs]
def initialise_core(self):
""" Initialise 10G core """
return
[docs]
def set_src_mac(self, mac):
return
[docs]
def get_src_mac(self):
return
[docs]
def set_dst_mac(self, mac):
return
[docs]
def get_dst_mac(self):
return
[docs]
def set_src_ip(self, ip):
return
[docs]
def get_src_ip(self):
return
[docs]
def set_dst_ip(self, ip):
return
[docs]
def get_dst_ip(self):
return
[docs]
def set_src_port(self, port):
return
[docs]
def get_src_port(self):
return
[docs]
def set_dst_port(self, port):
return
[docs]
def get_dst_port(self):
return
[docs]
def get_arp_table_status(self, idx, silent_mode=True):
return
[docs]
def mii_test_reset(self):
self.board[self._mii_test_register('rx_pkt_num', self._core)] = 0
[docs]
def mii_test_init(self, pkt_len, pkt_num): # rx_core):
self.board[self._mii_test_register('byte_swap', self._core)] = 0x1
self.board[self._mii_test_register('mode', self._core)] = 0x0
self.board[self._mii_test_register('eth_type', self._core)] = 0x88888888
self.board[self._mii_test_register('pkt_len', self._core)] = pkt_len // 8
self.board[self._mii_test_register('tx_pkt_num', self._core)] = pkt_num
# self.board[self._mii_test_register('rx_pkt_num', rx_core)] = 0
[docs]
def mii_test_mac_config(self, board):
if self._device == "fpga1":
adder = 0
else:
adder = 4
self.board[self._mii_test_register('src_mac_lo', self._core)] = self._core + adder
self.board[self._mii_test_register('src_mac_hi', self._core)] = board
self.board[self._mii_test_register('dst_mac_lo', self._core)] = ((self._core + 1) % self._nof_cores) + adder
self.board[self._mii_test_register('dst_mac_hi', self._core)] = board
[docs]
def mii_test(self, pkt_num, pkt_len=8192, show_result=True, wait_result=True):
self.board[self._mii_test_register('tx_pkt_num', self._core)] = 0 # stop current transmission
self.mii_send(pkt_num, pkt_len, wait_result)
if show_result:
self.mii_test_result()
[docs]
def mii_send(self, pkt_num, pkt_len=8192, wait_result=True):
# rx_core = (self._core + 1) % self._nof_cores
self.board['%s.regfile.eth10g_ctrl' % self._device] = 0xFF0
self.mii_test_init(pkt_len, pkt_num) # rx_core)
self.board[self._mii_test_register('tx_start', self._core)] = 0x1
if wait_result:
self.mii_wait_idle()
self.board['%s.regfile.eth10g_ctrl' % self._device] = 0x0
[docs]
def mii_wait_idle(self):
rx_core = self._core
wait = 1
while wait == 1:
time.sleep(0.3)
rd = self.board[self._mii_test_register('tx_start', self._core)]
if rd == 1:
logging.info("Waiting for transmission...")
rd = self.board[self._mii_test_register('rx_pkt_num', rx_core)]
logging.info("Received Packets: " + str(rd))
for f in ["fpga1","fpga2"]:
for c in range(self._nof_cores):
rd1 = self.board[self._mii_test_register('rx_pkt_num', rx_core)]
rd2 = self.board[self._mii_test_register('rx_error', rx_core)]
_core_name=f
if self._mii_prefix is not None:
_core_name += "."+self._mii_prefix
logging.info('%s core%d: %s error register - %d received packets' % (_core_name, c, hex(rd2), rd1))
else:
wait = 0
self.board['%s.regfile.eth10g_ctrl' % self._device] = 0x0
[docs]
def mii_test_result(self, verbose = True):
rx_core = self._core
rd1 = self.board[self._mii_test_register('rx_pkt_num', rx_core)]
rd2 = self.board[self._mii_test_register('rx_error', rx_core)]
if verbose:
logging.info('%s core%d: %s error register - %d received packets' % (self._device_w_mii_prefix, rx_core, hex(rd2), rd1))
return [rd1, rd2]
##################### Superclass method implementations #################################
[docs]
def initialise(self):
""" Initialise TpmFpga2FpgaGT """
logging.info("TpmFpga2FpgaGT has been initialised")
return True
[docs]
def status_check(self):
""" Perform status check
:return: Status
"""
logging.info("TpmFpga2FpgaGT : Checking status")
return Status.OK
[docs]
def clean_up(self):
""" Perform cleanup
:return: Success
"""
logging.info("TpmFpga2FpgaGT : Cleaning up")
return True