Source code for pyfabil.plugins.tpm.forty_g_xg_core

from __future__ import division
from builtins import str
from builtins import hex
from builtins import range
import socket
import time

__author__ = 'lessju'

from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging


[docs] class TpmFortyGCoreXg(FirmwareBlock): """ TpmFortyGCoreXg plugin """ @compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board) @friendlyname('tpm_10g_core') @maxinstances(2) def __init__(self, board, **kwargs): """ TpmFortyGCoreXg initialiser :param board: Pointer to board instance """ super(TpmFortyGCoreXg, self).__init__(board) if 'device' not in list(kwargs.keys()): raise PluginError("TpmFortyGCoreXg: Require a node instance") self._device = kwargs['device'] if 'core' not in list(kwargs.keys()): raise PluginError("TpmFortyGCoreXg: core_id required") if self._device == Device.FPGA_1: self._device = 'fpga1' elif self._device == Device.FPGA_2: self._device = 'fpga2' else: raise PluginError("TpmFortyGCoreXg: Invalid device %s" % self._device) self._core = kwargs['core'] self._tx_config_ip_ba = self.board.memory_map['%s.xg_udp.core%d_tx_config_ip' % (self._device, self._core)].address self._tx_config_port_ba = self.board.memory_map['%s.xg_udp.core%d_tx_config_port' % (self._device, self._core)].address if self.board.has_register(f'{self._device}.xg_udp.phy_rx_decode_error_counter_0'): self._decode_error_supported = True else: self._decode_error_supported = False if self.board.has_register(f'{self._device}.xg_udp.phy_rx_linkup_loss_cnt'): self._linkup_loss_cnt_supported = True else: self._linkup_loss_cnt_supported = False #######################################################################################
[docs] def initialise_core(self): """ Initialise 40G core """ self.board['%s.xg_udp.core%d_netmask' % (self._device, self._core)] = 0xFF000000 self.board['%s.xg_udp.core%d_global_config.tx_use_always_0' % (self._device, self._core)] = 0 self.board['%s.xg_udp.core%d_global_config.promiscuous_mode_enable' % (self._device, self._core)] = 0 self.board['%s.xg_udp.core%d_global_config.udp_port_filter_enable' % (self._device, self._core)] = 1 if self.board['%s.xg_udp.core%d_rev' % (self._device, self._core)] >= 2: self.board['%s.xg_udp.core%d_global_config.tx_disable' % (self._device, self._core)] = 0 return
[docs] def tx_disable(self): if self.board['%s.xg_udp.core%d_rev' % (self._device, self._core)] >= 2: self.board['%s.xg_udp.core%d_global_config.tx_disable' % (self._device, self._core)] = 1 return
[docs] def is_tx_disabled(self): if self.board['%s.xg_udp.core%d_rev' % (self._device, self._core)] >= 2: rd = self.board['%s.xg_udp.core%d_global_config.tx_disable' % (self._device, self._core)] if rd == 1: return True return False
[docs] def get_number_of_arp_table_entries(self): if self.board['%s.xg_udp.core%d_rev' % (self._device, self._core)] >= 2: return self.board['%s.xg_udp.core%d_features.nof_arp_table_entries' % (self._device, self._core)] return 4
[docs] def check_errors(self): errors = False if self.board['%s.xg_udp.phy_status.vl_aligned' % self._device] == 0: errors = True if self.board['%s.xg_udp.core%d_rx_crc_error' % (self._device, self._core)] != 0: errors = True for lane in range(4): if self.board['%s.xg_udp.phy_rx_bip_error_counter_%d' % (self._device, lane)] != 0: errors = True if self._decode_error_supported: if self.board['%s.xg_udp.phy_rx_decode_error_counter_%d' % (self._device, lane)] != 0: errors = True return errors
[docs] def get_bip_error_count(self): bip_error = {} for lane in range(4): bip_error[f'lane{lane}'] = self.board[f'{self._device}.xg_udp.phy_rx_bip_error_counter_{lane}'] return bip_error
[docs] def get_decode_error_count(self): if not self._decode_error_supported: logging.warning("40G decode error reporting is not supported.") return decode_error = {} for lane in range(4): decode_error[f'lane{lane}'] = self.board[f'{self._device}.xg_udp.phy_rx_decode_error_counter_{lane}'] return decode_error
[docs] def get_crc_error_count(self): return self.board[f'{self._device}.xg_udp.core{self._core}_rx_crc_error']
[docs] def get_packet_counter(self): counters = { "rx_received": self.board[f'{self._device}.xg_udp.core{self._core}_rx_received_packets_msb'] << 32 + self.board[f'{self._device}.xg_udp.core{self._core}_rx_received_packets_lsb'], "rx_forwarded": self.board[f'{self._device}.xg_udp.core{self._core}_rx_forwarded_packets_msb'] << 32 + self.board[f'{self._device}.xg_udp.core{self._core}_rx_forwarded_packets_lsb'], "tx_transmitted": self.board[f'{self._device}.xg_udp.core{self._core}_tx_transmitted_packets_msb'] << 32 + self.board[f'{self._device}.xg_udp.core{self._core}_tx_transmitted_packets_lsb'] } return counters
[docs] def reset_errors(self): self.board['%s.xg_udp.phy_rx_bip_error_counter_rst' % self._device] = 0xF self.board['%s.xg_udp.phy_rx_bip_error_counter_rst' % self._device] = 0x0 if self._linkup_loss_cnt_supported: self.board['%s.xg_udp.phy_rx_linkup_loss_cnt' % self._device] = 0x0 self.board['%s.xg_udp.core%d_rx_crc_error' % (self._device, self._core)] = 0 if self._decode_error_supported: self.board['%s.xg_udp.phy_rx_decode_error_counter_rst' % self._device] = 0xF self.board['%s.xg_udp.phy_rx_decode_error_counter_rst' % self._device] = 0x0
[docs] def check_linkup_loss_cnt(self, show_result=True): if self._linkup_loss_cnt_supported: count = self.board[f'{self._device}.xg_udp.phy_rx_linkup_loss_cnt'] if show_result: logging.info(f'{self._device.upper()} linkup loss count {count}') return count
[docs] def reset_core(self, timeout=10): for n in range(timeout): self.board['%s.xg_udp.phy_ctrl.rst' % self._device] = 1 self.board['%s.xg_udp.phy_ctrl.rst' % self._device] = 0 errors = False for _ in range(100): if self.board['%s.xg_udp.phy_status.vl_aligned' % self._device] == 1: time.sleep(3) self.reset_errors() time.sleep(0.5) if not self.check_errors(): logging.info("TpmFortyGCoreXg: %s link is up!" % self._device) return True else: errors = True break time.sleep(0.1) logging.info("TpmFortyGCoreXg: %s link reset, attempt %d, errors: %s" % (self._device, n, str(errors))) # Give up after timeout number of attempts logging.warning("TpmFortyGCoreXg: %s link is down!" % self._device) return False
[docs] def set_src_mac(self, mac): """ Set source MAC address :param mac: MAC address """ self.board['%s.xg_udp.core%d_mac_lsb' % (self._device, self._core)] = mac & 0xFFFFFFFF self.board['%s.xg_udp.core%d_mac_msb' % (self._device, self._core)] = (mac >> 32) & 0xFFFFFFFF
[docs] def get_src_mac(self): """ Get source MAC address """ lower = self.board['%s.xg_udp.core%d_mac_lsb' % (self._device, self._core)] upper = self.board['%s.xg_udp.core%d_mac_msb' % (self._device, self._core)] return upper << 32 | lower
[docs] def set_dst_mac(self, mac): """ Set destination MAC address :param mac: MAC address """ return
[docs] def get_dst_mac(self): """ Get destination MAC address """ return 0
[docs] def set_src_ip(self, ip): """ Set source IP address :param ip: IP address """ try: if type(ip) is not int: ip = struct.unpack("!L", socket.inet_aton(ip))[0] self.board['%s.xg_udp.core%d_ip' % (self._device, self._core)] = ip except: raise PluginError("TpmFortyGCoreXg: Could not set source IP " + str(ip))
[docs] def get_src_ip(self): """ Get source IP address """ return self.board['%s.xg_udp.core%d_ip' % (self._device, self._core)]
[docs] def set_dst_ip(self, ip, slot=0): """ Set source IP address :param ip: IP address :param slot: entry in the tx config table """ try: if type(ip) is not int: ip = struct.unpack("!L", socket.inet_aton(ip))[0] self.board[self._tx_config_ip_ba + 4 * slot] = ip except: raise PluginError("TpmFortyGCoreXg: Could not set destination IP " + str(ip))
[docs] def get_dst_ip(self, slot=0): """ Get destination ip :param slot: entry in the tx config table """ return self.board[self._tx_config_ip_ba + 4 * slot]
[docs] def set_src_port(self, port, slot=0): """ Set source IP address :param port: Port :param slot: entry in the tx config table """ rd = self.board[self._tx_config_port_ba + 4 * slot] val = (rd & 0xFFFF0000) | (port & 0xFFFF) self.board[self._tx_config_port_ba + 4 * slot] = val return
[docs] def get_src_port(self, slot=0): """ Get source IP address :param port: Port :param slot: entry in the tx config table """ rd = self.board[self._tx_config_port_ba + 4 * slot] return rd & 0xFFFF
[docs] def set_dst_port(self, port, slot=0): """ Set source IP address :param port: Port :param slot: entry in the core tx config table """ rd = self.board[self._tx_config_port_ba + 4 * slot] val = ((port << 16) & 0xFFFF0000) | (rd & 0xFFFF) self.board[self._tx_config_port_ba + 4 * slot] = val return
[docs] def get_dst_port(self, slot=0): """ Set source IP address :param port: Port :param slot: entry in the core tx config table """ rd = self.board[self._tx_config_port_ba + 4 * slot] return (rd >> 16) & 0xFFFF
[docs] def set_netmask(self, netmask=0xFF000000): """ Set netmask :param netmask: Netmask """ self.board['%s.xg_udp.core%d_netmask' % (self._device, self._core)] = netmask self.renew_arp_table() return
[docs] def get_netmask(self): """ Set netmask :return Netmask """ return self.board['%s.xg_udp.core%d_netmask' % (self._device, self._core)]
[docs] def set_gateway_ip(self, gateway=0): """ Set gateway :param gateway: Gateway """ self.board['%s.xg_udp.core%d_gateway' % (self._device, self._core)] = gateway self.renew_arp_table() return
[docs] def get_gateway_ip(self): """ Get gateway :return gateway """ return self.board['%s.xg_udp.core%d_gateway' % (self._device, self._core)]
[docs] def renew_arp_table(self): """ Renew ARP table. This will be called after netmask or gateway are written to the registers, if there are entries that forward packets to the gateway, """ nof_entries = self.get_number_of_arp_table_entries() - 2 netmask = self.get_netmask() for n in range(nof_entries): status, _ = self.get_arp_table_status(n) if status & 0x1 == 0x1: dst_ip = self.get_dst_ip(n) src_ip = self.get_src_ip() if ((dst_ip & netmask) ^ src_ip) != 0: self.set_dst_ip(dst_ip, n) return
[docs] def get_arp_table_status(self, idx, silent_mode=True): self.board['%s.xg_udp.core%d_arp_table_read_pointer' % (self._device, self._core)] = idx rd = self.board['%s.xg_udp.core%d_arp_table_status' % (self._device, self._core)] mac_lsb = self.board['%s.xg_udp.core%d_arp_table_mac_lsb' % (self._device, self._core)] mac_msb = self.board['%s.xg_udp.core%d_arp_table_mac_msb' % (self._device, self._core)] mac = (mac_msb << 32) + mac_lsb if not silent_mode: txt = "\n\nvalid: " + str(rd & 0x1) + "\n" txt += "renewing: " + str((rd & 0x2) >> 1) + "\n" txt += "mac_resolved: " + str((rd & 0x4) >> 2) + "\n" txt += "mac: " + hex(mac) + "\n" logging.info(txt) return rd, mac
[docs] def set_rx_port_filter(self, port, slot): self.board['%s.xg_udp.core%d_global_config.udp_port_filter_enable' % (self._device, self._core)] = 1 self.board['%s.xg_udp.core%d_rx_config_%d.enable' % (self._device, self._core, slot)] = 1 self.board['%s.xg_udp.core%d_rx_config_%d.rx_dst_port' % (self._device, self._core, slot)] = port
[docs] def test_stop(self): self.board['%s.xg_udp.core%d_test_ctrl_0.tx_start' % (self._device, self._core)] = 0 self.board['%s.xg_udp.core%d_test_ctrl_0.rx_start' % (self._device, self._core)] = 0
[docs] def test_start_tx(self, dst_ip=None, ipg=4, packet_len=8192): self.board['%s.xg_udp.core%d_test_ctrl_0.tx_start' % (self._device, self._core)] = 0 attempt = 0 if dst_ip is not None: self.set_dst_ip(dst_ip) while (self.get_arp_table_status(0)[0] & 0x4) == 0: time.sleep(0.01) attempt += 1 if attempt * 0.01 == 2: logging.error("TpmFortyGCoreXg %s Core%d: Not possible to resolve ARP. Test not started!" % (self._device.upper(), self._core)) return 1 self.board['%s.xg_udp.core%d_test_ctrl_1.ipg' % (self._device, self._core)] = ipg self.board['%s.xg_udp.core%d_test_ctrl_1.pkt_len' % (self._device, self._core)] = packet_len self.board['%s.xg_udp.core%d_test_ctrl_0.tx_start' % (self._device, self._core)] = 1 return 0
[docs] def test_start_rx(self, single_packet_mode=False): self.board['%s.xg_udp.core%d_rx_crc_error' % (self._device, self._core)] = 0 if single_packet_mode: self.board['%s.xg_udp.core%d_test_ctrl_0.rx_single_pkt_check' % (self._device, self._core)] = 1 else: self.board['%s.xg_udp.core%d_test_ctrl_0.rx_single_pkt_check' % (self._device, self._core)] = 0 self.board['%s.xg_udp.core%d_test_ctrl_0.rx_start' % (self._device, self._core)] = 0 self.board['%s.xg_udp.core%d_test_ctrl_0.rx_start' % (self._device, self._core)] = 1
[docs] def test_check_result(self): print("Test Error detected: %d" % self.board['%s.xg_udp.core%d_test_status.error' % (self._device, self._core)]) print("Test Error counter: %d" % self.board['%s.xg_udp.core%d_test_error_cnt' % (self._device, self._core)]) print("RX CRC Error counter: %d" % self.board['%s.xg_udp.core%d_rx_crc_error' % (self._device, self._core)]) print("TX packet counter: %d" % self.board['%s.xg_udp.core%d_test_tx_pkt_cnt' % (self._device, self._core)]) print("RX packet counter: %d" % self.board['%s.xg_udp.core%d_test_rx_pkt_cnt' % (self._device, self._core)])
##################### Superclass method implementations #################################
[docs] def initialise(self): """ Initialise TpmFortyGCoreXg """ logging.info("TpmFortyGCoreXg has been initialised") return True
[docs] def status_check(self): """ Perform status check :return: Status """ logging.info("TpmFortyGCoreXg: Checking status") return Status.OK
[docs] def clean_up(self): """ Perform cleanup :return: Success """ logging.info("TpmFortyGCoreXg: Cleaning up") return True