Source code for sdpbenchmarks.imagingiobench

"""This module contains the functions to run Imaging IO benchmark."""

import logging
import os
import platform
import time
import subprocess
import re
import math
import json
import statistics

from sdpbenchmarks.utils import ParamSweeper
from sdpbenchmarks.utils import get_project_root, load_modules, submit_job, get_job_status, sweep, \
    create_scheduler_conf, get_sockets_cores, exec_cmd, execute_command_on_host, \
    standardise_output_data
from sdpbenchmarks.exceptions import ExecuteCommandError, ImagingIOTestError

from ._version import __version__

_log = logging.LoggerAdapter(logging.getLogger(__name__), {'version': __version__})

# Name of the benchmark
BENCH_NAME = 'iotest'

# Git repo of the Imaging IO benchmark
GIT_REPO = "https://gitlab.com/ska-telescope/sdp/ska-sdp-exec-iotest.git"
GIT_REPO_NAME = 'ska-sdp-exec-iotest'

# Number of facets for each image size
NUMBER_OF_FACETS = {
    **dict.fromkeys(['tiny', '8k-2k-512'], 2),
    **dict.fromkeys(['small', '16k-8k-512'], 9),
    **dict.fromkeys(['smallish', '32k-8k-1k'], 25),
    **dict.fromkeys(['medium', '64k-16k-1k'], 25),
    **dict.fromkeys(['large', '96k-12k-1k'], 81),
    **dict.fromkeys(['tremendous', '128k-32k-2k'], 25),
    **dict.fromkeys(['huge', '256k-32k-2k'], 81),
    '96k-24k-1k': 25,
    '128k-32k-1k': 25,
    'T05_': 1
}
# Update the dict with new configs saved in rec-set-list.json
DATA_DIR = os.path.join(get_project_root(), "data", BENCH_NAME)
with open(os.path.join(DATA_DIR, "rec-set-list.json"), "r") as f:
    NUMBER_OF_FACETS.update(json.load(f))

# Visibility data for each telescope config in GB
GEN_VIS_DATA = {
    'lowbd2': 17000,
    'midr5': 14000
}


