"""This module runs the SDP benchmark codes"""
import os
import logging
import time
import datetime
import shutil
import tarfile
from sdpbenchmarks import imagingiobench
from sdpbenchmarks.utils import pull_image, load_modules, get_project_root
from sdpbenchmarks.exceptions import ExportError, BenchmarkError
from ._version import __version__
_log = logging.LoggerAdapter(logging.getLogger(__name__), {'version': __version__})
[docs]class SdpBenchmarkEngine():
"""SDP BENCHMARKS ENGINE"""
# Required Compilers for benchmarks
REQ_COMPILERS = {
'iotest': 'mpicc',
}
# Required libraries for benchmarks
REQ_DEPENDENCIES = {
'iotest': ['git-lfs', 'h5cc', 'fftw-wisdom', 'cmake'],
}
# Required disk space (in GB) for all benchmarks
DISK_THRESHOLD = 1.0
def __init__(self, config=None):
"""Initialize setup"""
self._bench_queue = config['global']['benchmarks'].copy()
self.selected_benchmarks = config['global']['benchmarks'].copy()
self._config = config['global']
self._config_full = config
self._extra = {}
self._result = {}
self.failures = []
[docs] def start(self):
"""Entrypoint for suite."""
_log.info("Starting SDP Benchmark Engine")
self._extra['start_time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
if self.pre_flight():
_log.info("Pre-flight checks passed successfully.")
self.run()
else:
_log.error("Pre-flight checks failed.")
raise Exception("Pre-flight checks failed")
[docs] def pre_flight(self):
"""Perform pre-flight checks."""
_log.info("Running pre-flight checks")
# Avoid executing commands if they are not valid run modes.
# This avoids injections through the configuration file.
checks = []
# Check if SLURM or OAR is available is running in job submission mode
if self._config['submit_job']:
slurm = shutil.which('sbatch')
oar = shutil.which('oarsub')
if slurm is None and oar is None:
_log.error("Neither SLURM nor OAR scheduler found. Please run in interactive mode")
checks.append(1)
# Check if valid modes are specified in config file
for run_mode in self._config['run_mode']:
if run_mode not in ['singularity', 'bare-metal']:
_log.error("Invalid run mode specified: %s.", self._config['run_mode'])
checks.append(1)
# Load dependencies
module_list = self._config_full['modules']['dep_modules']
load_modules(module_list)
# Check if required compilers, dependencies and container images are available
for bench in self._config['benchmarks']:
if "singularity" in self._config['run_mode']:
_log.info("Singularity mode is specified. Checking for required compilers")
# Search if run mode is installed
system_runmode = shutil.which('singularity')
if system_runmode is not None:
self._config_full['global']['singularity_path'] = system_runmode
_log.info(" - singularity executable found: %s.", system_runmode)
else:
_log.error(" - singularity is not installed in the system")
checks.append(1)
# Check for if image is pullable and get absolute path of the singularity image
self._config_full[bench]['image_path'] = os.path.abspath(os.path.join(
self._config_full[bench]['work_dir'], bench + ".sif"))
if not os.path.isfile(self._config_full[bench]['image_path']):
return_code = pull_image(self._config_full[bench]['image'], "singularity",
self._config_full[bench]['image_path'])
if not return_code:
_log.info(" - singularity image file pulled: %s", self._config_full[bench]
['image_path'])
else:
_log.error("Cannot pull the singularity image file.")
checks.append(1)
if "bare-metal" in self._config['run_mode']:
_log.info("Bare-metal mode is specified. Checking for required compilers")
compiler = self.REQ_COMPILERS[bench]
# Search if compiler is installed
system_compiler = shutil.which(compiler)
if system_compiler is not None:
_log.info(" - %s compiler found: %s.", compiler,
system_compiler)
else:
_log.error(" - %s is not installed in the system.", compiler)
checks.append(1)
dependencies = self.REQ_DEPENDENCIES[bench]
# Check if all dependencies are installed
for dependency in dependencies:
# system_bin = which(dependency, module_list)
system_bin = shutil.which(dependency)
if system_bin is not None:
_log.debug(" - %s is found: %s.", dependency, system_bin)
else:
_log.error(" - %s is not installed in the system.", dependency)
checks.append(1)
_log.info(" - Checking provided work dirs exist...")
os.makedirs(self._config['work_dir'], exist_ok=True)
_log.info(" - Checking if run_dir has enough space...")
disk_stats = shutil.disk_usage(self._config['work_dir'])
disk_space_gb = round(disk_stats.free * (10 ** -9), 2)
_log.debug("Calculated disk space: %s GB", disk_space_gb)
if disk_space_gb <= self.DISK_THRESHOLD:
_log.error("Not enough disk space on %s, free: %s GB, required: %s GB",
self._config['work_dir'], disk_space_gb, self.DISK_THRESHOLD)
checks.append(1)
_log.info(" - Checking if scratch_dir exists...")
os.makedirs(self._config['scratch_dir'], exist_ok=True)
_log.info(" - Checking if scratch_dir has enough space...")
disk_stats = shutil.disk_usage(self._config['scratch_dir'])
disk_space_gb = round(disk_stats.free * (10 ** -9), 2)
self._config_full['global']['avail_scratch_disk_space'] = disk_space_gb
if any(checks):
check = False
else:
check = True
return check
[docs] def run(self):
"""Run the benchmark at the head of _bench_queue and recurse"""
# Check if there are still benchmarks to run
for bench2run in self._bench_queue:
self._extra['start_time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
if bench2run == 'iotest':
# Prepare IO bench
if imagingiobench.prepare_iotest(self._config_full) == 0 and \
imagingiobench.check_iotest_arguments(self._config_full) == 0:
return_code = imagingiobench.run_iotest(self._config_full)
if return_code != 0:
self.failures.append(bench2run)
_log.warning("Imaging IO bench exited with failed runs")
else:
_log.info("Terminated Imaging IO bench with success")
else:
_log.error("Skipping Imaging IO bench due to failed installation")
self.cleanup()
self._extra['end_time'] = time.strftime('%Y-%m-%dT%H:%M:%SZ', time.gmtime())
[docs] def cleanup(self):
"""Run the cleanup phase - collect the results from each benchmark"""
root = get_project_root()
results_dir = os.path.join(root, 'results')
if not os.path.isdir(results_dir):
os.makedirs(results_dir, exist_ok=True)
# Get results from each benchmark
if self._config['export']:
for bench in self.selected_benchmarks:
try:
work_dir = self._config_full['global']['work_dir']
result_dir = self._config_full[bench]['out_dir']
shutil.copy(os.path.join(work_dir, 'ska_sdp_benchmarks.log'),
os.path.join(result_dir))
shutil.copy(os.path.join(work_dir, 'run_config.yml'),
os.path.join(result_dir))
outfile = os.path.join(results_dir, '{}_{}_{}.tar.gz'.
format(self._config['tag'], bench,
datetime.datetime.now().strftime(
"%Y-%m-%d_%H%M")))
_log.info("Exporting *.json, *.log, *.stdout, *.stderr from %s...", result_dir)
with tarfile.open(outfile, 'w:gz') as archive:
# Respect the tree hierarchy on compressing
for _, _, files in os.walk(result_dir):
for name in files:
if name.endswith('.json') or name.endswith('.log') \
or name.endswith('.out') or name.endswith('.yml') or \
name.endswith('.slurm') or name.endswith('.oar'):
archive.add(result_dir,
arcname=os.path.basename(result_dir))
except Exception as err:
_log.exception('Skipping %s because of %s', bench, err)
raise ExportError("Exporting results failed") from err
# Check for workload errors
if len(self.failures) == len(self.selected_benchmarks):
_log.exception('All benchmarks failed!', exc_info=False)
raise BenchmarkError("All benchmarks failed")
elif len(self.failures) > 0:
_log.exception("%s Failed. Please check the logs.", *self.failures, exc_info=False)
raise BenchmarkError("There are failed benchmarks")
else:
_log.info("Successfully completed all requested benchmarks")