import logging
import multiprocessing
import re
import shutil
import subprocess, os, time
from temci.utils.settings import Settings, SettingsError
from temci.utils.util import has_root_privileges
from temci.utils.typecheck import *
import typing as t
CPUSET_DIR = '/cpuset' # type: str
""" Location that the cpu set pseudo file system is mounted at """
NEW_ROOT_SET = 'bench.root' # type: str
""" Name of the new root cpu set that contains most of the processes of the original root set """
BENCH_SET = 'temci.set' # type: str
""" Name of the base cpu set used by temci for benchmarking purposes """
CONTROLLER_SUB_BENCH_SET = 'temci.set.controller' # type: str
""" Name of the cpu set used by the temci control process """
SUB_BENCH_SET = 'temci.set.{}' # type: str
""" Format of cpu sub set names for benchmarking """
[docs]class CPUSet:
"""
This class allows the usage of cpusets (see `man cpuset`) and therefore requires root privileges.
It uses the program cset to modify the cpusets.
This class needs root privileges to operate properly. Warns if not.
"""
def __init__(self, active: bool = True, base_core_number: int = None,
parallel: int = None, sub_core_number: int = None,
temci_in_base_set: bool = True):
"""
Initializes the cpu sets an determines the number of parallel programs (parallel_number variable).
:param active: are cpu sets actually used?
:param base_core_number: number of cpu cores for the base (remaining part of the) system
:param parallel: 0: benchmark sequential, > 0: benchmark parallel with n instances, -1: determine n automatically
:param sub_core_number: number of cpu cores per parallel running program
:param temci_in_base_set: place temci in the same cpu set as the rest of the system?
:raises ValueError: if the passed parameters don't work together on the current platform
:raises EnvironmentError: if the environment can't be setup properly (e.g. no root privileges)
"""
# self.bench_set = "bench.set"
self.active = active and has_root_privileges() # type: bool
""" Are cpu sets actually used? """
self.base_core_number = Settings().default(base_core_number, "run/cpuset/base_core_number") # type: int
""" Number of cpu cores for the base (remaining part of the) system """
self.parallel = Settings().default(parallel, "run/cpuset/parallel") # type: int
""" 0: benchmark sequential, > 0: benchmark parallel with n instances, -1: determine n automatically """
self.sub_core_number = Settings().default(sub_core_number, "run/cpuset/sub_core_number") # type: int
""" Number of cpu cores per parallel running program """
self.av_cores = len(self._cpus_of_set("")) if active else multiprocessing.cpu_count() # type: int
""" Number of available cpu cores """
self.parallel_number = 0 # type: int
""" Number of used parallel instances, zero if the benchmarking is done sequentially """
self.temci_in_base_set = Settings().default(temci_in_base_set, "run/cpuset/temci_in_base_set")
""" Place temci in the same cpu set as the rest of the system? """
if self.parallel != 0:
if self.parallel == -1:
self.parallel_number = self._number_of_parallel_sets(self.base_core_number,
True, self.sub_core_number)
else:
self.parallel_number = self.parallel
if self.parallel > self._number_of_parallel_sets(self.base_core_number, True, self.sub_core_number) \
and self.active:
raise ValueError("Invalid values for base_core_number and sub_core_number "
"on system with just {} cores. Note: The benchmark controller "
"needs a cpuset too.".format(self.av_cores))
self.base_core_number = self.av_cores - self.sub_core_number * self.parallel_number - 1
if not self.active:
if active and not has_root_privileges():
logging.warning("CPUSet functionality is disabled because root privileges are missing.")
return
logging.info("Initialize CPUSet")
typecheck(self.base_core_number, PositiveInt())
typecheck(self.parallel_number, NaturalNumber())
self.own_sets = [SUB_BENCH_SET.format(i) for i in range(0, self.parallel_number)] \
+ [CONTROLLER_SUB_BENCH_SET, NEW_ROOT_SET, BENCH_SET]
try:
self._init_cpuset()
except BaseException:
logging.error("Forced teardown of CPUSet")
self.teardown()
raise
logging.info("Finished initializing CPUSet")
[docs] def move_process_to_set(self, pid: int, set_id: int):
"""
Moves the process with the passed id to the parallel sub cpuset with the passed id.
:param pid: passed process id
:param set_id: passed parallel sub cpuset id
"""
if not self.active:
return
try:
typecheck(pid, Int())
typecheck(set_id, Int(range=range(0, self.parallel_number)))
self._move_process_to_set(SUB_BENCH_SET.format(set_id), pid)
except BaseException:
logging.error("Forced teardown of CPUSet")
self.teardown()
raise
[docs] def get_sub_set(self, set_id: int) -> str:
""" Gets the name of the benchmarking cpu set with the given id / number (starting at zero). """
if self.parallel == 0:
return CONTROLLER_SUB_BENCH_SET
if self.active:
typecheck(set_id, Int(range=range(0, self.parallel_number)))
return SUB_BENCH_SET.format(set_id)
[docs] def teardown(self):
"""
Tears the created cpusets down and makes the system usable again.
"""
if not self.active:
return
for set in self.own_sets:
try:
self._delete_set(set)
except EnvironmentError as ex:
pass
# logging.error(str(ex))
except BaseException:
raise
def _number_of_parallel_sets(self, base_core_number: int, parallel: bool, sub_core_number: int) -> int:
"""
Calculates the number of possible parallel sets.
"""
typecheck([base_core_number, parallel, sub_core_number], List(Int()))
if base_core_number + (0 if self.temci_in_base_set else 1) + sub_core_number > self.av_cores and self.active:
raise ValueError("Invalid values for base_core_number and sub_core_number "
"on system with just {} cores.{}".format(self.av_cores,
""
if self.temci_in_base_set
else "Note: temci needs a cpuset too."))
av_cores_for_par = self.av_cores - base_core_number - (0 if self.temci_in_base_set else 1)
if parallel:
return av_cores_for_par // sub_core_number
return 1
def _init_cpuset(self):
"""
Mounts the cpuset pseudo filesystem at ``CPUSET_DIR`` and creates the necessary cpusets.
"""
if not os.path.exists(CPUSET_DIR + "/cgroup.procs"):
if not os.path.exists(CPUSET_DIR):
os.mkdir(CPUSET_DIR)
proc = subprocess.Popen(["bash", "-c", "mount -t cpuset none /cpuset/"],
stdout=subprocess.DEVNULL,
stderr=subprocess.PIPE,
universal_newlines=True)
out, err = proc.communicate()
if proc.poll() > 0:
raise EnvironmentError(
"Cannot mount /cpuset. " +
"Probably you're not in root mode or you've already mounted cpuset elsewhere.", str(err))
self._create_cpuset(NEW_ROOT_SET, self._get_av_cpus()[0: self.base_core_number])
logging.info("Move all processes to new root cpuset")
self._move_all_to_new_root()
if self.parallel == 0: # just use all available cores, as the benchmarked program also runs in it
self._create_cpuset(CONTROLLER_SUB_BENCH_SET, self._get_av_cpus()[self.base_core_number:self.av_cores])
else:
self._create_cpuset(CONTROLLER_SUB_BENCH_SET, self._get_av_cpus()[self.base_core_number:1])
self._move_process_to_set(NEW_ROOT_SET if self.temci_in_base_set else CONTROLLER_SUB_BENCH_SET)
for i in range(0, self.parallel_number):
start = self.base_core_number + (0 if self.temci_in_base_set else 1) + (i * self.sub_core_number)
self._create_cpuset(SUB_BENCH_SET.format(i), self._get_av_cpus()[start:start + self.sub_core_number])
def _cpus_of_set(self, name: str) -> t.Optional[t.List[int]]:
""" Gets all cpu cores that are assigned to the set with the passed name. """
name = self._relname(name)
if self._has_set(name):
res = self._cset("set {}".format(name))
arr = res.split("\n")[3].strip().split(" ")
arr = [x for x in arr if x != ""]
if "-" in arr[1]:
start, end = map(int, arr[1].split("-"))
return list(range(start, end + 1))
elif "," in arr[1]:
return list(map(int, arr[1].split(",")))
else:
return int(arr[1])
return None
def _get_av_cpus(self) -> t.List[int]:
""" Gets the number of available cpu cores """
return self._cpus_of_set("")
def _ints_to_str(self, ints: t.List[int]) -> str:
""" Turns a list of integers comma separated into a string """
return ",".join(map(str, ints))
def _has_set(self, name: str):
""" Does the set with the given name exist? """
name = self._relname(name)
return name + " " in self._cset("set -rl")
def _delete_set(self, name: str):
""" Delete the set with the given name """
self._cset("set -r --force -d %s" % NEW_ROOT_SET)
def _move_all_to_new_root(self, name: str = 'root', _count: int = 100):
"""
Move all process from the root cpu set into the ``NEW_ROOT_SET``
:param name: name of the root cpu set
:param _count: maximum cpu set tree depth
"""
cpus = self._get_av_cpus()[0:self.base_core_number]
self._set_cpu_affinity_of_set(name, cpus)
if _count > 0:
for child in self._child_sets(name):
if len(child) > 1:
# print("moved from {child} to {root}".format(child=child, root=NEW_ROOT_SET))
try:
self._move_all_to_new_root(child, _count - 1)
except EnvironmentError as err:
pass
# logging.warning(str(err))
self._move_processes(name, NEW_ROOT_SET)
# if _count == 100:
# self._cset("proc --move -k --force --threads --pid=0-100000 --toset={}".format(NEW_ROOT_SET))
def _move_processes(self, from_set: str, to_set: str):
"""
Move all processes from the first to the second cpuset.
Only some kernel threads are left behind.
:param from_set: name of the first cpuset
:param to_set: name of the second cpuset
"""
from_set, to_set = (self._relname(from_set), self._relname(to_set))
self._cset("proc --move --kthread --force --threads --fromset %s --toset %s" % (from_set, to_set))
def _move_process_to_set(self, cpuset: str, pid: int = os.getpid()):
"""
Move the process with the given id into the passed cpu set.
:param cpuset: name of the passed cpu set
:param pid: id of the process to move, default is the own process
"""
self._cset("proc --move --force --pid %d --threads %s" % (pid, cpuset))
def _absname(self, relname: str):
""" Get the absolute set name for the given relative """
if "/" in relname:
return relname
res = self._cset("set %s" % relname)
arr = res.split("\n")[-1].strip().split(" ")
arr = [x for x in arr if x != ""]
return arr[7]
def _relname(self, absname: str):
""" Get the realtive set name for the given absolute """
if not "/" in absname:
return absname
return absname.split("/")[-1]
def _child_sets(self, name: str) -> t.List[str]:
""" Get the list of child set for the set with the given name """
name = self._relname(name)
res = self._cset("set %s" % name)
arr = []
for line in res.split("\n")[4:]:
line = line.strip()
arr.append(line.split(" ")[0])
return arr
def _create_cpuset(self, name: str, cpus: t.List[int]):
""" Create the cpuset with the given name and assign the given cpu cores to it """
typecheck(cpus, List(Int()))
cpu_range = self._ints_to_str(cpus)
path = []
for part in name.split("/"):
path.append(part)
self._cset("set --cpu {} {} ".format(cpu_range, "/".join(path)))
def _set_cpu_affinity_of_set(self, set: str, cpus: t.List[int]):
""" Set the cpu affinity for all processes that belong to the given set """
if set == "root":
set = ""
app = "cgroup.procs" if set == "" else set + "/cgroup.procs"
with open(os.path.join(CPUSET_DIR + "/" + app), "r") as f:
for line in f.readlines():
try:
self._set_cpu_affinity(int(line.strip()), cpus)
# logging.info("success {}".format(line))
except EnvironmentError as err:
pass
# logging.error(str(err))
def _set_cpu_affinity(self, pid: int, cpus: t.List[int]):
""" Set the cpu affinity for the given process to the given cpu cores """
cmd = "taskset --all-tasks --cpu-list -p {} {}; nice".format(self._ints_to_str(cpus), pid)
proc = subprocess.Popen(["/bin/sh", "-c", cmd],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
out, err = proc.communicate()
if proc.poll() > 0:
raise EnvironmentError(
"taskset error (cmd = '{}'): ".format(cmd) + str(err) + str(out)
)
return str(out)
def _cset(self, argument: str):
"""
Execute the passed argument with the cset tool.
:param passed argument for the tool
:return: output of executing the combined command
:raises EnvironmentError: if something goes wrong
"""
# cmd = ["/bin/sh", "-c", "sudo cset {}".format(argument)]
cmd = ["/bin/sh", "-c", "python3 -c 'import cpuset.main; print(cpuset.main.main())' " + argument]
proc = subprocess.Popen(cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
out, err = proc.communicate()
if proc.poll() > 0:
raise EnvironmentError(
"Error with cset tool. "
" More specific error (cmd = 'cset {}'): ".format(argument) + str(err) + str(out)
)
return str(out)