from __future__ import division
from builtins import str
from builtins import hex
from builtins import range
import socket
__author__ = 'lessju'
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging
[docs]
class TpmTenGCoreXg(FirmwareBlock):
""" TpmTenGCoreXg plugin """
@compatibleboards(BoardMake.TpmBoard,BoardMake.Tpm16Board)
@friendlyname('tpm_10g_core')
@maxinstances(8)
def __init__(self, board, **kwargs):
""" TpmTenGCoreXg initialiser
:param board: Pointer to board instance
"""
super(TpmTenGCoreXg, self).__init__(board)
if 'device' not in list(kwargs.keys()):
raise PluginError("TpmTenGCoreXg: Require a node instance")
self._device = kwargs['device']
if 'core' not in list(kwargs.keys()):
raise PluginError("TpmTenGCoreXg: core_id required")
if 'nof_cores' not in kwargs.keys():
self._nof_cores = 4
else:
self._nof_cores = kwargs['nof_cores']
if self._device == Device.FPGA_1:
self._device = 'fpga1'
elif self._device == Device.FPGA_2:
self._device = 'fpga2'
else:
raise PluginError("TpmTenGCoreXg: Invalid device %s" % self._device)
if 'mii_prefix' not in kwargs.keys():
self._device_w_mii_prefix = self._device + "." + "xg_udp"
else:
self._mii_prefix = kwargs['mii_prefix']
self._device_w_mii_prefix = self._device + "." + self._mii_prefix
self._core = kwargs['core']
self._tx_config_port_ba = self.board.memory_map[
'%s.xg_udp.core%d_tx_config_port' % (self._device, self._core)].address
#######################################################################################
def _mii_test_register(self,register, core):
return '%s.mii_test_core%d_%s' % (self._device_w_mii_prefix, core, register)
def _xg_udp_core_register(self,register, core):
return '%s.core%d_%s' % (self._device_w_mii_prefix, core, register)
[docs]
def initialise_core(self):
""" Initialise 10G core """
#if int(self._core) == 0:
# self.board['%s.xg_udp.phy_ctrl.rst' % self._device] = 0
# self.board['%s.xg_udp.phy_ctrl.rst' % self._device] = 1
# self.board['%s.xg_udp.phy_ctrl.rst' % self._device] = 0
self.board['%s.xg_udp.core%d_global_config.tx_use_always_0' % (self._device, self._core)] = 1
self.board['%s.xg_udp.core%d_global_config.promiscuous_mode_enable' % (self._device, self._core)] = 0
return
[docs]
def reset_core(self):
linkup_value = 0
for n in range(self._nof_cores):
linkup_value = (linkup_value << 4) + 0xF
for n in range(10):
self.board['%s.xg_udp.phy_ctrl.rst' % self._device] = 1
self.board['%s.xg_udp.phy_ctrl.rst' % self._device] = 0
for m in range(100):
if self.board['%s.xg_udp.phy_status.rx_sync_lock' % self._device] == int(2**self._nof_cores - 1):
return True
time.sleep(0.1)
logging.info("TpmTenGCoreXg: %s link reset, attempt %d" % (self._device, n))
logging.error("TpmTenGCoreXg: %s link is no up" % self._device)
return False
[docs]
def set_src_mac(self, mac):
""" Set source MAC address
:param mac: MAC address
"""
self.board[self._xg_udp_core_register('mac_lsb', self._core)] = mac & 0xFFFFFFFF
self.board[self._xg_udp_core_register('mac_msb', self._core)] = (mac >> 32) & 0xFFFFFFFF
self.board[self._mii_test_register('src_mac_lo', self._core)] = mac & 0xFFFFFFFF
self.board[self._mii_test_register('src_mac_hi', self._core)] = (mac >> 32) & 0xFFFFFFFF
[docs]
def get_src_mac(self):
""" Get source MAC address """
lower = self.board[self._xg_udp_core_register('mac_lsb', self._core)]
upper = self.board[self._xg_udp_core_register('mac_msb', self._core)]
return upper << 32 | lower
[docs]
def set_dst_mac(self, mac):
""" Set destination MAC address
:param mac: MAC address
"""
return
[docs]
def get_dst_mac(self):
""" Get destination MAC address """
return 0
[docs]
def set_src_ip(self, ip):
""" Set source IP address
:param ip: IP address
"""
try:
if type(ip) is not int:
ip = struct.unpack("!L", socket.inet_aton(ip))[0]
self.board['%s.xg_udp.core%d_ip' % (self._device, self._core)] = ip
self.board['%s.xg_udp.core%d_netmask' % (self._device, self._core)] = 0xFF000000
except:
raise PluginError("Tpm10GCoreXg: Could not set source IP")
[docs]
def get_src_ip(self):
""" Get source IP address """
return self.board['%s.xg_udp.core%d_ip' % (self._device, self._core)]
[docs]
def set_dst_ip(self, ip):
""" Set source IP address
:param ip: IP address
"""
try:
if type(ip) is not int:
ip = struct.unpack("!L", socket.inet_aton(ip))[0]
self.board['%s.xg_udp.core%d_tx_config_ip' % (self._device, self._core)] = ip
except:
raise PluginError("Tpm10GCoreXg: Could not set destination IP %s" % ip)
[docs]
def get_dst_ip(self):
""" Get destination ip"""
return self.board['%s.xg_udp.core%d_tx_config_ip' % (self._device, self._core)][0]
[docs]
def set_src_port(self, port, slot=0):
""" Set source IP address
:param port: Port
:param slot: entry in the tx config table
"""
rd = self.board[self._tx_config_port_ba + 4 * slot]
val = (rd & 0xFFFF0000) | (port & 0xFFFF)
self.board[self._tx_config_port_ba + 4 * slot] = val
return
[docs]
def get_src_port(self, slot=0):
""" Get source IP address
:param port: Port
:param slot: entry in the tx config table
"""
rd = self.board[self._tx_config_port_ba + 4 * slot]
return rd & 0xFFFF
[docs]
def set_dst_port(self, port, slot=0):
""" Set source IP address
:param port: Port
:param slot: entry in the core tx config table
"""
rd = self.board[self._tx_config_port_ba + 4 * slot]
val = ((port << 16) & 0xFFFF0000) | (rd & 0xFFFF)
self.board[self._tx_config_port_ba + 4 * slot] = val
return
[docs]
def get_dst_port(self, slot=0):
""" Set source IP address
:param port: Port
:param slot: entry in the core tx config table
"""
rd = self.board[self._tx_config_port_ba + 4 * slot]
return (rd >> 16) & 0xFFFF
[docs]
def get_arp_table_status(self, idx, silent_mode=True):
self.board['%s.xg_udp.core%d_arp_table_read_pointer' % (self._device, self._core)] = idx
rd = self.board['%s.xg_udp.core%d_arp_table_status' % (self._device, self._core)]
mac_lsb = self.board['%s.xg_udp.core%d_arp_table_mac_lsb' % (self._device, self._core)]
mac_msb = self.board['%s.xg_udp.core%d_arp_table_mac_msb' % (self._device, self._core)]
if silent_mode == False:
txt = "\n\nvalid: " + str(rd & 0x1) + "\n"
txt += "renewing: " + str((rd & 0x2) >> 1) + "\n"
txt += "mac_resolved: " + str((rd & 0x4) >> 2) + "\n"
txt += "mac: " + hex((mac_msb << 32) + mac_lsb) + "\n"
logging.info(txt)
return rd
[docs]
def mii_test_reset(self):
self.board[self._mii_test_register('rx_pkt_num', self._core)] = 0
[docs]
def mii_test_init(self, pkt_len, pkt_num, rx_core):
self.board[self._mii_test_register('byte_swap', self._core)] = 0x1
self.board[self._mii_test_register('mode', self._core)] = 0x0
self.board[self._mii_test_register('eth_type', self._core)] = 0x88888888
self.board[self._mii_test_register('pkt_len', self._core)] = pkt_len // 8
self.board[self._mii_test_register('tx_pkt_num', self._core)] = pkt_num
#self.board[self._mii_test_register('rx_pkt_num', rx_core)] = 0
[docs]
def mii_test_mac_config(self, board):
if self._device == "fpga1":
adder = 0
else:
adder = 4
self.board[self._mii_test_register('src_mac_lo', self._core)] = self._core + adder
self.board[self._mii_test_register('src_mac_hi', self._core)] = board
self.board[self._mii_test_register('dst_mac_lo', self._core)] = ((self._core + 1) % self._nof_cores) + adder
self.board[self._mii_test_register('dst_mac_hi', self._core)] = board
[docs]
def mii_test(self, pkt_num, pkt_len=8192, show_result=True, wait_result=True):
self.board[self._mii_test_register('tx_pkt_num', self._core)] = 0 # stop current transmission
self.mii_send(pkt_num, pkt_len, wait_result)
if show_result:
self.mii_test_result()
[docs]
def mii_send(self, pkt_num, pkt_len=8192, wait_result=True):
rx_core = (self._core + 1) % self._nof_cores
self.board['%s.regfile.eth10g_ctrl' % self._device] = 0xF
self.mii_test_init(pkt_len, pkt_num, rx_core)
self.board[self._mii_test_register('tx_start', self._core)] = 0x1
if wait_result:
self.mii_wait_idle()
self.board['%s.regfile.eth10g_ctrl' % self._device] = 0x0
[docs]
def mii_wait_idle(self):
rx_core = (self._core + 1) % self._nof_cores
wait = 1
while wait == 1:
time.sleep(0.3)
rd = self.board[self._mii_test_register('tx_start', self._core)]
if rd == 1:
logging.info("Waiting for transmission...")
rd = self.board[self._mii_test_register('rx_pkt_num', rx_core)]
logging.info("Received Packets: " + str(rd))
for f in ["fpga1","fpga2"]:
for c in range(self._nof_cores):
rd1 = self.board[self._mii_test_register('rx_pkt_num', rx_core)]
rd2 = self.board[self._mii_test_register('rx_error', rx_core)]
_core_name=f
if self._mii_prefix is not None:
_core_name += "."+self._mii_prefix
logging.info('%s core%d: %s error register - %d received packets' % (_core_name, c, hex(rd2), rd1))
else:
wait = 0
self.board['%s.regfile.eth10g_ctrl' % self._device] = 0x0
[docs]
def mii_test_result(self, verbose = True):
rx_core = (self._core + 1) % self._nof_cores
rd1 = self.board[self._mii_test_register('rx_pkt_num', rx_core)]
rd2 = self.board[self._mii_test_register('rx_error', rx_core)]
if verbose:
logging.info('%s core%d: %s error register - %d received packets' % (self._device_w_mii_prefix, rx_core, hex(rd2), rd1))
return [rd1, rd2]
##################### Superclass method implementations #################################
[docs]
def initialise(self):
""" Initialise TpmTenGCoreXg """
logging.info("TpmTenGCoreXg has been initialised")
return True
[docs]
def status_check(self):
""" Perform status check
:return: Status
"""
logging.info("TpmTenGCoreXg : Checking status")
return Status.OK
[docs]
def clean_up(self):
""" Perform cleanup
:return: Success
"""
logging.info("TpmTenGCoreXg : Cleaning up")
return True