from __future__ import division
from builtins import str
__author__ = 'bubs'
from pyfabil.plugins.firmwareblock import FirmwareBlock
import logging
import time
from pyfabil.base.definitions import *
MAX_RETRY = 6000
IOEXP_map = {
"LED0n": {'ch': 1, 'output': True},
"LED1n": {'ch': 2, 'output': True},
"LED2n": {'ch': 3, 'output': True},
"LED3n": {'ch': 0, 'output': True},
"ModPrsL": {'ch': 4, 'output': False},
"IntL": {'ch': 5, 'output': False},
"ResetL": {'ch': 6, 'output': True},
"LPMode": {'ch': 7, 'output': True},
}
i2c_status_busy = 0x1
i2c_status_ack_error = 0x2
itpm_cpld_i2c_mux = [1, 2]
PHY_ioexp = 0x40 # PCF8574TS
PHY_QSFP = 0xa0
[docs]
class Tpm_1_6_QSFPAdapter(FirmwareBlock):
""" Tpm_1_6_QSFPAdapter plugin """
@compatibleboards(BoardMake.Tpm16Board)
@friendlyname('tpm_qsfp_adapter')
@maxinstances(2)
def __init__(self, board, **kwargs):
""" Tpm_1_6_QSFPAdapter initialiser
:param board: Pointer to board instance
:param core_id: Index of plugin instance to select QSFP_ADAPTER 0 or 1
"""
super(Tpm_1_6_QSFPAdapter, self).__init__(board)
self.i2c_shadow = 0x90000270
self.i2c_req_reg = 0x50
self.i2c_ack_reg = 0x54
self.mcu_fwver_reg = self.board["board.bram_cpu"]
self.ticke_num = None
if 'core_id' not in list(kwargs.keys()):
raise PluginError("Tpm_1_6_QSFPAdapter: Require a core_id parameter")
self._core_id = kwargs['core_id']
if self._core_id < 0 or self._core_id > len(itpm_cpld_i2c_mux) - 1:
raise PluginError(
"Tpm_1_6_QSFPAdapter: core_id out of range, must be smaller than %d" % len(itpm_cpld_i2c_mux)
)
self.i2c_mux = itpm_cpld_i2c_mux[self._core_id]
[docs]
def set_passwd(self):
if self.board.i2c_old_mode:
rd = self.board["board.i2c.mac_hi"]
self.board["board.i2c.password"] = rd
rd = self.board["board.i2c.mac_lo"]
self.board["board.i2c.password_lo"] = rd
rd = self.board["board.i2c.password"]
if rd & 0x10000 == 0:
raise LibraryError("I2C/EEP password not accepted!")
else:
rd = self.board["board.i2c.mac_hi"]
self.board[self.i2c_shadow+0x3C] = rd
rd = self.board["board.i2c.mac_lo"]
self.board[self.i2c_shadow+0x38] = rd
[docs]
def check_passwd(self):
rd = self.board[self.i2c_shadow+0x3C]
if (rd & 0x10000) != 0x10000:
logging.error("I2C/EEP password not accepted, rd %x!" % rd)
raise LibraryError("I2C/EEP password not accepted!")
[docs]
def remove_passwd(self):
if self.board.i2c_old_mode:
self.board["board.i2c.password"] = 0
self.board["board.i2c.password_lo"] = 0
else:
self.board[self.i2c_shadow+0x3C] = 0
self.board[self.i2c_shadow+0x38] = 0
[docs]
def poll_reg(self, reg, exit_condition, exception_val):
retry = 0
while True:
if (self.board[reg]) == exit_condition:
break
else:
time.sleep(0.001)
retry = retry + 1
if self.ticket_num is not None:
self.board["board.lock.lock_i2c"] = self.ticket_num
if retry >= MAX_RETRY:
logging.error("EEP Polling Reg timeout: ", exception_val)
raise LibraryError(exception_val)
[docs]
def i2c_rd8(self, phy, nof_rd_byte=1, keep_locked=False):
self.__i2c_rd8(phy)
def __i2c_rd8(self, phy, nof_rd_byte=1, keep_locked=False, dbg_flag=False):
add = phy >> 1
nof_wr_byte = 0
offset = 0
cmd = (self.i2c_mux << 16) + (nof_rd_byte << 12) + (nof_wr_byte << 8) + add
if self.board.i2c_old_mode:
self.board['board.i2c.transmit'] = offset & 0xFF
self.board['board.i2c.command'] = cmd
status = i2c_status_busy
retry = 10
while status == i2c_status_busy:
status = self.board['board.i2c.status']
time.sleep(0.1)
retry -= 1
pass
if retry == 0:
logging.error("tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy))
raise PluginError("tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy))
return None
if status == i2c_status_ack_error:
logging.error("tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy))
raise PluginError("tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy))
return None
if retry == 0:
logging.error("tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy))
raise PluginError("tpm_qsfp_adapter.__i2c_rd8: i2c timeout error (PHY %s)" % hex(phy))
return None
if status == i2c_status_ack_error:
logging.error("tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy))
raise PluginError("tpm_qsfp_adapter.__i2c_rd8: i2c ack error (PHY %s)" % hex(phy))
return None
return self.board['board.i2c.receive']
else:
logging.debug("I2C Read: Locking I2C interface to avoid MCU access conflict ")
locked = False
retry = 150
self.ticket_num = self.board["board.lock.queue_number"]
while retry > 0:
self.board["board.lock.lock_i2c"] = self.ticket_num
if self.board["board.lock.lock_i2c"] != self.ticket_num:
retry = retry-1
time.sleep(0.007)
else:
locked = True
break
if locked is False:
logging.error("ERROR: Can't Lock I2C interface")
raise LibraryError("ERROR: Can't Lock I2C interface")
# else:
# logging.info("I2C interface Locked")
if self.board[self.i2c_shadow + self.i2c_req_reg] == 1:
logging.warning("Detected Incomplete Previous I2C operation")
self.board[self.i2c_shadow + self.i2c_req_reg] = 0
self.poll_reg(self.i2c_shadow+self.i2c_ack_reg, 0, "I2C/EEP not ready!")
self.set_passwd()
self.board[self.i2c_shadow + 0x4] = offset & 0xFF
self.board[self.i2c_shadow + 0x0] = cmd
# send request
self.board[self.i2c_shadow+self.i2c_req_reg] = 1
# wait req acknowledge
logging.debug("i2c read wait req acknowledge")
# debug flag used to stop SW for debug use
if dbg_flag is True:
logging.error("ERROR: DEBUG I2C interface")
raise LibraryError("ERROR: DEBUG I2C interface")
self.poll_reg(self.i2c_shadow+self.i2c_ack_reg, 1, "I2C/EEP request not accepted!")
# check passwd
logging.debug("i2c read check passwd")
self.check_passwd()
# check i2c op status: 0 ok, 1 busy, 2 NACK
logging.debug("i2c read check status")
self.poll_reg(self.i2c_shadow + 0xC, 0, "I2C/EEP busy or not acknoledge!")
read_data = self.board[self.i2c_shadow + 0x8]
# self.remove_passwd()
self.board[self.i2c_shadow+self.i2c_req_reg] = 0
logging.debug("i2c read check end operation")
self.poll_reg(self.i2c_shadow + self.i2c_ack_reg, 0, "I2C/EEP operation complete not detected!")
# Unlock I2C interface
if not keep_locked:
self.ticket_num = None
self.board["board.lock.lock_i2c"] = 0
time.sleep(0.020)
else:
self.board["board.lock.lock_i2c"] = self.ticket_num
return read_data
def __i2c_wr8(self, phy, value, offset=0, already_locked=False):
add = phy >> 1
nof_rd_byte = 0
nof_wr_byte = 1
cmd = (self.i2c_mux << 16) + (nof_rd_byte << 12) + (nof_wr_byte << 8) + add
if self.board.i2c_old_mode:
self.board['board.i2c.transmit'] = (value & 0xFF)
self.board['board.i2c.command'] = cmd
status = i2c_status_busy
retry = 10
while status == i2c_status_busy and retry > 0:
status = self.board['board.i2c.status']
time.sleep(0.1)
retry -= 1
pass
if retry == 0:
logging.error("tpm_qsfp_adapter.__i2c_wr8: i2c timeout error (PHY %s)" % hex(phy))
raise PluginError("tpm_qsfp_adapter.__i2c_wr8: i2c timeout error (PHY %s)" % hex(phy))
return None
if status == i2c_status_ack_error:
logging.error("tpm_qsfp_adapter.__i2c_wr8: i2c ack error (PHY %s)" % hex(phy))
raise PluginError("tpm_qsfp_adapter.__i2c_wr8: i2c ack error (PHY %s)" % hex(phy))
return None
return True
else:
# Lock I2C interface to avoid MCU access conflict
if not already_locked:
logging.debug("I2C Write: Locking I2C interface to avoid MCU access conflict ")
locked = False
retry = 150
ticket_num = self.board["board.lock.queue_number"]
while retry > 0:
self.board["board.lock.lock_i2c"] = ticket_num
if self.board["board.lock.lock_i2c"] != ticket_num:
retry = retry-1
time.sleep(0.007)
else:
locked = True
break
if locked is False:
logging.error("ERROR: Can't Lock I2C interface")
raise LibraryError("ERROR: Can't Lock I2C interface")
# else:33
# logging.info("I2C interface Locked")
else:
if self.ticket_num is not None:
self.board["board.lock.lock_i2c"] = self.ticket_num
old_ticket = self.board["board.lock.lock_i2c"]
if old_ticket != self.ticket_num:
logging.error("ERROR: Can't renew Lock I2C interface - ticket_num "
"is None, exp %x read %x" % (self.ticket_num, old_ticket))
raise LibraryError("ERROR: Can't renew Lock I2C interface")
else:
logging.error("ERROR: Can't renew Lock I2C interface - ticket_num is None")
raise LibraryError("ERROR: Can't renew Lock I2C interface")
if self.board[self.i2c_shadow + self.i2c_req_reg] == 1:
logging.warning("Detected Incomplete Previous I2C operation")
self.board[self.i2c_shadow + self.i2c_req_reg] = 0
self.poll_reg(self.i2c_shadow+self.i2c_ack_reg, 0, "I2C/EEP not ready!")
self.set_passwd()
self.board[self.i2c_shadow + 0x4] = (value & 0xFF)
self.board[self.i2c_shadow + 0x0] = cmd
# send request
self.board[self.i2c_shadow + self.i2c_req_reg] = 1
# wait req acknowledge
logging.debug("wait req acknowledge")
self.poll_reg(self.i2c_shadow + self.i2c_ack_reg, 1, "I2C/EEP request not accepted!")
# check passwd
logging.debug("check passwd")
self.check_passwd()
# check i2c op status: 0 ok, 1 busy, 2 NACK
logging.debug("check status")
self.poll_reg(self.i2c_shadow + 0xC, 0, "I2C/EEP busy or not acknoledge!")
# read_data = self.board[self.i2c_shadow + 0x8]
# self.remove_passwd()
self.board[self.i2c_shadow + self.i2c_req_reg] = 0
logging.debug("check end operation")
self.poll_reg(self.i2c_shadow + self.i2c_ack_reg, 0, "I2C/EEP operation complete not detected!")
# Unlock I2C interface
self.board["board.lock.lock_i2c"] = 0
self.ticket_num = None
time.sleep(0.020)
return True
[docs]
def status(self, line=None):
# if line is None:
# logging.info("= IOEXP_map " + str(self._core_id) +" =")
value = self.__i2c_rd8(PHY_ioexp)
if value is None:
return None
# logging.info(bin(value))
ret_value = {}
for key in IOEXP_map:
i = IOEXP_map[key]['ch']
# for i in [1,2,3,0,4,5,6,7]:
if value & 2**i > 0:
ret_value[key] = 1
# if line is None:
# logging.info(key + "\t1")
# el
if line == key:
return True
else:
ret_value[key] = 0
# if line is None:
# logging.info(key + "\t0")
# el
if line == key:
return False
return ret_value
[docs]
def set(self, line, value):
if IOEXP_map[line]['output']:
actual_value = self.__i2c_rd8(PHY_ioexp, keep_locked=True)
if actual_value is None:
return None
i = IOEXP_map[line]['ch']
if value > 0:
actual_value = actual_value | 2**i
else:
actual_value = actual_value & ~(2**i)
# logging.info("IOEXP_map " + str(self._core_id) +" "+line+" "+hex(value))
# input ports must be set to 1 even if read at 0
for key in IOEXP_map:
if not IOEXP_map[key]['output']:
i = IOEXP_map[key]['ch']
actual_value = actual_value | 2**i
return self.__i2c_wr8(PHY_ioexp, actual_value, already_locked=True)
else:
logging.error("tpm_qsfp_adapter.set: line %s is NOT output" % line)
raise PluginError("tpm_qsfp_adapter.set: line %s is NOT output" % line)
[docs]
def get(self, line, dbg_flag=False):
actual_value = self.__i2c_rd8(PHY_ioexp, dbg_flag=dbg_flag)
i = IOEXP_map[line]['ch']
value = (actual_value & 2**i) >> i
# logging.info("IOEXP_map " + str(self._core_id) +" "+line+" "+hex(value))
return value
[docs]
def qsfp_read(self, offset=0, len=16):
if not self.status("ModPrsL"):
arr = bytearray()
string = ""
self.__i2c_wr8(PHY_QSFP, offset)
for i in range(len):
arr.append(self.__i2c_rd8(PHY_QSFP))
string += '{0:02x}'.format(arr[i])
# logging.info("QSFP: " + string)
return arr
else:
logging.info("QSFP %d: NA cannot dump read from QSFP")
return None
[docs]
def get_line_mapping(self):
return IOEXP_map
[docs]
def irq_status(self):
irq = self.board['board.regfile.eth10ge.psnt'] & 2**self._core_id
if irq > 0:
return False
else:
return True
[docs]
def irq_clear(self):
self.__i2c_rd8(PHY_ioexp)