# -*- coding: utf-8 -*-
#
# Copyright (c) 2022 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
# we use dynamic attributes that confuse pylint...
# pylint: disable=attribute-defined-outside-init
"""
HBM Packet Controller ICL (abstraction)
"""
import bisect
import math
import time
import typing
import warnings
import dpkt.pcap as pcap
import numpy as np
from ska_low_cbf_fpga import (
DISCOVER_PROPERTIES,
FpgaPeripheral,
IclField,
IclFpgaField,
str_from_int_bytes,
)
from ska_low_cbf_sw_cnic.icl.ptp_scheduler import (
TIMESTAMP_BITS,
ptp_ts_from_float,
unix_ts_from_ptp,
)
from ska_low_cbf_sw_cnic.util.pcap import eth_from_sll, get_reader, get_writer
# These sizes are all in Bytes
ETHERNET_IPG_SIZE = 12 # Inter-Packet Gap, configured in CNIC firmware
ETHERNET_FCS_SIZE = 4 # Frame Check Sequence
ETHERNET_SFD_SIZE = 1 # Start Frame Delimiter
ETHERNET_PREAMBLE_SIZE = 7
ETHERNET_OVERHEAD_SIZE = (
ETHERNET_PREAMBLE_SIZE + ETHERNET_SFD_SIZE + ETHERNET_FCS_SIZE + ETHERNET_IPG_SIZE
)
AXI_TRANSACTION_SIZE = 4096
BEAT_SIZE = 64
MEM_ALIGN_SIZE = 64 # data in HBM aligned to multiples of this
TIMESTAMP_SIZE = TIMESTAMP_BITS // 8 # bytes
PACKET_LEN_SIZE = 2 # bytes used to store length of packet in HBM metadata
hbm_metadata = np.dtype(
[
("timestamp", "B", TIMESTAMP_SIZE),
("packet_size", f">u{PACKET_LEN_SIZE}"),
]
)
METADATA_SIZE = hbm_metadata.itemsize
"""total bytes of metadata stored in HBM for each packet"""
def _get_padded_size(data_size: int) -> int:
"""
Round up the packet size to the next 'beat's worth of data
:param data_size: bytes
"""
pad_length = 0
if data_size % MEM_ALIGN_SIZE:
pad_length = MEM_ALIGN_SIZE - (data_size % MEM_ALIGN_SIZE)
return data_size + pad_length
def _gap_from_rate(packet_size: int, rate: float, burst_size: int = 1) -> int:
"""
Calculate packet burst gap (really a period) in nanoseconds
:param packet_size: bytes
:param rate: Gigabits per second
:param burst_size: number of packets in a burst
"""
# Effective packet size on wire
line_bytes = packet_size + ETHERNET_OVERHEAD_SIZE
# Desired packets/s
packet_rate = (rate * 1e9) / (line_bytes * 8)
# Convert to nanoseconds and apply burst size factor
return math.ceil(1e9 * burst_size / packet_rate)
[docs]class HbmPacketController(FpgaPeripheral):
"""
Class to represent an HbmPacketController FPGA Peripheral
"""
_user_attributes = {
# Expose all the properties this class defines
DISCOVER_PROPERTIES,
# The below registers could be useful for GUIs, CI scripts,
# or debugging
# TODO - add legacy_rate_sel when we increment to 1.6
# "legacy_rate_sel",
"rx_complete",
"rx_enable_capture",
"rx_hbm_1_end_addr",
"rx_hbm_2_end_addr",
"rx_hbm_3_end_addr",
"rx_hbm_4_end_addr",
"rx_packet_size",
"rx_packet_size_abs",
"rx_packets_to_capture",
"tx_burst_gap",
"tx_complete",
"tx_enable",
"tx_packet_type",
"tx_running",
}
_field_config = {
# Tell control system about Read-Only FPGA registers
"tx_running": IclFpgaField(user_write=False),
"tx_looping": IclFpgaField(user_write=False),
"tx_loop_count": IclFpgaField(user_write=False),
"tx_axi_transaction_count": IclFpgaField(user_write=False),
"tx_burst_count": IclFpgaField(user_write=False),
"tx_packet_count_hi": IclFpgaField(user_write=False),
"tx_packet_count_lo": IclFpgaField(user_write=False),
"tx_packets_to_mac_hi": IclFpgaField(user_write=False),
"tx_packets_to_mac_lo": IclFpgaField(user_write=False),
"tx_complete": IclFpgaField(user_write=False),
"debug_tx_current_hbm_rd_addr": IclFpgaField(user_write=False),
"debug_tx_current_hbm_rd_buffer": IclFpgaField(user_write=False),
"debug_tx_total_packet_beat_count": IclFpgaField(user_write=False),
"debug_tx_packet_beat_count": IclFpgaField(user_write=False),
"debug_tx_burst_packet_count": IclFpgaField(user_write=False),
"debug_rd_fsm_debug": IclFpgaField(user_write=False),
"debug_output_fsm_debug": IclFpgaField(user_write=False),
"debug_input_fsm_debug": IclFpgaField(user_write=False),
"debug_fifo_datacount": IclFpgaField(user_write=False),
"debug_capture_filter_target": IclFpgaField(user_write=False),
"debug_capture_filter_non_target": IclFpgaField(user_write=False),
"ns_total_time": IclFpgaField(user_write=False),
"ns_burst_timer": IclFpgaField(user_write=False),
"rx_hbm_1_end_addr": IclFpgaField(user_write=False),
"rx_hbm_2_end_addr": IclFpgaField(user_write=False),
"rx_hbm_3_end_addr": IclFpgaField(user_write=False),
"rx_hbm_4_end_addr": IclFpgaField(user_write=False),
"rx_complete": IclFpgaField(user_write=False),
"rx_packet_count_hi": IclFpgaField(user_write=False),
"rx_packet_count_lo": IclFpgaField(user_write=False),
}
[docs] def __init__(self, *args, **kwargs) -> None:
super().__init__(*args, **kwargs)
# we don't have nice interface to find the size of the buffers...
self._fpga_interface = self._personality.driver
# skip the first buffer (ARGS interchange),
# get the sizes of all other shared buffers
hbm_sizes = [hbm.size for hbm in self._fpga_interface._mem_config[1:]]
# convert sizes to a list of virtual end addresses of each buffer
# e.g. [1000, 1000, 1000] => [1000, 2000, 3000]
hbm_end_addresses = np.cumsum(hbm_sizes)
# insert a zero for the first buffer's start address:
# [0, 1000, 2000, 3000]
self._buffer_offsets = np.insert(hbm_end_addresses, 0, 0)
"""Virtual addresses of start/end of each HBM buffer
(Note: n+1 elements, last element is end of last buffer)"""
self._loaded_pcap: str = ""
"""Filename of the pcap file loaded to HBM"""
self._loaded_pcap_packets: int = 0
"""Number of packets loaded into HBM"""
self._dumped_pcap: str = ""
"""Filename of last pcap file dumped from HBM (updated when complete)"""
@property
def tx_packet_count(self) -> IclField[int]:
"""Get 64-bit total Tx packet count"""
return IclField(
description="Transmitted Packet Count",
type_=int,
value=(self.tx_packet_count_hi.value << 32) | self.tx_packet_count_lo.value,
)
@property
def tx_packets_to_mac(self) -> IclField[int]:
"""Get 64-bit total Tx packets to MAC count"""
return IclField(
description="Transmitted to MAC Packet Count",
type_=int,
value=(self.tx_packets_to_mac_hi.value << 32)
| self.tx_packets_to_mac_lo.value,
)
@property
def rx_packet_count(self) -> IclField[int]:
"""Get 64-bit total Rx packet count"""
return IclField(
description="Received Packet Count",
type_=int,
value=(self.rx_packet_count_hi.value << 32) | self.rx_packet_count_lo.value,
)
def _virtual_write(self, data: np.ndarray, address: int) -> None:
"""
Simple virtual address mapper for writing to multiple HBM buffers.
:param data: numpy array to write
:param address: byte-based address
:raises IndexError: if data cannot fit at address
"""
# Note bisect works here because our first buffer to use is memory 1
# (would need to add an offset if this was not the case)
# e.g. if _buffer_offsets is [0, 1000, 2000, 3000]
# address 50 will return 1; address 1500 will return 2
start_buffer = bisect.bisect(self._buffer_offsets, address)
end_buffer = bisect.bisect(self._buffer_offsets, address + len(data))
if end_buffer >= len(self._buffer_offsets):
raise IndexError(
f"Cannot fit {len(data)} bytes "
f"starting from virtual address {address}. "
f"Buffers end at {self._buffer_offsets[-1]}."
)
start_offset = address - self._buffer_offsets[start_buffer - 1]
if start_buffer == end_buffer:
# the easy case - everything in one buffer
self._fpga_interface.write_memory(start_buffer, data, start_offset)
else:
# split across buffers, assuming buffer size >> data size
# how much room is left in the first buffer?
first_size = ( # calculate buffer size from address map
self._buffer_offsets[start_buffer]
- self._buffer_offsets[start_buffer - 1]
) - start_offset
self._fpga_interface.write_memory(
start_buffer, data[:first_size], start_offset
)
self._fpga_interface.write_memory(start_buffer + 1, data[first_size:], 0)
[docs] def dump_pcap(self, out_filename: str, packet_size: int):
"""
Dump a PCAP(NG) file to disk from HBM
:param str out_filename: file to save to
:param int packet_size: Number of Bytes used for each packet # TODO - remove
"""
self._logger.info(f"Writing to {out_filename}")
with open(out_filename, "wb") as out_file:
self._dump_pcap(out_file)
def _dump_pcap(
self,
out_file: typing.BinaryIO,
) -> None:
"""
Dump a PCAP(NG) file from HBM.
:param out_file: File object to write to. File type determined by extension,
use .pcapng for next-gen.
"""
writer = get_writer(out_file)
last_partial_packet = None
n_packets = 0
n_packets_to_dump = min(self.rx_packet_count, self.rx_packets_to_capture)
total_bytes = 0
padded_metadata_size = _get_padded_size(METADATA_SIZE)
# start from 1 as our first buffer is #1
for buffer in range(1, len(self._buffer_offsets)):
# skipping buffers for debugging
if not self._rx_buffer_enabled(buffer):
self._logger.debug(f"Skipping buffer {buffer}")
continue
end = getattr(self, f"rx_hbm_{buffer}_end_addr").value
self._logger.info(f"Reading {end} B from HBM buffer {buffer} ")
if end == 0:
# No data in this buffer,
# so we have already processed the last packet
break
# WORKAROUND for weird bug when reading 2GB+ on some machines
# hopefully we can remove this later
raw = np.empty(end, dtype=np.uint8)
page_size = 1 << 30 # read 1GB
for this_read_start in range(0, end, page_size):
this_read_end = min(this_read_start + page_size, end)
n_bytes = this_read_end - this_read_start
raw[this_read_start:this_read_end] = self._personality.read_memory(
buffer, n_bytes, this_read_start
).view(dtype=np.uint8)
print(".", end="", flush=True)
print("")
# END WORKAROUND
# below is the code that would work if not for the bug!
# raw = (
# self._personality.driver
# .read_memory(buffer, end)
# .view(dtype=np.uint8)
# )
self._logger.info(f"Writing buffer {buffer} packets to file")
if last_partial_packet is not None:
# insert tail of last buffer into head of this one
raw = np.insert(raw, 0, last_partial_packet)
offset = 0
while offset < raw.nbytes and n_packets < n_packets_to_dump:
metadata = raw[offset : offset + METADATA_SIZE].view(dtype=hbm_metadata)
ptp_ts = int.from_bytes(metadata["timestamp"].tobytes(), "big")
timestamp = unix_ts_from_ptp(ptp_ts)
if n_packets == 0:
first_ts = timestamp
if n_packets % 100 == 0:
# brief sleep to give the control system a chance to do things
time.sleep(0.001)
packet_size = int(metadata["packet_size"][0])
start = offset + padded_metadata_size
end = start + packet_size
next_offset = start + _get_padded_size(packet_size)
if end < raw.nbytes:
offset = next_offset
packet_data = raw[start:end].tobytes()
writer.writepkt(packet_data, timestamp)
n_packets += 1
total_bytes += packet_size
else:
# packet goes into the next buffer
last_partial_packet = np.copy(raw[offset:])
break
# end for each buffer loop
self._logger.info(
f"Finished writing {n_packets} packets,"
f" {str_from_int_bytes(total_bytes)}"
)
try:
duration = float(timestamp - first_ts)
self._logger.info(f"Capture duration {duration:.9f} s")
# guard against divide by zero
# when PTP isn't active it marks all packets at t=0
if duration > 0:
line_bytes = total_bytes + n_packets * ETHERNET_OVERHEAD_SIZE
data_rate_gbps = (8 * line_bytes / duration) / 1e9
self._logger.info(f"Average data rate {data_rate_gbps:.3f} Gbps")
else:
self._logger.warning("Cannot calculate data rate")
except NameError:
self._logger.error("Couldn't calculate duration of capture")
self._logger.info(
(
f"Wrote {n_packets} packets, "
f"{str_from_int_bytes(total_bytes)} "
f"to {out_file.name}"
)
)
self._dumped_pcap = out_file.name
@property
def last_dumped_pcap(self) -> IclField[str]:
"""Get our last dumped PCAP file name"""
return IclField(
description="Last dumped PCAP file name",
type_=str,
value=self._dumped_pcap,
)
@property
def loaded_pcap(self) -> IclField[str]:
"""Get our last loaded PCAP file name"""
return IclField(
description="Last loaded PCAP file name",
type_=str,
value=self._loaded_pcap,
)
@property
def loaded_pcap_packets(self) -> IclField[int]:
"""Get our last loaded PCAP packet count."""
return IclField(
description="Last loaded PCAP packet count",
type_=int,
value=self._loaded_pcap_packets,
)
[docs] def load_pcap(self, in_filename: str) -> None:
"""
Load a PCAP(NG) file from disk to FPGA.
:param str in_filename: path to input PCAP(NG) file
"""
if self._loaded_pcap == in_filename:
self._logger.info(f"Won't load {in_filename} file again")
return
self._loaded_pcap = ""
with open(in_filename, "rb") as in_file:
self._logger.info(f"Loading from {in_filename}")
self._loaded_pcap_packets = self._load_pcap(in_file)
self._loaded_pcap = in_filename
self._logger.info("Loading complete")
return
# leave some trace in case there are problems with the file
self._logger.error(f"ERROR loading PCAP file {in_filename}")
def _load_pcap(self, in_file: typing.BinaryIO) -> int:
"""
Load a PCAP(NG) file from disk to FPGA.
:param in_file: input PCAP(NG) file
:raises RuntimeError: if FPGA settings don't match PCAP file
:returns: Number of packets loaded
"""
reader = get_reader(in_file)
virtual_address = 0 # byte address to write to
if self.enable_duplex:
buffer = len(self._buffer_offsets) // 2
virtual_address = self._buffer_offsets[buffer]
self._logger.debug(
f"Duplex mode, loading into HBM {buffer + 1} "
f"(virtual address {virtual_address})"
)
start_virt_addr = virtual_address
metadata_padded_size = _get_padded_size(METADATA_SIZE)
dot_print_increment = 128 << 20 # print progress every 128MiB
print_next_dot = start_virt_addr
n_packets = 0
link_layer = reader.datalink()
if link_layer not in [pcap.DLT_EN10MB, pcap.DLT_LINUX_SLL]:
raise NotImplementedError(f"Link-Layer Header {link_layer} not supported!")
for timestamp, packet in reader:
if link_layer == pcap.DLT_LINUX_SLL:
# convert "cooked" packet data back to standard Ethernet
packet = eth_from_sll(packet)
packet_size = len(packet)
packet_padded_size = _get_padded_size(packet_size)
data = np.zeros(packet_padded_size + metadata_padded_size, dtype=np.uint8)
# TODO do we need to check that it's a valid ethernet packet?
# Set CNIC control metadata in data array
metadata = data[:METADATA_SIZE].view(dtype=hbm_metadata)
metadata["timestamp"] = np.frombuffer(
ptp_ts_from_float(timestamp).to_bytes(TIMESTAMP_SIZE, "big"),
dtype=np.uint8,
)
metadata["packet_size"] = packet_size
# Set Ethernet packet contents in data array
data[
metadata_padded_size : metadata_padded_size + packet_size
] = np.frombuffer(packet, dtype=np.uint8)
try:
self._virtual_write(data, virtual_address)
except IndexError:
# stop if we don't have enough memory left for the packet
self._logger.debug(
f"Aborting load, {data.nbytes} B can't fit at"
f" virtual address {virtual_address}"
)
break
n_packets += 1
# virtual address is used as the DMA pointer for HBM memory.
# this is advancing the ptr to the start position of the next packet
# in memory. TX memory structure is metadata(mod64) + packetsize(mod64)
virtual_address += data.nbytes
if virtual_address >= print_next_dot:
print(".", end="", flush=True)
print_next_dot += dot_print_increment
if n_packets % 1000 == 0:
# brief sleep to give the control system a chance to do things
time.sleep(0.0001)
self._logger.info(
f"Loaded {n_packets} packets, "
f"{str_from_int_bytes(virtual_address - start_virt_addr)}"
)
# set AXI transaction limit to suit amount of HBM used
# (can't be set in configure_tx because we need to parse all the packets first)
self.tx_axi_transactions = math.ceil(
(virtual_address - start_virt_addr) / AXI_TRANSACTION_SIZE
)
if n_packets < self.tx_packet_to_send.value:
self._logger.warning(
f"Expected {self.tx_packet_to_send.value} packets but only "
f"got {n_packets} (will use {n_packets})"
)
self.tx_packet_to_send = n_packets
return n_packets
[docs] def start_tx(self) -> None:
"""
Start transmitting packets
"""
# if _loaded_pcap was stored in the FPGA, we could do something like:
# if not _loaded_pcap:
# raise RuntimeError("No PCAP loaded")
# since it's only in software, we will defer to the user's judgement
if self.rx_enable_capture and not self.duplex:
raise RuntimeError("Duplex not enabled and Rx already running!")
self.tx_enable = 0
self.tx_enable = 1
[docs] def start_rx(self) -> None:
"""
Begin receiving packets into FPGA memory.
Call configure_rx first.
"""
if self.tx_enable and not self.duplex:
raise RuntimeError("Duplex not enabled and Tx already running!")
self.rx_enable_capture = 1
def _rx_buffer_enabled(self, buffer: int) -> bool:
"""
Check if Rx buffer is enabled
:param buffer: Buffer index, starting from 1
:return: True = buffer in use
"""
# check if debug register exists
if "rx_bank_enable" not in self._fields:
return True
return not bool(self.rx_bank_enable & (1 << (buffer - 1)))
[docs] def flush(self) -> None:
"""
Flush internal FPGA Rx packet buffers into HBM.
WARNING: After doing this, you won't get any more packets until you
run start_rx to configure a new receiving session.
"""
# this register appeared part-way through FW v0.1.4 development...
if hasattr(self, "rx_flush_to_hbm"):
self.rx_flush_to_hbm = 1
self.rx_flush_to_hbm = 0
@property
def duplex(self) -> IclField[bool]:
"""Is duplex mode active?"""
return self.enable_duplex
@duplex.setter
def duplex(self, enable: bool) -> None:
"""
Control duplex mode. Note: set this before configuring Tx or Rx!
:param enable: True=duplex (simultaneous Tx/Rx),
False=simplex (one or the other)
"""
self.tx_enable = False
self.tx_reset = True
self.rx_enable_capture = False
self.rx_reset_capture = True
# Tx loads to different address depending on duplex setting
self._loaded_pcap = ""
self.enable_duplex = enable
self.tx_reset = False
self.rx_reset_capture = False