Source code for sdpbenchmarks.sdpbenchmarkengine

"""This module runs the SDP benchmark codes"""

import os
import logging
import time
import datetime
import shutil
import tarfile

from sdpbenchmarks import imagingiobench
from sdpbenchmarks.utils import pull_image, load_modules, get_project_root
from sdpbenchmarks.exceptions import ExportError, BenchmarkError
from ._version import __version__

_log = logging.LoggerAdapter(logging.getLogger(__name__), {'version': __version__})


[docs]class SdpBenchmarkEngine(): """SDP BENCHMARKS ENGINE""" # Required Compilers for benchmarks REQ_COMPILERS = { 'iotest': 'mpicc', } # Required libraries for benchmarks REQ_DEPENDENCIES = { 'iotest': ['git-lfs', 'h5cc', 'fftw-wisdom', 'cmake'], } # Required disk space (in GB) for all benchmarks DISK_THRESHOLD = 1.0 def __init__(self, config=None): """Initialize setup""" self._bench_queue = config['global']['benchmarks'].copy() self.selected_benchmarks = config['global']['benchmarks'].copy() self._config = config['global'] self._config_full = config self._extra = {} self._result = {} self.failures = []
[docs] def start(self): """Entrypoint for suite.""" _log.info("Starting SDP Benchmark Engine") self._extra['start_time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) if self.pre_flight(): _log.info("Pre-flight checks passed successfully.") self.run() else: _log.error("Pre-flight checks failed.") raise Exception("Pre-flight checks failed")
[docs] def pre_flight(self): """Perform pre-flight checks.""" _log.info("Running pre-flight checks") # Avoid executing commands if they are not valid run modes. # This avoids injections through the configuration file. checks = [] # Check if SLURM or OAR is available is running in job submission mode if self._config['submit_job']: slurm = shutil.which('sbatch') oar = shutil.which('oarsub') if slurm is None and oar is None: _log.error("Neither SLURM nor OAR scheduler found. Please run in interactive mode") checks.append(1) # Check if valid modes are specified in config file for run_mode in self._config['run_mode']: if run_mode not in ['singularity', 'bare-metal']: _log.error("Invalid run mode specified: %s.", self._config['run_mode']) checks.append(1) # Load dependencies module_list = self._config_full['modules']['dep_modules'] load_modules(module_list) # Check if required compilers, dependencies and container images are available for bench in self._config['benchmarks']: if "singularity" in self._config['run_mode']: _log.info("Singularity mode is specified. Checking for required compilers") # Search if run mode is installed system_runmode = shutil.which('singularity') if system_runmode is not None: self._config_full['global']['singularity_path'] = system_runmode _log.info(" - singularity executable found: %s.", system_runmode) else: _log.error(" - singularity is not installed in the system") checks.append(1) # Check for if image is pullable and get absolute path of the singularity image self._config_full[bench]['image_path'] = os.path.abspath(os.path.join( self._config_full[bench]['work_dir'], bench + ".sif")) if not os.path.isfile(self._config_full[bench]['image_path']): return_code = pull_image(self._config_full[bench]['image'], "singularity", self._config_full[bench]['image_path']) if not return_code: _log.info(" - singularity image file pulled: %s", self._config_full[bench] ['image_path']) else: _log.error("Cannot pull the singularity image file.") checks.append(1) if "bare-metal" in self._config['run_mode']: _log.info("Bare-metal mode is specified. Checking for required compilers") compiler = self.REQ_COMPILERS[bench] # Search if compiler is installed system_compiler = shutil.which(compiler) if system_compiler is not None: _log.info(" - %s compiler found: %s.", compiler, system_compiler) else: _log.error(" - %s is not installed in the system.", compiler) checks.append(1) dependencies = self.REQ_DEPENDENCIES[bench] # Check if all dependencies are installed for dependency in dependencies: # system_bin = which(dependency, module_list) system_bin = shutil.which(dependency) if system_bin is not None: _log.debug(" - %s is found: %s.", dependency, system_bin) else: _log.error(" - %s is not installed in the system.", dependency) checks.append(1) _log.info(" - Checking provided work dirs exist...") os.makedirs(self._config['work_dir'], exist_ok=True) _log.info(" - Checking if run_dir has enough space...") disk_stats = shutil.disk_usage(self._config['work_dir']) disk_space_gb = round(disk_stats.free * (10 ** -9), 2) _log.debug("Calculated disk space: %s GB", disk_space_gb) if disk_space_gb <= self.DISK_THRESHOLD: _log.error("Not enough disk space on %s, free: %s GB, required: %s GB", self._config['work_dir'], disk_space_gb, self.DISK_THRESHOLD) checks.append(1) _log.info(" - Checking if scratch_dir exists...") os.makedirs(self._config['scratch_dir'], exist_ok=True) _log.info(" - Checking if scratch_dir has enough space...") disk_stats = shutil.disk_usage(self._config['scratch_dir']) disk_space_gb = round(disk_stats.free * (10 ** -9), 2) self._config_full['global']['avail_scratch_disk_space'] = disk_space_gb if any(checks): check = False else: check = True return check
[docs] def run(self): """Run the benchmark at the head of _bench_queue and recurse""" # Check if there are still benchmarks to run for bench2run in self._bench_queue: self._extra['start_time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime()) if bench2run == 'iotest': # Prepare IO bench if imagingiobench.prepare_iotest(self._config_full) == 0 and \ imagingiobench.check_iotest_arguments(self._config_full) == 0: return_code = imagingiobench.run_iotest(self._config_full) if return_code != 0: self.failures.append(bench2run) _log.warning("Imaging IO bench exited with failed runs") else: _log.info("Terminated Imaging IO bench with success") else: _log.error("Skipping Imaging IO bench due to failed installation") self.cleanup() self._extra['end_time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
[docs] def cleanup(self): """Run the cleanup phase - collect the results from each benchmark""" root = get_project_root() results_dir = os.path.join(root, 'results') if not os.path.isdir(results_dir): os.makedirs(results_dir, exist_ok=True) # Get results from each benchmark if self._config['export']: for bench in self.selected_benchmarks: try: work_dir = self._config_full['global']['work_dir'] result_dir = self._config_full[bench]['out_dir'] shutil.copy(os.path.join(work_dir, 'ska_sdp_benchmarks.log'), os.path.join(result_dir)) shutil.copy(os.path.join(work_dir, 'run_config.yml'), os.path.join(result_dir)) outfile = os.path.join(results_dir, '{}_{}_{}.tar.gz'. format(self._config['tag'], bench, datetime.datetime.now().strftime( "%Y-%m-%d_%H%M"))) _log.info("Exporting *.json, *.log, *.stdout, *.stderr from %s...", result_dir) with tarfile.open(outfile, 'w:gz') as archive: # Respect the tree hierarchy on compressing for _, _, files in os.walk(result_dir): for name in files: if name.endswith('.json') or name.endswith('.log') \ or name.endswith('.out') or name.endswith('.yml') or \ name.endswith('.slurm') or name.endswith('.oar'): archive.add(result_dir, arcname=os.path.basename(result_dir)) except Exception as err: _log.exception('Skipping %s because of %s', bench, err) raise ExportError("Exporting results failed") from err # Check for workload errors if len(self.failures) == len(self.selected_benchmarks): _log.exception('All benchmarks failed!', exc_info=False) raise BenchmarkError("All benchmarks failed") elif len(self.failures) > 0: _log.exception("%s Failed. Please check the logs.", *self.failures, exc_info=False) raise BenchmarkError("There are failed benchmarks") else: _log.info("Successfully completed all requested benchmarks")