# -*- coding: utf-8 -*-
#
# Copyright (c) 2023 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""
Virtual Digitiser Data Generation ICL (abstraction)
"""
import json
import typing
from dataclasses import dataclass, field
from typing import List
import numpy as np
from ska_low_cbf_fpga import DISCOVER_ALL, ArgsFpgaDriver, FpgaPeripheral, IclField
from ska_low_cbf_fpga.args_fpga import WORD_SIZE, ArgsWordType
from ska_low_cbf_fpga.args_map import ArgsFieldInfo
from ska_low_cbf_sw_cnic.util.vd_datagen import datagen_config
SOURCE_CONFIG_WORDS = 16
MAX_CONFIG_STREAMS = 1024
SOURCES_PER_STREAM = 8 # 4 per polarisation
MAX_CONFIG_SOURCES = MAX_CONFIG_STREAMS * SOURCES_PER_STREAM
DELAY_POLY_COEFFS = 6
"""Number of Delay Polynomial Coefficients"""
CONFIG_BUF_WORDS = SOURCE_CONFIG_WORDS * MAX_CONFIG_SOURCES
"""
Size of a full configuration buffer.
Also the offset for second half of the double-buffer.
"""
[docs]def null_polynomial():
return np.zeros(DELAY_POLY_COEFFS, np.float64)
[docs]@dataclass
class Source:
tone: bool
"""True = Sinusoid, False = Noise"""
channel_frequency: float
"""SPS SPEAD channel centre frequency (GHz)"""
scale: int
"""Scale factor: [-16384,16384] for sinusoids, [-1020,1020] for noise"""
delay_polynomial: typing.Sized = field(default_factory=null_polynomial)
seed: int = field(default=0)
"""RNG seed, applies when tone=False"""
fine_frequency: int = field(default=0)
"""in steps of 28.257Hz, applies when tone=True"""
def __post_init__(self):
"""Validate input."""
if len(self.delay_polynomial) != DELAY_POLY_COEFFS:
raise ValueError(f"Supply exactly {DELAY_POLY_COEFFS} coefficients")
self.delay_polynomial = np.float64(self.delay_polynomial)
def __eq__(self, other):
"""Test Equality."""
# TODO we could ignore the unused one of seed/fine_frequency
return (
self.tone == other.tone
and self.channel_frequency == other.channel_frequency
and self.scale == other.scale
and (self.delay_polynomial == other.delay_polynomial).all()
and self.seed == other.seed
and self.fine_frequency == other.fine_frequency
)
[docs] def as_registers(self):
# in register order: delay_polynomial, channel_frequency, seed (==frequency),
# tone, scale
data = np.zeros(SOURCE_CONFIG_WORDS * WORD_SIZE, dtype=np.uint8)
data[0:48] = np.float64(self.delay_polynomial).view(np.uint8)
data[48:56] = np.array(self.channel_frequency, dtype=np.float64, ndmin=1).view(
np.uint8
)
fifteen_bit_mask = 0b111_1111_1111_1111
if self.tone:
mode_config = self.fine_frequency & fifteen_bit_mask
mode_config |= 2**15 # set the sinusoid mode bit
else:
mode_config = self.seed & fifteen_bit_mask
data[56:58] = np.array(mode_config, dtype=np.uint16, ndmin=1).view(np.uint8)
data[58:60] = np.array(self.scale, dtype=np.uint16, ndmin=1).view(np.uint8)
# bytes 60-63 are unused
return data.view(ArgsWordType)
[docs] @classmethod
def from_registers(cls, registers: np.ndarray):
"""
Create a Source object from a row of registers in the configuration table.
:param registers: a slice of SOURCE_CONFIG_WORDS registers
:return: Source object derived from supplied registers
"""
registers = registers.view(np.uint64)
poly = registers[0:6].view(np.float64)
freq = registers[6].view(np.float64)
mode_config = int(registers[7]) & 0xFFFF_FFFF # only low 32 bits used
# convert scale from unsigned to signed
# then to Python int type so we can later convert Source to JSON
scale = int(np.uint16(mode_config >> 16).astype(np.int16))
tone = bool(mode_config & (1 << 15))
fifteen_bit_mask = 0b111_1111_1111_1111
if tone:
fine_freq = mode_config & fifteen_bit_mask
# decode 15 bit two's complement value
sign_bit = 1 << 14
fine_freq = (fine_freq & (sign_bit - 1)) - (fine_freq & sign_bit)
return cls(
tone=True,
channel_frequency=freq,
scale=scale,
delay_polynomial=poly,
fine_frequency=fine_freq,
)
else:
seed = mode_config & fifteen_bit_mask
return cls(
tone=False,
channel_frequency=freq,
scale=scale,
delay_polynomial=poly,
seed=seed,
)
def __iter__(self):
# this function is used to convert a Source object to a dict
# we convert the delay_polynomial to a list
# so the dict can then be converted to JSON (numpy arrays cannot)
for key in dir(self):
if not key.startswith("_"):
value = getattr(self, key)
if not callable(value):
if key == "delay_polynomial":
yield key, value.tolist()
else:
yield key, value
NULL_SOURCE = Source(
tone=False, channel_frequency=0.1, scale=0, delay_polynomial=[0] * DELAY_POLY_COEFFS
)
[docs]def pad_source_list(sources: List[Source], desired_length: int):
if len(sources) == desired_length:
return sources
if len(sources) > desired_length:
raise ValueError(f"Too many Sources ({len(sources)} > {desired_length})")
pad_length = desired_length - len(sources)
# scale=0 is the important setting here
sources.extend([NULL_SOURCE] * pad_length)
return sources
[docs]class DataGenerator(FpgaPeripheral):
"""Virtual Digitiser Data Generator configuration"""
_user_attributes = {DISCOVER_ALL}
_not_user_attributes = {
"data", # this is called "spead_shared_ram" in source YAML
# "enable_vd", # leaving this exposed for initial debugging
}
_user_methods = {"configure_from_yaml"}
[docs] def __init__(
self,
driver: ArgsFpgaDriver,
map_field_info: typing.Dict[str, ArgsFieldInfo],
**kwargs,
):
super().__init__(driver, map_field_info, **kwargs)
# The _next_poly_buffer value could be more sophisticated
# - i.e. use the "not last used",
# if that buffer is not currently in use or waiting to be used...
# BUT: the software is not sophisticated enough to resume delay updates
# after a crash anyway, so we might as well just default to zero in
# preparation for a new configuration.
# If VD is running without dynamic delays, it will happily continue to do so.
self._next_poly_buffer = 0
self.valid_time_buf0 = 0
self.valid_time_buf1 = 0
[docs] def read_time(self) -> float:
"""Get current data-generator time"""
secs1 = self.current_time_s.value
while True:
nsecs = self.current_time_ns.value
secs2 = self.current_time_s.value
timeval = secs1 + nsecs * 1e-9
if secs2 == secs1:
break
# seconds rolled over while reading: reread
secs1 = self.current_time_s.value
return timeval
@property
def configuration(self) -> IclField[str]:
"""
Get the last written configuration (JSON).
If no configuration has been written since software start-up, you will get a
silly answer (second half of double buffer).
"""
last_buf = int(not self._next_poly_buffer)
configs = self._get_config_buffer(last_buf)
return IclField(type_=str, value=json.dumps(configs, default=dict))
@property
def configuration0(self) -> IclField[str]:
"""Get configuration buffer 0 (JSON)."""
configs = self._get_config_buffer(0)
return IclField(type_=str, value=json.dumps(configs, default=dict))
@property
def configuration1(self) -> IclField[str]:
"""Get configuration buffer 1 (JSON)."""
configs = self._get_config_buffer(1)
return IclField(type_=str, value=json.dumps(configs, default=dict))
def _get_config_buffer(self, buffer: int) -> List[Source]:
"""
Read one configuration buffer.
:param buffer: If zero, read the low half of the double buffer. Otherwise, read
the second half.
"""
configs = []
if buffer == 0:
config_buf = self.data[:CONFIG_BUF_WORDS]
else:
# not zero, read second buffer
config_buf = self.data[CONFIG_BUF_WORDS:]
config_buf.shape = (MAX_CONFIG_SOURCES, SOURCE_CONFIG_WORDS)
for registers in config_buf:
configs.append(Source.from_registers(registers))
return configs
@property
def buf0_valid(self) -> IclField[bool]:
return IclField(type_=bool, value=bool(self.enable_vd.value & 1))
@buf0_valid.setter
def buf0_valid(self, value: bool) -> None:
# There's probably some slick way to inject 2x IclFpgaFields instead,
# thus making use of the underlying bitfield logic,
# or maybe FW devs can configure it as two fields,
# but this should work...
old_value = self.enable_vd.value
if value:
self.enable_vd = old_value | 1
else:
self.enable_vd = old_value & 0xFFFF_FFFE
@property
def buf1_valid(self) -> IclField[bool]:
return IclField(type_=bool, value=bool(self.enable_vd.value & 2))
@buf1_valid.setter
def buf1_valid(self, value: bool) -> None:
old_value = self.enable_vd.value
if value:
self.enable_vd = old_value | 2
else:
self.enable_vd = old_value & 0xFFFF_FFFD
[docs] def set_delay_polynomial(self, source: int, polynomial, buffer) -> None:
"""
Set a delay polynomial for a single source.
:param source: 0 indexed entry in spead_shared_ram.
:param polynomial: coefficients [c0,c1,c2,c3,c4,c5]
delay = c0 + c1*t + ... + c5*t^5
units of nanoseconds per second^[0,1,2,3,4,5]
:param buffer: 0/1 (double buffered configuration)
"""
input_polynomial = np.float64(polynomial)
# ensure we have the right number of coefficients
polynomial = null_polynomial()
poly_len = min(DELAY_POLY_COEFFS, len(input_polynomial))
polynomial[:poly_len] = input_polynomial[:poly_len]
# Bytes 0-47 (out of 64 per source) are the delay polynomial. 6x float64
start = source * SOURCE_CONFIG_WORDS + (buffer * CONFIG_BUF_WORDS)
end = start + ((8 * DELAY_POLY_COEFFS) / WORD_SIZE)
register_values = polynomial.view(ArgsWordType)
self.data[start:end] = register_values