Source code for pyfabil.plugins.tpm.ten_g_core

from __future__ import division
from builtins import str
from builtins import hex
from builtins import range
import socket

__author__ = 'lessju'

from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging


[docs] class TpmTenGCore(FirmwareBlock): """ TpmTenGCore plugin """ @compatibleboards(BoardMake.TpmBoard) @friendlyname('tpm_10g_core') @maxinstances(8) def __init__(self, board, **kwargs): """ TpmTenGCore initialiser :param board: Pointer to board instance """ super(TpmTenGCore, self).__init__(board) if 'device' not in list(kwargs.keys()): raise PluginError("TpmTenGCore: Require a node instance") self._device = kwargs['device'] if 'core' not in list(kwargs.keys()): raise PluginError("TpmTenGCore: core_id required") if self._device == Device.FPGA_1: self._device = 'fpga1' elif self._device == Device.FPGA_2: self._device = 'fpga2' else: raise PluginError("TpmTenGCore: Invalid device %s" % self._device) self._core = kwargs['core'] #######################################################################################
[docs] def initialise_core(self): """ Initialise 10G core """ # Packet split size: 1025 self.board[ '%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_packet_split_size' % (self._device, self._core)] = 0x00002328 # Disabling header/trailer insert self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_hdr_tlr_inserter' % (self._device, self._core)] = 0x0000F300 # Packet Length and IP Count Values self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_ip_v4_header_1' % (self._device, self._core)] = 0xDB00001C # UDP length: 0x0008 self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_udp_length' % (self._device, self._core)] = 0x00000008 # Enabling Inter-Frame Gap self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_inter_gap_value' % (self._device, self._core)] = 0x00000001 self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_control' % (self._device, self._core)] = 0x00000001 # Disable UDP checksum self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_control.udp_checksum_zero' % (self._device, self._core)] = 0x00000001
[docs] def set_src_mac(self, mac): """ Set source MAC address :param mac: MAC address """ self.board[ '%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_src_mac_addr_lower' % (self._device, self._core)] = mac & 0xFFFFFFFF self.board[ '%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_src_mac_addr_upper' % (self._device, self._core)] = (mac >> 32) & 0xFFFFFFFF self.board[ '%s.mii_test_ic.core%d_src_mac_lo' % (self._device, self._core)] = mac & 0xFFFFFFFF self.board[ '%s.mii_test_ic.core%d_src_mac_hi' % (self._device, self._core)] = (mac >> 32) & 0xFFFFFFFF
[docs] def get_src_mac(self): """ Get source MAC address """ lower = self.board[ '%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_src_mac_addr_lower' % (self._device, self._core)] upper = self.board[ '%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_src_mac_addr_upper' % (self._device, self._core)] return upper << 32 | lower
[docs] def set_dst_mac(self, mac): """ Set destination MAC address :param mac: MAC address """ self.board[ '%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_dst_mac_addr_lower' % (self._device, self._core)] = mac & 0xFFFFFFFF self.board[ '%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_dst_mac_addr_upper' % (self._device, self._core)] = (mac >> 32) & 0xFFFFFFFF self.board[ '%s.mii_test_ic.core%d_dst_mac_lo' % (self._device, self._core)] = mac & 0xFFFFFFFF self.board[ '%s.mii_test_ic.core%d_dst_mac_hi' % (self._device, self._core)] = (mac >> 32) & 0xFFFFFFFF
[docs] def get_dst_mac(self): """ Get destination MAC address """ lower = self.board[ '%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_dst_mac_addr_lower' % (self._device, self._core)] upper = self.board[ '%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_dst_mac_addr_upper' % (self._device, self._core)] return upper << 32 | lower
[docs] def set_src_ip(self, ip): """ Set source IP address :param ip: IP address """ try: if type(ip) is not int: ip = struct.unpack("!L", socket.inet_aton(ip))[0] self.board[ '%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_udp_src_ip_addr' % (self._device, self._core)] = ip except: raise PluginError("Tpm10GCore: Could not set source IP")
[docs] def get_src_ip(self): """ Get source IP address """ return self.board[ '%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_udp_src_ip_addr' % (self._device, self._core)]
[docs] def set_dst_ip(self, ip): """ Set source IP address :param ip: IP address """ try: if type(ip) is not int: ip = struct.unpack("!L", socket.inet_aton(ip))[0] self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_' 'udp_core_control_udp_dst_ip_addr' % (self._device, self._core)] = ip except: raise PluginError("Tpm10GCore: Could not set destination IP %s" % ip)
[docs] def get_dst_ip(self): """ Get destination ip""" return self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_' \ 'udp_core_control_udp_dst_ip_addr' % (self._device, self._core)]
[docs] def set_src_port(self, port): """ Set source IP address :param port: Port """ self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_' \ 'udp_ports.udp_src_port' % (self._device, self._core)] = port
[docs] def get_src_port(self): """ Get source IP address :param port: Port """ return self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_' \ 'udp_ports.udp_src_port' % (self._device, self._core)]
[docs] def set_dst_port(self, port): """ Set source IP address :param port: Port """ self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_' \ 'udp_ports.udp_dst_port' % (self._device, self._core)] = port
[docs] def get_dst_port(self): """ Set source IP address :param port: Port """ return self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_' \ 'udp_ports.udp_dst_port' % (self._device, self._core)]
[docs] def mii_test_init(self, pkt_len, pkt_num, rx_core): self.board['%s.mii_test_ic.core%d_byte_swap' % (self._device, self._core)] = 0x1 self.board['%s.mii_test_ic.core%d_mode' % (self._device, self._core)] = 0x0 self.board['%s.mii_test_ic.core%d_eth_type' % (self._device, self._core)] = 0x88888888 self.board['%s.mii_test_ic.core%d_pkt_len' % (self._device, self._core)] = pkt_len // 8 self.board['%s.mii_test_ic.core%d_tx_pkt_num' % (self._device, self._core)] = pkt_num self.board['%s.mii_test_ic.core%d_rx_pkt_num' % (self._device, rx_core)] = 0
[docs] def mii_test_mac_config(self, board): if self._device == "fpga1": adder = 0 else: adder = 4 self.board['%s.mii_test_ic.core%d_src_mac_lo' % (self._device, self._core)] = self._core + adder self.board['%s.mii_test_ic.core%d_src_mac_hi' % (self._device, self._core)] = board self.board['%s.mii_test_ic.core%d_dst_mac_lo' % (self._device, self._core)] = ((self._core + 1) % 4) + adder self.board['%s.mii_test_ic.core%d_dst_mac_hi' % (self._device, self._core)] = board
[docs] def mii_test(self, pkt_num, pkt_len=8192, show_result=True, wait_result=True): self.board['%s.mii_test_ic.core%d_tx_pkt_num' % (self._device, self._core)] = 0 # stop current transmission self.mii_send(pkt_num, pkt_len, wait_result) if show_result: self.mii_test_result()
[docs] def mii_send(self, pkt_num, pkt_len=8192, wait_result=True): rx_core = (self._core + 1) % 4 self.board['%s.regfile.eth10g_ctrl' % self._device] = 0xF self.mii_test_init(pkt_len, pkt_num, rx_core) self.board['%s.mii_test_ic.core%d_tx_start' % (self._device, self._core)] = 0x1 if wait_result: self.mii_wait_idle() self.board['%s.regfile.eth10g_ctrl' % self._device] = 0x0
[docs] def mii_wait_idle(self): rx_core = (self._core + 1) % 4 wait = 1 while wait == 1: time.sleep(0.3) rd = self.board['%s.mii_test_ic.core%d_tx_start' % (self._device, self._core)] if rd == 1: logging.debug("Waiting for transmission...") rd = self.board['%s.mii_test_ic.core%d_rx_pkt_num' % (self._device, rx_core)] logging.debug("Received Packets: " + str(rd)) for f in ["fpga1","fpga2"]: for c in range(4): rd = self.board['%s.mii_test_ic.core%d_rx_error' % (f, c)] logging.debug('%s core%d: %s error register' % (f, c, hex(rd))) else: wait = 0 self.board['%s.regfile.eth10g_ctrl' % self._device] = 0x0
[docs] def mii_test_result(self): rx_core = (self._core + 1) % 4 rd1 = self.board['%s.mii_test_ic.core%d_rx_pkt_num' % (self._device, rx_core)] rd2 = self.board['%s.mii_test_ic.core%d_rx_error' % (self._device, rx_core)] logging.debug('%s core%d: %s error register - %d received packets' % (self._device, rx_core, hex(rd2), rd1))
##################### Superclass method implementations #################################
[docs] def initialise(self): """ Initialise TpmTenGCore """ logging.info("TpmTenGCore has been initialised") return True
[docs] def status_check(self): """ Perform status check :return: Status """ logging.info("TpmTenGCore : Checking status") return Status.OK
[docs] def clean_up(self): """ Perform cleanup :return: Success """ logging.info("TpmTenGCore : Cleaning up") return True