[docs]def create_bench_conf(tag, run_mode, num_nodes, rep, rec_set, vis_set, chunk_sizes): """Creates a dict containing the parameters for a given run Args: tag (str): Tag of the benchmark run (given at the CLI) run_mode (str): Mode of the run num_nodes (int): Number of nodes rep (int): Number of repetitions rec_set (str): Image size vis_set (str): Antennas configuration chunk_sizes (str): Chunk sizes for time and frequency domains Returns: dict: A dict containing all the parameters """ time_chunks = [] freq_chunks = [] # Separate time and frequency chunks read from config file for chunk_str in chunk_sizes: time_chunk, freq_chunk = chunk_str.split(",") time_chunks.extend([int(time_chunk)]) freq_chunks.extend([int(freq_chunk)]) # Parameter dict param_conf = {'tag': [tag], 'run_mode': run_mode, 'num_nodes': num_nodes, 'rec_set': rec_set, 'vis_set': vis_set, 'time_chunk': time_chunks, 'freq_chunk': freq_chunks, 'run_num': list(range(rep))} return param_conf
[docs]def check_iotest_arguments(conf): """Checks the arguments passed in the config file Args: conf (dict): A dict containing configuration. Returns: int: Error code 0 OK, 1 Not OK """ checks = [] bench_args = conf[BENCH_NAME]['iotest_args'] # Check if provided rec-set is in available options for rec_set in bench_args['rec_set']: if rec_set not in ['T04', 'T05', 'T05_'] + list(NUMBER_OF_FACETS.keys()): _log.exception("Unknown option for --rec-set=%s", rec_set, exc_info=False) checks.append(1) # Check if provided visibility set is in available options for vis_set in bench_args['vis_set']: if vis_set not in ['vlaa', 'lowbd2', 'lowbd2-core', 'lowr3', 'midr5', 'askap', 'lofar']: _log.exception("Unknown option for --vis-set=%s", vis_set, exc_info=False) checks.append(1) # Check SKA1 LOW or MID is specified, check if scratch space has enough data to write elif vis_set in ["lowbd2", "midr5"] and bench_args['write_vis']: if conf['global']['avail_scratch_disk_space'] < GEN_VIS_DATA[vis_set]: _log.exception("Not enough disk space on %s, free: %s GB, required: %s GB", conf['global']['scratch_dir'], conf['global']['avail_scratch_disk_space'], GEN_VIS_DATA[vis_set], exc_info=False) checks.append(1) else: _log.warning( "You have chosen to run %s configuration. Based on chunk size generated " "visibility data can be more than %d GB.", vis_set, GEN_VIS_DATA[vis_set]) if any(checks): check = 1 else: check = 0 _log.info("Imaging IO bench arguments check finished.") return check
[docs]def compile_imaging_iotest(conf): """Compiles Imaging IO test by cloning the code from Git Args: conf (dict): A dict containing configuration. Returns: int: 0 OK, 1 Not OK Raises: ImagingIOTestError: An error occurred during compiling of the code """ work_dir = conf[BENCH_NAME]['work_dir'] _log.info('Attempting to install Imaging IO Bench.') # Make directory to install Imaging IO bench cmd_str = 'mkdir -p {}'.format(work_dir) exec_cmd(cmd_str) # Compile Imaging IO bench from source codes _log.info("Compiling Imaging IO test...") # Load dependencies module_list = conf['modules']['dep_modules'] load_modules(module_list) # Purge previous installs if os.path.isdir(os.path.join(work_dir, GIT_REPO_NAME)): exec_cmd("rm -rf {}".format(os.path.join(work_dir, GIT_REPO_NAME))) # Command string to be executed cmd_str = " ".join(["cd {} && ".format(work_dir), "git clone --depth=1 {} && ".format(GIT_REPO), "cd {} && git lfs pull origin".format(os.path.join(work_dir, GIT_REPO_NAME, 'src')), "&& mkdir -p {0} && cd {0} && cmake -DCMAKE_C_STANDARD_LIBRARIES='-lm' ../ " "&& make -j2".format(os.path.join(work_dir, GIT_REPO_NAME, 'build'))]) _log.debug("Command string executed to install Imaging IO bench code is %s", cmd_str) try: compile_iotest = exec_cmd(cmd_str) _log.debug("Output from compiling Imaging IO test is %s", compile_iotest.stdout.strip("\n")) except ExecuteCommandError as err: _log.exception("Installing IO Bench Failed.") raise ImagingIOTestError("Installing IO Bench Failed.") from err _log.info('Installation of Imaging IO Bench was successful.') return 0
[docs]def prepare_iotest(conf): """Prepare IO Imaging Benchmark installation. Args: conf (dict): A dict containing configuration. Returns: int: 0 OK , 1 Not OK """ _log.info("Checking if IO imaging benchmark is installed.") make_new_install = False check = 0 # Check if bare metal mode is specified in config if 'bare-metal' in conf['global']['run_mode']: # The directory where sources are placed conf[BENCH_NAME]['run_dir'] = os.path.join(conf[BENCH_NAME]['work_dir'], GIT_REPO_NAME, 'build') # If the sources are already there, check if there is a working executable if os.path.isfile(os.path.join(conf[BENCH_NAME]['run_dir'], "iotest")): _log.info("Found executable of IO imaging benchmark in the system") else: _log.info("No existing installation found in the system.") _log.debug("No installation found at %s", os.path.join(conf[BENCH_NAME]['run_dir'])) make_new_install = True # If a new installation is required, run install script if make_new_install: check = compile_imaging_iotest(conf) return check
[docs]def get_mpi_args(conf, num_nodes, num_omp_threads, num_processes): """ Extract all MPI specific arguments and form a string Args: conf (dict): A dict containing the config parameters of benchmark num_nodes (int): Number of nodes num_omp_threads (int): Number of OpenMP threads num_processes (int): Number of MPI processes Returns: str: A string containing all MPI related arguments """ # Override the hostfile value with path to the hostfile # This is used for Grid'5000 bulk reservations. if not conf['global']['submit_job']: if 'host_file' in conf['mpi'].keys(): cmd_str = "uniq $OAR_NODEFILE | head -n {} > {}".format(num_nodes, os.path.join(conf[BENCH_NAME] ['work_dir'], "host_file")) subprocess.run(cmd_str, shell=True, check=True) conf['mpi']['host_file'] = "--machinefile {}".format( os.path.join(conf[BENCH_NAME]['work_dir'], "host_file")) # Get all the MPI arguments mpi_args = conf['mpi']['mpi_startup'] for key, arg in conf['mpi'].items(): if key != 'mpi_startup': mpi_args = " ".join([mpi_args, arg]) # Join all MPI arguments to form one string if conf['mpi']['mpi_startup'] == "mpirun": mpi_args = " ".join([mpi_args, "--mca mpi_yield_when_idle 1", "--mca mca_base_env_list " "'I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0;" "I_MPI_PIN_DOMAIN=omp:compact;" "OMP_PROC_BIND=true;" "OMP_PLACES=sockets;" "OMP_NUM_THREADS={}'".format(num_omp_threads), "--tag-output --timestamp-output --map-by node", "-np {} ".format(num_processes)]) elif conf['mpi']['mpi_startup'] == "srun": mpi_args = " ".join(["export OMPI_MCA_mpi_yield_when_idle=1 " "I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 " "I_MPI_PIN_DOMAIN=omp:compact " "OMP_PROC_BIND=true " "OMP_PLACES=sockets " "OMP_NUM_THREADS={} &&".format(num_omp_threads), mpi_args, "--distribution=cyclic:cyclic:fcyclic " "--cpu-bind=verbose,sockets " "--overcommit --label " "--nodes={} --ntasks={}".format(num_nodes, num_processes), "--cpus-per-task={}".format(num_omp_threads)]) elif conf['mpi']['mpi_startup'] == "ccc_mprun": mpi_args = " ".join(["export OMPI_MCA_mpi_yield_when_idle=1 " "I_MPI_JOB_RESPECT_PROCESS_PLACEMENT=0 " "I_MPI_PIN_DOMAIN=omp:compact " "OMP_PROC_BIND=true " "OMP_PLACES=sockets " "OMP_NUM_THREADS={} &&".format(num_omp_threads), mpi_args, "-E '--distribution=cyclic:cyclic:fcyclic " "--overcommit --label' " "-n {}".format(num_processes), "-c {}".format(num_omp_threads)]) _log.debug("MPI options provided are %s", mpi_args) return mpi_args
[docs]def get_telescope_config_settings(param): """ This function returns the telescope related configurations Args: param (dict): Parameter class with all relevant arguments for the run Returns: str: String with time, frequency and w-stacking arguments """ arg_str = "" if param['vis_set'] == "lowbd2": # SKA1 LOW setting arg_str += " --time -460:460/1024/{}".format(param['time_chunk']) arg_str += " --freq 260e6:300e6/8192/{}".format(param['freq_chunk']) elif param['vis_set'] == "midr5": # SKA1 Mid setting arg_str += " --time -290:290/4096/{}".format(param['time_chunk']) arg_str += " --freq 0.35e9:0.4725e9/11264/{}".format(param['freq_chunk']) if param['vis_set'] in ["lowbd2", "midr5"]: arg_str += " --target-err=1e-5 --margin=0" return arg_str
[docs]def get_num_processes(conf, rec_set, num_nodes): """ Estimates producers, streamers OpenMP threads. Args: conf (dict): A dict containing configuration settings rec_set (str): image parameters num_nodes (int): number of nodes Returns: list: total cpu cores (only physical) on all nodes combined, threads per each core, number of OpenMP threads, number of producers, number of MPI processes """ # Get number of sockets, cores (physical) per node and threads (per each core) on compute nodes num_sockets, num_cores, num_threads = get_sockets_cores(conf) # Total number of cores on all nodes combined num_cores *= num_nodes # Number of facet workers are equal to number of facets + 1 num_facet_workers = NUMBER_OF_FACETS[rec_set] + 1 # Number of streamers are number of NUMA nodes num_streamer_workers = num_nodes * num_sockets # Total number of MPI processes num_processes = num_facet_workers + num_streamer_workers # Estimate the number of threads based on physical CPU core count num_omp_threads = math.ceil(num_cores / num_streamer_workers) _log.info("Total number nodes, MPI processes and OpenMP threads are %d, %d and %d", num_nodes, num_processes, num_omp_threads) _log.debug("Total number streamers are %d and producers are %d", num_processes - num_facet_workers, num_facet_workers) return num_cores, num_threads, num_omp_threads, num_facet_workers, num_processes
[docs]def get_command_to_execute_bench(conf, param): """ This function forms the command string to be executed Args: conf (dict): A dictionary containing configuration parameters param (dict): A Parameter dict with all relevant arguments for the run Returns: str: Complete command string """ bench_args = conf[BENCH_NAME]['iotest_args'] # Get number of streamers, producers and OpenMP threads num_cores, num_threads, num_omp_threads, num_facet_workers, num_processes = get_num_processes( conf, param['rec_set'], param['num_nodes']) param['num_cores'] = num_cores param['num_threads'] = num_threads param['num_omp_threads'] = num_omp_threads param['num_facet_workers'] = num_facet_workers param['num_processes'] = num_processes # Get time and frequency related arguments based on telescope configuration arg_str = get_telescope_config_settings(param) # Get all MPI related arguments mpi_args = get_mpi_args(conf, param['num_nodes'], param['num_omp_threads'], param['num_processes']) # Make scratch dir for this specific run. This avoids problems with stray files from # previous runs os.makedirs(os.path.join(conf['global']['scratch_dir'], param['run_prefix']), exist_ok=True) # iotest arguments bench_args_str = " ".join(["--facet-workers={}".format(num_facet_workers), "--rec-set={} --vis-set={}".format(param['rec_set'], param['vis_set']), "{}".format(arg_str), "{}".format(bench_args['other_args'])]) # split the argument string and save them to param dict dict_bench_args = dict((o if len(o) > 1 else (o[0], True) for o in (arg.split("=") for arg in bench_args_str.split()))) param['bench_args'] = dict_bench_args if param['run_mode'] == "bare-metal": cmd_str = " ".join(["{} ./iotest {}".format(mpi_args, bench_args_str)]) if not conf['global']['submit_job']: cmd_str = " ".join(["cd {} &&".format(conf[BENCH_NAME]['run_dir']), cmd_str]) elif param['run_mode'] == "singularity": cmd_str = " ".join(["{}".format(mpi_args), "{}".format(conf['global']['singularity_path']), "run --env OMP_NUM_THREADS={}".format(num_omp_threads), "--bind {}".format(os.path.join(conf['global']['scratch_dir'], param['run_prefix'])), "--app {}".format(conf[BENCH_NAME]['image_app']), "{} {}".format(conf[BENCH_NAME]['image_path'], bench_args_str)]) # If writing visibility, make job depedency a must. We dont want multiple jobs running # concurrently and writing data to the parallel file system concurrently if bench_args['write_vis']: if conf['global']['submit_job']: conf['scheduler']['job_dependency'] = "afterany" cmd_str = " ".join([cmd_str, "--writer-count={}".format(bench_args['writer_threads']), os.path.join(conf['global']['scratch_dir'], param['run_prefix'], "out%d.h5"), "&& rm -rf", os.path.join(conf['global']['scratch_dir'], param['run_prefix'], 'out*.h5')]) param['cmd_str'] = cmd_str return cmd_str
[docs]def run_iotest(conf): """Run Imaging IO test. Args: conf (dict): A dict containing configuration. Returns: int: < 0 all runs failed, = 0 all runs passed, > 0 few runs passed Raises: Exception: An error occurred in building parameter sweep, running or submitting job """ result_dir = conf[BENCH_NAME]['result_dir'] json_dir = conf[BENCH_NAME]['json_dir'] _log.debug("Configuration in use for IO benchmark: %s", conf) # Create a config dict for the runs param_conf = create_bench_conf(conf['global']['tag'], conf['global']['run_mode'], conf['global']['num_nodes'], conf['global']['repetitions'], conf[BENCH_NAME]['iotest_args']['rec_set'], conf[BENCH_NAME]['iotest_args']['vis_set'], conf[BENCH_NAME]['iotest_args']['chunk_sizes']) # Sweep all the parameter space try: params = sweep(param_conf) except Exception as err: _log.exception("Failed to build parameter sweep") raise Exception("Failed to build parameter sweep") from err sweeper = ParamSweeper(persistence_dir=conf['global']['per_dir'], params=params, randomise=conf['global']['randomise']) _log.info("%d runs remaining\n", len(sweeper.get_remaining())) # Initialise the job ID job_id = -1 # Get all sweeps in the parameter space until none left while True: param = sweeper.get_next() if not param: _log.info("No more runs.") break if conf['global']['ignore_rem']: _log.info("Ignoring run %s", param['run_prefix']) sweeper.ignore(param) continue _log.info("Current run %s", param['run_prefix']) try: # This function forms the command string to be executed cmd_str = get_command_to_execute_bench(conf, param) _log.debug("Command to execute is %s", cmd_str) if conf['global']['submit_job']: scheduler_conf = create_scheduler_conf(conf, param, BENCH_NAME) job_id = submit_job(scheduler_conf, job_id) param['job_id'] = job_id sweeper.submit(param) else: out_file = os.path.join(result_dir, '{}.out'.format(param['run_prefix'])) execute_command_on_host(param['cmd_str'], out_file) sweeper.done(param) _log.info("Run/job submission %s done.\n", param['run_prefix']) except Exception: sweeper.skip(param) _log.exception("Run/job submission %s skipped.\n", param['run_prefix']) # We yield for small time here to give some time to batch scheduler to preprocess all the jobs. time.sleep(3) # Extract all metrics data from stdout files for param in sweeper.get_submitted() + sweeper.get_done(): if conf['global']['submit_job']: job_status = get_job_status(conf['scheduler'], param['job_id']) else: job_status = "SUCCESS" if job_status == "SUCCESS": try: metrics = extract_metrics(os.path.join(result_dir, '{}.out'. format(param['run_prefix'])), conf['mpi']['mpi_startup']) print_key_stats(param['run_prefix'], metrics) sdp_std_output = standardise_output_data(BENCH_NAME, conf, param, metrics) with open(os.path.join(json_dir, "{}.json".format(param['run_prefix'])), "w") as out_file: json.dump(sdp_std_output, out_file, indent=4) sweeper.done(param) except KeyError as err: # sweeper.skip(param) _log.exception("Error in parsing %s run due to %s", param['run_prefix'], err) elif job_status == "FAIL": sweeper.skip(param) _log.warning("Run %s failed. Check the job log files.", param['run_prefix']) if len(sweeper.get_done()) == len(sweeper.get_sweeps()): _log.info("All runs completed and metrics extracted!") return 0 elif 0 < len(sweeper.get_skipped()) <= len(sweeper.get_sweeps()): _log.error("Some or all jobs skipped during submission/execution.") return 1 else: _log.info("All current jobs submitted successfully. Rerun the script after job completion " "to extract metrics.") return 0
[docs]def extract_metrics(filename, mpi_startup): """Extract data transfer metrics from benchmark output Args: filename (str): Name of the stdout file to parse mpi_startup (str): Startup command of MPI (mpirun or srun) Returns: dict: A dict containing streamer, producer and writer metrics """ # Note: Picked up from Peter's repo (Too lazy to rewrite same thing again). rank_output = {} out_file = open(filename, 'r') lines = out_file.readlines() # Segregate output based on ranks if mpi_startup == "mpirun": out_tag = re.compile(r"(\[[\d,]*\])(<\w*>:| )") for line in lines: elems = out_tag.split(line) if len(elems) > 3: rank = int(re.findall(r'^\[[0-9]+,([0-9]+)\]*$', elems[1])[0]) if rank not in rank_output: rank_output[rank] = [] rank_output[rank].append(elems[-1]) elif mpi_startup in ["srun", "ccc_mprun"]: out_tag = re.compile(r"([0-9 ]+): (.*)") for line in lines: elems = out_tag.split(line) if len(elems) > 1: try: rank = int(elems[1]) if rank not in rank_output: rank_output[rank] = [] rank_output[rank].append(elems[2] + elems[3]) except ValueError: pass out_file.close() # Determine roles producers = {} producers_pid = {} streamers = {} streamers_pid = {} writers = {} writer_ids = {} role_tag = re.compile(r"(.*) role: (\w*) (\d*)") write_tag = re.compile(r"Writer (\d*):") # The stdout of producers, streamers and writers are added # in their corresponding dicts for rank, output in rank_output.items(): for out in output: match = role_tag.match(out) if match: pid = match.group(1).split(" ")[0] role = match.group(2) role_id = int(match.group(3)) if role == 'Producer': producers[role_id] = output producers_pid[role_id] = pid elif role == 'Streamer': streamers[role_id] = output streamers_pid[role_id] = pid else: print("Unknown role of %s: %s", rank, role) continue match = write_tag.match(out) if match: writer_id = int(match.group(1)) if writer_id not in writers: writers[writer_id] = [] writer_ids[writer_id] = writer_id writers[writer_id].append(out) # Extract timings using regex stream_extracts = [ ("Create [s]", re.compile(r"^done in ([\d\.+-e]*)s")), ("Stream [s]", re.compile(r"^Streamed for ([\d\.+-e]*)s")), ("Received [GB]", re.compile(r"^Received ([\d\.+-e]*) GB")), ("Receiver Wait [s]", re.compile(r"^Receiver: Wait: ([\d\.+-e]*)s")), ("Worker Wait [s]", re.compile(r"^Worker: Wait: ([\d\.+-e]*)s")), ("Worker FFT [s]", re.compile(r"^Worker: .*FFT: ([\d\.+-e]*)s")), ("Worker Degrid [s]", re.compile(r"^Worker: .*Degrid: ([\d\.+-e]*)s")), ("Worker Idle [s]", re.compile(r"^Worker: .*Idle: ([\d\.+-e]*)s")), ("Degrid [Gflops]", re.compile(r"^ degrid ([\d\.+-e]*)")), ("Degrid rate [Gflop/s]", re.compile(r"^ degrid.*\(([\d\.+-e]*)")), ("Degrid data rate [GB/s]", re.compile(r"^ degrid.*\(.* ([\d\.+-e]*) GB/s,")), ("Visibilities", re.compile(r"^ degrid.*\(.*s, ([\d\.+-e]*)/")), ("Visibilities data size [GB]", re.compile(r"^ degrid.*\(.* ([\d\.+-e]*) GB,")), ("Visibility chunks", re.compile(r"^ degrid.*\(.*s,.*s, ([\d\.+-e]*) chunks")), ("Vis RMSE", re.compile(r"^Vis accuracy: RMSE ([\d\.+\-e]*)")), ("Vis worst", re.compile(r"^Vis accuracy: .*worst ([\d\.+-e]*)")), ("Vis samples", re.compile(r"^Vis accuracy: .*\(([\d\.+-e]*) samples\)")), ("Grid RMSE", re.compile(r"^Grid accuracy: RMSE ([\d\.+\-e]*)")), ("Grid worst", re.compile(r"^Grid accuracy: .*worst ([\d\.+-e]*)")), ("Grid samples", re.compile(r"^Grid accuracy: .*\(([\d\.+-e]*) samples\)")), ("Grid wmax RMSE", re.compile(r"^Grid wmax accuracy: RMSE ([\d\.+\-e]*)")), ("Grid wmax worst", re.compile(r"^Grid wmax accuracy: .*worst ([\d\.+-e]*)")), ("Grid wmax samples", re.compile(r"^Grid wmax accuracy: .*\(([\d\.+-e]*) samples\)")), ("FFT flops [Gflops]", re.compile(r"^ FFTs ([\d\.+-e]*) [gG]flop")), ] if len(writers) == 0: writer_extracts = [] else: writer_extracts = [ ("Written data [GB]", re.compile(r"^Writer \d*: .*?written ([\d\.+-e]*) GB")), ("Rewritten data [GB]", re.compile(r"^Writer \d*: .*rewritten ([\d\.+-e]*) GB")), ("Written rate [GB/s]", re.compile(r"^Writer \d*: .*rate ([\d\.+-e]*) GB/s")), ("Writer Wait [s]", re.compile(r"^Writer \d*: Wait: ([\d\.+-e]*)s")), ("Writer Read [s]", re.compile(r"^Writer \d*: .*Read: ([\d\.+-e]*)s")), ("Writer Write [s]", re.compile(r"^Writer \d*: .*Write: ([\d\.+-e]*)s")), ] producer_extracts = [ ("PF1 [s]", re.compile(r"^PF1: ([\d\.+-e]*)")), ("PF1 ops [Gflop]", re.compile(r"^PF1: [\d\.+-e]* s \(([\d\.+-e]*) Gflop")), ("FT1 [s]", re.compile(r"^PF1:.*FT1: ([\d\.+-e]*)")), ("FT1 ops [Gflop]", re.compile(r"^PF1:.*FT1: [\d\.+-e]* s \(([\d\.+-e]*) Gflop")), ("ES1 [s]", re.compile(r"^PF1:.*ES1: ([\d\.+-e]*)")), ("ES1 ops [Gflop]", re.compile(r"^PF1:.*ES1: [\d\.+-e]* s \(([\d\.+-e]*) Gflop")), ("PF2 [s]", re.compile(r"^PF2: ([\d\.+-e]*)")), ("PF2 ops [Gflop]", re.compile(r"^PF2: [\d\.+-e]* s \(([\d\.+-e]*) Gflop")), ("FT2 [s]", re.compile(r"^PF2:.*FT2: ([\d\.+-e]*)")), ("FT2 ops [Gflop]", re.compile(r"^PF2:.*FT2: [\d\.+-e]* s \(([\d\.+-e]*) Gflop")), ("ES2 [s]", re.compile(r"^PF2:.*ES2: ([\d\.+-e]*)")), ("ES2 ops [Gflop]", re.compile(r"^PF2:.*ES2: [\d\.+-e]* s \(([\d\.+-e]*) Gflop")), ("Fill [s]", re.compile(r"^fill: ([\d\.+-e]*)")), ("Assign [s]", re.compile(r"^fill:.*assign wait: ([\d\.+-e]*)")), ("Slot [s]", re.compile(r"^fill:.*slot wait: ([\d\.+-e]*)")), ("Send [s]", re.compile(r"^fill:.*send: ([\d\.+-e]*)")), ("Idle [s]", re.compile(r"^fill:.*idle: ([\d\.+-e]*)")), ] iotest_stats = {} # This dict contains all the stats about different processes producer_stats = {} writer_stats = {} streamer_stats = {} for _, output in producers.items(): for out in output: for name, prod_tag in producer_extracts: prod_match = prod_tag.match(out) if prod_match: if name not in producer_stats: producer_stats[name] = [] producer_stats[name].extend([float(prod_match.group(1))]) producer_stats['nodes'] = list(set(producers_pid.values())) iotest_stats['Producer'] = producer_stats for _, output in streamers.items(): for out in output: for name, streamer_tag in stream_extracts: streamer_match = streamer_tag.match(out) if streamer_match: if name not in streamer_stats: streamer_stats[name] = [] streamer_stats[name].extend([float(streamer_match.group(1))]) streamer_stats['nodes'] = list(set(streamers_pid.values())) iotest_stats['Streamer'] = streamer_stats for _, output in writers.items(): for out in output: for name, writer_tag in writer_extracts: writer_match = writer_tag.match(out) if writer_match: if name not in writer_stats: writer_stats[name] = [] writer_stats[name].extend([float(writer_match.group(1))]) writer_stats['nodes'] = list(set(streamers_pid.values())) iotest_stats['Writer'] = writer_stats if "Written data [GB]" in iotest_stats['Writer'].keys(): iotest_stats['Writer']['Avg. IO BW [GB/s]'] = \ sum(iotest_stats['Writer']['Written data [GB]']) / \ (sum(iotest_stats['Streamer']['Stream [s]']) / len(iotest_stats['Streamer']['Stream [s]'])) # node_list = list(set(streamer_stats['nodes'] + producer_stats['nodes'])) return iotest_stats