Source code for sdpbenchmarks.utils

"""This module contains the utility functions."""

import os
import sys
import subprocess
import random
import string
import logging
import inspect
import textwrap
import platform
import re
import random
import json
import shutil
import pathlib

from sdpbenchmarks.exceptions import ExecuteCommandError, JobSubmissionError, KeyNotFoundError, \
    JobScriptCreationError

# Add env module to python modules
if 'MODULESHOME' in os.environ.keys():
    try:
        sys.path.index(os.environ['MODULESHOME'] + '/init')
    except ValueError:
        sys.path.insert(0, os.environ['MODULESHOME'] + '/init')
    try:
        from env_modules_python import module
    except ModuleNotFoundError:
        exec(open(os.environ['MODULESHOME'] + '/init/python.py').read())

from ._version import __version__

_log = logging.LoggerAdapter(logging.getLogger(__name__), {'version': __version__})

# lscpu info for several G5k clusters
G5K_CLUSTER = {'dahu': {
    'num_sockets': 2,
    'num_cores': 32,
    'num_threads': 2
},
    'yeti': {
        'num_sockets': 4,
        'num_cores': 64,
        'num_threads': 2
    },
    'graphite': {
        'num_sockets': 2,
        'num_cores': 16,
        'num_threads': 2
    },
    'grimoire': {
        'num_sockets': 2,
        'num_cores': 16,
        'num_threads': 2
    },
    'grisou': {
        'num_sockets': 2,
        'num_cores': 16,
        'num_threads': 2
    },
    'gros': {
        'num_sockets': 1,
        'num_cores': 18,
        'num_threads': 2
    },
}

# Parse job id for different batch schedulers
BATCH_SCHEDULER = {
    'oarsub': ["OAR", r".*OAR_JOB_ID=(\-?\d+)"],
    'sbatch': ["SLURM", r"^Submitted batch job (-?\d+)"],
    'ccc_msub': ["SLURM", r"^Submitted Batch Session (-?\d+)"]
}


