"""This module contains the functions to run Imaging IO benchmark."""
import logging
import os
import platform
import time
import subprocess
import re
import math
import json
import statistics
from sdpbenchmarks.utils import ParamSweeper
from sdpbenchmarks.utils import get_project_root, load_modules, submit_job, get_job_status, sweep, \
create_scheduler_conf, get_sockets_cores, exec_cmd, execute_command_on_host, \
standardise_output_data
from sdpbenchmarks.exceptions import ExecuteCommandError, ImagingIOTestError
from ._version import __version__
_log = logging.LoggerAdapter(logging.getLogger(__name__), {'version': __version__})
# Name of the benchmark
BENCH_NAME = 'iotest'
# Git repo of the Imaging IO benchmark
GIT_REPO = "https://gitlab.com/ska-telescope/sdp/ska-sdp-exec-iotest.git"
GIT_REPO_NAME = 'ska-sdp-exec-iotest'
# Number of facets for each image size
NUMBER_OF_FACETS = {
**dict.fromkeys(['tiny', '8k-2k-512'], 2),
**dict.fromkeys(['small', '16k-8k-512'], 9),
**dict.fromkeys(['smallish', '32k-8k-1k'], 25),
**dict.fromkeys(['medium', '64k-16k-1k'], 25),
**dict.fromkeys(['large', '96k-12k-1k'], 81),
**dict.fromkeys(['tremendous', '128k-32k-2k'], 25),
**dict.fromkeys(['huge', '256k-32k-2k'], 81),
'96k-24k-1k': 25,
'128k-32k-1k': 25,
'T05_': 1
}
# Update the dict with new configs saved in rec-set-list.json
DATA_DIR = os.path.join(get_project_root(), "data", BENCH_NAME)
with open(os.path.join(DATA_DIR, "rec-set-list.json"), "r") as f:
NUMBER_OF_FACETS.update(json.load(f))
# Visibility data for each telescope config in GB
GEN_VIS_DATA = {
'lowbd2': 17000,
'midr5': 14000
}
[docs]def create_bench_conf(tag, run_mode, num_nodes, rep, rec_set, vis_set, chunk_sizes):
"""Creates a dict containing the parameters for a given run
Args:
tag (str): Tag of the benchmark run (given at the CLI)
run_mode (str): Mode of the run
num_nodes (int): Number of nodes
rep (int): Number of repetitions
rec_set (str): Image size
vis_set (str): Antennas configuration
chunk_sizes (str): Chunk sizes for time and frequency domains
Returns:
dict: A dict containing all the parameters
"""
time_chunks = []
freq_chunks = []
# Separate time and frequency chunks read from config file
for chunk_str in chunk_sizes:
time_chunk, freq_chunk = chunk_str.split(",")
time_chunks.extend([int(time_chunk)])
freq_chunks.extend([int(freq_chunk)])
# Parameter dict
param_conf = {'tag': [tag],
'run_mode': run_mode,
'num_nodes': num_nodes,
'rec_set': rec_set,
'vis_set': vis_set,
'time_chunk': time_chunks,
'freq_chunk': freq_chunks,
'run_num': list(range(rep))}
return param_conf
[docs]def check_iotest_arguments(conf):
"""Checks the arguments passed in the config file
Args:
conf (dict): A dict containing configuration.
Returns:
int: Error code 0 OK, 1 Not OK
"""
checks = []
bench_args = conf[BENCH_NAME]['iotest_args']
# Check if provided rec-set is in available options
for rec_set in bench_args['rec_set']:
if rec_set not in ['T04', 'T05', 'T05_'] + list(NUMBER_OF_FACETS.keys()):
_log.exception("Unknown option for --rec-set=%s", rec_set, exc_info=False)
checks.append(1)
# Check if provided visibility set is in available options
for vis_set in bench_args['vis_set']:
if vis_set not in ['vlaa', 'lowbd2', 'lowbd2-core', 'lowr3', 'midr5', 'askap', 'lofar']:
_log.exception("Unknown option for --vis-set=%s", vis_set, exc_info=False)
checks.append(1)
# Check SKA1 LOW or MID is specified, check if scratch space has enough data to write
elif vis_set in ["lowbd2", "midr5"] and bench_args['write_vis']:
if conf['global']['avail_scratch_disk_space'] < GEN_VIS_DATA[vis_set]:
_log.exception("Not enough disk space on %s, free: %s GB, required: %s GB",
conf['global']['scratch_dir'],
conf['global']['avail_scratch_disk_space'],
GEN_VIS_DATA[vis_set], exc_info=False)
checks.append(1)
else:
_log.warning(
"You have chosen to run %s configuration. Based on chunk size generated "
"visibility data can be more than %d GB.", vis_set,
GEN_VIS_DATA[vis_set])
if any(checks):
check = 1
else:
check = 0
_log.info("Imaging IO bench arguments check finished.")
return check
[docs]def compile_imaging_iotest(conf):
"""Compiles Imaging IO test by cloning the code from Git
Args:
conf (dict): A dict containing configuration.
Returns:
int: 0 OK, 1 Not OK
Raises:
ImagingIOTestError: An error occurred during compiling of the code
"""
work_dir = conf[BENCH_NAME]['work_dir']
_log.info('Attempting to install Imaging IO Bench.')
# Make directory to install Imaging IO bench
cmd_str = 'mkdir -p {}'.format(work_dir)
exec_cmd(cmd_str)
# Compile Imaging IO bench from source codes
_log.info("Compiling Imaging IO test...")
# Load dependencies
module_list = conf['modules']['dep_modules']
load_modules(module_list)
# Purge previous installs
if os.path.isdir(os.path.join(work_dir, GIT_REPO_NAME)):
exec_cmd("rm -rf {}".format(os.path.join(work_dir, GIT_REPO_NAME)))
# Command string to be executed
cmd_str = " ".join(["cd {} && ".format(work_dir),
"git clone --depth=1 {} && ".format(GIT_REPO),
"cd {} && git lfs pull origin".format(os.path.join(work_dir, GIT_REPO_NAME,
'src')),
"&& mkdir -p {0} && cd {0} && cmake -DCMAKE_C_STANDARD_LIBRARIES='-lm' ../ "
"&& make -j2".format(os.path.join(work_dir, GIT_REPO_NAME, 'build'))])
_log.debug("Command string executed to install Imaging IO bench code is %s", cmd_str)
try:
compile_iotest = exec_cmd(cmd_str)
_log.debug("Output from compiling Imaging IO test is %s",
compile_iotest.stdout.strip("\n"))
except ExecuteCommandError as err:
_log.exception("Installing IO Bench Failed.")
raise ImagingIOTestError("Installing IO Bench Failed.") from err
_log.info('Installation of Imaging IO Bench was successful.')
return 0
[docs]def prepare_iotest(conf):
"""Prepare IO Imaging Benchmark installation.
Args:
conf (dict): A dict containing configuration.
Returns:
int: 0 OK , 1 Not OK
"""
_log.info("Checking if IO imaging benchmark is installed.")
make_new_install = False
check = 0
# Check if bare metal mode is specified in config
if 'bare-metal' in conf['global']['run_mode']:
# The directory where sources are placed
conf[BENCH_NAME]['run_dir'] = os.path.join(conf[BENCH_NAME]['work_dir'],
GIT_REPO_NAME, 'build')
# If the sources are already there, check if there is a working executable
if os.path.isfile(os.path.join(conf[BENCH_NAME]['run_dir'], "iotest")):
_log.info("Found executable of IO imaging benchmark in the system")
else:
_log.info("No existing installation found in the system.")
_log.debug("No installation found at %s", os.path.join(conf[BENCH_NAME]['run_dir']))
make_new_install = True
# If a new installation is required, run install script
if make_new_install:
check = compile_imaging_iotest(conf)
return check
[docs]def get_mpi_args(conf, num_nodes, num_omp_threads, num_processes):
""" Extract all MPI specific arguments and form a string
Args:
conf (dict): A dict containing the config parameters of benchmark
num_nodes (int): Number of nodes
num_omp_threads (int): Number of OpenMP threads
num_processes (int): Number of MPI processes
Returns:
str: A string containing all MPI related arguments
"""
# Override the hostfile value with path to the hostfile
# This is used for Grid'5000 bulk reservations.
if not conf['global']['submit_job']:
if 'host_file' in conf['mpi'].keys():
cmd_str = "uniq $OAR_NODEFILE | head -n {} > {}".format(num_nodes,
os.path.join(conf[BENCH_NAME]
['work_dir'],
"host_file"))
subprocess.run(cmd_str, shell=True, check=True)
conf['mpi']['host_file'] = "--machinefile {}".format(
os.path.join(conf[BENCH_NAME]['work_dir'], "host_file"))
# Get all the MPI arguments
mpi_args = conf['mpi']['mpi_startup']
for key, arg in conf['mpi'].items():
if key != 'mpi_startup':
mpi_args = " ".join([mpi_args, arg])
# Join all MPI arguments to form one string
if conf['mpi']['mpi_startup'] == "mpirun":
mpi_args = " ".join([mpi_args, "--mca mpi_yield_when_idle 1",
"--mca mca_base_env_list "
"'I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0;"
"I_MPI_PIN_DOMAIN=omp:compact;"
"OMP_PROC_BIND=true;"
"OMP_PLACES=sockets;"
"OMP_NUM_THREADS={}'".format(num_omp_threads),
"--tag-output --timestamp-output --map-by node",
"-np {} ".format(num_processes)])
elif conf['mpi']['mpi_startup'] == "srun":
mpi_args = " ".join(["export OMPI_MCA_mpi_yield_when_idle=1 "
"I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 "
"I_MPI_PIN_DOMAIN=omp:compact "
"OMP_PROC_BIND=true "
"OMP_PLACES=sockets "
"OMP_NUM_THREADS={} &&".format(num_omp_threads), mpi_args,
"--distribution=cyclic:cyclic:fcyclic "
"--cpu-bind=verbose,sockets "
"--overcommit --label "
"--nodes={} --ntasks={}".format(num_nodes, num_processes),
"--cpus-per-task={}".format(num_omp_threads)])
elif conf['mpi']['mpi_startup'] == "ccc_mprun":
mpi_args = " ".join(["export OMPI_MCA_mpi_yield_when_idle=1 "
"I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 "
"I_MPI_PIN_DOMAIN=omp:compact "
"OMP_PROC_BIND=true "
"OMP_PLACES=sockets "
"OMP_NUM_THREADS={} &&".format(num_omp_threads), mpi_args,
"-E '--distribution=cyclic:cyclic:fcyclic "
"--overcommit --label' "
"-n {}".format(num_processes),
"-c {}".format(num_omp_threads)])
_log.debug("MPI options provided are %s", mpi_args)
return mpi_args
[docs]def get_telescope_config_settings(param):
""" This function returns the telescope related configurations
Args:
param (dict): Parameter class with all relevant arguments for the run
Returns:
str: String with time, frequency and w-stacking arguments
"""
arg_str = ""
if param['vis_set'] == "lowbd2":
# SKA1 LOW setting
arg_str += " --time -460:460/1024/{}".format(param['time_chunk'])
arg_str += " --freq 260e6:300e6/8192/{}".format(param['freq_chunk'])
elif param['vis_set'] == "midr5":
# SKA1 Mid setting
arg_str += " --time -290:290/4096/{}".format(param['time_chunk'])
arg_str += " --freq 0.35e9:0.4725e9/11264/{}".format(param['freq_chunk'])
if param['vis_set'] in ["lowbd2", "midr5"]:
arg_str += " --target-err=1e-5 --margin=0"
return arg_str
[docs]def get_num_processes(conf, rec_set, num_nodes):
""" Estimates producers, streamers OpenMP threads.
Args:
conf (dict): A dict containing configuration settings
rec_set (str): image parameters
num_nodes (int): number of nodes
Returns:
list: total cpu cores (only physical) on all nodes combined, threads per each core,
number of OpenMP threads, number of producers, number of MPI processes
"""
# Get number of sockets, cores (physical) per node and threads (per each core) on compute nodes
num_sockets, num_cores, num_threads = get_sockets_cores(conf)
# Total number of cores on all nodes combined
num_cores *= num_nodes
# Number of facet workers are equal to number of facets + 1
num_facet_workers = NUMBER_OF_FACETS[rec_set] + 1
# Number of streamers are number of NUMA nodes
num_streamer_workers = num_nodes * num_sockets
# Total number of MPI processes
num_processes = num_facet_workers + num_streamer_workers
# Estimate the number of threads based on physical CPU core count
num_omp_threads = math.ceil(num_cores / num_streamer_workers)
_log.info("Total number nodes, MPI processes and OpenMP threads are %d, %d and %d",
num_nodes, num_processes, num_omp_threads)
_log.debug("Total number streamers are %d and producers are %d",
num_processes - num_facet_workers, num_facet_workers)
return num_cores, num_threads, num_omp_threads, num_facet_workers, num_processes
[docs]def get_command_to_execute_bench(conf, param):
""" This function forms the command string to be executed
Args:
conf (dict): A dictionary containing configuration parameters
param (dict): A Parameter dict with all relevant arguments for the run
Returns:
str: Complete command string
"""
bench_args = conf[BENCH_NAME]['iotest_args']
# Get number of streamers, producers and OpenMP threads
num_cores, num_threads, num_omp_threads, num_facet_workers, num_processes = get_num_processes(
conf, param['rec_set'], param['num_nodes'])
param['num_cores'] = num_cores
param['num_threads'] = num_threads
param['num_omp_threads'] = num_omp_threads
param['num_facet_workers'] = num_facet_workers
param['num_processes'] = num_processes
# Get time and frequency related arguments based on telescope configuration
arg_str = get_telescope_config_settings(param)
# Get all MPI related arguments
mpi_args = get_mpi_args(conf, param['num_nodes'], param['num_omp_threads'],
param['num_processes'])
# Make scratch dir for this specific run. This avoids problems with stray files from
# previous runs
os.makedirs(os.path.join(conf['global']['scratch_dir'], param['run_prefix']), exist_ok=True)
# iotest arguments
bench_args_str = " ".join(["--facet-workers={}".format(num_facet_workers),
"--rec-set={} --vis-set={}".format(param['rec_set'],
param['vis_set']),
"{}".format(arg_str),
"{}".format(bench_args['other_args'])])
# split the argument string and save them to param dict
dict_bench_args = dict((o if len(o) > 1 else (o[0], True) for o in (arg.split("=") for arg in
bench_args_str.split())))
param['bench_args'] = dict_bench_args
if param['run_mode'] == "bare-metal":
cmd_str = " ".join(["{} ./iotest {}".format(mpi_args, bench_args_str)])
if not conf['global']['submit_job']:
cmd_str = " ".join(["cd {} &&".format(conf[BENCH_NAME]['run_dir']), cmd_str])
elif param['run_mode'] == "singularity":
cmd_str = " ".join(["{}".format(mpi_args),
"{}".format(conf['global']['singularity_path']),
"run --env OMP_NUM_THREADS={}".format(num_omp_threads),
"--bind {}".format(os.path.join(conf['global']['scratch_dir'],
param['run_prefix'])),
"--app {}".format(conf[BENCH_NAME]['image_app']),
"{} {}".format(conf[BENCH_NAME]['image_path'],
bench_args_str)])
# If writing visibility, make job depedency a must. We dont want multiple jobs running
# concurrently and writing data to the parallel file system concurrently
if bench_args['write_vis']:
if conf['global']['submit_job']:
conf['scheduler']['job_dependency'] = "afterany"
cmd_str = " ".join([cmd_str, "--writer-count={}".format(bench_args['writer_threads']),
os.path.join(conf['global']['scratch_dir'], param['run_prefix'],
"out%d.h5"),
"&& rm -rf", os.path.join(conf['global']['scratch_dir'],
param['run_prefix'], 'out*.h5')])
param['cmd_str'] = cmd_str
return cmd_str
[docs]def run_iotest(conf):
"""Run Imaging IO test.
Args:
conf (dict): A dict containing configuration.
Returns:
int: < 0 all runs failed, = 0 all runs passed, > 0 few runs passed
Raises:
Exception: An error occurred in building parameter sweep, running or submitting job
"""
result_dir = conf[BENCH_NAME]['result_dir']
json_dir = conf[BENCH_NAME]['json_dir']
_log.debug("Configuration in use for IO benchmark: %s", conf)
# Create a config dict for the runs
param_conf = create_bench_conf(conf['global']['tag'], conf['global']['run_mode'],
conf['global']['num_nodes'], conf['global']['repetitions'],
conf[BENCH_NAME]['iotest_args']['rec_set'],
conf[BENCH_NAME]['iotest_args']['vis_set'],
conf[BENCH_NAME]['iotest_args']['chunk_sizes'])
# Sweep all the parameter space
try:
params = sweep(param_conf)
except Exception as err:
_log.exception("Failed to build parameter sweep")
raise Exception("Failed to build parameter sweep") from err
sweeper = ParamSweeper(persistence_dir=conf['global']['per_dir'], params=params,
randomise=conf['global']['randomise'])
_log.info("%d runs remaining\n", len(sweeper.get_remaining()))
# Initialise the job ID
job_id = -1
# Get all sweeps in the parameter space until none left
while True:
param = sweeper.get_next()
if not param:
_log.info("No more runs.")
break
if conf['global']['ignore_rem']:
_log.info("Ignoring run %s", param['run_prefix'])
sweeper.ignore(param)
continue
_log.info("Current run %s", param['run_prefix'])
try:
# This function forms the command string to be executed
cmd_str = get_command_to_execute_bench(conf, param)
_log.debug("Command to execute is %s", cmd_str)
if conf['global']['submit_job']:
scheduler_conf = create_scheduler_conf(conf, param, BENCH_NAME)
job_id = submit_job(scheduler_conf, job_id)
param['job_id'] = job_id
sweeper.submit(param)
else:
out_file = os.path.join(result_dir, '{}.out'.format(param['run_prefix']))
execute_command_on_host(param['cmd_str'], out_file)
sweeper.done(param)
_log.info("Run/job submission %s done.\n", param['run_prefix'])
except Exception:
sweeper.skip(param)
_log.exception("Run/job submission %s skipped.\n", param['run_prefix'])
# We yield for small time here to give some time to batch scheduler to preprocess all the jobs.
time.sleep(3)
# Extract all metrics data from stdout files
for param in sweeper.get_submitted() + sweeper.get_done():
if conf['global']['submit_job']:
job_status = get_job_status(conf['scheduler'], param['job_id'])
else:
job_status = "SUCCESS"
if job_status == "SUCCESS":
try:
metrics = extract_metrics(os.path.join(result_dir, '{}.out'.
format(param['run_prefix'])),
conf['mpi']['mpi_startup'])
print_key_stats(param['run_prefix'], metrics)
sdp_std_output = standardise_output_data(BENCH_NAME, conf, param, metrics)
with open(os.path.join(json_dir, "{}.json".format(param['run_prefix'])),
"w") as out_file:
json.dump(sdp_std_output, out_file, indent=4)
sweeper.done(param)
except KeyError as err:
# sweeper.skip(param)
_log.exception("Error in parsing %s run due to %s", param['run_prefix'], err)
elif job_status == "FAIL":
sweeper.skip(param)
_log.warning("Run %s failed. Check the job log files.", param['run_prefix'])
if len(sweeper.get_done()) == len(sweeper.get_sweeps()):
_log.info("All runs completed and metrics extracted!")
return 0
elif 0 < len(sweeper.get_skipped()) <= len(sweeper.get_sweeps()):
_log.error("Some or all jobs skipped during submission/execution.")
return 1
else:
_log.info("All current jobs submitted successfully. Rerun the script after job completion "
"to extract metrics.")
return 0
[docs]def print_key_stats(run_prefix, metrics):
"""This prints the key metrics to stdout
Args:
run_prefix (str): Prefix of the run
metrics (dict): A dict containing stats of the Imaging IO bench
"""
print("\n")
print(" *******************************************************************************")
print(" *************************** SKA-SDP Benchmark Suite ***************************")
print(" ************************ Key stats from Imaging IO test ***********************")
print(" *******************************************************************************")
print(" Run run_prefix : {}".format(
run_prefix))
print(" Mean streamer time in sec : {:.3f} +/- {:.3f}".format(
statistics.mean(metrics["Streamer"]["Stream [s]"]) if len(metrics["Streamer"]
["Stream [s]"]) > 1 else
metrics["Streamer"]["Stream [s]"][0],
statistics.stdev(metrics["Streamer"]["Stream [s]"]) if len(metrics["Streamer"]
["Stream [s]"]) > 1 else 0))
print(" Total degrid rate in GFLOPS/sec : {:.3f}".format(
sum(metrics["Streamer"]["Degrid rate [Gflop/s]"])))
print(" Total degrid data rate in GB/sec : {:.3f}".format(
sum(metrics["Streamer"]["Degrid data rate [GB/s]"])))
if "Written data [GB]" in metrics["Writer"].keys():
print(" Total written data in GB : {:.3f}".format(
sum(metrics["Writer"]["Written data [GB]"])))
print(" Total rewritten data in GB : {:.3f}".format(
sum(metrics["Writer"]["Rewritten data [GB]"])))
print(" Average IO bandwidth in GB/sec : {:.3f}".format(
metrics["Writer"]["Avg. IO BW [GB/s]"]))
print(" *******************************************************************************")
print("\n")