from __future__ import division
from builtins import str
from builtins import hex
from builtins import range
import socket
import time
__author__ = 'lessju'
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging
[docs]
class TpmFortyGCoreXg(FirmwareBlock):
""" TpmFortyGCoreXg plugin """
@compatibleboards(BoardMake.TpmBoard, BoardMake.Tpm16Board)
@friendlyname('tpm_10g_core')
@maxinstances(2)
def __init__(self, board, **kwargs):
""" TpmFortyGCoreXg initialiser
:param board: Pointer to board instance
"""
super(TpmFortyGCoreXg, self).__init__(board)
if 'device' not in list(kwargs.keys()):
raise PluginError("TpmFortyGCoreXg: Require a node instance")
self._device = kwargs['device']
if 'core' not in list(kwargs.keys()):
raise PluginError("TpmFortyGCoreXg: core_id required")
if self._device == Device.FPGA_1:
self._device = 'fpga1'
elif self._device == Device.FPGA_2:
self._device = 'fpga2'
else:
raise PluginError("TpmFortyGCoreXg: Invalid device %s" % self._device)
self._core = kwargs['core']
self._tx_config_ip_ba = self.board.memory_map['%s.xg_udp.core%d_tx_config_ip' % (self._device, self._core)].address
self._tx_config_port_ba = self.board.memory_map['%s.xg_udp.core%d_tx_config_port' % (self._device, self._core)].address
if self.board.has_register(f'{self._device}.xg_udp.phy_rx_decode_error_counter_0'):
self._decode_error_supported = True
else:
self._decode_error_supported = False
if self.board.has_register(f'{self._device}.xg_udp.phy_rx_linkup_loss_cnt'):
self._linkup_loss_cnt_supported = True
else:
self._linkup_loss_cnt_supported = False
#######################################################################################
[docs]
def initialise_core(self):
""" Initialise 40G core """
self.board['%s.xg_udp.core%d_netmask' % (self._device, self._core)] = 0xFF000000
self.board['%s.xg_udp.core%d_global_config.tx_use_always_0' % (self._device, self._core)] = 0
self.board['%s.xg_udp.core%d_global_config.promiscuous_mode_enable' % (self._device, self._core)] = 0
self.board['%s.xg_udp.core%d_global_config.udp_port_filter_enable' % (self._device, self._core)] = 1
if self.board['%s.xg_udp.core%d_rev' % (self._device, self._core)] >= 2:
self.board['%s.xg_udp.core%d_global_config.tx_disable' % (self._device, self._core)] = 0
return
[docs]
def tx_disable(self):
if self.board['%s.xg_udp.core%d_rev' % (self._device, self._core)] >= 2:
self.board['%s.xg_udp.core%d_global_config.tx_disable' % (self._device, self._core)] = 1
return
[docs]
def is_tx_disabled(self):
if self.board['%s.xg_udp.core%d_rev' % (self._device, self._core)] >= 2:
rd = self.board['%s.xg_udp.core%d_global_config.tx_disable' % (self._device, self._core)]
if rd == 1:
return True
return False
[docs]
def get_number_of_arp_table_entries(self):
if self.board['%s.xg_udp.core%d_rev' % (self._device, self._core)] >= 2:
return self.board['%s.xg_udp.core%d_features.nof_arp_table_entries' % (self._device, self._core)]
return 4
[docs]
def check_errors(self):
errors = False
if self.board['%s.xg_udp.phy_status.vl_aligned' % self._device] == 0:
errors = True
if self.board['%s.xg_udp.core%d_rx_crc_error' % (self._device, self._core)] != 0:
errors = True
for lane in range(4):
if self.board['%s.xg_udp.phy_rx_bip_error_counter_%d' % (self._device, lane)] != 0:
errors = True
if self._decode_error_supported:
if self.board['%s.xg_udp.phy_rx_decode_error_counter_%d' % (self._device, lane)] != 0:
errors = True
return errors
[docs]
def get_bip_error_count(self):
bip_error = {}
for lane in range(4):
bip_error[f'lane{lane}'] = self.board[f'{self._device}.xg_udp.phy_rx_bip_error_counter_{lane}']
return bip_error
[docs]
def get_decode_error_count(self):
if not self._decode_error_supported:
logging.warning("40G decode error reporting is not supported.")
return
decode_error = {}
for lane in range(4):
decode_error[f'lane{lane}'] = self.board[f'{self._device}.xg_udp.phy_rx_decode_error_counter_{lane}']
return decode_error
[docs]
def get_crc_error_count(self):
return self.board[f'{self._device}.xg_udp.core{self._core}_rx_crc_error']
[docs]
def get_packet_counter(self):
counters = {
"rx_received": self.board[f'{self._device}.xg_udp.core{self._core}_rx_received_packets_msb'] << 32 +
self.board[f'{self._device}.xg_udp.core{self._core}_rx_received_packets_lsb'],
"rx_forwarded": self.board[f'{self._device}.xg_udp.core{self._core}_rx_forwarded_packets_msb'] << 32 +
self.board[f'{self._device}.xg_udp.core{self._core}_rx_forwarded_packets_lsb'],
"tx_transmitted": self.board[f'{self._device}.xg_udp.core{self._core}_tx_transmitted_packets_msb'] << 32 +
self.board[f'{self._device}.xg_udp.core{self._core}_tx_transmitted_packets_lsb']
}
return counters
[docs]
def reset_errors(self):
self.board['%s.xg_udp.phy_rx_bip_error_counter_rst' % self._device] = 0xF
self.board['%s.xg_udp.phy_rx_bip_error_counter_rst' % self._device] = 0x0
if self._linkup_loss_cnt_supported:
self.board['%s.xg_udp.phy_rx_linkup_loss_cnt' % self._device] = 0x0
self.board['%s.xg_udp.core%d_rx_crc_error' % (self._device, self._core)] = 0
if self._decode_error_supported:
self.board['%s.xg_udp.phy_rx_decode_error_counter_rst' % self._device] = 0xF
self.board['%s.xg_udp.phy_rx_decode_error_counter_rst' % self._device] = 0x0
[docs]
def check_linkup_loss_cnt(self, show_result=True):
if self._linkup_loss_cnt_supported:
count = self.board[f'{self._device}.xg_udp.phy_rx_linkup_loss_cnt']
if show_result:
logging.info(f'{self._device.upper()} linkup loss count {count}')
return count
[docs]
def reset_core(self, timeout=10):
for n in range(timeout):
self.board['%s.xg_udp.phy_ctrl.rst' % self._device] = 1
self.board['%s.xg_udp.phy_ctrl.rst' % self._device] = 0
errors = False
for _ in range(100):
if self.board['%s.xg_udp.phy_status.vl_aligned' % self._device] == 1:
time.sleep(3)
self.reset_errors()
time.sleep(0.5)
if not self.check_errors():
logging.info("TpmFortyGCoreXg: %s link is up!" % self._device)
return True
else:
errors = True
break
time.sleep(0.1)
logging.info("TpmFortyGCoreXg: %s link reset, attempt %d, errors: %s" % (self._device, n, str(errors)))
# Give up after timeout number of attempts
logging.warning("TpmFortyGCoreXg: %s link is down!" % self._device)
return False
[docs]
def is_link_up(self):
if self.board['%s.xg_udp.phy_status.vl_aligned' % self._device] == 1:
return True
else:
return False
[docs]
def set_src_mac(self, mac):
""" Set source MAC address
:param mac: MAC address
"""
self.board['%s.xg_udp.core%d_mac_lsb' % (self._device, self._core)] = mac & 0xFFFFFFFF
self.board['%s.xg_udp.core%d_mac_msb' % (self._device, self._core)] = (mac >> 32) & 0xFFFFFFFF
[docs]
def get_src_mac(self):
""" Get source MAC address """
lower = self.board['%s.xg_udp.core%d_mac_lsb' % (self._device, self._core)]
upper = self.board['%s.xg_udp.core%d_mac_msb' % (self._device, self._core)]
return upper << 32 | lower
[docs]
def set_dst_mac(self, mac):
""" Set destination MAC address
:param mac: MAC address
"""
return
[docs]
def get_dst_mac(self):
""" Get destination MAC address """
return 0
[docs]
def set_src_ip(self, ip):
""" Set source IP address
:param ip: IP address
"""
try:
if type(ip) is not int:
ip = struct.unpack("!L", socket.inet_aton(ip))[0]
self.board['%s.xg_udp.core%d_ip' % (self._device, self._core)] = ip
except:
raise PluginError("TpmFortyGCoreXg: Could not set source IP " + str(ip))
[docs]
def get_src_ip(self):
""" Get source IP address """
return self.board['%s.xg_udp.core%d_ip' % (self._device, self._core)]
[docs]
def set_dst_ip(self, ip, slot=0):
""" Set source IP address
:param ip: IP address
:param slot: entry in the tx config table
"""
try:
if type(ip) is not int:
ip = struct.unpack("!L", socket.inet_aton(ip))[0]
self.board[self._tx_config_ip_ba + 4 * slot] = ip
except:
raise PluginError("TpmFortyGCoreXg: Could not set destination IP " + str(ip))
[docs]
def get_dst_ip(self, slot=0):
""" Get destination ip
:param slot: entry in the tx config table
"""
return self.board[self._tx_config_ip_ba + 4 * slot]
[docs]
def set_src_port(self, port, slot=0):
""" Set source IP address
:param port: Port
:param slot: entry in the tx config table
"""
rd = self.board[self._tx_config_port_ba + 4 * slot]
val = (rd & 0xFFFF0000) | (port & 0xFFFF)
self.board[self._tx_config_port_ba + 4 * slot] = val
return
[docs]
def get_src_port(self, slot=0):
""" Get source IP address
:param port: Port
:param slot: entry in the tx config table
"""
rd = self.board[self._tx_config_port_ba + 4 * slot]
return rd & 0xFFFF
[docs]
def set_dst_port(self, port, slot=0):
""" Set source IP address
:param port: Port
:param slot: entry in the core tx config table
"""
rd = self.board[self._tx_config_port_ba + 4 * slot]
val = ((port << 16) & 0xFFFF0000) | (rd & 0xFFFF)
self.board[self._tx_config_port_ba + 4 * slot] = val
return
[docs]
def get_dst_port(self, slot=0):
""" Set source IP address
:param port: Port
:param slot: entry in the core tx config table
"""
rd = self.board[self._tx_config_port_ba + 4 * slot]
return (rd >> 16) & 0xFFFF
[docs]
def set_netmask(self, netmask=0xFF000000):
""" Set netmask
:param netmask: Netmask
"""
self.board['%s.xg_udp.core%d_netmask' % (self._device, self._core)] = netmask
self.renew_arp_table()
return
[docs]
def get_netmask(self):
""" Set netmask
:return Netmask
"""
return self.board['%s.xg_udp.core%d_netmask' % (self._device, self._core)]
[docs]
def set_gateway_ip(self, gateway=0):
""" Set gateway
:param gateway: Gateway
"""
self.board['%s.xg_udp.core%d_gateway' % (self._device, self._core)] = gateway
self.renew_arp_table()
return
[docs]
def get_gateway_ip(self):
""" Get gateway
:return gateway
"""
return self.board['%s.xg_udp.core%d_gateway' % (self._device, self._core)]
[docs]
def renew_arp_table(self):
""" Renew ARP table. This will be called after netmask or gateway are written to the registers, if there are
entries that forward packets to the gateway,
"""
nof_entries = self.get_number_of_arp_table_entries() - 2
netmask = self.get_netmask()
for n in range(nof_entries):
status, _ = self.get_arp_table_status(n)
if status & 0x1 == 0x1:
dst_ip = self.get_dst_ip(n)
src_ip = self.get_src_ip()
if ((dst_ip & netmask) ^ src_ip) != 0:
self.set_dst_ip(dst_ip, n)
return
[docs]
def get_arp_table_status(self, idx, silent_mode=True):
self.board['%s.xg_udp.core%d_arp_table_read_pointer' % (self._device, self._core)] = idx
rd = self.board['%s.xg_udp.core%d_arp_table_status' % (self._device, self._core)]
mac_lsb = self.board['%s.xg_udp.core%d_arp_table_mac_lsb' % (self._device, self._core)]
mac_msb = self.board['%s.xg_udp.core%d_arp_table_mac_msb' % (self._device, self._core)]
mac = (mac_msb << 32) + mac_lsb
if not silent_mode:
txt = "\n\nvalid: " + str(rd & 0x1) + "\n"
txt += "renewing: " + str((rd & 0x2) >> 1) + "\n"
txt += "mac_resolved: " + str((rd & 0x4) >> 2) + "\n"
txt += "mac: " + hex(mac) + "\n"
logging.info(txt)
return rd, mac
[docs]
def set_rx_port_filter(self, port, slot):
self.board['%s.xg_udp.core%d_global_config.udp_port_filter_enable' % (self._device, self._core)] = 1
self.board['%s.xg_udp.core%d_rx_config_%d.enable' % (self._device, self._core, slot)] = 1
self.board['%s.xg_udp.core%d_rx_config_%d.rx_dst_port' % (self._device, self._core, slot)] = port
[docs]
def test_stop(self):
self.board['%s.xg_udp.core%d_test_ctrl_0.tx_start' % (self._device, self._core)] = 0
self.board['%s.xg_udp.core%d_test_ctrl_0.rx_start' % (self._device, self._core)] = 0
[docs]
def test_start_tx(self, dst_ip=None, ipg=4, packet_len=8192):
self.board['%s.xg_udp.core%d_test_ctrl_0.tx_start' % (self._device, self._core)] = 0
attempt = 0
if dst_ip is not None:
self.set_dst_ip(dst_ip)
while (self.get_arp_table_status(0)[0] & 0x4) == 0:
time.sleep(0.01)
attempt += 1
if attempt * 0.01 == 2:
logging.error("TpmFortyGCoreXg %s Core%d: Not possible to resolve ARP. Test not started!" % (self._device.upper(), self._core))
return 1
self.board['%s.xg_udp.core%d_test_ctrl_1.ipg' % (self._device, self._core)] = ipg
self.board['%s.xg_udp.core%d_test_ctrl_1.pkt_len' % (self._device, self._core)] = packet_len
self.board['%s.xg_udp.core%d_test_ctrl_0.tx_start' % (self._device, self._core)] = 1
return 0
[docs]
def test_start_rx(self, single_packet_mode=False):
self.board['%s.xg_udp.core%d_rx_crc_error' % (self._device, self._core)] = 0
if single_packet_mode:
self.board['%s.xg_udp.core%d_test_ctrl_0.rx_single_pkt_check' % (self._device, self._core)] = 1
else:
self.board['%s.xg_udp.core%d_test_ctrl_0.rx_single_pkt_check' % (self._device, self._core)] = 0
self.board['%s.xg_udp.core%d_test_ctrl_0.rx_start' % (self._device, self._core)] = 0
self.board['%s.xg_udp.core%d_test_ctrl_0.rx_start' % (self._device, self._core)] = 1
[docs]
def test_check_result(self):
print("Test Error detected: %d" % self.board['%s.xg_udp.core%d_test_status.error' % (self._device, self._core)])
print("Test Error counter: %d" % self.board['%s.xg_udp.core%d_test_error_cnt' % (self._device, self._core)])
print("RX CRC Error counter: %d" % self.board['%s.xg_udp.core%d_rx_crc_error' % (self._device, self._core)])
print("TX packet counter: %d" % self.board['%s.xg_udp.core%d_test_tx_pkt_cnt' % (self._device, self._core)])
print("RX packet counter: %d" % self.board['%s.xg_udp.core%d_test_rx_pkt_cnt' % (self._device, self._core)])
##################### Superclass method implementations #################################
[docs]
def initialise(self):
""" Initialise TpmFortyGCoreXg """
logging.info("TpmFortyGCoreXg has been initialised")
return True
[docs]
def status_check(self):
""" Perform status check
:return: Status
"""
logging.info("TpmFortyGCoreXg: Checking status")
return Status.OK
[docs]
def clean_up(self):
""" Perform cleanup
:return: Success
"""
logging.info("TpmFortyGCoreXg: Cleaning up")
return True