from __future__ import division
from builtins import str
from builtins import hex
from builtins import range
import socket
__author__ = 'lessju'
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging
[docs]
class TpmTenGCore(FirmwareBlock):
""" TpmTenGCore plugin """
@compatibleboards(BoardMake.TpmBoard)
@friendlyname('tpm_10g_core')
@maxinstances(8)
def __init__(self, board, **kwargs):
""" TpmTenGCore initialiser
:param board: Pointer to board instance
"""
super(TpmTenGCore, self).__init__(board)
if 'device' not in list(kwargs.keys()):
raise PluginError("TpmTenGCore: Require a node instance")
self._device = kwargs['device']
if 'core' not in list(kwargs.keys()):
raise PluginError("TpmTenGCore: core_id required")
if self._device == Device.FPGA_1:
self._device = 'fpga1'
elif self._device == Device.FPGA_2:
self._device = 'fpga2'
else:
raise PluginError("TpmTenGCore: Invalid device %s" % self._device)
self._core = kwargs['core']
#######################################################################################
[docs]
def initialise_core(self):
""" Initialise 10G core """
# Packet split size: 1025
self.board[
'%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_packet_split_size'
% (self._device, self._core)] = 0x00002328
# Disabling header/trailer insert
self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_hdr_tlr_inserter'
% (self._device, self._core)] = 0x0000F300
# Packet Length and IP Count Values
self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_ip_v4_header_1'
% (self._device, self._core)] = 0xDB00001C
# UDP length: 0x0008
self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_udp_length'
% (self._device, self._core)] = 0x00000008
# Enabling Inter-Frame Gap
self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_inter_gap_value'
% (self._device, self._core)] = 0x00000001
self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_control'
% (self._device, self._core)] = 0x00000001
# Disable UDP checksum
self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_control.udp_checksum_zero'
% (self._device, self._core)] = 0x00000001
[docs]
def set_src_mac(self, mac):
""" Set source MAC address
:param mac: MAC address
"""
self.board[
'%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_src_mac_addr_lower'
% (self._device, self._core)] = mac & 0xFFFFFFFF
self.board[
'%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_src_mac_addr_upper'
% (self._device, self._core)] = (mac >> 32) & 0xFFFFFFFF
self.board[
'%s.mii_test_ic.core%d_src_mac_lo'
% (self._device, self._core)] = mac & 0xFFFFFFFF
self.board[
'%s.mii_test_ic.core%d_src_mac_hi'
% (self._device, self._core)] = (mac >> 32) & 0xFFFFFFFF
[docs]
def get_src_mac(self):
""" Get source MAC address """
lower = self.board[
'%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_src_mac_addr_lower'
% (self._device, self._core)]
upper = self.board[
'%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_src_mac_addr_upper'
% (self._device, self._core)]
return upper << 32 | lower
[docs]
def set_dst_mac(self, mac):
""" Set destination MAC address
:param mac: MAC address
"""
self.board[
'%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_dst_mac_addr_lower'
% (self._device, self._core)] = mac & 0xFFFFFFFF
self.board[
'%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_dst_mac_addr_upper'
% (self._device, self._core)] = (mac >> 32) & 0xFFFFFFFF
self.board[
'%s.mii_test_ic.core%d_dst_mac_lo'
% (self._device, self._core)] = mac & 0xFFFFFFFF
self.board[
'%s.mii_test_ic.core%d_dst_mac_hi'
% (self._device, self._core)] = (mac >> 32) & 0xFFFFFFFF
[docs]
def get_dst_mac(self):
""" Get destination MAC address """
lower = self.board[
'%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_dst_mac_addr_lower'
% (self._device, self._core)]
upper = self.board[
'%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_dst_mac_addr_upper'
% (self._device, self._core)]
return upper << 32 | lower
[docs]
def set_src_ip(self, ip):
""" Set source IP address
:param ip: IP address
"""
try:
if type(ip) is not int:
ip = struct.unpack("!L", socket.inet_aton(ip))[0]
self.board[
'%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_udp_src_ip_addr'
% (self._device, self._core)] = ip
except:
raise PluginError("Tpm10GCore: Could not set source IP")
[docs]
def get_src_ip(self):
""" Get source IP address """
return self.board[
'%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_udp_src_ip_addr'
% (self._device, self._core)]
[docs]
def set_dst_ip(self, ip):
""" Set source IP address
:param ip: IP address
"""
try:
if type(ip) is not int:
ip = struct.unpack("!L", socket.inet_aton(ip))[0]
self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_'
'udp_core_control_udp_dst_ip_addr' % (self._device, self._core)] = ip
except:
raise PluginError("Tpm10GCore: Could not set destination IP %s" % ip)
[docs]
def get_dst_ip(self):
""" Get destination ip"""
return self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_' \
'udp_core_control_udp_dst_ip_addr' % (self._device, self._core)]
[docs]
def set_src_port(self, port):
""" Set source IP address
:param port: Port
"""
self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_' \
'udp_ports.udp_src_port' % (self._device, self._core)] = port
[docs]
def get_src_port(self):
""" Get source IP address
:param port: Port
"""
return self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_' \
'udp_ports.udp_src_port' % (self._device, self._core)]
[docs]
def set_dst_port(self, port):
""" Set source IP address
:param port: Port
"""
self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_' \
'udp_ports.udp_dst_port' % (self._device, self._core)] = port
[docs]
def get_dst_port(self):
""" Set source IP address
:param port: Port
"""
return self.board['%s.udp_core.udp_core_inst_%d_udp_core_control_udp_core_control_' \
'udp_ports.udp_dst_port' % (self._device, self._core)]
[docs]
def mii_test_init(self, pkt_len, pkt_num, rx_core):
self.board['%s.mii_test_ic.core%d_byte_swap' % (self._device, self._core)] = 0x1
self.board['%s.mii_test_ic.core%d_mode' % (self._device, self._core)] = 0x0
self.board['%s.mii_test_ic.core%d_eth_type' % (self._device, self._core)] = 0x88888888
self.board['%s.mii_test_ic.core%d_pkt_len' % (self._device, self._core)] = pkt_len // 8
self.board['%s.mii_test_ic.core%d_tx_pkt_num' % (self._device, self._core)] = pkt_num
self.board['%s.mii_test_ic.core%d_rx_pkt_num' % (self._device, rx_core)] = 0
[docs]
def mii_test_mac_config(self, board):
if self._device == "fpga1":
adder = 0
else:
adder = 4
self.board['%s.mii_test_ic.core%d_src_mac_lo' % (self._device, self._core)] = self._core + adder
self.board['%s.mii_test_ic.core%d_src_mac_hi' % (self._device, self._core)] = board
self.board['%s.mii_test_ic.core%d_dst_mac_lo' % (self._device, self._core)] = ((self._core + 1) % 4) + adder
self.board['%s.mii_test_ic.core%d_dst_mac_hi' % (self._device, self._core)] = board
[docs]
def mii_test(self, pkt_num, pkt_len=8192, show_result=True, wait_result=True):
self.board['%s.mii_test_ic.core%d_tx_pkt_num' % (self._device, self._core)] = 0 # stop current transmission
self.mii_send(pkt_num, pkt_len, wait_result)
if show_result:
self.mii_test_result()
[docs]
def mii_send(self, pkt_num, pkt_len=8192, wait_result=True):
rx_core = (self._core + 1) % 4
self.board['%s.regfile.eth10g_ctrl' % self._device] = 0xF
self.mii_test_init(pkt_len, pkt_num, rx_core)
self.board['%s.mii_test_ic.core%d_tx_start' % (self._device, self._core)] = 0x1
if wait_result:
self.mii_wait_idle()
self.board['%s.regfile.eth10g_ctrl' % self._device] = 0x0
[docs]
def mii_wait_idle(self):
rx_core = (self._core + 1) % 4
wait = 1
while wait == 1:
time.sleep(0.3)
rd = self.board['%s.mii_test_ic.core%d_tx_start' % (self._device, self._core)]
if rd == 1:
logging.debug("Waiting for transmission...")
rd = self.board['%s.mii_test_ic.core%d_rx_pkt_num' % (self._device, rx_core)]
logging.debug("Received Packets: " + str(rd))
for f in ["fpga1","fpga2"]:
for c in range(4):
rd = self.board['%s.mii_test_ic.core%d_rx_error' % (f, c)]
logging.debug('%s core%d: %s error register' % (f, c, hex(rd)))
else:
wait = 0
self.board['%s.regfile.eth10g_ctrl' % self._device] = 0x0
[docs]
def mii_test_result(self):
rx_core = (self._core + 1) % 4
rd1 = self.board['%s.mii_test_ic.core%d_rx_pkt_num' % (self._device, rx_core)]
rd2 = self.board['%s.mii_test_ic.core%d_rx_error' % (self._device, rx_core)]
logging.debug('%s core%d: %s error register - %d received packets' % (self._device, rx_core, hex(rd2), rd1))
##################### Superclass method implementations #################################
[docs]
def initialise(self):
""" Initialise TpmTenGCore """
logging.info("TpmTenGCore has been initialised")
return True
[docs]
def status_check(self):
""" Perform status check
:return: Status
"""
logging.info("TpmTenGCore : Checking status")
return Status.OK
[docs]
def clean_up(self):
""" Perform cleanup
:return: Success
"""
logging.info("TpmTenGCore : Cleaning up")
return True