Source code for pyfabil.plugins.tpm.f2f

from builtins import hex
import socket

__author__ = 'chiello'

from pyfabil.plugins.firmwareblock import FirmwareBlock
from pyfabil.base.definitions import *
from pyfabil.base.utils import *
import logging
import time


[docs] class TpmFpga2Fpga(FirmwareBlock): """ TpmF2F plugin """ @compatibleboards(BoardMake.TpmBoard) @friendlyname('tpm_f2f') @maxinstances(4) def __init__(self, board, **kwargs): """ TpmF2F initialiser :param board: Pointer to board instance """ super(TpmFpga2Fpga, self).__init__(board) #if 'device' not in kwargs.keys(): # raise PluginError("TpmF2F: Require a node instance") #self._device = kwargs['device'] if 'core' not in list(kwargs.keys()): raise PluginError("TpmF2F: core_id required") self._core = kwargs['core'] self._devices = ['fpga1', 'fpga2'] self._allowed_directions = ["fpga1->fpga2", "fpga2->fpga1"] # default directions based on tile.py if self._core == 0: self._direction = "fpga2->fpga1" elif self._core == 1: self._direction = "fpga1->fpga2" else: self._direction = None #######################################################################################
[docs] def initialise_core(self, direction=None): """ Initialise TpmF2F core """ if direction is not None: if direction not in self._allowed_directions: raise PluginError('TpmF2F: direction required, fpga1->fpga2 or fpga2->fpga1') self._direction = direction if self._direction == "fpga1->fpga2": self._tx_device = 'fpga1' self._rx_device = 'fpga2' else: self._tx_device = 'fpga2' self._rx_device = 'fpga1' logging.info(f'Initialising F2F core {self._core} direction {self._direction}: ') if self.board['board.regfile.ctrl.en_ddr_vdd'] == 0x0: self.board['board.regfile.ctrl.en_ddr_vdd'] = 0x1 # power IO bank (same as DDR) time.sleep(1) # Wait for power supply loop_retries = 10 for n in list(range(loop_retries)): # Reset core # Set rst and direction_is_in register fields self.board[f'{self._tx_device}.f2f_{self._core}.ctrl'] = 0x1 self.board[f'{self._rx_device}.f2f_{self._core}.ctrl'] = 0x3 abort = 0 max_retries = 100 retries = 0 while self.board[f'{self._tx_device}.f2f_{self._core}.status'] & 0x7 != 0x0: time.sleep(0.01) retries += 1 if retries == max_retries: abort = 1 break retries = 0 if abort == 0: while self.board[f'{self._rx_device}.f2f_{self._core}.status'] & 0x7 != 0x0: time.sleep(0.01) retries += 1 if retries == max_retries: abort = 1 break retries = 0 if abort == 0: self.board[f'{self._tx_device}.f2f_{self._core}.ctrl'] = 0x0 while self.board[f'{self._tx_device}.f2f_{self._core}.status'] & 0x7 != 0x7: time.sleep(0.01) retries += 1 if retries == max_retries: abort = 1 break retries = 0 if abort == 0: self.board[f'{self._rx_device}.f2f_{self._core}.ctrl'] = 0x2 while self.board[f'{self._rx_device}.f2f_{self._core}.status'] & 0xFFF != 0x187: time.sleep(0.01) retries += 1 if retries == max_retries: abort = 1 break if abort == 0: break else: if n == loop_retries - 1: logging.info(f"F2F core {self._core} direction {self._direction}: Not possible to configure. {hex(self.board[f'{self._rx_device}.f2f_{self._core}.status'])}") return logging.info(f"F2F core {self._core} direction {self._direction}: timed out! Retry...{n} {hex(self.board[f'{self._rx_device}.f2f_{self._core}.status'])}") logging.info(f"F2F core {self._core} direction {self._direction}: status is {hex(self.board[f'{self._rx_device}.f2f_{self._core}.status'])}")
[docs] def run_test(self, duration): logging.debug("Running F2F core %d BIST " % self._core) self.board['%s.f2f_%d.ctrl.test_enable' % (self._rx_device, self._core)] = 0x0 # disable BIST self.board['%s.f2f_%d.test_error' % (self._rx_device, self._core)] = 0x0 # clear BIST result self.board['%s.f2f_%d.ctrl.test_enable' % (self._tx_device, self._core)] = 0x1 # enable BIST TX time.sleep(0.2) self.board['%s.f2f_%d.ctrl.test_enable' % (self._rx_device, self._core)] = 0x1 # enable BIST RX time.sleep(duration) result = self.board['%s.f2f_%d.test_error' % (self._rx_device, self._core)] logging.info("F2F core %d BIST result (0x0 means test OK): " % self._core) logging.info(result) self.board['%s.f2f_%d.ctrl.test_enable' % (self._rx_device, self._core)] = 0x0 # disable BIST self.board['%s.f2f_%d.ctrl.test_enable' % (self._tx_device, self._core)] = 0x0 # disable BIST return result
[docs] def get_test_result(self): return self.board['%s.f2f_%d.test_error' % (self._rx_device, self._core)]
[docs] def check_pll_lock_status(self, show_result=True): status = [] for device in self._devices: lock = self.board[f'{device}.f2f_{self._core}.status.pll_locked'] > 0 count = self.board[f'{device}.f2f_{self._core}.status.pll_lock_loss_cnt'] status.append((lock, count)) if show_result: logging.info(f'{device.upper()} Core{self._core} pll lock loss count {count}') return status
[docs] def clear_pll_lock_loss_counter(self): for device in self._devices: self.board[f'{device}.f2f_{self._core}.status.pll_lock_loss_cnt_reset'] = 1 return
############################## compatibility with tpm 1.6 F2F plugin ###############################
[docs] def check_channel_up(self): ret = 0 if self.board['%s.f2f_%d.status' % (self._tx_device, self._core)] & 0x7 == 0x7 and \ self.board['%s.f2f_%d.status' % (self._rx_device, self._core)] & 0xFFF == 0x187: ret = 1 return ret
[docs] def start_tx_test(self): logging.debug("Starting F2F core %d BIST " % self._core) self.board['%s.f2f_%d.ctrl.test_enable' % (self._rx_device, self._core)] = 0x0 # disable BIST self.board['%s.f2f_%d.test_error' % (self._rx_device, self._core)] = 0x0 # clear BIST result self.board['%s.f2f_%d.ctrl.test_enable' % (self._tx_device, self._core)] = 0x1 # enable BIST TX time.sleep(0.2)
[docs] def start_rx_test(self): self.board['%s.f2f_%d.ctrl.test_enable' % (self._rx_device, self._core)] = 0x1 # enable BIST RX
[docs] def stop_test(self): self.board['%s.f2f_%d.ctrl.test_enable' % (self._rx_device, self._core)] = 0x0 # disable BIST self.board['%s.f2f_%d.ctrl.test_enable' % (self._tx_device, self._core)] = 0x0 # disable BIST
[docs] def set_test_pattern(self, enable, pattern): self.board['%s.f2f_%d.test_pattern' % (self._tx_device, self._core)] = pattern self.board['%s.f2f_%d.test_pattern' % (self._rx_device, self._core)] = pattern self.board['%s.f2f_%d.test_pattern_enable' % (self._tx_device, self._core)] = enable self.board['%s.f2f_%d.test_pattern_enable' % (self._rx_device, self._core)] = enable
[docs] def assert_reset(self): """ Assert reset F2F core """ self.initialise_core()
[docs] def deassert_reset(self): return
##################### Superclass method implementations #################################
[docs] def initialise(self): """ Initialise TpmF2F """ logging.info("TpmF2F has been initialised") return True
[docs] def status_check(self): """ Perform status check :return: Status """ logging.info("TpmF2F : Checking status") return Status.OK
[docs] def clean_up(self): """ Perform cleanup :return: Success """ logging.info("TpmF2F : Cleaning up") return True