Source code for pyfabil.plugins.tpm.ten_g_xg_core

from __future__ import division
from builtins import str
from builtins import hex
from builtins import range
import socket

__author__ = 'lessju'

from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging


[docs] class TpmTenGCoreXg(FirmwareBlock): """ TpmTenGCoreXg plugin """ @compatibleboards(BoardMake.TpmBoard,BoardMake.Tpm16Board) @friendlyname('tpm_10g_core') @maxinstances(8) def __init__(self, board, **kwargs): """ TpmTenGCoreXg initialiser :param board: Pointer to board instance """ super(TpmTenGCoreXg, self).__init__(board) if 'device' not in list(kwargs.keys()): raise PluginError("TpmTenGCoreXg: Require a node instance") self._device = kwargs['device'] if 'core' not in list(kwargs.keys()): raise PluginError("TpmTenGCoreXg: core_id required") if 'nof_cores' not in kwargs.keys(): self._nof_cores = 4 else: self._nof_cores = kwargs['nof_cores'] if self._device == Device.FPGA_1: self._device = 'fpga1' elif self._device == Device.FPGA_2: self._device = 'fpga2' else: raise PluginError("TpmTenGCoreXg: Invalid device %s" % self._device) if 'mii_prefix' not in kwargs.keys(): self._device_w_mii_prefix = self._device + "." + "xg_udp" else: self._mii_prefix = kwargs['mii_prefix'] self._device_w_mii_prefix = self._device + "." + self._mii_prefix self._core = kwargs['core'] self._tx_config_port_ba = self.board.memory_map[ '%s.xg_udp.core%d_tx_config_port' % (self._device, self._core)].address ####################################################################################### def _mii_test_register(self,register, core): return '%s.mii_test_core%d_%s' % (self._device_w_mii_prefix, core, register) def _xg_udp_core_register(self,register, core): return '%s.core%d_%s' % (self._device_w_mii_prefix, core, register)
[docs] def initialise_core(self): """ Initialise 10G core """ #if int(self._core) == 0: # self.board['%s.xg_udp.phy_ctrl.rst' % self._device] = 0 # self.board['%s.xg_udp.phy_ctrl.rst' % self._device] = 1 # self.board['%s.xg_udp.phy_ctrl.rst' % self._device] = 0 self.board['%s.xg_udp.core%d_global_config.tx_use_always_0' % (self._device, self._core)] = 1 self.board['%s.xg_udp.core%d_global_config.promiscuous_mode_enable' % (self._device, self._core)] = 0 return
[docs] def reset_core(self): linkup_value = 0 for n in range(self._nof_cores): linkup_value = (linkup_value << 4) + 0xF for n in range(10): self.board['%s.xg_udp.phy_ctrl.rst' % self._device] = 1 self.board['%s.xg_udp.phy_ctrl.rst' % self._device] = 0 for m in range(100): if self.board['%s.xg_udp.phy_status.rx_sync_lock' % self._device] == int(2**self._nof_cores - 1): return True time.sleep(0.1) logging.info("TpmTenGCoreXg: %s link reset, attempt %d" % (self._device, n)) logging.error("TpmTenGCoreXg: %s link is no up" % self._device) return False
[docs] def set_src_mac(self, mac): """ Set source MAC address :param mac: MAC address """ self.board[self._xg_udp_core_register('mac_lsb', self._core)] = mac & 0xFFFFFFFF self.board[self._xg_udp_core_register('mac_msb', self._core)] = (mac >> 32) & 0xFFFFFFFF self.board[self._mii_test_register('src_mac_lo', self._core)] = mac & 0xFFFFFFFF self.board[self._mii_test_register('src_mac_hi', self._core)] = (mac >> 32) & 0xFFFFFFFF
[docs] def get_src_mac(self): """ Get source MAC address """ lower = self.board[self._xg_udp_core_register('mac_lsb', self._core)] upper = self.board[self._xg_udp_core_register('mac_msb', self._core)] return upper << 32 | lower
[docs] def set_dst_mac(self, mac): """ Set destination MAC address :param mac: MAC address """ return
[docs] def get_dst_mac(self): """ Get destination MAC address """ return 0
[docs] def set_src_ip(self, ip): """ Set source IP address :param ip: IP address """ try: if type(ip) is not int: ip = struct.unpack("!L", socket.inet_aton(ip))[0] self.board['%s.xg_udp.core%d_ip' % (self._device, self._core)] = ip self.board['%s.xg_udp.core%d_netmask' % (self._device, self._core)] = 0xFF000000 except: raise PluginError("Tpm10GCoreXg: Could not set source IP")
[docs] def get_src_ip(self): """ Get source IP address """ return self.board['%s.xg_udp.core%d_ip' % (self._device, self._core)]
[docs] def set_dst_ip(self, ip): """ Set source IP address :param ip: IP address """ try: if type(ip) is not int: ip = struct.unpack("!L", socket.inet_aton(ip))[0] self.board['%s.xg_udp.core%d_tx_config_ip' % (self._device, self._core)] = ip except: raise PluginError("Tpm10GCoreXg: Could not set destination IP %s" % ip)
[docs] def get_dst_ip(self): """ Get destination ip""" return self.board['%s.xg_udp.core%d_tx_config_ip' % (self._device, self._core)][0]
[docs] def set_src_port(self, port, slot=0): """ Set source IP address :param port: Port :param slot: entry in the tx config table """ rd = self.board[self._tx_config_port_ba + 4 * slot] val = (rd & 0xFFFF0000) | (port & 0xFFFF) self.board[self._tx_config_port_ba + 4 * slot] = val return
[docs] def get_src_port(self, slot=0): """ Get source IP address :param port: Port :param slot: entry in the tx config table """ rd = self.board[self._tx_config_port_ba + 4 * slot] return rd & 0xFFFF
[docs] def set_dst_port(self, port, slot=0): """ Set source IP address :param port: Port :param slot: entry in the core tx config table """ rd = self.board[self._tx_config_port_ba + 4 * slot] val = ((port << 16) & 0xFFFF0000) | (rd & 0xFFFF) self.board[self._tx_config_port_ba + 4 * slot] = val return
[docs] def get_dst_port(self, slot=0): """ Set source IP address :param port: Port :param slot: entry in the core tx config table """ rd = self.board[self._tx_config_port_ba + 4 * slot] return (rd >> 16) & 0xFFFF
[docs] def get_arp_table_status(self, idx, silent_mode=True): self.board['%s.xg_udp.core%d_arp_table_read_pointer' % (self._device, self._core)] = idx rd = self.board['%s.xg_udp.core%d_arp_table_status' % (self._device, self._core)] mac_lsb = self.board['%s.xg_udp.core%d_arp_table_mac_lsb' % (self._device, self._core)] mac_msb = self.board['%s.xg_udp.core%d_arp_table_mac_msb' % (self._device, self._core)] if silent_mode == False: txt = "\n\nvalid: " + str(rd & 0x1) + "\n" txt += "renewing: " + str((rd & 0x2) >> 1) + "\n" txt += "mac_resolved: " + str((rd & 0x4) >> 2) + "\n" txt += "mac: " + hex((mac_msb << 32) + mac_lsb) + "\n" logging.info(txt) return rd
[docs] def mii_test_reset(self): self.board[self._mii_test_register('rx_pkt_num', self._core)] = 0
[docs] def mii_test_init(self, pkt_len, pkt_num, rx_core): self.board[self._mii_test_register('byte_swap', self._core)] = 0x1 self.board[self._mii_test_register('mode', self._core)] = 0x0 self.board[self._mii_test_register('eth_type', self._core)] = 0x88888888 self.board[self._mii_test_register('pkt_len', self._core)] = pkt_len // 8 self.board[self._mii_test_register('tx_pkt_num', self._core)] = pkt_num
#self.board[self._mii_test_register('rx_pkt_num', rx_core)] = 0
[docs] def mii_test_mac_config(self, board): if self._device == "fpga1": adder = 0 else: adder = 4 self.board[self._mii_test_register('src_mac_lo', self._core)] = self._core + adder self.board[self._mii_test_register('src_mac_hi', self._core)] = board self.board[self._mii_test_register('dst_mac_lo', self._core)] = ((self._core + 1) % self._nof_cores) + adder self.board[self._mii_test_register('dst_mac_hi', self._core)] = board
[docs] def mii_test(self, pkt_num, pkt_len=8192, show_result=True, wait_result=True): self.board[self._mii_test_register('tx_pkt_num', self._core)] = 0 # stop current transmission self.mii_send(pkt_num, pkt_len, wait_result) if show_result: self.mii_test_result()
[docs] def mii_send(self, pkt_num, pkt_len=8192, wait_result=True): rx_core = (self._core + 1) % self._nof_cores self.board['%s.regfile.eth10g_ctrl' % self._device] = 0xF self.mii_test_init(pkt_len, pkt_num, rx_core) self.board[self._mii_test_register('tx_start', self._core)] = 0x1 if wait_result: self.mii_wait_idle() self.board['%s.regfile.eth10g_ctrl' % self._device] = 0x0
[docs] def mii_wait_idle(self): rx_core = (self._core + 1) % self._nof_cores wait = 1 while wait == 1: time.sleep(0.3) rd = self.board[self._mii_test_register('tx_start', self._core)] if rd == 1: logging.info("Waiting for transmission...") rd = self.board[self._mii_test_register('rx_pkt_num', rx_core)] logging.info("Received Packets: " + str(rd)) for f in ["fpga1","fpga2"]: for c in range(self._nof_cores): rd1 = self.board[self._mii_test_register('rx_pkt_num', rx_core)] rd2 = self.board[self._mii_test_register('rx_error', rx_core)] _core_name=f if self._mii_prefix is not None: _core_name += "."+self._mii_prefix logging.info('%s core%d: %s error register - %d received packets' % (_core_name, c, hex(rd2), rd1)) else: wait = 0 self.board['%s.regfile.eth10g_ctrl' % self._device] = 0x0
[docs] def mii_test_result(self, verbose = True): rx_core = (self._core + 1) % self._nof_cores rd1 = self.board[self._mii_test_register('rx_pkt_num', rx_core)] rd2 = self.board[self._mii_test_register('rx_error', rx_core)] if verbose: logging.info('%s core%d: %s error register - %d received packets' % (self._device_w_mii_prefix, rx_core, hex(rd2), rd1)) return [rd1, rd2]
##################### Superclass method implementations #################################
[docs] def initialise(self): """ Initialise TpmTenGCoreXg """ logging.info("TpmTenGCoreXg has been initialised") return True
[docs] def status_check(self): """ Perform status check :return: Status """ logging.info("TpmTenGCoreXg : Checking status") return Status.OK
[docs] def clean_up(self): """ Perform cleanup :return: Success """ logging.info("TpmTenGCoreXg : Cleaning up") return True