from builtins import hex
import socket
__author__ = 'chiello'
from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging
import time
[docs]
class TpmFpga2Fpga(FirmwareBlock):
""" TpmF2F plugin """
@compatibleboards(BoardMake.TpmBoard)
@friendlyname('tpm_f2f')
@maxinstances(4)
def __init__(self, board, **kwargs):
""" TpmF2F initialiser
:param board: Pointer to board instance
"""
super(TpmFpga2Fpga, self).__init__(board)
#if 'device' not in kwargs.keys():
# raise PluginError("TpmF2F: Require a node instance")
#self._device = kwargs['device']
if 'core' not in list(kwargs.keys()):
raise PluginError("TpmF2F: core_id required")
self._core = kwargs['core']
self._devices = ['fpga1', 'fpga2']
self._allowed_directions = ["fpga1->fpga2", "fpga2->fpga1"]
# default directions based on tile.py
if self._core == 0:
self._direction = "fpga2->fpga1"
elif self._core == 1:
self._direction = "fpga1->fpga2"
else:
self._direction = None
#######################################################################################
[docs]
def initialise_core(self, direction=None):
""" Initialise TpmF2F core """
if direction is not None:
if direction not in self._allowed_directions:
raise PluginError('TpmF2F: direction required, fpga1->fpga2 or fpga2->fpga1')
self._direction = direction
if self._direction == "fpga1->fpga2":
self._tx_device = 'fpga1'
self._rx_device = 'fpga2'
else:
self._tx_device = 'fpga2'
self._rx_device = 'fpga1'
logging.info(f'Initialising F2F core {self._core} direction {self._direction}: ')
if self.board['board.regfile.ctrl.en_ddr_vdd'] == 0x0:
self.board['board.regfile.ctrl.en_ddr_vdd'] = 0x1 # power IO bank (same as DDR)
time.sleep(1) # Wait for power supply
loop_retries = 10
for n in list(range(loop_retries)):
# Reset core
# Set rst and direction_is_in register fields
self.board[f'{self._tx_device}.f2f_{self._core}.ctrl'] = 0x1
self.board[f'{self._rx_device}.f2f_{self._core}.ctrl'] = 0x3
abort = 0
max_retries = 100
retries = 0
while self.board[f'{self._tx_device}.f2f_{self._core}.status'] & 0x7 != 0x0:
time.sleep(0.01)
retries += 1
if retries == max_retries:
abort = 1
break
retries = 0
if abort == 0:
while self.board[f'{self._rx_device}.f2f_{self._core}.status'] & 0x7 != 0x0:
time.sleep(0.01)
retries += 1
if retries == max_retries:
abort = 1
break
retries = 0
if abort == 0:
self.board[f'{self._tx_device}.f2f_{self._core}.ctrl'] = 0x0
while self.board[f'{self._tx_device}.f2f_{self._core}.status'] & 0x7 != 0x7:
time.sleep(0.01)
retries += 1
if retries == max_retries:
abort = 1
break
retries = 0
if abort == 0:
self.board[f'{self._rx_device}.f2f_{self._core}.ctrl'] = 0x2
while self.board[f'{self._rx_device}.f2f_{self._core}.status'] & 0xFFF != 0x187:
time.sleep(0.01)
retries += 1
if retries == max_retries:
abort = 1
break
if abort == 0:
break
else:
if n == loop_retries - 1:
logging.info(f"F2F core {self._core} direction {self._direction}: Not possible to configure. {hex(self.board[f'{self._rx_device}.f2f_{self._core}.status'])}")
return
logging.info(f"F2F core {self._core} direction {self._direction}: timed out! Retry...{n} {hex(self.board[f'{self._rx_device}.f2f_{self._core}.status'])}")
logging.info(f"F2F core {self._core} direction {self._direction}: status is {hex(self.board[f'{self._rx_device}.f2f_{self._core}.status'])}")
[docs]
def run_test(self, duration):
logging.debug("Running F2F core %d BIST " % self._core)
self.board['%s.f2f_%d.ctrl.test_enable' % (self._rx_device, self._core)] = 0x0 # disable BIST
self.board['%s.f2f_%d.test_error' % (self._rx_device, self._core)] = 0x0 # clear BIST result
self.board['%s.f2f_%d.ctrl.test_enable' % (self._tx_device, self._core)] = 0x1 # enable BIST TX
time.sleep(0.2)
self.board['%s.f2f_%d.ctrl.test_enable' % (self._rx_device, self._core)] = 0x1 # enable BIST RX
time.sleep(duration)
result = self.board['%s.f2f_%d.test_error' % (self._rx_device, self._core)]
logging.info("F2F core %d BIST result (0x0 means test OK): " % self._core)
logging.info(result)
self.board['%s.f2f_%d.ctrl.test_enable' % (self._rx_device, self._core)] = 0x0 # disable BIST
self.board['%s.f2f_%d.ctrl.test_enable' % (self._tx_device, self._core)] = 0x0 # disable BIST
return result
[docs]
def get_test_result(self):
return self.board['%s.f2f_%d.test_error' % (self._rx_device, self._core)]
[docs]
def check_pll_lock_status(self, show_result=True):
status = []
for device in self._devices:
lock = self.board[f'{device}.f2f_{self._core}.status.pll_locked'] > 0
count = self.board[f'{device}.f2f_{self._core}.status.pll_lock_loss_cnt']
status.append((lock, count))
if show_result:
logging.info(f'{device.upper()} Core{self._core} pll lock loss count {count}')
return status
[docs]
def clear_pll_lock_loss_counter(self):
for device in self._devices:
self.board[f'{device}.f2f_{self._core}.status.pll_lock_loss_cnt_reset'] = 1
return
############################## compatibility with tpm 1.6 F2F plugin ###############################
[docs]
def check_channel_up(self):
ret = 0
if self.board['%s.f2f_%d.status' % (self._tx_device, self._core)] & 0x7 == 0x7 and \
self.board['%s.f2f_%d.status' % (self._rx_device, self._core)] & 0xFFF == 0x187:
ret = 1
return ret
[docs]
def start_tx_test(self):
logging.debug("Starting F2F core %d BIST " % self._core)
self.board['%s.f2f_%d.ctrl.test_enable' % (self._rx_device, self._core)] = 0x0 # disable BIST
self.board['%s.f2f_%d.test_error' % (self._rx_device, self._core)] = 0x0 # clear BIST result
self.board['%s.f2f_%d.ctrl.test_enable' % (self._tx_device, self._core)] = 0x1 # enable BIST TX
time.sleep(0.2)
[docs]
def start_rx_test(self):
self.board['%s.f2f_%d.ctrl.test_enable' % (self._rx_device, self._core)] = 0x1 # enable BIST RX
[docs]
def stop_test(self):
self.board['%s.f2f_%d.ctrl.test_enable' % (self._rx_device, self._core)] = 0x0 # disable BIST
self.board['%s.f2f_%d.ctrl.test_enable' % (self._tx_device, self._core)] = 0x0 # disable BIST
[docs]
def set_test_pattern(self, enable, pattern):
self.board['%s.f2f_%d.test_pattern' % (self._tx_device, self._core)] = pattern
self.board['%s.f2f_%d.test_pattern' % (self._rx_device, self._core)] = pattern
self.board['%s.f2f_%d.test_pattern_enable' % (self._tx_device, self._core)] = enable
self.board['%s.f2f_%d.test_pattern_enable' % (self._rx_device, self._core)] = enable
[docs]
def assert_reset(self):
""" Assert reset F2F core """
self.initialise_core()
[docs]
def deassert_reset(self):
return
##################### Superclass method implementations #################################
[docs]
def initialise(self):
""" Initialise TpmF2F """
logging.info("TpmF2F has been initialised")
return True
[docs]
def status_check(self):
""" Perform status check
:return: Status
"""
logging.info("TpmF2F : Checking status")
return Status.OK
[docs]
def clean_up(self):
""" Perform cleanup
:return: Success
"""
logging.info("TpmF2F : Cleaning up")
return True