Source code for scripts.residual_image_simulator

#!/usr/bin/env python3
"""Simulates and images (residual) visibilities concurrently."""

import oskar


[docs]class ResidualImageSimulator(oskar.Interferometer): """ Interferometer simulator which generates both reference and residual visibilities, optionally imaging them. This class inherits ``oskar.Interferometer``, so it requires the same settings parameters. Each visibility block can be imaged if required in the overridden :meth:`process_block()` method. To generate (and image) residual visibilities, two simulations must be run using separate instances of this class: 1. The first run generates the reference visibility data set, which must be saved to an OSKAR visibility data file. 2. The reference visibilities are then subtracted from the visibilities generated in the second run. Dirty/residual images of the visibilities can be returned for either run by specifying an imager to use when constructing the simulator. The following code shows a minimal but complete example of how this class could be used. Note that there are two separate simulators created. .. code-block:: python :linenos: import oskar # Create a 9-by-9 unit-amplitude point-source sky model # at the phase centre. # First, define the phase centre coordinates. ra0_deg = 0 dec0_deg = -30 grid_width_deg = 4.0 # Width of the source grid in degrees. sky = oskar.Sky.generate_grid(ra0_deg, dec0_deg, 9, grid_width_deg) # Create and set up an imager to make the residual images. params_img = { 'image/fov_deg': grid_width_deg + 0.5, 'image/size': 6144, 'image/fft/use_gpu': True } settings_img = oskar.SettingsTree('oskar_imager') settings_img.from_dict(params_img) imager = oskar.Imager(settings=settings_img) # Define base parameters for a simulated observation # using oskar.Interferometer. obs_length_sec = 4 * 3600.0 # 4 hours, in seconds. base_params_sim = { 'observation/start_frequency_hz': 110e6, # One channel at 110 MHz 'observation/phase_centre_ra_deg': ra0_deg, 'observation/phase_centre_dec_deg': dec0_deg, 'observation/num_time_steps': 24, # Simulate 24 correlator dumps 'observation/start_time_utc': get_start_time(ra0_deg, obs_length_sec), 'observation/length': obs_length_sec, 'interferometer/channel_bandwidth_hz': 10e3, 'interferometer/time_average_sec': 1.0, # Ignore w-components only if the sky model allows for it, # with all sources well within the imaged field of view! # (W-smearing will be disabled for all sources.) 'interferometer/ignore_w_components': True } settings_sim = oskar.SettingsTree('oskar_sim_interferometer') settings_sim.from_dict(base_params_sim) # Set the parameters for the reference simulation, including a # reference telescope model, and the output visibility file name. settings_sim['telescope/input_directory'] = '/path/to/reference_telescope_model_folder.tm' settings_sim['interferometer/oskar_vis_filename'] = 'reference_data.vis' # Run the reference simulation. # No image is made at this point, but visibilities are saved to a file. sim = ResidualImageSimulator(settings=settings_sim) sim.set_sky_model(sky) sim.run() # Set the parameters for the comparison simulation, including a new # telescope model. We don't need to save the residual visibilities, # so the output file name is blank. settings_sim['telescope/input_directory'] = '/path/to/comparison_telescope_model_folder.tm' settings_sim['interferometer/oskar_vis_filename'] = '' # Run the comparison simulation. # Note that the pre-configured imager and the filename of the # reference visibility data are both passed in the constructor. sim = ResidualImageSimulator( imager=imager, settings=settings_sim, ref_vis='reference_data.vis') sim.set_sky_model(sky) # The residual image(s) is (are) returned by the run() method. output = sim.run() image = output['images'][0] # Stokes I residual image """
[docs] def __init__(self, imager=None, settings=None, ref_vis=None): """Creates the simulator, storing a handle to the imager. Args: imager (Optional[oskar.Imager]): Imager to use. settings (Optional[oskar.SettingsTree]): Optional settings to use to set up the simulator. ref_vis (Optional[str]): Pathname of reference visibility file. """ oskar.Interferometer.__init__(self, None, settings) self._imager = imager self._ref_vis = ref_vis self._ref_handle = None self._ref_hdr = None self._ref_block = None if ref_vis: (self._ref_hdr, self._ref_handle) = oskar.VisHeader.read(ref_vis) self._ref_block = oskar.VisBlock.create_from_header(self._ref_hdr)
[docs] def finalise(self): """Called automatically by the base class at the end of run().""" oskar.Interferometer.finalise(self) if self._imager and not self.coords_only: num_images = 4 if self._imager.image_type == "Stokes" else 1 return self._imager.finalise(return_images=num_images) return None
[docs] def process_block(self, block, block_index): """Processes the visibility block. Residual visibilities are generated if appropriate by subtracting the corresponding reference visibilities. The (modified) visibility block is also sent to the imager if one was set, and written to any open visibility data files defined in the settings. Args: block (oskar.VisBlock): A handle to the block to be processed. block_index (int): The index of the visibility block. """ if not self.coords_only: if self._ref_vis: # Read the reference visibility data block. self._ref_block.read( self._ref_hdr, self._ref_handle, block_index ) # Subtract reference block from this one. block.cross_correlations()[ : ] -= self._ref_block.cross_correlations()[:] # Write (residual) block to any open files. self.write_block(block, block_index) # Update imager with (residual) block. if self._imager: self._imager.update_from_block(self.vis_header(), block)
[docs] def run(self): """Runs the interferometer simulator and imager, if set. Any images will be returned in an array accessed by the 'images' dictionary key, for example: .. code-block:: python output = sim.run() image = output['images'][0] # Stokes I image. """ # Check if imaging with uniform weighting or W-projection. need_coords_first = False if self._imager: if ( self._imager.weighting == "Uniform" or self._imager.algorithm == "W-projection" ): need_coords_first = True # Simulate coordinates first, if required. if need_coords_first: self.set_coords_only(True) oskar.Interferometer.run(self) self.set_coords_only(False) # Simulate and image the visibilities. return oskar.Interferometer.run(self)
def set_coords_only(self, value): """Calls set_coords_only() on interferometer and imager objects.""" oskar.Interferometer.set_coords_only(self, value) if self._imager: self._imager.set_coords_only(value)