[docs]def get_project_root(): """Get root directory of the project Returns: str: Full path of the root directory """ return pathlib.Path(__file__).parent.parent
[docs]def load_modules(module_list): """This function purges the existing modules and loads given modules Args: module_list (str): List of modules to load """ # Load dependencies print(module_list) try: module('purge') module('load', *module_list.split()) except NameError: pass
[docs]def which(cmd, modules): """This function loads the given modules and returns of path of the requested binary if found or None Args: cmd (str): Name of the binary modules (str): modules to load Returns: str: Path of the binary or None if not found """ cmd_str = "export PATH=$HOME/.local/bin:$PATH && which {}".format(cmd) if modules: cmd_str = " ".join(["module load {} &&".format(modules), cmd_str]) try: run_cmd = subprocess.run(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True) cmd_path = run_cmd.stdout.decode("utf-8").rstrip("\n") except subprocess.CalledProcessError: return None return cmd_path
[docs]def pull_image(uri, container_mode, path): """This pulls the image from the registry. It returns error if image is not pullable Args: uri (str): URI of the image container_mode (str): Docker or Singularity container path (str): Path where image needs to be saved (only for singularity). It will overwrite if image already exists. Returns: int: 0 - OK 1 - Not OK """ if container_mode == "docker": if uri.startswith("docker://"): uri = uri.replace("docker://", "") cmd_str = "docker pull {}".format(uri) elif container_mode == "singularity": cmd_str = "singularity pull --force {} {}".format(path, uri) try: subprocess.run(cmd_str, shell=True, capture_output=True, check=True) except subprocess.CalledProcessError as err: _log.exception("Pulling the image from registry failed with error: %s", err) return 1 return 0
[docs]def sweep(parameters): """This method accepts a dict with possible values for each parameter and creates a parameter space to sweep Args: parameters (dict): A dict containing parameters and its values Returns: list: All possible combinations of the parameter space """ result = [{}] for key, val in parameters.items(): if len(val) == 0: continue newresult = [] for i in result: if isinstance(val, dict): for subkey, subval in val.items(): for subcombs in sweep(subval): subresult = i.copy() subresult.update({key: subkey}) subresult.update(subcombs) newresult.append(subresult) else: for j in val: subresult = i.copy() subresult.update({key: j}) newresult.append(subresult) result = newresult for param in result: param['run_prefix'] = "_".join([str(v) for v in param.values()]) return result
[docs]class ParamSweeper(object): """This class is inspired from execo library (http://execo.gforge.inria.fr/doc/latest-stable/) except this is very simplified version of the original. The original one is developed for large scale experiments and thread safety. Here what we are interested is the state of each run that can be tracked and remembered when launching the experiments. """ def __init__(self, persistence_dir, params=None, name=None, randomise=True): """Initialises the class Args: persistence_dir (str): Directory where the meta data of the experiment is stored params (list): A list containing the combinations of parameter space name (str): Name given the sweeper randomise (bool): If True, randomises the order of experiment runs. Useful to minimise the cache effects in successive runs """ self.__persistence_dir = persistence_dir self.__randomise = randomise try: os.makedirs(self.__persistence_dir) except os.error: pass self.__name = name if not self.__name: self.__name = os.path.basename(self.__persistence_dir) self.set_sweeps(params)
[docs] def set_sweeps(self, params=None): """This method sets the sweeps to be performed""" try: with open(os.path.join(self.__persistence_dir, "sweeps"), 'r') as sweeps_file: self.__sweeps = json.load(sweeps_file) for key, value in self.__sweeps.items(): if value['state'] == 'SKIPPED': self.__sweeps[key]['state'] = 'TODO' except: self.__sweeps = {} if params: for param in params: if 'run_prefix' not in param.keys(): param['run_prefix'] = "-".join([value for _, value in param.items()]) if param['run_prefix'] not in self.__sweeps.keys(): self.__sweeps[param['run_prefix']] = {'config': param, 'state': 'TODO'} with open(os.path.join(self.__persistence_dir, "sweeps"), 'w') as sweeps_file: json.dump(self.__sweeps, sweeps_file)
[docs] def get_sweeps(self): """Returns the iterable of what to iterate on""" return self.__sweeps
[docs] def get_remaining(self): """Returns the iterable of remaining to iterate on""" return [self.__sweeps[run_prefix]['config'] for run_prefix, value in self.__sweeps.items() if value['state'] in ['TODO']]
[docs] def get_skipped(self): """Returns the iterable of skipped runs""" return [self.__sweeps[run_prefix]['config'] for run_prefix, value in self.__sweeps.items() if value['state'] in ['SKIPPED']]
[docs] def get_ignored(self): """Returns the iterable of ignored runs""" return [self.__sweeps[run_prefix]['config'] for run_prefix, value in self.__sweeps.items() if value['state'] in ['IGNORED']]
[docs] def get_submitted(self): """Returns the iterable of submitted jobs (when batch scheduler is used)""" return [self.__sweeps[run_prefix]['config'] for run_prefix, value in self.__sweeps.items() if value['state'] in ['SUBMITTED']]
[docs] def get_inprogress(self): """Returns the iterable of runs in progress""" return [self.__sweeps[run_prefix]['config'] for run_prefix, value in self.__sweeps.items() if value['state'] in ['SUBMITTED', 'DONE']]
[docs] def get_done(self): """Returns the iterable of finished runs""" return [self.__sweeps[run_prefix]['config'] for run_prefix, value in self.__sweeps.items() if value['state'] in ['DONE']]
[docs] def get_next(self): """Returns the iterable next run""" remaining_combs = self.get_remaining() num_remaining_combs = len(remaining_combs) if num_remaining_combs >= 1: if self.__randomise: next_comb_indx = random.randint(0, num_remaining_combs - 1) else: next_comb_indx = 0 return remaining_combs[next_comb_indx] else: return None
[docs] def submit(self, combination): """Marks the iterable as submitted""" with open(os.path.join(self.__persistence_dir, "sweeps"), 'r') as sweeps_file: self.__sweeps = json.load(sweeps_file) self.__sweeps[combination['run_prefix']]['state'] = 'SUBMITTED' self.__sweeps[combination['run_prefix']]['config'] = combination with open(os.path.join(self.__persistence_dir, "sweeps"), 'w') as sweeps_file: json.dump(self.__sweeps, sweeps_file)
[docs] def done(self, combination): """Marks the iterable as done""" with open(os.path.join(self.__persistence_dir, "sweeps"), 'r') as sweeps_file: self.__sweeps = json.load(sweeps_file) self.__sweeps[combination['run_prefix']]['state'] = 'DONE' self.__sweeps[combination['run_prefix']]['config'] = combination with open(os.path.join(self.__persistence_dir, "sweeps"), 'w') as sweeps_file: json.dump(self.__sweeps, sweeps_file)
[docs] def skip(self, combination): """Marks the iterable as skipped""" with open(os.path.join(self.__persistence_dir, "sweeps"), 'r') as sweeps_file: self.__sweeps = json.load(sweeps_file) self.__sweeps[combination['run_prefix']]['state'] = 'SKIPPED' with open(os.path.join(self.__persistence_dir, "sweeps"), 'w') as sweeps_file: json.dump(self.__sweeps, sweeps_file)
[docs] def ignore(self, combination): """Marks the iterable as ignored""" with open(os.path.join(self.__persistence_dir, "sweeps"), 'r') as sweeps_file: self.__sweeps = json.load(sweeps_file) self.__sweeps[combination['run_prefix']]['state'] = 'IGNORED' with open(os.path.join(self.__persistence_dir, "sweeps"), 'w') as sweeps_file: json.dump(self.__sweeps, sweeps_file)
[docs]def reformat_long_string(ln_str, width=70): """ This method reformats command string by breaking it into multiple lines. Args: ln_str (str): Long string to break down width (int): Width of each line (Default is 70) Returns: str: Same string in multiple lines to ease readability """ ln_str = textwrap.wrap(ln_str, width=width, drop_whitespace=False, subsequent_indent=' ', break_long_words=False, break_on_hyphens=False) multi_ln_str = [] for iline, line in enumerate(ln_str): if iline < len(ln_str) - 1: line += '\\' multi_ln_str.append(line) multi_ln_str = "\n".join(multi_ln_str) return multi_ln_str
[docs]def get_sockets_cores(conf): """Returns the number of sockets and cores on the compute nodes. For interactive runs, lscpu can be used to grep the info. When using the script to submit jobs from login nodes, lscpu cannot be used and sinfo for a given partition is used. Args: conf (dict): A dict containing configuration settings Returns: list: Number of sockets on each node, number of physical cores on each node (num_socekts * num_cores per socket), number of threads inside each core Raises: KeyNotFoundError: An error occurred if key is not found in g5k dict that contains lscpu info for different clusters """ if not conf['global']['submit_job']: if platform.system() == "Linux": num_sockets = int(exec_cmd("lscpu | grep 'Socket(s)' | awk '{print $2}'"). stdout.strip("\n")) num_cores = num_sockets * int(exec_cmd("lscpu | grep 'Core(s) per socket' | " "awk '{print $4}'").stdout.strip("\n")) num_threads = int(exec_cmd("lscpu | grep 'Thread(s) per core' | awk '{print $4}'"). stdout.strip("\n")) elif platform.system() == "Darwin": num_sockets = int(exec_cmd("system_profiler SPHardwareDataType | " "awk 'FNR == 9{print $4}'").stdout.strip("\n")) num_cores = num_sockets * int(exec_cmd("system_profiler SPHardwareDataType | " "awk 'FNR == 10{print $5}'").stdout.strip("\n")) if exec_cmd("system_profiler SPHardwareDataType | awk 'FNR == 13{print $3}'").\ stdout.strip("\n") == "Enabled": num_threads = 2 else: num_threads = 1 else: if shutil.which("sinfo") is not None: cmd_str = "sinfo --partition={} -o '%20R %10z' | awk 'FNR == 2 {{print $2}}'". \ format(conf['scheduler']['partition']) cmd_out = exec_cmd(cmd_str).stdout.strip("\n").split(":") num_sockets = int(cmd_out[0]) num_cores = num_sockets * int(cmd_out[1]) num_threads = int(cmd_out[2]) else: # Not a pretty way. Need to change by using oarnodes in the future. # Just defensive coding for the moment to avoid failure try: num_sockets, num_cores, num_threads = list(G5K_CLUSTER[conf['scheduler'][ 'partition']].values()) # num_sockets = 1 # num_cores = 6 # num_threads = 2 except KeyError: _log.exception("Resource information not found") raise KeyNotFoundError( "Resource information of {} not found".format(conf['scheduler'][ 'partition'])) \ from KeyError _log.debug("Compute node has %d sockets, %d cores and %d threads per core", num_sockets, num_cores, num_threads) return num_sockets, num_cores, num_threads
[docs]def get_job_status(conf, job_id): """ Returns the status of the batch job Args: conf (dict): A dict containing configuration of batch scheduler job_id (int): ID of the job Returns: str: current status of the job """ if conf['name'] == 'OAR': cmd_str = "oarstat -fj {} | awk 'FNR == 7 {{print $3}}'".format(job_id) run_cmd = subprocess.run(cmd_str, shell=True, capture_output=True, check=True) status = run_cmd.stdout.decode("utf-8").rstrip("\n") _log.debug("OAR Job %d status output: %s", job_id, status) if status == "Error": state = "FAIL" _log.debug("Job %d failed due to %s reason", job_id, status) elif status in ["Waiting", "Running", "Finishing"]: state = "INPROGRESS" _log.debug("Job %d in %s state", job_id, status) elif status == "Terminated": state = "SUCCESS" else: state = "UNKNOWN" elif conf['name'] in ['SLURM', 'TGCC_SLURM']: # squeue cant give the historical status of the jobs. sacct can only give status of running # jobs and historical ones. We combine both to monitor the status of submitted, # running and historical jobs here. cmd_strs = ["squeue -j{} | awk 'FNR == 2 {{print $5}}'".format(job_id), "sacct --format='JobID,state' -j {} " "| awk 'FNR == 3 {{print $2}}'".format(job_id), "scontrol show job {} | awk 'FNR == 4 {{print $1}}' |" " tr '=' '\t' | awk '{{print $2}}'".format(job_id)] status = [] for cmd_str in cmd_strs: run_cmd = subprocess.run(cmd_str, shell=True, capture_output=True, check=True) status.append(run_cmd.stdout.decode("utf-8").rstrip("\n")) _log.debug("SLURM Job %d status output: %s", job_id, [s for s in status if s]) if any(s in ["FAILED", "F", "NODE_FAIL", "NF", "TIMEOUT", "TO", "CANCELLED", "CA", "DEADLINE", "DL", "OUT_OF_MEMORY", "OOM", "SUSPENDED", "S", "BOOT_FAIL", "BF", "STOPPED", "ST"] for s in status): state = "FAIL" elif any(s in ["PENDING", "PD", "RUNNING", "R", "PREEMPTED", "PR", "CONFIGURING", "CF", "COMPLETING", "CG", "RESV_DEL_HOLD", "RD"] for s in status): state = "INPROGRESS" elif any(s in ["COMPLETED", "CD"] for s in status): state = "SUCCESS" else: state = "UNKNOWN ({})".format(status) _log.info("Job %d status: %s", job_id, state) return state
[docs]def write_oar_job_file(conf_oar): """ This method writes a OAR job file to submit with sbatch Args: conf_oar (dict): A dict containing all OAR job parameters Returns: str: Name of the file Raises: JobScriptCreationError: An error occurred in creation of job script """ oar_content = """#!/bin/bash -l #OAR -n sdp-benchmarks #OAR -l host={num_nodes},walltime={res_time} #OAR -p cluster='{partition}' #OAR -O {out_file} #OAR -E {out_file} # GENERATED FILE set -e # Purge previously loaded modules # and load required modules module purge {module_list} # Give a name to the benchmark BENCH_NAME={bench_name} # Directory where executable is WORK_DIR={run_dir} # Output directory OUT_DIR={out_dir} # Any machine specific environment variables that needed to be given. {env_var} # Change to script directory cd $WORK_DIR echo "JobID: $OAR_JOB_ID" echo "Job start time: `date`" echo "Job nodes: $(uniq $OAR_NODEFILE | wc -l)" echo "Running on master node: `hostname`" echo "Current directory: `pwd`" # Job preparation scripts {prep_script} # Any additional step jobs {step_job} echo "Executing the command:" CMD=\"{cmd_str}\" echo $CMD eval $CMD echo "Job finish time: `date`" {wait_cmd} """ script_name = os.path.join(conf_oar['script_dir'], conf_oar['run_prefix'] + ".oar") try: with open(script_name, "w") as oar_f: oar_f.write(inspect.cleandoc(oar_content).format(**conf_oar)) _log.info("OAR script for run %s created.", conf_oar['run_prefix']) except Exception as err: _log.exception("OAR script for %s run cannot be created", conf_oar['run_prefix']) raise JobScriptCreationError("Error in generating OAR script") from err return script_name
[docs]def write_slurm_job_file(conf_slurm): """ This method writes a SLURM job file to submit with sbatch Args: conf_slurm (dict): A dict containing all SLURM job parameters Returns: str: Name of the file Raises: JobScriptCreationError: An error occurred in creation of job script """ slurm_content = """#!/bin/bash #SBATCH --time={res_time} #SBATCH --job-name=sdp-benchmarks #SBATCH --account={account_name} #SBATCH --nodes={num_nodes} #SBATCH --ntasks={num_resources} #SBATCH --partition={partition} #SBATCH --output={out_file} #SBATCH --error={out_file} #SBATCH --mail-type=FAIL #SBATCH --no-requeue #SBATCH --exclusive # GENERATED FILE set -e # Purge previously loaded modules # and load required modules module purge {module_list} # Give a name to the benchmark BENCH_NAME={bench_name} # Directory where executable is WORK_DIR={run_dir} # Any machine specific environment variables that needed to be given. {env_var} # Change to script directory cd $WORK_DIR # Output directory OUT_DIR={out_dir} echo "JobID: $SLURM_JOB_ID" echo "Job start time: `date`" echo "Job num nodes: $SLURM_JOB_NUM_NODES" echo "Running on master node: `hostname`" echo "Current directory: `pwd`" # Job preparation scripts {prep_script} # Any additional step jobs {step_job} echo "Executing the main command:" CMD=\"{cmd_str}\" echo $CMD eval $CMD echo "Job finish time: `date`" {wait_cmd} """ script_name = os.path.join(conf_slurm['script_dir'], conf_slurm['run_prefix'] + ".slurm") try: with open(script_name, "w") as slurm_f: slurm_f.write(inspect.cleandoc(slurm_content).format(**conf_slurm)) _log.info("SLURM script for run %s created.", conf_slurm['run_prefix']) except Exception as err: _log.exception("SLURM script for %s run cannot be created", conf_slurm['run_prefix']) raise JobScriptCreationError("Error in generating SLURM script") from err return script_name
[docs]def write_tgcc_job_file(conf_slurm): """ This method writes a SLURM job file for TGCC Irene machine to submit with ccc_msub Args: conf_slurm (dict): A dict containing all SLURM job parameters Returns: str: Name of the file Raises: JobScriptCreationError: An error occurred in creation of job script """ slurm_content = """#!/bin/bash #MSUB -T {res_time} #MSUB -r sdp-benchmarks #MSUB -A {account_name} #MSUB -m scratch,work,store #MSUB -N {num_nodes} #MSUB -n {num_resources} #MSUB -q {partition} #MSUB -o {out_file} #MSUB -e {out_file} #MSUB -x # GENERATED FILE # set -e # Purge previously loaded modules # and load required modules module purge {module_list} # Give a name to the benchmark BENCH_NAME={bench_name} # Directory where executable is WORK_DIR={run_dir} # Output directory OUT_DIR={out_dir} # Any machine specific environment variables that needed to be given. {env_var} # Change to script directory cd $WORK_DIR echo "JobID: $SLURM_JOB_ID" echo "Job start time: `date`" echo "Job num nodes: $SLURM_JOB_NUM_NODES" echo "Running on master node: `hostname`" echo "Current directory: `pwd`" # Job preparation scripts {prep_script} # Any additional step jobs {step_job} echo "Executing the main command:" CMD=\"{cmd_str}\" echo $CMD eval $CMD echo "Job finish time: `date`" {wait_cmd} """ script_name = os.path.join(conf_slurm['script_dir'], conf_slurm['run_prefix'] + ".sh") try: with open(script_name, "w") as slurm_f: slurm_f.write(inspect.cleandoc(slurm_content).format(**conf_slurm)) _log.info("SLURM script for run %s created.", conf_slurm['run_prefix']) except Exception as err: _log.exception("SLURM script for %s run cannot be created", conf_slurm['run_prefix']) raise JobScriptCreationError("Error in generating SLURM script") from err return script_name
[docs]def execute_job_submission(cmd_str, run_prefix): """This method submits to SLURM job scheduler and returns job ID Args: cmd_str (str): Command string to be submitted run_prefix (str): Prefix of the bench run Returns: int: ID of the submitted job or raises exception in case of failure Raises: JobSubmissionError: An error occurred during the job submission """ scheduler_md = BATCH_SCHEDULER[cmd_str.split()[0]] job_submit_output = exec_cmd(cmd_str).stdout.rstrip("\n") _log.debug("Output of %s job submission: %s", scheduler_md[0], job_submit_output) rgx_match = re.compile(scheduler_md[1]).match(job_submit_output.replace("\n", "\t")) if rgx_match and int(rgx_match.group(1)) > 0: job_id = int(rgx_match.group(1)) _log.info("%s successfully submitted to %s with job ID %d", run_prefix, scheduler_md[0], job_id) else: _log.exception("%s error: Job submission unsuccessful for %s", scheduler_md[0], run_prefix, exc_info=False) raise JobSubmissionError("Job submission to %s failed", scheduler_md[0]) return job_id
[docs]def submit_job(conf, job_id): """This method submits job to the scheduler Args: conf (dict): A dict containing all parameters needed for job submission job_id (int): Job ID of the previous job. In case of dependent jobs, this is necessary Returns: int: ID of the submitted job """ conf['cmd_str'] = reformat_long_string(conf['cmd_str'], width=120) if conf['name'] == 'OAR': script_name = write_oar_job_file(conf) cmd_chmod = " ".join(["chmod +x", script_name]) subprocess.run(cmd_chmod, shell=True, check=True) if conf['job_dependency'] and job_id > 0: cmd_str = " ".join(["oarsub --anterior={} -S".format(job_id), script_name]) else: cmd_str = " ".join(["oarsub -S", script_name]) elif conf['name'] == 'SLURM': script_name = write_slurm_job_file(conf) if conf['job_dependency'] and job_id > 0: cmd_str = " ".join(["sbatch --dependency={}:{}".format(conf['job_dependency'], job_id), script_name]) else: cmd_str = " ".join(["sbatch", script_name]) elif conf['name'] == 'TGCC_SLURM': script_name = write_tgcc_job_file(conf) if conf['job_dependency'] and job_id > 0: cmd_str = " ".join(["ccc_msub -a {}".format(job_id), script_name]) else: cmd_str = " ".join(["ccc_msub", script_name]) job_id = execute_job_submission(cmd_str, conf['run_prefix']) return job_id
[docs]def create_scheduler_conf(conf, param, bench_name): """Prepares a dict with parameters that will create a job submit Args: conf (dict): A dict containing configuration. param (dict): A dict containing all parameters for the run bench_name (str): Name of the benchmark Returns: dict: A dict with parameters that need to submit a job file Raises: KeyNotFoundError: An error occurred while looking for a key in conf or param """ try: scheduler_conf = {'name': conf['scheduler']['name'], 'res_time': conf['scheduler']['req_walltime'], 'account_name': conf['scheduler']['account_name'], 'num_nodes': param['num_nodes'], 'num_resources': param['num_cores'] * param['num_threads'], 'partition': conf['scheduler']['partition'], 'out_file': os.path.join(conf[bench_name]['result_dir'], '{}.out'.format(param['run_prefix'])), 'job_dependency': conf['scheduler']['job_dependency'], 'module_list': "module load {}".format(conf['modules']['dep_modules']) if conf['modules']['dep_modules'] else "", 'bench_name': conf['global']['tag'], 'work_dir': conf[bench_name]['work_dir'], 'out_dir': os.path.join(conf['global']['scratch_dir'], param['run_prefix']), 'run_dir': conf[bench_name]['run_dir'] if param['run_mode'] == 'bare-metal' else conf[bench_name]['work_dir'], 'script_dir': conf[bench_name]['script_dir'], 'env_var': conf['scheduler']['env_var'], 'prep_script': conf['scheduler']['prep_script'], 'step_job': conf['scheduler']['step_job'], 'wait_cmd': "wait" if "&" in conf['scheduler']['step_job'] else "", 'cmd_str': param['cmd_str'], 'run_prefix': param['run_prefix']} except KeyError: raise KeyNotFoundError("Key not found in conf and/or param dict") from KeyError return scheduler_conf
[docs]def standardise_output_data(bench_name, conf, param, metrics): """This method saves all the data of the benchmark run in json format. The aim is to put all the info tp be able to reproduce the run. Args: bench_name (str): Name of the benchmark conf (dict): A dict file containing all configuration info param (dict): A parameter dict file metrics (dict): Dict file containing all the metric data Raises: KeyNotFoundError: An error occurred while looking for a key in conf or param """ try: param_to_save = param.copy() bench_args = param_to_save['bench_args'] bench_args['cmd_str'] = param_to_save['cmd_str'] param_to_save.pop('bench_args') sdp_std_output = { 'benchmark_name': bench_name, 'benchmark_args': bench_args, 'config_info': param_to_save, 'job_scheduler_info': {**conf['scheduler'], 'num_omp_threads': param['num_omp_threads'], 'num_processes': param['num_processes'], 'cmd_str': param['cmd_str'] } if conf['global']['submit_job'] else {}, 'benchmark_metrics_info': metrics } except KeyError: raise KeyNotFoundError("Key not found in conf and/or param dict") from KeyError return sdp_std_output
[docs]def log_failed_cmd_stderr_file(output): """This method dumps the output to a file when command execution fails Args: output (str): stdout and stderr from execution of command Returns: str: Path of the file """ file_name = ''.join(random.choices(string.ascii_letters + string.digits, k=16)) + '.log' file_path = os.path.join("/tmp", file_name) with open(file_path, "w") as f_out: f_out.write(output) return file_path
[docs]def exec_cmd(cmd_str): """This method executes the given command Args: cmd_str (str): Command to execute Returns: A subprocess.run output with stdout, stderr and return code in the object Raises: ExecuteCommandError: An error occurred during execution of command """ try: cmd_out = subprocess.run(cmd_str, shell=True, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, check=True, encoding='utf-8') return cmd_out except subprocess.CalledProcessError as err: err_file_path = log_failed_cmd_stderr_file(err.stdout) _log.exception("Command %s execution failed", cmd_str) raise ExecuteCommandError("Command {} execution failed. Check {} for details".format( cmd_str, err_file_path)) from err
[docs]def execute_command_on_host(cmd_str, out_file): """This method executes the job on host Args: cmd_str (str): Command to execute out_file (str): Name of the stdout/stderr file Raises: ExecuteCommandError: An error occurred during execution of command """ try: with open(out_file, 'w') as f_out: subprocess.run(cmd_str, shell=True, stdout=f_out, stderr=f_out, check=True, encoding='utf-8') f_out.close() except subprocess.CalledProcessError as err: _log.exception("Command %s execution failed on host", cmd_str) raise ExecuteCommandError("Execution of command on host failed. Check {} for " "details".format(out_file)) from err