Source code for vd_datagen

# -*- coding: utf-8 -*-
#
# Copyright (c) 2023 CSIRO Space and Astronomy.
#
# Distributed under the terms of the CSIRO Open Source Software Licence
# Agreement. See LICENSE for more info.
"""
Virtual Digitiser Data Generation ICL (abstraction)
"""
import json
import typing
from dataclasses import dataclass, field
from typing import List

import numpy as np
from ska_low_cbf_fpga import DISCOVER_ALL, ArgsFpgaDriver, FpgaPeripheral, IclField
from ska_low_cbf_fpga.args_fpga import WORD_SIZE, ArgsWordType
from ska_low_cbf_fpga.args_map import ArgsFieldInfo

from ska_low_cbf_sw_cnic.util.vd_datagen import datagen_config

SOURCE_CONFIG_WORDS = 16
MAX_CONFIG_STREAMS = 1024
SOURCES_PER_STREAM = 8  # 4 per polarisation
MAX_CONFIG_SOURCES = MAX_CONFIG_STREAMS * SOURCES_PER_STREAM
DELAY_POLY_COEFFS = 6
"""Number of Delay Polynomial Coefficients"""
CONFIG_BUF_WORDS = SOURCE_CONFIG_WORDS * MAX_CONFIG_SOURCES
"""
Size of a full configuration buffer.
Also the offset for second half of the double-buffer.
"""


