# pylint: disable=too-many-arguments,unexpected-keyword-arg
# pylint: disable=too-many-instance-attributes
"""Dataclasses for RFI"""
import logging
from dataclasses import dataclass, field
import h5py
import numpy as np
LOG = logging.getLogger("rfi-logger")
@dataclass
class RFISource:
"""Data for a single RFI Source"""
id: str
source_type: str
central_frequency: float = field(metadata={"unit": "Hz"})
bandwidth: float = field(metadata={"unit": "Hz"})
polarisation: str
# for aircraft and satellite this would change with time
height: float
latitude: float = field(metadata={"unit": "degree"})
longitude: float = field(metadata={"unit": "degree"})
def _get_unit(self, field_name):
# pylint: disable=no-member
return self.__dataclass_fields__[field_name].metadata["unit"]
def __repr__(self):
return (
f"{self.__class__.__name__}("
f"id={self.id}, "
f"source_type={self.source_type}, "
# pylint: disable=line-too-long
f"central_frequency={self.central_frequency}[{self._get_unit('central_frequency')}], " # noqa: E501
f"bandwidth={self.bandwidth}[{self._get_unit('bandwidth')}], "
f"polarisation={self.polarisation}, "
f"height={self.height}, "
f"latitude={self.latitude}[{self._get_unit('latitude')}], "
f"longitude={self.longitude}[{self._get_unit('longitude')}]"
f")"
)
@dataclass
class RFISignal:
"""Data for RFI Signal"""
source_id: str
time: np.ndarray
station_id: np.ndarray # 1D array of strings of all stations
frequency: np.ndarray = field(
metadata={"unit": "Hz"}
) # 1D array of frequency samples
# 3D array: list of data per time per station
# [ [ [...], [...], [...] ], [ [...], [...], [...] ] ]
apparent_power: np.ndarray = field(metadata={"unit": "W/Hz???"})
# coordinates at SKA stations; 2D-arrays, coordinates per time
azimuth: np.ndarray = field(metadata={"unit": "degree"})
elevation: np.ndarray = field(metadata={"unit": "degree"})
distance: np.ndarray = field(metadata={"unit": "m"})
# reshape az, el, dist, into a single array:
# [[[az, el, dist], [az, el, dist]], [[az, el, dist], [az, el, dist]]]
# arr[0][0] --> first time, first station values, arr[0][1]
# --> first time, second station values, etc.
apparent_coordinates: np.ndarray
@dataclass
class DataCubePerSource:
"""DataCube for a single RFI source"""
rfi_source: RFISource = field(default_factory=RFISource)
rfi_signal: RFISignal = field(default_factory=[RFISignal])
def validate_input(self):
"""
Make sure input data belong together.
Both rfi_source and rfi_signal have to refer to the same soure_id
:return valid: bool, True if valid, False if not
message: "" if valid, corresponding error message if not valid
"""
valid = True
message = ""
if self.rfi_signal.source_id != self.rfi_source.id:
valid = False
message = (
"Mismatch between input data source ids. "
"All of the input RFISignal objects "
"have to refer to the same source "
"as the provided input source_id!"
)
return valid, message
def __post_init__(self):
valid, message = self.validate_input()
if not valid:
raise ValueError(message)
[docs]class DataCube:
"""
Class to transform RFI data and save the result in an HDF5 file.
:param times: list of time samples the simulation ran for
:param freqs: list of frequency channels the simulation ran for
:param station_ids: list of station ids that were used in the simulation
:param rmax: maximum distance of SKA station from its array centre
:param station_skip: ...
rmax and station_skip are needed for transferring information from the
propagation script to the visibility simulation script
"""
def __init__(
self,
times: list,
freqs: list,
station_ids: list,
rmax=None,
station_skip=None,
):
self.time_samples = np.array(times).astype("S")
self.frequency_channels = np.array(freqs).astype("f")
self.station_id = np.array(station_ids).astype("S")
self.source_id = np.empty(1, dtype=str).astype("S")
self.source_type = np.empty(1, dtype=str).astype("S")
self.coordinates = np.empty_like(
[None], shape=(1, len(times), len(station_ids), 3), dtype=float
)
self.signal = np.empty_like(
[None],
shape=(1, len(times), len(station_ids), len(freqs)),
dtype=complex,
)
self._number_of_sources = 0
self._rmax = (
rmax # maximum distance of each station from the array centre
)
self._station_skip = station_skip
[docs] def append_data(self, new_rfi_data: DataCubePerSource):
"""
Append data from a DataCubePerSource object to the existing arrays.
:param new_rfi_data: input DataCubePerSource object
containing RFI data for a single source
"""
self.validate_input_data(new_rfi_data)
self.source_id = np.append(
self.source_id, np.array([new_rfi_data.rfi_source.id]), axis=0
).astype("S")
self.source_type = np.append(
self.source_type,
np.array([new_rfi_data.rfi_source.source_type]),
axis=0,
).astype("S")
try:
self.signal[self._number_of_sources] = (
new_rfi_data.rfi_signal.apparent_power.astype("F")
)
self.coordinates[self._number_of_sources] = (
new_rfi_data.rfi_signal.apparent_coordinates.astype("f")
)
except IndexError:
# when a new source is added, the index of the
# array has to be increased by one first
self.coordinates = np.append(
self.coordinates, self.coordinates, axis=0
)
self.signal = np.append(self.signal, self.signal, axis=0)
self.signal[self._number_of_sources] = (
new_rfi_data.rfi_signal.apparent_power.astype("F")
)
self.coordinates[self._number_of_sources] = (
new_rfi_data.rfi_signal.apparent_coordinates.astype("f")
)
self._number_of_sources = self._number_of_sources + 1
if self.source_id[0] == b"":
self.source_id = np.delete(self.source_id, 0, axis=0)
self.source_type = np.delete(self.source_type, 0, axis=0)
def _convert_data_to_hdf5(self, hdf5_file):
for k, _ in self.__dict__.items():
if not k.startswith("_"):
hdf5_file[k] = self.__dict__[k]
# add metadata
hdf5_file["coordinates"].attrs["dimensions"] = [
"Nsources x Ntimes x Nstations x 3(az, el, dist)"
]
hdf5_file["signal"].attrs["dimensions"] = [
"Nsources x Ntimes x Nstations x Nfreqs"
]
hdf5_file["station_id"].attrs[
"max_station_distance_from_centre"
] = self._rmax
hdf5_file["station_id"].attrs["station_skip"] = self._station_skip
[docs] def export_to_hdf5(self, filename):
"""
Save transformed data to HDF5
:param filename: name of output file
"""
with h5py.File(filename, "w") as hdf5_file:
self._convert_data_to_hdf5(hdf5_file)
hdf5_file.flush()
LOG.info("HDF5 file saved with name: %s", filename)
def _get_input():
return input("Continue? (yes, no) ")
[docs]class BeamGainDataCube:
"""
Data Cube to contain Beam Gain information calculated by OSKAR.
:param ra: right ascension of observed source
:param dec: declination of observed source
:param obs_time: time of observation
:param freq_chans: array of frequency channels
:param rfi_ids: array of RFI source IDs
:param nstations: number of SKA stations
"""
def __init__(
self,
ra: float,
dec: float,
obs_time: str,
freq_chans: np.ndarray,
rfi_ids: np.ndarray,
nstations: int,
):
self.right_ascension = ra
self.declination = dec
self.observation_time = obs_time
self.freq_channels = freq_chans
self.rfi_source_ids = rfi_ids.astype("S")
self.num_stations = nstations
self._beam_gain = None
@property
def beam_gain(self):
"""Beam gain value"""
return self._beam_gain
@beam_gain.setter
def beam_gain(self, value):
"""Set beam_gain value"""
if not isinstance(value, np.ndarray):
raise TypeError(
"Provide beam gain values as numpy arrays with dimensions: "
"Nrfi_sources x Nska_stations x Nfreq_channels x 4 (Nstokes)"
)
if self._beam_gain is not None:
LOG.warning(
"Overwriting existing data in %s.beam_gain.",
self.__class__.__name__,
)
cont = _get_input()
if cont.lower() != "yes":
LOG.info("Aborting")
return
value_dimensions = value.shape
if not value_dimensions == (
len(self.rfi_source_ids),
self.num_stations,
len(self.freq_channels),
4,
):
raise ValueError(
"Input data has the wrong dimensions. It needs to match: \n"
# pylint: disable=line-too-long
f"{len(self.rfi_source_ids), len(self.freq_channels), self.num_stations, 4} \n" # noqa: E501
f"[Nrfi_sources x Nska_stations x Nfreq_channels x 4 (Nstokes)]" # noqa: E501
)
self._beam_gain = value
def _convert_data_to_hdf5(self, hdf5_file):
for k, _ in self.__dict__.items():
hdf5_file[k.lstrip("_")] = self.__dict__[k]
hdf5_file["beam_gain"].attrs["dimensions"] = [
"Nrfi_sources x Nska_stations x Nfreq_channels "
"x 4 (stokesI, stokesQ, stokesU, stokesV)"
]
[docs] def export_to_hdf5(self, filename):
"""
Save transformed data to HDF5
:param filename: name of output file
"""
with h5py.File(filename, "w") as hdf5_file:
self._convert_data_to_hdf5(hdf5_file)
hdf5_file.flush()
LOG.info("HDF5 file saved with name: %s", filename)