# -*- coding: utf-8 -*-
#
# Copyright (c) 2023 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""
Virtual Digitiser ICL (abstraction)
"""
import json
import math
import typing
import warnings
from dataclasses import dataclass
from datetime import datetime
from enum import IntEnum
from typing import Sequence
import numpy as np
from ska_low_cbf_fpga import DISCOVER_ALL, FpgaPeripheral, IclField
from ska_low_cbf_fpga.args_fpga import ArgsWordType
from ska_low_cbf_sw_cnic.icl.ptp_scheduler import TIME_STR_FORMAT
[docs]class SpeadVersion(IntEnum):
"""
Supported SPEAD packet versions.
For details on packet contents, see `CNIC firmware docs <https://developer.skao.int/projects/ska-low-cbf-fw-cnic/en/latest>`_
or `SPS v3 ECP <https://confluence.skatelescope.org/display/CMI/ECP-230134+-+Update+time+fields+in+the+LOW+CBF+-+SPS+ICD>`_.
"""
v2 = 2
v3 = 3
[docs]@dataclass
class VDChannelConfig:
"""Virtual Digitiser Channel Configuration"""
scan: int
beam: int
frequency: int
substation: int
subarray: int
station: int
[docs] def as_registers(self, spead_version: int = SpeadVersion.v2) -> np.ndarray:
"""
Render as np array, ready to write to FPGA data RAM.
Note: for v2 SPS SPEAD format only.
"""
scan_id_low = self.scan & 0x0_FFFF_FFFF
scan_id_high = (
(self.scan >> 32) & 0x0_FFFF if spead_version == SpeadVersion.v3 else 0
)
return np.array(
[
scan_id_low,
self.beam,
self.frequency,
self.substation,
self.subarray,
self.station,
scan_id_high,
],
dtype=ArgsWordType,
)
[docs] @classmethod
def from_registers(
cls, registers: np.ndarray, spead_version: int = SpeadVersion.v2
):
"""
Create a VDChannelConfig object from a row of config table registers.
Note: for v2 SPS SPEAD format only.
:param registers: a slice of CONFIG_ROW_WORDS registers
:return: VDChannelConfig object to match supplied registers
"""
scan_id = int(registers[0])
if spead_version == SpeadVersion.v3:
# apply the high portion of scan ID
scan_id |= int(0x0_FFFF & registers[6]) << 32
return cls(
scan=scan_id,
beam=int(registers[1]),
frequency=int(registers[2]),
substation=int(registers[3]),
subarray=int(registers[4]),
station=int(registers[5]),
)
def __iter__(self):
# allows VDChannelConfig to be converted to JSON
for key in dir(self):
if not key.startswith("_"):
value = getattr(self, key)
if not callable(value):
yield key, value
# Each row of the config table is this many words
# (note: not equal to the number of parameters in a channel config)
CONFIG_ROW_WORDS = 8
r"""
====== ======================= =============== ==================
Word v1 v2 v3 (proposed)
====== ======================= =============== ==================
0 Frequency Hz low 32b Scan ID Scan ID low 32b
1 Beam ID Beam ID Beam ID
2 Frequency ID Frequency ID Frequency ID
3 Substation ID Substation ID Substation ID
4 Subarray ID Subarray ID Subarray ID
5 Station ID Station ID Station ID
6 Frequency Hz high 16b [unused] Scan ID high 16b
7 [unused] [unused] [unused]
====== ======================= =============== ==================
"""
MAX_CONFIG_ROWS = 4096
CONFIG_EMPTY = 65535 # special startup value for 'number of valid lines'
MAX_BURST_ENABLE_BIT = 1 << 31
"""Top bit of Max. Bursts register used as enable bit."""
SPS_PACKET_PERIOD = 2048 * 1080e-9
"""SPS packet period in seconds."""
def _split_into_two(x: int) -> (int, int):
"""
Split a number into two 32-bit register values
Returns: High word, Low word
"""
return (x >> 32), x & 0xFFFF_FFFF
def _time_between_streams(n_streams: int) -> int:
"""
Calculate required time delay between packets, given number of SPEAD streams.
We want a data rate of ~30Gbps as expected from SPS for 576 streams (505 cycles),
need to run faster for more streams. Equation based on 100 cycles @ 1728 streams.
:param n_streams: number of SPEAD streams
:returns: time delay between packets in FPGA clock cycles (300MHz)
"""
return max(round(505 - 0.3515625 * max(n_streams - 576, 0)), 0)
[docs]class VirtualDigitiser(FpgaPeripheral):
_user_attributes = {DISCOVER_ALL}
_not_user_attributes = {
"max_number_of_vd_bursts", # bitfield decoded in our properties
"no_of_packets_this_config_l", # lower/upper values merged in property
"no_of_packets_this_config_u",
"no_of_sps_packets_l",
"no_of_sps_packets_u",
"packet_counter",
"packet_counter_u",
"unix_epoch_time_l",
"unix_epoch_time_u",
"timestamp_l",
"timestamp_u",
"data",
"number_of_valid_lines_in_vd_ram",
"sps_packet_version_select",
}
# No methods should be driven by control system directly
_user_methods = None
@property
def packet_count(self) -> IclField[int]:
"""Get combined value of packet_counter & packet_counter_u."""
total_packets = (self.packet_counter_u.value << 32) | self.packet_counter.value
return IclField(
description="SPS packet counter", type_=int, value=total_packets
)
@property
def sps_packets(self) -> IclField[int]:
total_packets = (
self.no_of_sps_packets_u.value << 32
) | self.no_of_sps_packets_l.value
return IclField(
description="Total SPS Packets Sent",
type_=int,
value=total_packets,
)
@property
def session_packets(self) -> IclField[int]:
"""Get number of packets sent since VD last enabled."""
total_packets = (
self.no_of_packets_this_config_u.value << 32
) | self.no_of_packets_this_config_l.value
return IclField(
description="Total packets sent this session (i.e. since enabled)",
type_=int,
value=total_packets,
)
@property
def epoch(self) -> IclField[str]:
timestamp = (self.unix_epoch_time_u.value << 32) | self.unix_epoch_time_l.value
return IclField(
description="SPEAD epoch for SPS protocol v1/v2",
type_=str,
value=datetime.fromtimestamp(timestamp).strftime(TIME_STR_FORMAT),
)
@property
def timestamp(self) -> IclField[int]:
timestamp = (self.timestamp_u.value << 32) | self.timestamp_l.value
return IclField(
description="SPEAD timestamp (10s of ns) for SPS protocol v1/v2",
type_=int,
value=timestamp,
)
@property
def configured_channels(self) -> IclField[int]:
"""Number of configured SPEAD channels (1 means 1)."""
# register is "table size minus one"
n_lines = int(self.number_of_valid_lines_in_vd_ram)
if n_lines == CONFIG_EMPTY:
n_channels = 0
else:
n_channels = n_lines + 1
return IclField(
description="Configured SPEAD channels",
type_=int,
value=n_channels,
)
[docs] def get_channel_config(self, n: int) -> VDChannelConfig:
"""
Get one SPEAD channel's configuration.
:param n: zero-indexed channel number (row in data table)
:raises RuntimeError: if not configured
"""
if n >= self.configured_channels:
raise RuntimeError(f"Channel {n} is not configured.")
if n >= MAX_CONFIG_ROWS:
raise RuntimeError(f"Channel {n} is beyond table limit.")
reg_slice = slice(n * CONFIG_ROW_WORDS, (n + 1) * CONFIG_ROW_WORDS)
return VDChannelConfig.from_registers(
self.data[reg_slice], self.sps_packet_version.value
)
@property
def configuration(self) -> IclField[str]:
"""Get the current configuration (JSON)."""
configs = [
self.get_channel_config(n) for n in range(self.configured_channels.value)
]
return IclField(type_=str, value=json.dumps(configs, default=dict))
@property
def last_used_datagen_buffer(self) -> IclField[int]:
"""
The last used (i.e. idle) Data Generator config buffer. The buffer is
an internal interface between the DataGenerator and packetiser
vd_buffer_gen_status is a 4-bit wide FPGA register
b0: last buffer written 0
b1: last buffer written 1
b2: buffer 0 generation in progress
b3: buffer 1 generation in progress
:returns: -1 if neither has been used
"""
status = self.vd_buffer_gen_status.value
last = -1
if status & 1:
last = 0
elif status & 2:
last = 1
return IclField(
description="Last used VD Datagen config buffer",
type_=int,
value=last,
)
# It might have been nice to combine the two max_bursts properties,
# using 'None' for 'infinite', but I have a feeling it would break Tango.
@property
def max_bursts_enabled(self) -> IclField[bool]:
"""Output burst limit active?"""
enabled = False
if hasattr(self, "max_number_of_vd_bursts"):
enabled = bool(self.max_number_of_vd_bursts.value & MAX_BURST_ENABLE_BIT)
return IclField(
type_=bool, value=enabled, description="Is VD burst limit enabled?"
)
@property
def max_bursts(self) -> IclField[int]:
"""Output burst limit value."""
n = 0
if hasattr(self, "max_number_of_vd_bursts"):
n = self.max_number_of_vd_bursts.value & (~MAX_BURST_ENABLE_BIT)
return IclField(type_=int, value=n, description="VD output burst limit")
[docs] def set_max_bursts(self, max_bursts: typing.Optional[int]) -> None:
"""
Set the output burst limit.
:param max_bursts: Maximum VD output bursts. Use ``None`` to run forever.
"""
if not hasattr(self, "max_number_of_vd_bursts") and max_bursts is not None:
raise NotImplementedError("Firmware does not support maximum burst setting")
if max_bursts is None:
self.max_number_of_vd_bursts = 0
else:
self.max_number_of_vd_bursts = max_bursts | MAX_BURST_ENABLE_BIT
[docs] def set_time_between_packets_and_bursts(self, n_streams: int) -> None:
"""
Configure VD packet delays.
:param n_streams: Number of SPEAD streams
"""
self.time_between_channel_bursts = 2_211_840 # Matches SPS packet rate [ns]
self.time_between_packets = _time_between_streams(n_streams)
@property
def sps_packet_version(self) -> IclField[int]:
"""SPS packet version in use (1 means v1)."""
fpga_value = self.sps_packet_version_select.value
return IclField(
type_=int, value=fpga_value + 1, description="SPS SPEAD protocol version"
)
@sps_packet_version.setter
def sps_packet_version(self, version) -> None:
"""
Select SPS packet version to use.
:param version: packet/protocol version - 1 means v1
"""
if version not in (SpeadVersion.v2, SpeadVersion.v3):
warnings.warn(
f"Using v{version}, but SW only handles v2 and v3 properly. "
"Here there be dragons!"
)
self.sps_packet_version_select = version - 1