from temci.utils.number import FNumber
import yaml
import copy
import os, logging
import click
from temci.utils.typecheck import Obsolete
from temci.utils.util import recursive_exec_for_leafs, Singleton, sphinx_doc
from temci.utils.typecheck import *
import multiprocessing
import typing as t
[docs]def ValidCPUCoreNumber() -> Int:
"""
Creates a Type instance that matches all valid CPU core numbers.
"""
return Int(range=range(0, multiprocessing.cpu_count()))
[docs]class SettingsError(ValueError):
""" Error raised if something with the settings goes wrong """
pass
[docs]class Settings(metaclass=Singleton):
"""
Manages the Settings.
The settings keys and sub keys are combined by a slash, e.g. "report/in".
The current settings are:
.. code: yaml
"""
config_file_name = "temci.yaml" # type: str
""" Default name of the configuration files """
type_scheme = Dict({
"settings": Str() // Description("Additional settings file")
// Default(config_file_name if os.path.exists(config_file_name) else "")
// CompletionHint(zsh=YAML_FILE_COMPLETION_HINT),
"config": Str() // Description("Alias for settings")
// Default(config_file_name if os.path.exists(config_file_name) else "")
// CompletionHint(zsh=YAML_FILE_COMPLETION_HINT),
"tmp_dir": Str() // Default("/tmp/temci") // Description("Used temporary directory"),
"log_level": ExactEither("debug", "info", "warn", "error", "quiet") // Default("info")
// Description("Logging level"),
"stats": Dict({
"properties": ListOrTuple(Str()) // Default(["all"])
// CompletionHint(zsh="(" + " ".join(["__ov-time", "cache-misses", "cycles", "task-clock",
"instructions", "branch-misses", "cache-references",
"all"])
+ ")")
// Description("Properties to use for reporting and null hypothesis tests, "
"can be regular expressions"),
"uncertainty_range": Tuple(Float(lambda x: x >= 0), Float(lambda x: x >= 0)) // Default([0.05, 0.15])
// Description("Range of p values that allow no conclusion.")
}, unknown_keys=True),
"report": Dict({
# "reporter": Str() // Default("console") // Description(),
"in": Either(Str(), ListOrTuple(Str())) // Default("run_output.yaml")
// Description("Files that contain the benchmarking results")
// CompletionHint(zsh=YAML_FILE_COMPLETION_HINT),
"excluded_properties": ListOrTuple(Str()) // Default(["__ov-time"])
// Description("Properties that aren't shown in the report."),
"exclude_invalid": BoolOrNone() // Default(True)
// Description("Exclude all data sets that contain only NaNs."),
"long_properties": BoolOrNone() // Default(False)
// Description("Replace the property names in reports with longer more descriptive versions?"),
"xkcd_like_plots": BoolOrNone() // Default(False)
// Description("Produce xkcd like plots (requires the humor sans font to be installed)"),
"number": FNumber.settings_format,
"included_blocks": ListOrTuple(Str()) // Default(["all"])
// Description("List of included run blocks (all: include all), "
"identified by their description or tag attribute, "
"can be regular expressions"),
}, unknown_keys=True),
"run": Dict({
"discarded_runs": NaturalNumber() // Description("First n runs that are discarded") // Default(1),
"min_runs": NaturalNumber() // Default(20) // Description("Minimum number of benchmarking runs"),
"max_runs": NaturalNumber() // Default(100) // Description("Maximum number of benchmarking runs"),
"max_runs_per_tag": Dict(unknown_keys=True, key_type=Str() // Description("Tag"), value_type=NaturalNumber() // Description("Max runs"))
// Default({}) // Description("Maximum runs per tag (block attribute 'tag'), min('max_runs', 'per_tag') is used"),
"min_runs_per_tag": Dict(unknown_keys=True, key_type=Str() // Description("Tag"),
value_type=NaturalNumber() // Description("Min runs"))
// Default({}) // Description(
"Minimum runs per tag (block attribute 'tag'), max('min_runs', 'per_tag') is used"),
"runs_per_tag": Dict(unknown_keys=True, key_type=Str() // Description("Tag"),
value_type=NaturalNumber() // Description("Runs"))
// Default({}) // Description(
"Runs per tag (block attribute 'tag'), max('runs', 'per_tag') is used"),
"runs": Int(lambda x: x >= -1) // Default(-1) // Description("if != -1 sets max and min runs to its value"),
"max_time": ValidTimeSpan() // Default("-1") // Description("Maximum time the whole benchmarking should take, "
"-1 == no timeout, supports normal time span expressions"), # in seconds
"max_block_time": ValidTimeSpan() // Default("-1") // Description("Maximum time one run block should take, "
"-1 == no timeout, supports normal time span expressions"),
"run_block_size": PositiveInt() // Default(1)
// Description("Number of benchmarking runs that are done together"),
"in": Str() // Default("input.exec.yaml")
// Description("Input file with the program blocks to benchmark")
// CompletionHint(zsh=YAML_FILE_COMPLETION_HINT),
"out": Str() // Default("run_output.yaml") // Description("Output file for the benchmarking results")
// CompletionHint(zsh=YAML_FILE_COMPLETION_HINT),
"store_often": Bool() // Default(False)
// Description("Store the result file after each set of blocks is benchmarked"),
"exec_plugins": Dict({
}),
"included_blocks" : ListOrTuple(Str()) // Default(["all"])
// Description("List of included run blocks (all: include all), "
"or their tag attribute "
"or their number in the file (starting with 0), "
"can be regular expressions"),
"cpuset": Dict({
"active": Bool() // Description("Use cpuset functionality?") // Default(False),
"base_core_number": ValidCPUCoreNumber()
// Description("Number of cpu cores for the base (remaining part of the) system") // Default(1),
"parallel": Int(lambda x: x >= -1) // Description("0: benchmark sequential, "
"> 0: benchmark parallel with n instances, "
"-1: determine n automatically") // Default(0),
"sub_core_number": ValidCPUCoreNumber() // Description("Number of cpu cores per parallel running program.")
// Default(1),
"temci_in_base_set": Bool() // Default(True)
// Description("place temci in the same cpu set as the rest of the system?")
}),
"disable_hyper_threading": Bool() // Default(False)
// Description("Disable the hyper threaded cores. Good for cpu bound programs."),
"show_report": Bool() // Default(True)
// Description("Print console report if log_level=info"),
"append": Bool() // Default(False)
// Description("Append to the output file instead of overwriting by adding new run data blocks"),
"shuffle": Bool() // Default(True) // Description("Randomize the order in which the program blocks are "
"benchmarked."),
"send_mail": Str() // Default("")
// Description("If not empty, recipient of a mail after the benchmarking finished."),
"discard_all_data_for_block_on_error": Bool() // Default(False)
// Description("Discard all run data for the failing program on error"),
"record_errors_in_file": Bool() // Default(True)
// Description("Record the caught errors in the run_output file"),
"no_build": Bool() // Default(False)
// Description("Do not build if build configs are present, only works if the working directory "
"of the blocks does not change"),
"only_build": Bool() // Default(False) // Description("Only build"),
"abort_after_build_error": Bool() // Default(True)
// Description("Abort after the first failing build"),
"watch": Bool() // Default(False) // Description("Show the report continuously"),
"watch_every": PositiveInt() // Default(1)
// Description("Update the screen nth run (less updates are better for benchmarks)")
}),
"build": Dict({
"in": Str() // Default("build.yaml") // Description("Input file with the program blocks to build")
// CompletionHint(zsh=YAML_FILE_COMPLETION_HINT),
"out": Str() // Default("run_config.yaml") // Description("Resulting run config file"),
"threads": PositiveInt() // Default(1) // Description("Number of simultaneous builds for a specific "
"program block, only makes sense when build_config/number > 1, "
"and if the build commands create a different binary every "
"time they are run"),
"rand": Obsolete("Removed builder randomization", "0.8") // Description("Obsolete randomization configuration")
}, unknown_keys=True),
"package": Obsolete("Removed temci package", "0.8"),
"env": Dict({"USER": Str(), "PATH": Str()}, unknown_keys=True)
// Default({"USER": "", "PATH": ""})
// Description("Environment variables for the benchmarked programs, includes the user used for "
"generated files"),
"sudo": Bool() // Default(False) // Description("Acquire sudo privileges and run benchmark programs with "
"non-sudo user. Only supported on the command line.")
}, unknown_keys=True) # type: Dict
""" Type scheme of the settings """
def __init__(self):
"""
Initializes a Settings singleton object and thereby loads the Settings files.
It loads the settings files from the app folder (config.yaml) and
the current working directory (temci.yaml) if they exist.
:raises: SettingsError if some of the settings aren't in the format described via the type_scheme class property
"""
self.prefs = copy.deepcopy(self.type_scheme.get_default()) # type: t.Dict[str, t.Any]
""" The set sonfigurations """
res = self._validate_settings_dict(self.prefs, "default settings")
if not res:
raise SettingsError(str(res))
self._setup()
[docs] def load_files(self):
""" Loads the configuration files from the current and the config directory """
self.load_from_config_dir()
self.load_from_current_dir()
self._setup()
def _setup(self):
"""
Simple setup method that checks if basic directories exist and creates them if necessary.
"""
if not os.path.exists(self.prefs["tmp_dir"]):
os.mkdir(self.prefs["tmp_dir"])
log_level = self["log_level"]
logging.Logger.disabled = log_level == "quiet"
logger = logging.getLogger()
mapping = {
"debug": logging.DEBUG,
"info": logging.INFO,
"warn": logging.WARNING,
"error": logging.ERROR,
"quiet": logging.ERROR
}
logger.setLevel(mapping[log_level])
self._update_doc()
self.apply_override_actions()
def _update_doc(self):
"""
Update the class documentation
"""
if sphinx_doc():
self.__doc__ = self.__doc__.split(".. code: yaml")[0] + """.. code: yaml
""" + "\n ".join(self.type_scheme.get_default_yaml().split("\n"))
[docs] def reset(self):
"""
Resets the current settings to the defaults.
"""
self.prefs = copy.deepcopy(self.type_scheme.get_default())
def _validate_settings_dict(self, data: t.Dict[str, t.Any], description: str = None):
"""
Check whether the passed dictionary matches the settings type scheme.
:param data: passed dictionary
:param description: short description of the passed dictionary
:return: True like object if valid, else string like object which is the error message
"""
return verbose_isinstance(data, self.type_scheme, description or "Settings")
[docs] def load_file(self, file: str):
"""
Loads the configuration from the configuration YAML file.
:param file: path to the file
:raises: SettingsError if the settings file is incorrect or doesn't exist
"""
self.prefs = self.type_scheme.get_default()
tmp = copy.deepcopy(self.prefs)
try:
with open(file, 'r') as stream:
map = yaml.safe_load(stream.read().replace("!!python/tuple", ""))
def func(key, path, value):
if value is not None or self.get_type_scheme(path).check(value):
self._set_default(path, value)
self._set(path, value)
recursive_exec_for_leafs(map, func)
except (yaml.YAMLError, IOError) as err:
self.prefs = tmp
raise SettingsError(str(err))
res = self._validate_settings_dict(self.prefs, "settings with ones from file '{}'".format(file))
if not res:
self.prefs = tmp
raise SettingsError(str(res))
self._setup()
[docs] def load_from_dict(self, config_dict: t.Dict[str, t.Any]):
"""
Load the configuration from the passed dictionary.
:param config_dict: passed configuration dictionary
"""
self.prefs = self.type_scheme.get_default()
tmp = copy.deepcopy(self.prefs)
def func(key, path, value):
self._set_default(path, value)
recursive_exec_for_leafs(config_dict, func)
res = self._validate_settings_dict(self.prefs, "settings with ones config dict")
if not res:
self.prefs = tmp
raise SettingsError(str(res))
self._setup()
[docs] def load_from_dir(self, dir: str):
"""
Load the configuration from the configuration file inside the passed directory.
:param dir: path of the directory
"""
self.load_file(os.path.join(dir, "config.yaml"))
[docs] def load_from_config_dir(self):
"""
Load the config file from the application directory (e.g. in the users home folder) if it exists.
"""
conf = os.path.join(click.get_app_dir("temci"), "config.yaml")
if os.path.exists(conf) and os.path.isfile(conf):
self.load_file(conf)
[docs] def load_from_current_dir(self):
"""
Load the configuration from the `configuration file in the current working directory if it exists.
"""
if os.path.exists(self.config_file_name) and os.path.isfile(self.config_file_name):
self.load_file(self.config_file_name)
[docs] def get(self, key: t.Union[str, t.List[str]]) -> t.Any:
"""
Get the setting with the given key.
:param key: name of the setting
:return: value of the setting
:raises: SettingsError if the setting doesn't exist
"""
if self.is_obsolete(key):
raise SettingsError("Using obsolete setting {!r}: {}".format(key, self.obsoleteness_reason(key)))
path = key.split("/") if isinstance(key, str) else key
if not self.validate_key_path(path):
raise SettingsError("No such setting {}".format(key))
data = self.prefs
for sub in path:
data = data[sub]
return data
def __getitem__(self, key: str) -> t.Any:
"""
Alias for self.get(self, key).
"""
return self.get(key)
def _set(self, path: t.List[str], value):
"""
Set the setting at the passed path.
:param path: passed key path
:param value: new value
"""
if self.is_obsolete(path):
return
tmp_pref = self.prefs
tmp_type = self.type_scheme
for key in path[0:-1]:
if key not in tmp_pref:
tmp_pref[key] = {}
tmp_type[key] = Dict(unknown_keys=True, key_type=Str())
tmp_pref = tmp_pref[key]
tmp_type = tmp_type[key]
tmp_pref[path[-1]] = value
if path[-1] not in tmp_type.data:
tmp_type[path[-1]] = Any() // Default(value)
if (path == ["config"] or path == ["settings"]) and value != "":
self.load_file(value)
self._update_doc()
[docs] def validate(self):
"""
Validate this settings object
:raises: SettingsError if the setting isn't valid
"""
self._validate_settings_dict(self.prefs)
[docs] def set(self, key: str, value, validate: bool = True, setup: bool = True):
"""
Sets the setting key to the passed new value
:param key: settings key
:param value: new value
:param validate: validate after the setting operation
:param setup: call the setup function
:raises: SettingsError if the setting isn't valid
"""
tmp = copy.deepcopy(self.prefs)
path = key.split("/")
self._set(path, value)
if validate:
res = self._validate_settings_dict(self.prefs, "settings with new setting ({}={!r})".format(key, value))
if not res:
self.prefs = tmp
raise SettingsError(str(res))
if setup:
self._setup()
def __setitem__(self, key: str, value):
"""
Alias for self.set(key, value).
"""
self.set(key, value)
[docs] def validate_key_path(self, path: t.List[str]) -> bool:
"""
Validates a path into in to the settings trees,
:param path: list of sub keys
:return: Is this key path valid?
"""
if self.is_obsolete(path):
return True
tmp = self.prefs
for item in path:
if item not in tmp:
return False
tmp = tmp[item]
return True
[docs] def has_key(self, key: str) -> bool:
""" Does the passed key exist? """
return self.validate_key_path(key.split("/"))
def _set_default(self, path: t.List[str], value):
"""
Set the default value of the setting with the passed path
:param path: passed key path
:param value: new default value
"""
self.modify_type_scheme("/".join(path), lambda t: t // Default(value))
self._update_doc()
[docs] def modify_setting(self, key: str, type_scheme: Type):
"""
Modifies the setting with the given key and adds it if it doesn't exist.
:param key: key of the setting
:param type_scheme: Type of the setting
:param default_value: default value of the setting
:raises: SettingsError if the settings domain (the key without the last element) doesn't exist
:raises: TypeError if the default value doesn't adhere the type scheme
"""
if self.is_obsolete(key):
logging.info("Using obsolete setting {!r}: {}".format(key, self.obsoleteness_reason(key)))
return
path = key.split("/")
domain = "/".join(path[:-1])
if len(path) > 1 and not self.validate_key_path(path[:-1]) \
and not isinstance(self.get(domain), dict):
raise SettingsError("Setting domain {} doesn't exist".format(domain))
tmp_typ = self.type_scheme
tmp_prefs = self.prefs
for subkey in path[:-1]:
tmp_typ = tmp_typ[subkey]
tmp_prefs = tmp_prefs[subkey]
tmp_typ[path[-1]] = type_scheme
if path[-1] in tmp_prefs:
if type_scheme.typecheck_default:
typecheck(tmp_prefs[path[-1]], type_scheme)
tmp_typ[path[-1]] = type_scheme
else:
tmp_prefs[path[-1]] = type_scheme.get_default()
[docs] def get_type_scheme(self, key: t.Union[str, t.List[str]]) -> Type:
"""
Returns the type scheme of the given key.
:param key: given key
:return: type scheme
:raises: SettingsError if the setting with the given key doesn't exist
"""
key = key.split("/") if isinstance(key, str) else key
if not self.validate_key_path(key):
raise SettingsError("Setting {} doesn't exist".format("/".join(key)))
tmp_typ = self.type_scheme
for subkey in key:
tmp_typ = tmp_typ[subkey]
return tmp_typ
[docs] def modify_type_scheme(self, key: str, modificator: t.Callable[[Type], Type]):
"""
Modifies the type scheme of the given key via a modificator function.
:param key: given key
:param modificator: gets the type scheme and returns its modified version
:raises: SettingsError if the setting with the given key doesn't exist
"""
if self.is_obsolete(key):
return
if not self.validate_key_path(key.split("/")):
raise SettingsError("Setting {} doesn't exist".format(key))
tmp_typ = self.type_scheme
subkeys = key.split("/")
for subkey in subkeys[:-1]:
tmp_typ = tmp_typ[subkey]
tmp_typ[subkeys[-1]] = modificator(tmp_typ[subkeys[-1]])
assert isinstance(tmp_typ[subkeys[-1]], Type)
[docs] def default(self, value: t.Optional[t.Any], key: str):
"""
Returns the passed value if isn't None else the settings value under the passed key.
:param value: passed value
:param key: passed settings key
"""
if value is None:
return self[key]
typecheck(value, self.get_type_scheme(key))
return value
[docs] def store_into_file(self, file_name: str, comment_out_defaults: bool = False):
"""
Stores the current settings into a yaml file with comments.
:param file_name: name of the resulting file
:param comment_out_defaults: comment out the default values
"""
with open(file_name, "w") as f:
print(self.type_scheme.get_default_yaml(defaults=self.prefs,
comment_out_defaults=comment_out_defaults), file=f)
[docs] def has_log_level(self, level: str) -> bool:
"""
Is the current log level the passed level?
:param level: passed level (in ["error", "warn", "info", "debug"])
"""
levels = ["error", "warn", "info", "debug"]
return levels.index(level) <= levels.index(self["log_level"])
[docs] def is_obsolete(self, key: t.Union[str, t.List[str]]) -> bool:
"""
Is the setting with the passed key obsolete?
:param key: key or key path
:return: obsolete setting?
"""
return self.obsoleteness_reason(key) is not None
[docs] def obsoleteness_reason(self, key: t.Union[str, t.List[str]]) -> t.Optional[Obsolete]:
"""
Returns the obsolete type object for obsolete settings
:param key: key or path
:return: object that contains information on the obsoleteness or None
"""
path = key.split("/") if isinstance(key, str) else key
tmp_type = self.type_scheme
for subkey in path[:-1]:
if tmp_type.is_obsolete(subkey):
return tmp_type.obsoleteness_reason(subkey)
if subkey not in tmp_type:
return None
if isinstance(tmp_type[subkey], Obsolete):
return tmp_type[subkey]
tmp_type = tmp_type[subkey]
if path[-1] in tmp_type and isinstance(tmp_type[path[-1]], Obsolete):
return tmp_type[subkey]
return None
[docs] def apply_override_actions(self):
""" Applies actions like overriding max_runs with runs """
if self["run/runs"] > -1:
self.set("run/max_runs", self["run/runs"], setup=False)
self.set("run/min_runs", self["run/runs"], setup=False)