Making an animation

Sometimes it can be useful to make an animation to check whether a set of simulations worked, or to help give a demo. This section shows an example of how to use matplotlib and the OSKAR imager from within a loop to make each frame of an animation by iterating over time samples in a Measurement Set. The script below could either be used as-is, or adapted to a more complex use case. Each frame is generated by reading slices of visibility data in Plotter._animate_func, while the remainder of the script sets up the environment using calls to functions in matplotlib.

The script has the following command-line arguments:

usage: animate_ms.py [-h] [--fov_deg FOV_DEG] [--size SIZE] [--fps FPS]
                    [--out OUT] [--title TITLE]
                    MS [MS ...]

Make an animation from one or more Measurement Sets

positional arguments:
MS                 Measurement Set path(s)

optional arguments:
-h, --help         show this help message and exit
--fov_deg FOV_DEG  Field of view to image, in degrees (default: 0.5)
--size SIZE        Image side length, in pixels (default: 256)
--fps FPS          Frames per second in output (default: 10)
--out OUT          Output filename (default: out.mp4)
--title TITLE      Overall figure title (default: )

Download animate_ms.py:

  1#!/usr/bin/env python3
  2"""
  3Generate an animation by stepping through visibility time samples.
  4"""
  5import argparse
  6import copy
  7
  8import matplotlib
  9
 10matplotlib.use("Agg")
 11# pylint: disable=wrong-import-position
 12from mpl_toolkits.axes_grid1 import make_axes_locatable
 13from matplotlib import animation
 14import matplotlib.pyplot as plt
 15import numpy
 16import oskar
 17
 18
 19# pylint: disable=too-many-instance-attributes
 20class Plotter:
 21    """Generate an animation by stepping through visibility time samples."""
 22
 23    def __init__(self):
 24        """Constructor."""
 25        self._artists = ()
 26        self._axes = None
 27        self._base_settings = {}
 28        self._fig = None
 29        self._ms_list = []
 30        self._ms_names = []
 31        self._num_frames = 0
 32        self._title = ""
 33
 34    def animate(
 35        self, imager_settings, ms_names, title="", fps=10, filename="out.mp4"
 36    ):
 37        """Function to generate the animation.
 38
 39        Args:
 40            imager_settings (dict): Base settings for OSKAR imager.
 41            ms_names (list[str]): List of Measurement Sets to image.
 42            title (str): Main figure title.
 43            fps (int): Frames-per-second.
 44            filename (str): Name of output MP4 file.
 45        """
 46        # Store arguments.
 47        self._base_settings = imager_settings
 48        self._ms_names = ms_names
 49        self._title = title
 50        self._ms_list.clear()
 51
 52        # Work out the number of frames to generate.
 53        num_images = len(self._ms_names)
 54        self._num_frames = 0
 55        for i in range(num_images):
 56            ms = oskar.MeasurementSet.open(self._ms_names[i], readonly=True)
 57            num_rows = ms.num_rows
 58            num_stations = ms.num_stations
 59            num_baselines = (num_stations * (num_stations - 1)) // 2
 60            self._num_frames = max(self._num_frames, num_rows // num_baselines)
 61            self._ms_list.append(ms)
 62
 63        # Create the plot panels.
 64        num_cols = num_images
 65        if num_cols > 4:
 66            num_cols = 4
 67        num_rows = (num_images + num_cols - 1) // num_cols
 68        panel_size = 8
 69        if num_images > 1:
 70            panel_size = 5
 71        if num_images > 3:
 72            panel_size = 4
 73        fig_size = (num_cols * panel_size, num_rows * panel_size)
 74        fig, axes = plt.subplots(
 75            nrows=num_rows, ncols=num_cols, squeeze=False, figsize=fig_size
 76        )
 77        self._fig = fig
 78        self._axes = axes.flatten()
 79
 80        # Call the animate function.
 81        anim = animation.FuncAnimation(
 82            self._fig,
 83            self._animate_func,
 84            init_func=self._init_func,
 85            frames=range(0, self._num_frames),
 86            interval=1000.0 / fps,
 87            blit=False,
 88        )
 89
 90        # Save animation.
 91        anim.save(filename, writer="ffmpeg", bitrate=3500)
 92        plt.close(fig=fig)
 93
 94    def _init_func(self):
 95        """Internal initialisation function called by FuncAnimation."""
 96        # Create an empty image.
 97        imsize = self._base_settings["image/size"]
 98        zeros = numpy.zeros((imsize, imsize))
 99        zeros[0, 0] = 1
100
101        # Create list of matplotlib artists that must be updated each frame.
102        artists = []
103
104        # Iterate plot panels.
105        for i in range(len(self._axes)):
106            ax = self._axes[i]
107            im = ax.imshow(zeros, aspect="equal", cmap="gnuplot2")
108            divider = make_axes_locatable(ax)
109            cax = divider.append_axes("right", size="5%", pad=0.05)
110            cbar = plt.colorbar(im, cax=cax)
111            ax.invert_yaxis()
112            ax.axes.xaxis.set_visible(False)
113            ax.axes.yaxis.set_visible(False)
114            if i < len(self._ms_names):
115                ax.set_title(self._ms_names[i])
116            else:
117                cbar.set_ticks([])
118                cbar.set_ticklabels([])
119            artists.append(im)
120
121        # Set figure title.
122        self._fig.suptitle(self._title, fontsize=16, y=0.95)
123
124        # Return tuple of artists to update.
125        self._artists = tuple(artists)
126        return self._artists
127
128    def _animate_func(self, frame):
129        """Internal function called per frame by FuncAnimation.
130
131        Args:
132            frame (int): Frame index.
133        """
134        # Iterate plot panels.
135        num_panels = len(self._ms_list)
136        for i in range(num_panels):
137            # Read the visibility meta data.
138            freq_start_hz = self._ms_list[i].freq_start_hz
139            freq_inc_hz = self._ms_list[i].freq_inc_hz
140            num_channels = self._ms_list[i].num_channels
141            num_stations = self._ms_list[i].num_stations
142            num_rows = self._ms_list[i].num_rows
143            num_baselines = (num_stations * (num_stations - 1)) // 2
144
145            # Read the visibility data and coordinates.
146            start_row = frame * num_baselines
147            if start_row >= num_rows or start_row + num_baselines > num_rows:
148                continue
149            (u, v, w) = self._ms_list[i].read_coords(start_row, num_baselines)
150            vis = self._ms_list[i].read_column(
151                "DATA", start_row, num_baselines
152            )
153            num_pols = vis.shape[-1]
154
155            # Create settings for the imager.
156            params = copy.deepcopy(self._base_settings)
157            settings = oskar.SettingsTree("oskar_imager")
158            settings.from_dict(params)
159
160            # Make the image for this frame.
161            print(
162                "Generating frame %d/%d, panel %d/%d"
163                % (frame + 1, self._num_frames, i + 1, num_panels)
164            )
165            imager = oskar.Imager(settings=settings)
166            imager.set_vis_frequency(freq_start_hz, freq_inc_hz, num_channels)
167            imager.update(
168                u, v, w, vis, end_channel=num_channels - 1, num_pols=num_pols
169            )
170            data = imager.finalise(return_images=1)
171
172            # Update the plot panel and colourbar.
173            self._artists[i].set_data(data["images"][0])
174            self._artists[i].autoscale()
175
176
177def main():
178    """Main function."""
179    parser = argparse.ArgumentParser(
180        description="Make an animation from one or more Measurement Sets",
181        formatter_class=argparse.ArgumentDefaultsHelpFormatter,
182    )
183    parser.add_argument(
184        "ms_names", metavar="MS", nargs="+", help="Measurement Set path(s)"
185    )
186    parser.add_argument(
187        "--fov_deg",
188        type=float,
189        default=0.5,
190        help="Field of view to image, in degrees",
191    )
192    parser.add_argument(
193        "--size", type=int, default=256, help="Image side length, in pixels"
194    )
195    parser.add_argument(
196        "--fps", type=int, default=10, help="Frames per second in output"
197    )
198    parser.add_argument("--out", default="out.mp4", help="Output filename")
199    parser.add_argument("--title", default="", help="Overall figure title")
200    args = parser.parse_args()
201
202    # Imager settings.
203    imager_settings = {"image/fov_deg": args.fov_deg, "image/size": args.size}
204
205    # Make animation.
206    plotter = Plotter()
207    plotter.animate(
208        imager_settings, args.ms_names, args.title, args.fps, args.out
209    )
210
211
212if __name__ == "__main__":
213    main()

Example: Single-station drift scan of Galactic plane

As an example, the following OSKAR parameter file will generate a simulated Measurement Set for a 24-hour drift-scan observation of the Galactic plane using a telescope model consisting of a single 38-metre diameter SKA-Low station of 256 isotropic elements.

Download drift_scan_galaxy.ini:

[General]
app=oskar_sim_interferometer
version=2.8.0

[simulator]
double_precision=false

[sky]
healpix_fits/file=haslam_nside_128.fits
healpix_fits/min_abs_val=30.0

[observation]
mode=Drift scan
start_frequency_hz=1.0e+08
start_time_utc=2000-01-01 09:30:00.0
length=24:00:00.0
num_time_steps=96

[telescope]
input_directory=single_station.tm
pol_mode=Scalar
station_type=Isotropic beam

[interferometer]
ms_filename=drift_scan_galaxy.ms

The animation below was then produced by running the animate_ms.py script with the following command-line arguments using the output Measurement Set:

./animate_ms.py --fov_deg=180 --fps=20 --title="OSKAR drift scan test" --out=drift_scan.mp4 drift_scan_galaxy.ms