[docs]def null_polynomial(): return np.zeros(DELAY_POLY_COEFFS, np.float64)
[docs]@dataclass class Source: tone: bool """True = Sinusoid, False = Noise""" channel_frequency: float """SPS SPEAD channel centre frequency (GHz)""" scale: int """Scale factor: [-16384,16384] for sinusoids, [-1020,1020] for noise""" delay_polynomial: typing.Sized = field(default_factory=null_polynomial) seed: int = field(default=0) """RNG seed, applies when tone=False""" fine_frequency: int = field(default=0) """in steps of 28.257Hz, applies when tone=True""" def __post_init__(self): """Validate input.""" if len(self.delay_polynomial) != DELAY_POLY_COEFFS: raise ValueError(f"Supply exactly {DELAY_POLY_COEFFS} coefficients") self.delay_polynomial = np.float64(self.delay_polynomial) def __eq__(self, other): """Test Equality.""" # TODO we could ignore the unused one of seed/fine_frequency return ( self.tone == other.tone and self.channel_frequency == other.channel_frequency and self.scale == other.scale and (self.delay_polynomial == other.delay_polynomial).all() and self.seed == other.seed and self.fine_frequency == other.fine_frequency )
[docs] def as_registers(self): # in register order: delay_polynomial, channel_frequency, seed (==frequency), # tone, scale data = np.zeros(SOURCE_CONFIG_WORDS * WORD_SIZE, dtype=np.uint8) data[0:48] = np.float64(self.delay_polynomial).view(np.uint8) data[48:56] = np.array(self.channel_frequency, dtype=np.float64, ndmin=1).view( np.uint8 ) fifteen_bit_mask = 0b111_1111_1111_1111 if self.tone: mode_config = self.fine_frequency & fifteen_bit_mask mode_config |= 2**15 # set the sinusoid mode bit else: mode_config = self.seed & fifteen_bit_mask data[56:58] = np.array(mode_config, dtype=np.uint16, ndmin=1).view(np.uint8) data[58:60] = np.array(self.scale, dtype=np.uint16, ndmin=1).view(np.uint8) # bytes 60-63 are unused return data.view(ArgsWordType)
[docs] @classmethod def from_registers(cls, registers: np.ndarray): """ Create a Source object from a row of registers in the configuration table. :param registers: a slice of SOURCE_CONFIG_WORDS registers :return: Source object derived from supplied registers """ registers = registers.view(np.uint64) poly = registers[0:6].view(np.float64) freq = registers[6].view(np.float64) mode_config = int(registers[7]) & 0xFFFF_FFFF # only low 32 bits used # convert scale from unsigned to signed # then to Python int type so we can later convert Source to JSON scale = int(np.uint16(mode_config >> 16).astype(np.int16)) tone = bool(mode_config & (1 << 15)) fifteen_bit_mask = 0b111_1111_1111_1111 if tone: fine_freq = mode_config & fifteen_bit_mask # decode 15 bit two's complement value sign_bit = 1 << 14 fine_freq = (fine_freq & (sign_bit - 1)) - (fine_freq & sign_bit) return cls( tone=True, channel_frequency=freq, scale=scale, delay_polynomial=poly, fine_frequency=fine_freq, ) else: seed = mode_config & fifteen_bit_mask return cls( tone=False, channel_frequency=freq, scale=scale, delay_polynomial=poly, seed=seed, )
def __iter__(self): # this function is used to convert a Source object to a dict # we convert the delay_polynomial to a list # so the dict can then be converted to JSON (numpy arrays cannot) for key in dir(self): if not key.startswith("_"): value = getattr(self, key) if not callable(value): if key == "delay_polynomial": yield key, value.tolist() else: yield key, value
NULL_SOURCE = Source( tone=False, channel_frequency=0.1, scale=0, delay_polynomial=[0] * DELAY_POLY_COEFFS )
[docs]def pad_source_list(sources: List[Source], desired_length: int): if len(sources) == desired_length: return sources if len(sources) > desired_length: raise ValueError(f"Too many Sources ({len(sources)} > {desired_length})") pad_length = desired_length - len(sources) # scale=0 is the important setting here sources.extend([NULL_SOURCE] * pad_length) return sources
[docs]class DataGenerator(FpgaPeripheral): """Virtual Digitiser Data Generator configuration""" _user_attributes = {DISCOVER_ALL} _not_user_attributes = { "data", # this is called "spead_shared_ram" in source YAML # "enable_vd", # leaving this exposed for initial debugging } _user_methods = {"configure_from_yaml"}
[docs] def __init__( self, driver: ArgsFpgaDriver, map_field_info: typing.Dict[str, ArgsFieldInfo], **kwargs, ): super().__init__(driver, map_field_info, **kwargs) # The _next_poly_buffer value could be more sophisticated # - i.e. use the "not last used", # if that buffer is not currently in use or waiting to be used... # BUT: the software is not sophisticated enough to resume delay updates # after a crash anyway, so we might as well just default to zero in # preparation for a new configuration. # If VD is running without dynamic delays, it will happily continue to do so. self._next_poly_buffer = 0 self.valid_time_buf0 = 0 self.valid_time_buf1 = 0
[docs] def read_time(self) -> float: """Get current data-generator time""" secs1 = self.current_time_s.value while True: nsecs = self.current_time_ns.value secs2 = self.current_time_s.value timeval = secs1 + nsecs * 1e-9 if secs2 == secs1: break # seconds rolled over while reading: reread secs1 = self.current_time_s.value return timeval
@property def configuration(self) -> IclField[str]: """ Get the last written configuration (JSON). If no configuration has been written since software start-up, you will get a silly answer (second half of double buffer). """ last_buf = int(not self._next_poly_buffer) configs = self._get_config_buffer(last_buf) return IclField(type_=str, value=json.dumps(configs, default=dict)) @property def configuration0(self) -> IclField[str]: """Get configuration buffer 0 (JSON).""" configs = self._get_config_buffer(0) return IclField(type_=str, value=json.dumps(configs, default=dict)) @property def configuration1(self) -> IclField[str]: """Get configuration buffer 1 (JSON).""" configs = self._get_config_buffer(1) return IclField(type_=str, value=json.dumps(configs, default=dict)) def _get_config_buffer(self, buffer: int) -> List[Source]: """ Read one configuration buffer. :param buffer: If zero, read the low half of the double buffer. Otherwise, read the second half. """ configs = [] if buffer == 0: config_buf = self.data[:CONFIG_BUF_WORDS] else: # not zero, read second buffer config_buf = self.data[CONFIG_BUF_WORDS:] config_buf.shape = (MAX_CONFIG_SOURCES, SOURCE_CONFIG_WORDS) for registers in config_buf: configs.append(Source.from_registers(registers)) return configs
[docs] def configure(self, sources: List[Source]) -> None: """ Configure all VD data generator sources. Note: Both buffers will be marked as invalid. User expected to call configure_next_delay_polynomials after calling here. :param sources: one source object per VD source """ # mark both config buffers as invalid self.enable_vd = 0 self.valid_time_buf0 = 0 self.valid_time_buf1 = 0 # ensure we zero out any unused sources sources = pad_source_list(sources, MAX_CONFIG_SOURCES) all_registers = np.array( [source.as_registers() for source in sources], dtype=ArgsWordType ).flatten() # load initial configuration into both halves of the double buffer self.data[:CONFIG_BUF_WORDS] = all_registers self.data[CONFIG_BUF_WORDS:] = all_registers
[docs] def configure_from_yaml(self, configuration: dict) -> None: """ Configure the Virtual Digitiser's Data Generator. :param configuration: dictionary created from YAML """ # mark both config buffers as invalid self.enable_vd = 0 self.valid_time_buf0 = 0 self.valid_time_buf1 = 0 # load initial configuration into both halves of the double buffer config_registers = datagen_config(configuration) # config_registers only as long as it needs to be # we trust it won't be too long... self.data[0] = config_registers self.data[CONFIG_BUF_WORDS] = config_registers self.buf0_valid = True
@property def buf0_valid(self) -> IclField[bool]: return IclField(type_=bool, value=bool(self.enable_vd.value & 1)) @buf0_valid.setter def buf0_valid(self, value: bool) -> None: # There's probably some slick way to inject 2x IclFpgaFields instead, # thus making use of the underlying bitfield logic, # or maybe FW devs can configure it as two fields, # but this should work... old_value = self.enable_vd.value if value: self.enable_vd = old_value | 1 else: self.enable_vd = old_value & 0xFFFF_FFFE @property def buf1_valid(self) -> IclField[bool]: return IclField(type_=bool, value=bool(self.enable_vd.value & 2)) @buf1_valid.setter def buf1_valid(self, value: bool) -> None: old_value = self.enable_vd.value if value: self.enable_vd = old_value | 2 else: self.enable_vd = old_value & 0xFFFF_FFFD
[docs] def set_delay_polynomial(self, source: int, polynomial, buffer) -> None: """ Set a delay polynomial for a single source. :param source: 0 indexed entry in spead_shared_ram. :param polynomial: coefficients [c0,c1,c2,c3,c4,c5] delay = c0 + c1*t + ... + c5*t^5 units of nanoseconds per second^[0,1,2,3,4,5] :param buffer: 0/1 (double buffered configuration) """ input_polynomial = np.float64(polynomial) # ensure we have the right number of coefficients polynomial = null_polynomial() poly_len = min(DELAY_POLY_COEFFS, len(input_polynomial)) polynomial[:poly_len] = input_polynomial[:poly_len] # Bytes 0-47 (out of 64 per source) are the delay polynomial. 6x float64 start = source * SOURCE_CONFIG_WORDS + (buffer * CONFIG_BUF_WORDS) end = start + ((8 * DELAY_POLY_COEFFS) / WORD_SIZE) register_values = polynomial.view(ArgsWordType) self.data[start:end] = register_values
[docs] def configure_next_delay_polynomials( self, polynomials: List[List[float]], activation_time: int, data_start_sec: int = 0, data_start_ns: int = 0, ) -> None: """ Configure all delay polynomials for this datagen instance to use at the next activation time. You must supply exactly one polynomial per source, in order. Beware: the number of polynomials supplied is not checked here! :param polynomials: one polynomial per source - see set_delay_polynomial :param activation_time: datagen time at which this set of polynomials will activate :param data_start_sec: vd datagen start seconds, used only after reset :param data_start_ns: vd_datagen start nanoseconds, used only after reset """ for source, polynomial in enumerate(polynomials): self.set_delay_polynomial(source, polynomial, self._next_poly_buffer) if self._next_poly_buffer == 0: self.valid_time_buf0 = activation_time self.buf0_valid = True else: self.valid_time_buf1 = activation_time self.buf1_valid = True # ping-pong between usage of the two buffers self._next_poly_buffer += 1 if self._next_poly_buffer > 1: self._next_poly_buffer = 0 # when there are data-gen start time params, write them to registers if data_start_sec != 0 or data_start_ns != 0: self.start_time_s = data_start_sec self.start_time_ns = data_start_ns