Source code for pyfabil.plugins.tpm_1_6.qsfp_adapter

from __future__ import division
from builtins import str
__author__ = 'bubs'

from pyfabil.plugins.firmwareblock import FirmwareBlock
import logging
import time

from pyfabil.base.definitions import *

MAX_RETRY = 6000

IOEXP_map = {
    "LED0n":   {'ch': 1, 'output': True},
    "LED1n":   {'ch': 2, 'output': True},
    "LED2n":   {'ch': 3, 'output': True},
    "LED3n":   {'ch': 0, 'output': True},
    "ModPrsL": {'ch': 4, 'output': False},
    "IntL":    {'ch': 5, 'output': False},
    "ResetL":  {'ch': 6, 'output': True},
    "LPMode":  {'ch': 7, 'output': True},
    }

i2c_status_busy = 0x1
i2c_status_ack_error = 0x2

itpm_cpld_i2c_mux = [1, 2]

PHY_ioexp = 0x40  # PCF8574TS
PHY_QSFP = 0xa0


[docs] class Tpm_1_6_QSFPAdapter(FirmwareBlock): """ Tpm_1_6_QSFPAdapter plugin """ @compatibleboards(BoardMake.Tpm16Board) @friendlyname('tpm_qsfp_adapter') @maxinstances(2) def __init__(self, board, **kwargs): """ Tpm_1_6_QSFPAdapter initialiser :param board: Pointer to board instance :param core_id: Index of plugin instance to select QSFP_ADAPTER 0 or 1 """ super(Tpm_1_6_QSFPAdapter, self).__init__(board) self.i2c_shadow = 0x90000270 self.i2c_req_reg = 0x50 self.i2c_ack_reg = 0x54 self.mcu_fwver_reg = self.board["board.bram_cpu"] self.ticke_num = None if 'core_id' not in list(kwargs.keys()): raise PluginError("Tpm_1_6_QSFPAdapter: Require a core_id parameter") self._core_id = kwargs['core_id'] if self._core_id < 0 or self._core_id > len(itpm_cpld_i2c_mux) - 1: raise PluginError( "Tpm_1_6_QSFPAdapter: core_id out of range, must be smaller than %d" % len(itpm_cpld_i2c_mux) ) self.i2c_mux = itpm_cpld_i2c_mux[self._core_id]
[docs] def set_passwd(self): if self.board.i2c_old_mode: rd = self.board["board.i2c.mac_hi"] self.board["board.i2c.password"] = rd rd = self.board["board.i2c.mac_lo"] self.board["board.i2c.password_lo"] = rd rd = self.board["board.i2c.password"] if rd & 0x10000 == 0: raise LibraryError("I2C/EEP password not accepted!") else: rd = self.board["board.i2c.mac_hi"] self.board[self.i2c_shadow+0x3C] = rd rd = self.board["board.i2c.mac_lo"] self.board[self.i2c_shadow+0x38] = rd
[docs] def check_passwd(self): rd = self.board[self.i2c_shadow+0x3C] if (rd & 0x10000) != 0x10000: logging.error("I2C/EEP password not accepted, rd %x!" % rd) raise LibraryError("I2C/EEP password not accepted!")
[docs] def remove_passwd(self): if self.board.i2c_old_mode: self.board["board.i2c.password"] = 0 self.board["board.i2c.password_lo"] = 0 else: self.board[self.i2c_shadow+0x3C] = 0 self.board[self.i2c_shadow+0x38] = 0
[docs] def poll_reg(self, reg, exit_condition, exception_val): retry = 0 while True: if (self.board[reg]) == exit_condition: break else: time.sleep(0.001) retry = retry + 1 if self.ticket_num is not None: self.board["board.lock.lock_i2c"] = self.ticket_num if retry >= MAX_RETRY: logging.error("EEP Polling Reg timeout: ", exception_val) raise LibraryError(exception_val)
[docs] def i2c_rd8(self, phy, nof_rd_byte=1, keep_locked=False): self.__i2c_rd8(phy)
def __i2c_rd8(self, phy, nof_rd_byte=1, keep_locked=False, dbg_flag=False): add = phy >> 1 nof_wr_byte = 0 offset = 0 cmd = (self.i2c_mux << 16) + (nof_rd_byte << 12) + (nof_wr_byte << 8) + add if self.board.i2c_old_mode: self.board['board.i2c.transmit'] = offset & 0xFF self.board['board.i2c.command'] = cmd status = i2c_status_busy retry = 10 while status == i2c_status_busy: status = self.board['board.i2c.status'] time.sleep(0.1) retry -= 1 pass if retry == 0: logging.error("tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy)) raise PluginError("tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy)) return None if status == i2c_status_ack_error: logging.error("tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy)) raise PluginError("tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy)) return None if retry == 0: logging.error("tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy)) raise PluginError("tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy)) return None if status == i2c_status_ack_error: logging.error("tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy)) raise PluginError("tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy)) return None return self.board['board.i2c.receive'] else: logging.debug("I2C Read: Locking I2C interface to avoid MCU access conflict ") locked = False retry = 150 self.ticket_num = self.board["board.lock.queue_number"] while retry > 0: self.board["board.lock.lock_i2c"] = self.ticket_num if self.board["board.lock.lock_i2c"] != self.ticket_num: retry = retry-1 time.sleep(0.007) else: locked = True break if locked is False: logging.error("ERROR: Can't Lock I2C interface") raise LibraryError("ERROR: Can't Lock I2C interface") # else: # logging.info("I2C interface Locked") if self.board[self.i2c_shadow + self.i2c_req_reg] == 1: logging.warning("Detected Incomplete Previous I2C operation") self.board[self.i2c_shadow + self.i2c_req_reg] = 0 self.poll_reg(self.i2c_shadow+self.i2c_ack_reg, 0, "I2C/EEP not ready!") self.set_passwd() self.board[self.i2c_shadow + 0x4] = offset & 0xFF self.board[self.i2c_shadow + 0x0] = cmd # send request self.board[self.i2c_shadow+self.i2c_req_reg] = 1 # wait req acknowledge logging.debug("i2c read wait req acknowledge") # debug flag used to stop SW for debug use if dbg_flag is True: logging.error("ERROR: DEBUG I2C interface") raise LibraryError("ERROR: DEBUG I2C interface") self.poll_reg(self.i2c_shadow+self.i2c_ack_reg, 1, "I2C/EEP request not accepted!") # check passwd logging.debug("i2c read check passwd") self.check_passwd() # check i2c op status: 0 ok, 1 busy, 2 NACK logging.debug("i2c read check status") self.poll_reg(self.i2c_shadow + 0xC, 0, "I2C/EEP busy or not acknoledge!") read_data = self.board[self.i2c_shadow + 0x8] # self.remove_passwd() self.board[self.i2c_shadow+self.i2c_req_reg] = 0 logging.debug("i2c read check end operation") self.poll_reg(self.i2c_shadow + self.i2c_ack_reg, 0, "I2C/EEP operation complete not detected!") # Unlock I2C interface if not keep_locked: self.ticket_num = None self.board["board.lock.lock_i2c"] = 0 time.sleep(0.020) else: self.board["board.lock.lock_i2c"] = self.ticket_num return read_data def __i2c_wr8(self, phy, value, offset=0, already_locked=False): add = phy >> 1 nof_rd_byte = 0 nof_wr_byte = 1 cmd = (self.i2c_mux << 16) + (nof_rd_byte << 12) + (nof_wr_byte << 8) + add if self.board.i2c_old_mode: self.board['board.i2c.transmit'] = (value & 0xFF) self.board['board.i2c.command'] = cmd status = i2c_status_busy retry = 10 while status == i2c_status_busy and retry > 0: status = self.board['board.i2c.status'] time.sleep(0.1) retry -= 1 pass if retry == 0: logging.error("tpm_qsfp_adapter.__i2c_wr8: i2c timeout error (PHY %s)" % hex(phy)) raise PluginError("tpm_qsfp_adapter.__i2c_wr8: i2c timeout error (PHY %s)" % hex(phy)) return None if status == i2c_status_ack_error: logging.error("tpm_qsfp_adapter.__i2c_wr8: i2c ack error (PHY %s)" % hex(phy)) raise PluginError("tpm_qsfp_adapter.__i2c_wr8: i2c ack error (PHY %s)" % hex(phy)) return None return True else: # Lock I2C interface to avoid MCU access conflict if not already_locked: logging.debug("I2C Write: Locking I2C interface to avoid MCU access conflict ") locked = False retry = 150 ticket_num = self.board["board.lock.queue_number"] while retry > 0: self.board["board.lock.lock_i2c"] = ticket_num if self.board["board.lock.lock_i2c"] != ticket_num: retry = retry-1 time.sleep(0.007) else: locked = True break if locked is False: logging.error("ERROR: Can't Lock I2C interface") raise LibraryError("ERROR: Can't Lock I2C interface") # else:33 # logging.info("I2C interface Locked") else: if self.ticket_num is not None: self.board["board.lock.lock_i2c"] = self.ticket_num old_ticket = self.board["board.lock.lock_i2c"] if old_ticket != self.ticket_num: logging.error("ERROR: Can't renew Lock I2C interface - ticket_num " "is None, exp %x read %x" % (self.ticket_num, old_ticket)) raise LibraryError("ERROR: Can't renew Lock I2C interface") else: logging.error("ERROR: Can't renew Lock I2C interface - ticket_num is None") raise LibraryError("ERROR: Can't renew Lock I2C interface") if self.board[self.i2c_shadow + self.i2c_req_reg] == 1: logging.warning("Detected Incomplete Previous I2C operation") self.board[self.i2c_shadow + self.i2c_req_reg] = 0 self.poll_reg(self.i2c_shadow+self.i2c_ack_reg, 0, "I2C/EEP not ready!") self.set_passwd() self.board[self.i2c_shadow + 0x4] = (value & 0xFF) self.board[self.i2c_shadow + 0x0] = cmd # send request self.board[self.i2c_shadow + self.i2c_req_reg] = 1 # wait req acknowledge logging.debug("wait req acknowledge") self.poll_reg(self.i2c_shadow + self.i2c_ack_reg, 1, "I2C/EEP request not accepted!") # check passwd logging.debug("check passwd") self.check_passwd() # check i2c op status: 0 ok, 1 busy, 2 NACK logging.debug("check status") self.poll_reg(self.i2c_shadow + 0xC, 0, "I2C/EEP busy or not acknoledge!") # read_data = self.board[self.i2c_shadow + 0x8] # self.remove_passwd() self.board[self.i2c_shadow + self.i2c_req_reg] = 0 logging.debug("check end operation") self.poll_reg(self.i2c_shadow + self.i2c_ack_reg, 0, "I2C/EEP operation complete not detected!") # Unlock I2C interface self.board["board.lock.lock_i2c"] = 0 self.ticket_num = None time.sleep(0.020) return True
[docs] def status(self, line=None): # if line is None: # logging.info("= IOEXP_map " + str(self._core_id) +" =") value = self.__i2c_rd8(PHY_ioexp) if value is None: return None # logging.info(bin(value)) ret_value = {} for key in IOEXP_map: i = IOEXP_map[key]['ch'] # for i in [1,2,3,0,4,5,6,7]: if value & 2**i > 0: ret_value[key] = 1 # if line is None: # logging.info(key + "\t1") # el if line == key: return True else: ret_value[key] = 0 # if line is None: # logging.info(key + "\t0") # el if line == key: return False return ret_value
[docs] def set(self, line, value): if IOEXP_map[line]['output']: actual_value = self.__i2c_rd8(PHY_ioexp, keep_locked=True) if actual_value is None: return None i = IOEXP_map[line]['ch'] if value > 0: actual_value = actual_value | 2**i else: actual_value = actual_value & ~(2**i) # logging.info("IOEXP_map " + str(self._core_id) +" "+line+" "+hex(value)) # input ports must be set to 1 even if read at 0 for key in IOEXP_map: if not IOEXP_map[key]['output']: i = IOEXP_map[key]['ch'] actual_value = actual_value | 2**i return self.__i2c_wr8(PHY_ioexp, actual_value, already_locked=True) else: logging.error("tpm_qsfp_adapter.set: line %s is NOT output" % line) raise PluginError("tpm_qsfp_adapter.set: line %s is NOT output" % line)
[docs] def get(self, line, dbg_flag=False): actual_value = self.__i2c_rd8(PHY_ioexp, dbg_flag=dbg_flag) i = IOEXP_map[line]['ch'] value = (actual_value & 2**i) >> i # logging.info("IOEXP_map " + str(self._core_id) +" "+line+" "+hex(value)) return value
[docs] def qsfp_read(self, offset=0, len=16): if not self.status("ModPrsL"): arr = bytearray() string = "" self.__i2c_wr8(PHY_QSFP, offset) for i in range(len): arr.append(self.__i2c_rd8(PHY_QSFP)) string += '{0:02x}'.format(arr[i]) # logging.info("QSFP: " + string) return arr else: logging.info("QSFP %d: NA cannot dump read from QSFP") return None
[docs] def get_line_mapping(self): return IOEXP_map
[docs] def irq_status(self): irq = self.board['board.regfile.eth10ge.psnt'] & 2**self._core_id if irq > 0: return False else: return True
[docs] def irq_clear(self): self.__i2c_rd8(PHY_ioexp)