import copy
import random
import traceback
from temci.build.build_processor import BuildProcessor
from temci.build.builder import Builder, BuildError
from temci.utils.curses import Screen
from temci.utils.sudo_utils import chown
from temci.utils.util import join_strs, in_standalone_mode, parse_timespan
from temci.utils.mail import send_mail
from temci.utils.typecheck import *
from temci.run.run_worker_pool import RunWorkerPool, ParallelRunWorkerPool, AbstractRunWorkerPool
from temci.run.run_driver import RunProgramBlock, BenchmarkingResultBlock, RunDriverRegistry, ExecRunDriver, \
is_perf_available, filter_runs, log_program_error
import temci.run.run_driver_plugin
from temci.report.rundata import RunDataStatsHelper, RunData
from temci.utils.settings import Settings
from temci.report.report_processor import ReporterRegistry
import time, logging, humanfriendly, sys, math, os, click
import typing as t
import yaml
[docs]class RunProcessor:
"""
This class handles the coordination of the whole benchmarking process.
It is configured by setting the settings of the stats and run domain.
Important note: the constructor also setups the cpu sets and plugins that can alter the system,
e.g. confine most processes on only one core. Be sure to call the ``teardown()`` or the
``benchmark()`` method to make the system usable again.
"""
def __init__(self, runs: t.List[dict] = None, append: bool = None, show_report: bool = None):
"""
Creates an instance and setup everything.
:param runs: list of dictionaries that represent run program blocks if None Settings()["run/in"] is used
:param append: append to the old benchmarks if there are any in the result file?
:param show_report: show a short report after finishing the benchmarking?
"""
if runs is None:
typecheck(Settings()["run/in"], ValidYamlFileName(), value_name="run/in")
with open(Settings()["run/in"], "r") as f:
runs = yaml.safe_load(f)
self.runs = runs # type: t.List[dict]
""" List of dictionaries that represent run program blocks """
self.run_blocks = [] # type: t.List[RunProgramBlock]
""" Run program blocks for each dictionary in ``runs```"""
for (id, run) in enumerate(runs):
self.run_blocks.append(RunProgramBlock.from_dict(id, copy.deepcopy(run)))
old_blocks = self.run_blocks
self.run_blocks = filter_runs(self.run_blocks, Settings()["run/included_blocks"])
self.runs = [runs[next(i for i, o in enumerate(old_blocks) if o == b)] for b in self.run_blocks]
self.append = Settings().default(append, "run/append") # type: bool
""" Append to the old benchmarks if there are any in the result file? """
self.show_report = Settings().default(show_report, "run/show_report") # type: bool
""" Show a short report after finishing the benchmarking? """
self.stats_helper = None # type: RunDataStatsHelper
""" Used stats helper to help with measurements """
typecheck(Settings()["run/out"], FileName())
if self.append:
run_data = []
try:
if os.path.exists(Settings()["run/out"]):
with open(Settings()["run/out"], "r") as f:
run_data = yaml.safe_load(f)
self.stats_helper = RunDataStatsHelper.init_from_dicts(run_data, external=True)
for run in runs:
self.stats_helper.runs.append(RunData(attributes=run["attributes"]))
except:
self.teardown()
raise
else:
self.stats_helper = RunDataStatsHelper.init_from_dicts(copy.deepcopy(runs),
included_blocks=Settings()["run/included_blocks"])
#if Settings()["run/remote"]:
# self.pool = RemoteRunWorkerPool(Settings()["run/remote"], Settings()["run/remote_port"])
if os.path.exists(Settings()["run/out"]):
os.remove(Settings()["run/out"])
self.start_time = time.time() # type: float
""" Unix time stamp of the start of the benchmarking """
self.end_time = -1 # type: float
""" Unix time stamp of the point in time that the benchmarking can at most reach """
try:
max_time = parse_timespan(Settings()["run/max_time"])
if max_time > -1:
self.end_time = self.start_time + max_time
except:
self.teardown()
raise
self.pool = None # type: AbstractRunWorkerPool
""" Used run worker pool that abstracts the benchmarking """
if Settings()["run/cpuset/parallel"] == 0:
self.pool = RunWorkerPool(end_time=self.end_time)
else:
self.pool = ParallelRunWorkerPool(end_time=self.end_time)
self.run_block_size = Settings()["run/run_block_size"] # type: int
""" Number of benchmarking runs that are done together """
self.discarded_runs = Settings()["run/discarded_runs"] # type: int
""" First n runs that are discarded """
self.max_runs = Settings()["run/max_runs"] # type: int
""" Maximum number of benchmarking runs """
self.min_runs = Settings()["run/min_runs"] # type: int
""" Minimum number of benchmarking runs """
if self.min_runs > self.max_runs:
logging.warning("min_runs ({}) is bigger than max_runs ({}), therefore they are swapped."
.format(self.min_runs, self.max_runs))
tmp = self.min_runs
self.min_runs = self.max_runs
self.max_runs = tmp
self.shuffle = Settings()["run/shuffle"] # type: bool
""" Randomize the order in which the program blocks are benchmarked. """
self.fixed_runs = Settings()["run/runs"] != -1 # type: bool
""" Do a fixed number of benchmarking runs? """
if self.fixed_runs:
self.min_runs = self.max_runs = self.min_runs = Settings()["run/runs"]
self.store_often = Settings()["run/store_often"] # type: bool
""" Store the result file after each set of blocks is benchmarked """
self.block_run_count = 0 # type: int
""" Number of benchmarked blocks """
self.erroneous_run_blocks = [] # type: t.List[t.Tuple[int, BenchmarkingResultBlock]]
""" List of all failing run blocks (id and results till failing) """
self.discard_all_data_for_block_on_error = Settings()["run/discard_all_data_for_block_on_error"]
self.no_build = Settings()["run/no_build"]
self.only_build = Settings()["run/only_build"]
self.abort_after_build_error = Settings()["run/abort_after_build_error"]
def _finished(self) -> bool:
if not self.pool.has_time_left():
return True
if self.fixed_runs:
return self.block_run_count >= self.max_runs
return (len(self.stats_helper.get_program_ids_to_bench()) == 0 \
or not self._can_run_next_block()) and self.maximum_of_min_runs() <= self.block_run_count
[docs] def maximum_of_min_runs(self) -> int:
return max(list(block.min_runs for block in self.run_blocks) + [self.min_runs])
[docs] def maximum_of_max_runs(self) -> int:
return max(list(block.max_runs for block in self.run_blocks) + [self.max_runs])
def _can_run_next_block(self) -> bool:
if not in_standalone_mode:
estimated_time = self.stats_helper.estimate_time_for_next_round(self.run_block_size,
all=self.block_run_count < self.min_runs)
to_bench_count = len(self.stats_helper.get_program_ids_to_bench())
if -1 < self.end_time < round(time.time() + estimated_time):
logging.warning("Ran too long ({}) and is therefore now aborted. "
"{} program blocks should've been benchmarked again."
.format(humanfriendly.format_timespan(time.time() + estimated_time - self.start_time),
to_bench_count))
return False
if self.block_run_count >= self.maximum_of_max_runs() and self.block_run_count >= self.maximum_of_min_runs():
logging.warning("Benchmarked program blocks too often and aborted therefore now.")
return False
return True
[docs] def build(self):
"""
Build before benchmarking, essentially calls `temci build` where necessary and modifies the run configs
"""
if self.no_build:
return
to_build = [(i, conf) for i, conf in enumerate(self.runs) if "build_config" in conf]
if len(to_build) == 0:
return
logging.info("Start building {} block(s)".format(len(to_build)))
for i, block in to_build:
if "working_dir" not in block["build_config"]:
block["build_config"]["working_dir"] = self.run_blocks[i].data["cwd"]
try:
block = BuildProcessor.preprocess_build_blocks([block])[0]
logging.info("Build {}".format(self.run_blocks[i].description()))
block_builder = Builder(self.run_blocks[i].description(),
block["build_config"]["working_dir"],
block["build_config"]["cmd"], block["build_config"]["revision"],
block["build_config"]["number"],
block["build_config"]["base_dir"], block["build_config"]["branch"])
working_dirs = block_builder.build()
block["cwds"] = working_dirs
self.run_blocks[i].data["cwds"] = working_dirs
except BuildError as err:
self.stats_helper.add_error(i, err.error)
self.erroneous_run_blocks.append((i, BenchmarkingResultBlock(data={}, error=err, recorded_error=err.error)))
if self.abort_after_build_error:
raise err
[docs] def benchmark(self):
"""
Benchmark and teardown.
"""
if self.only_build:
return
try:
show_progress = Settings().has_log_level("info") and \
("exec" != RunDriverRegistry.get_used() or "start_stop" not in ExecRunDriver.get_used())
showed_progress_before = False
discard_label = "Make the {} discarded benchmarks".format(self.discarded_runs)
if self.fixed_runs:
label = "Benchmark {} times".format(self.max_runs)
else:
label = "Benchmark {} to {} times".format(self.min_runs, self.max_runs)
start_label = discard_label if self.discarded_runs > 0 else label
label_format = "{:32s}"
if show_progress:
def bench(run: int) -> bool:
if run < self.discarded_runs:
self._benchmarking_block_run(block_size=1, discard=True)
else:
recorded_run = run - self.discarded_runs
if self._finished() or all(b.max_runs < recorded_run for b in self.run_blocks):
return False
self._benchmarking_block_run(run=recorded_run)
return True
import click
with click.progressbar(range(0, self.max_runs + self.discarded_runs),
label=label_format.format(start_label),
file=None if self.pool.run_driver.runs_benchmarks else open(os.devnull, 'w')) as runs:
discard_label = "Discarded benchmark {{}} out of {}".format(self.discarded_runs)
if self.fixed_runs:
label = "Benchmark {{}} out of {}".format(self.max_runs)
else:
label = "Benchmark {{}} out of {} to {}".format(self.min_runs, self.max_runs)
def alter_label(run: int):
if run < self.discarded_runs:
runs.label = label_format.format(discard_label.format(run + 1))
else:
runs.label = label_format.format(label.format(run - self.discarded_runs + 1))
runs.short_limit = 0
every = Settings()["run/watch_every"]
if Settings()["run/watch"]:
with Screen(scroll=True, keep_first_lines=1) as f:
import click._termui_impl
click._termui_impl.BEFORE_BAR = "\r"
click._termui_impl.AFTER_BAR = "\n"
runs.file = f if self.pool.run_driver.runs_benchmarks else "-"
runs._last_line = ""
def render():
f.reset()
runs._last_line = ""
runs.render_progress()
f.advance_line()
print(ReporterRegistry.get_for_name("console", self.stats_helper).report(
with_tester_results=False, to_string=True), file=f)
for run in runs:
alter_label(run)
f.enable()
render()
if run % every == 0 or True:
f.display()
f.reset()
if not bench(run):
break
f.disable()
runs.finish()
render()
f.display()
else:
alter_label(0)
for run in runs:
alter_label(run)
runs._last_line = ""
runs.render_progress()
if not bench(run):
break
runs.finish()
runs.render_progress()
else:
time_per_run = self._make_discarded_runs()
last_round_time = time.time()
if time_per_run != None:
last_round_time -= time_per_run * self.run_block_size
run = 0
while not self._finished():
self._benchmarking_block_run(run)
run += 1
except BaseException as ex:
logging.error("Forced teardown of RunProcessor")
self.store_and_teardown()
if isinstance(ex, KeyboardInterrupt) and Settings()["log_level"] == "info" and self.block_run_count > 0\
and self.show_report:
self.print_report()
raise
if self.show_report and not Settings()["run/watch"]:
self.print_report()
self.store_and_teardown()
def _benchmarking_block_run(self, block_size: int = None, discard: bool = False, bench_all: bool = None, run: int = None):
block_size = block_size or self.run_block_size
if bench_all is None:
bench_all = self.block_run_count < self.maximum_of_min_runs()
try:
to_bench = list((i, b) for (i, b) in enumerate(self.run_blocks) if self._should_run(b, run))
if not bench_all and self.block_run_count < self.max_runs and not in_standalone_mode:
to_bench = [(i, self.run_blocks[i]) for i in self.stats_helper.get_program_ids_to_bench() if self._should_run(self.run_blocks[i], run)]
to_bench = [(i, b) for (i, b) in to_bench if self.stats_helper.runs[i] is not None and not self.stats_helper.has_error(i)]
if self.shuffle:
random.shuffle(to_bench)
if len(to_bench) == 0:
return
benched = 0
for (id, run_block) in to_bench:
if self.pool.has_time_left() > 0:
benched += 1
self.pool.submit(run_block, id, self.run_block_size)
else:
logging.warn("Ran into timeout")
break
for (block, result, id) in self.pool.results(benched):
if result.error:
self.erroneous_run_blocks.append((id, result))
if self.discard_all_data_for_block_on_error:
self.stats_helper.discard_run_data(id)
if result.recorded_error:
if not self.discard_all_data_for_block_on_error:
self.stats_helper.add_data_block(id, result.data)
self.stats_helper.add_error(id, result.recorded_error)
logging.error("Program block no. {} failed: {}".format(id, result.error))
log_program_error(result.recorded_error)
logging.debug("".join(traceback.format_exception(None, result.error, result.error.__traceback__)))
self.store_erroneous()
if isinstance(result.error, KeyboardInterrupt):
raise result.error
elif not discard:
self.stats_helper.add_data_block(id, result.data)
if not discard:
self.block_run_count += block_size
except BaseException as ex:
#self.store_and_teardown()
#logging.error("Forced teardown of RunProcessor")
raise
if not discard and self.store_often:
self.store()
def _should_run(self, block: RunProgramBlock, run: int = None) -> bool:
return run < block.max_runs if run is not None else not self.stats_helper.has_error(block.id)
def _make_discarded_runs(self) -> t.Optional[int]:
if self.discarded_runs == 0:
return None
start_time = time.time()
self._benchmarking_block_run(block_size=self.discarded_runs, discard=True, bench_all=True)
return (time.time() - start_time) / self.discarded_runs
[docs] def recorded_error(self) -> bool:
return len(self.erroneous_run_blocks) > 0
[docs] def teardown(self):
""" Teardown everything (make the system useable again) """
self.pool.teardown()
[docs] def store_and_teardown(self):
"""
Teardown everything, store the result file, print a short report and send an email
if configured to do so.
"""
self.teardown()
if not self.pool.run_driver.store_files:
return
self.store()
if len(self.stats_helper.valid_runs()) > 0 \
and all(x.benchmarks() > 0 for x in self.stats_helper.valid_runs()):
report = ""
if not in_standalone_mode:
report = ReporterRegistry.get_for_name("console", self.stats_helper)\
.report(with_tester_results=False, to_string=True)
subject = "Finished " + join_strs([repr(run.description()) for run in self.stats_helper.valid_runs()])
send_mail(Settings()["run/send_mail"], subject, report, [Settings()["run/out"]])
if self.recorded_error():
descrs = []
msgs = []
for (i, result) in self.erroneous_run_blocks:
descr = self.run_blocks[i].description()
descrs.append(descr)
msg = descr + ":\n\t" + "\n\t".join(str(result.error).split("\n"))
msgs.append(msg)
subject = "Errors while benchmarking " + join_strs(descrs)
send_mail(Settings()["run/send_mail"], subject, "\n\n".join(msgs), [Settings()["run/in"] + ".erroneous.yaml"])
[docs] def store(self):
""" Store the result file """
try:
self.stats_helper.add_property_descriptions(self.pool.run_driver.get_property_descriptions())
except (IOError, OSError) as ex:
logging.error(ex)
if (len(self.stats_helper.valid_runs()) > 0 and all(x.benchmarks() > 0 for x in self.stats_helper.valid_runs())) \
or Settings()["run/record_errors_in_file"]:
with open(Settings()["run/out"], "w") as f:
self.stats_helper.update_env_info(),
f.write(yaml.dump(self.stats_helper.serialize()))
chown(f)
[docs] def store_erroneous(self):
""" Store the failing program blocks in a file ending with ``.erroneous.yaml``. """
if len(self.erroneous_run_blocks) == 0:
return
file_name = Settings()["run/in"] + ".erroneous.yaml"
try:
blocks = [self.runs[x[0]] for x in self.erroneous_run_blocks]
with open(file_name, "w") as f:
f.write(yaml.dump(blocks))
chown(f)
except IOError as err:
logging.error("Can't write erroneous program blocks to " + file_name)
[docs] def print_report(self) -> str:
if in_standalone_mode:
return
""" Print a short report if possible. """
try:
ReporterRegistry.get_for_name("console", self.stats_helper).report(with_tester_results=False)
except:
pass