Source code for temci.utils.util

"""
Utility functions and classes that don't depend on the rest of the temci code base.
"""

import functools
import os
import subprocess
import typing as t
import sys
import logging

import shutil

import pytimeparse
from rainbow_logging_handler import RainbowLoggingHandler


[docs]def recursive_exec_for_leafs(data: dict, func, _path_prep = []): """ Executes the function for every leaf key (a key without any sub keys) of the data dict tree. :param data: dict tree :param func: function that gets passed the leaf key, the key path and the actual value """ if not isinstance(data, dict): return for subkey in data.keys(): if type(data[subkey]) is dict: recursive_exec_for_leafs(data[subkey], func, _path_prep=_path_prep + [subkey]) else: func(subkey, _path_prep + [subkey], data[subkey])
[docs]def has_root_privileges() -> bool: """ Has the current user root privileges? """ return does_command_succeed("head /proc/1/stack")
[docs]def has_pdflatex() -> bool: """ Is pdflatex installed? """ return does_command_succeed("pdflatex --version")
[docs]def does_command_succeed(cmd: str) -> bool: """ Does the passed command succeed (when executed by /bin/sh)? """ try: subprocess.check_call(["/bin/sh", "-c", cmd], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except: return False return True
[docs]def warn_for_pdflatex_non_existence_once(_warned = [False]): """ Log a warning if the pdflatex isn't available, but only if this function is called the first time """ if not has_pdflatex() and not _warned[0]: logging.warning("pdflatex is not installed therefore no pdf plots are produced") _warned[0] = True
[docs]def get_cache_line_size(cache_level: int = None) -> t.Optional[int]: """ Returns the cache line size of the cache on the given level. Level 0 and 1 are actually on the same level. :param cache_level: if None the highest level cache is used :return: cache line size or none if the cache on the given level doesn't exist """ if cache_level is None: cache_level = -1 for path in os.listdir("/sys/devices/system/cpu/cpu0/cache/"): if path.startswith("index"): cache_level = max(cache_level, int(path.split("index")[1])) if cache_level == -1: return None level_dir = "/sys/devices/system/cpu/cpu0/cache/index" + str(cache_level) with open(level_dir + "/coherency_line_size") as f: return int(f.readline().strip())
[docs]def get_memory_page_size() -> int: """ Returns the size of a main memory page """ try: proc = subprocess.Popen(["/bin/sh", "-c", "getconf PAGESIZE"], stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) out, err = proc.communicate() if proc.poll() == 0: return int(out.strip()) except: pass return 4096
[docs]def get_distribution_name() -> str: """ Returns the name of the current linux distribution (requires `lsb_release` to be installed) """ return subprocess.check_output(["lsb_release", "-i", "-s"], universal_newlines=True).strip()
[docs]def get_distribution_release() -> str: """ Returns the used release of the current linux distribution (requires `lsb_release` to be installed) """ return subprocess.check_output(["lsb_release", "-r", "-s"], universal_newlines=True).strip()
[docs]def does_program_exist(program: str) -> bool: """ Does the passed program exist? """ return shutil.which(program) is not None
[docs]def on_apple_os() -> bool: """ Is the current operating system an apple OS X? """ return sys.platform == 'darwin'
[docs]class proc_wait_with_rusage: """ Each Popen object gets a field rusage """ def __enter__(self): self.rusage = None self.old_try_wait = subprocess.Popen._try_wait def try_wait(self, wait_flags): """ Copied from subprocess._try_wait""" try: (pid, sts, _u) = os.wait4(self.pid, wait_flags) self.rusage = _u # instead of: (pid, sts) = os.waitpid(self.pid, wait_flags) except ChildProcessError: # This happens if SIGCLD is set to be ignored or waiting # for child processes has otherwise been disabled for our # process. This child is dead, we can't get the status. pid = self.pid sts = 0 return (pid, sts) subprocess.Popen._try_wait = try_wait def __exit__(self, exc_type, exc_val, exc_tb): subprocess.Popen._try_wait = self.old_try_wait
[docs]def join_strs(strs: t.List[str], last_word: str = "and") -> str: """ Joins the passed strings together with ", " except for the last to strings that separated by the passed word. :param strs: strings to join :param last_word: passed word that is used between the two last strings """ if not isinstance(strs, list): strs = list(strs) if len(strs) == 1: return strs[0] elif len(strs) > 1: return " {} ".format(last_word).join([", ".join(strs[0:-1]), strs[-1]])
allow_all_imports = False # type: bool """ Allow all imports (should the can_import method return true for every module)? """
[docs]def can_import(module: str) -> bool: """ Can a module (like scipy or numpy) be imported without a severe and avoidable performance penalty? The rational behind this is that some parts of temci don't need scipy or numpy. :param module: name of the module """ if sphinx_doc(): return False if allow_all_imports: return True if module not in ["scipy", "numpy", "init"]: return True if in_standalone_mode: return False if len(sys.argv) == 1 or sys.argv[1] in ["completion", "version", "assembler"]: return False return True
in_standalone_mode = False # type: bool """ In rudimentary standalone mode (executed via run.py) """ _sphinx_doc = os.environ.get("SPHINXDOC", os.environ.get('READTHEDOCS', None)) == 'True'
[docs]def sphinx_doc() -> bool: """ Is the code only loaded to document it with sphinx? """ return _sphinx_doc
[docs]def get_doc_for_type_scheme(type_scheme: 'Type') -> str: """ Return a class documentation string for the given type scheme. Use the default_yaml method. """ return """ .. code-block:: yaml {default_yaml} """.format(default_yaml="\n ".join(type_scheme.string_representation().split("\n")))
[docs]def document(**kwargs: t.Dict[str, str]): """ Document :param kwargs: class attribute, documentation prefix """ def dec(klass): if sphinx_doc(): for k, v in kwargs.items(): klass.__doc__ += """ {} {} """.format(v, get_doc_for_type_scheme(klass.__dict__[k])) return klass return dec
[docs]class Singleton(type): """ Singleton meta class. @see http://stackoverflow.com/a/6798042 """ _instances = {} def __call__(cls, *args, **kwargs): if cls not in cls._instances: cls._instances[cls] = super(Singleton, cls).__call__(*args, **kwargs) return cls._instances[cls]
[docs]class InsertionTimeOrderedDict: """ A dictionary which's elements are sorted by their insertion time. """ def __init__(self): self._dict = {} self._keys = [] dict() def __delitem__(self, key): """ Remove the entry with the passed key """ del(self._dict[key]) del(self._keys[self._keys.index(key)]) def __getitem__(self, key): """ Get the entry with the passed key """ return self._dict[key] def __setitem__(self, key, value): """ Set the value of the item with the passed key """ if key not in self._dict: self._keys.append(key) self._dict[key] = value def __iter__(self): """ Iterate over all keys """ return self._keys.__iter__()
[docs] def values(self) -> t.List: """ Rerturns all values of this dictionary. They are sorted by their insertion time. """ return [self._dict[key] for key in self._keys]
[docs] def keys(self) -> t.List: """ Returns all keys of this dictionary. They are sorted by their insertion time. """ return self._keys
def __len__(self): """ Returns the number of items in this dictionary """ return len(self._keys)
[docs] def items(self) -> t.List[t.Tuple[t.Any, t.Any]]: return [(k, self[k]) for k in self.keys()]
[docs] @classmethod def from_list(cls, items: t.Optional[list], key_func: t.Callable[[t.Any], t.Any]) -> 'InsertionTimeOrderedDict': """ Creates an ordered dict out of a list of elements. :param items: list of elements :param key_func: function that returns a key for each passed list element :return: created ordered dict with the elements in the same order as in the passed list """ if items is None: return InsertionTimeOrderedDict() ret = InsertionTimeOrderedDict() for item in items: ret[key_func(item)] = item return ret
#formatter = logging.Formatter("[%(asctime)s] %(name)s %(levelname)s \t%(message)s") # setup `RainbowLoggingHandler` handler = RainbowLoggingHandler(sys.stderr, color_funcName=('black', 'yellow', True)) """ Colored logging handler that is used for the root logger """ handler.setFormatter(logging.Formatter("[%(asctime)s] %(message)s")) logging.getLogger().addHandler(handler)
[docs]def geom_std(values: t.List[float]) -> float: """ Calculates the geometric standard deviation for the passed values. Source: https://en.wikipedia.org/wiki/Geometric_standard_deviation """ import scipy.stats as stats import numpy as np gmean = stats.gmean(values) return np.exp(np.sqrt(np.sum([np.log(x / gmean) ** 2 for x in values]) / len(values)))
[docs]def parse_timespan(time: str) -> float: """ Parse a time span expression, see https://pypi.org/project/pytimeparse/ Supports -1 to express an infinite time span :param time: time span expression, mixture of different time units is possible :return: time span in seconds """ try: return float(time) except ValueError: return pytimeparse.parse(time) if time != "-1" else -1
[docs]@functools.lru_cache() def rusage_header() -> str: from ..setup.setup import script_relative return open(script_relative("rusage/header.c")).read().split("\"")[1]