"""
This module consists of the abstract run worker pool class and several implementations.
"""
import re
import time
import humanfriendly
from temci.utils.util import has_root_privileges, parse_timespan
from ..utils.typecheck import *
from ..utils.settings import Settings
from .run_driver import RunProgramBlock, BenchmarkingResultBlock, AbstractRunDriver, RunDriverRegistry
from queue import Queue, Empty
from .cpuset import CPUSet
import logging, threading, subprocess, shlex, os, tempfile
import typing as t
ResultGenerator = t.Iterator[t.Tuple[RunProgramBlock, BenchmarkingResultBlock, int]]
""" Return type of the run worker pool ``results`` method """
[docs]class AbstractRunWorkerPool:
"""
An abstract run worker pool that just deals with the hyper threading setting.
"""
def __init__(self, run_driver_name: str = None, end_time: float = -1):
"""
Create an instance.
:param run_driver_name: name of the used run driver, if None the one configured in the settings is used
"""
self.submit_queue = Queue() # type: Queue
""" Queue for submitted but not benchmarked run program blocks """
self.result_queue = Queue() # type: Queue
"""
Queue of benchmarking results.
The queue items are tuples consisting of
the benchmarked block, the benchmarking result and the
blocks id.
"""
self.parallel_number = 1 # type: int
""" Number of instances in which the benchmarks takes place in parallel """
run_driver_name = run_driver_name or RunDriverRegistry.get_used()
self.run_driver = RunDriverRegistry().get_for_name(run_driver_name) # type: AbstractRunDriver
""" Used run driver instance """
self.cpuset = None # type: CPUSet
""" Used cpu set instance """
self.end_time = end_time
self._disabled_ht_cores = [] # type: t.List[int]
if Settings()["run/disable_hyper_threading"]:
if not has_root_privileges():
logging.warning("Can't disable hyper threading as root privileges are missing")
return
#if Settings()["run/cpuset/active"]:
# logging.warning("Currently disabling hyper threading doesn't work well in combination with cpusets")
# return
self._disabled_ht_cores = self.disable_hyper_threading()
[docs] def submit(self, block: RunProgramBlock, id: int, runs: int):
"""
Submits the passed block for "runs" times benchmarking.
It also sets the blocks is_enqueued property to True.
:param block: passed run program block
:param id: id of the passed block
:param runs: number of individual benchmarking runs
"""
raise NotImplementedError()
[docs] def results(self, expected_num: int) \
-> ResultGenerator:
"""
A generator for all available benchmarking results.
The items of this generator are tuples consisting of
the benchmarked block, the benchmarking result and the
blocks id.
:param expected_num: expected number of results
"""
raise NotImplementedError()
[docs] def teardown(self):
"""
Tears down the inherited run driver.
This should be called if all benchmarking with this pool is finished.
"""
if Settings()["run/disable_hyper_threading"]:
self.enable_hyper_threading(self._disabled_ht_cores)
[docs] def time_left(self) -> float:
"""
Does not work properly if self.end_time == -1
"""
return max(self.end_time - time.time(), 0)
[docs] def has_time_left(self) -> bool:
return self.end_time == -1 or self.time_left() > 0
[docs] @classmethod
def get_hyper_threading_cores(cls) -> t.List[int]:
"""
Adapted from http://unix.stackexchange.com/a/223322
"""
total_logical_cpus = 0
total_physical_cpus = 0
total_cores = 0
cpu = None
logical_cpus = {}
physical_cpus = {}
cores = {}
hyperthreading = False
for line in open('/proc/cpuinfo').readlines():
if re.match('processor', line):
cpu = int(line.split()[2])
if cpu not in logical_cpus:
logical_cpus[cpu] = []
total_logical_cpus += 1
if re.match('physical id', line):
phys_id = int(line.split()[3])
if phys_id not in physical_cpus:
physical_cpus[phys_id] = []
total_physical_cpus += 1
if re.match('core id', line):
core = int(line.split()[3])
if core not in cores:
cores[core] = []
total_cores += 1
cores[core].append(cpu)
if (total_cores * total_physical_cpus) * 2 == total_logical_cpus:
hyperthreading = True
ht_cores = [] # type: t.List[int]
if hyperthreading:
for c in cores:
for p, val in enumerate(cores[c]):
if p > 0:
ht_cores.append(val)
return ht_cores
[docs] def next_block_timeout(self) -> float:
timeout = parse_timespan(Settings()["run/max_block_time"])
if not self.has_time_left():
return 0
if timeout > -1:
return max(min(self.time_left() if self.end_time != -1 else timeout, timeout), 0)
return -1 if self.end_time == -1 else max(self.time_left(), 0)
[docs] @classmethod
def disable_hyper_threading(cls) -> t.List[int]:
if has_root_privileges():
cores = cls.get_hyper_threading_cores()
cls._set_status_of_ht_cores(cores, 0)
return cores
return []
[docs] @classmethod
def enable_hyper_threading(cls, disabled_cores: t.List[int]):
if has_root_privileges():
cls._set_status_of_ht_cores(disabled_cores, 1)
@classmethod
def _set_status_of_ht_cores(cls, ht_cores: t.List[int], online_status: int):
if len(ht_cores) == 0:
return
arg = "\n".join("echo {} > /sys/devices/system/cpu/cpu{}/online"
.format(online_status, core_id) for core_id in ht_cores)
proc = subprocess.Popen(["/bin/sh", "-c", arg],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
universal_newlines=True)
out, err = proc.communicate()
if proc.poll() > 0:
raise EnvironmentError("Error while disabling the hyper threaded cores: " + str(err))
[docs]class RunWorkerPool(AbstractRunWorkerPool):
"""
This run worker pool implements the sequential benchmarking of program blocks.
"""
def __init__(self, run_driver_name: str = None, end_time: float = -1):
super().__init__(run_driver_name, end_time)
if run_driver_name is None:
run_driver_name = RunDriverRegistry().get_used()
self.cpuset = CPUSet(parallel=0) if Settings()["run/cpuset/active"] else CPUSet(active=False)
self.parallel_number = 1
[docs] def submit(self, block: RunProgramBlock, id: int, runs: int):
typecheck(block, RunProgramBlock)
typecheck(runs, NaturalNumber())
typecheck(id, NaturalNumber())
block.is_enqueued = True
self.result_queue.put((block, self.run_driver.benchmark(block, runs, self.cpuset, timeout=self.next_block_timeout()), id))
block.is_enqueued = False
[docs] def results(self, expected_num: int) -> ResultGenerator:
for i in range(expected_num):#while not self.result_queue.empty():
yield self.result_queue.get()
[docs] def teardown(self):
super().teardown()
self.run_driver.teardown()
if self.cpuset is not None:
self.cpuset.teardown()
[docs]class ParallelRunWorkerPool(AbstractRunWorkerPool):
"""
This run worker pool implements the parallel benchmarking of program blocks.
It uses a server-client-model to benchmark on different cpu cores.
"""
def __init__(self, run_driver_name: str = None, end_time: float = -1):
super().__init__(run_driver_name, end_time)
if run_driver_name is None:
run_driver_name = RunDriverRegistry().get_used()
if Settings()["run/cpuset/active"]:
self.cpuset = CPUSet()
else:
self.cpuset = CPUSet(active=False)
#raise ValueError("Only works with run/cpuset/active=True")
self.parallel_number = self.cpuset.parallel_number
logging.info("Using {} parallel processes to benchmark.".format(self.parallel_number))
self.threads = [] # type: t.List[BenchmarkingThread]
""" Running benchmarking threads """
try:
for i in range(0, self.parallel_number):
thread = BenchmarkingThread(i, self, self.run_driver, self.cpuset)
self.threads.append(thread)
thread.start()
except BaseException:
logging.error("Forced teardown of ParallelRunWorkerPool")
self.teardown()
raise
[docs] def submit(self, block: RunProgramBlock, id: int, runs: int):
if self.time_left() <= 0:
return
typecheck(block, RunProgramBlock)
typecheck(runs, NaturalNumber())
typecheck(id, NaturalNumber())
block.is_enqueued = True
self.submit_queue.put((block, id, runs))
[docs] def results(self, expected_num: int) -> ResultGenerator:
#while not self.intermediate_queue.empty() or not self.submit_queue.empty() or not self.result_queue.empty():
for i in range(expected_num):#while not self.submit_queue.empty() or not self.result_queue.empty() or not self.submit_queue.all_tasks_done:
yield self.result_queue.get()
[docs] def teardown(self):
super().teardown()
self.cpuset.teardown()
self.run_driver.teardown()
try:
for thread in self.threads:
thread.stop = True
#thread.teardown()
except:
pass
[docs]class BenchmarkingThread(threading.Thread):
"""
A thread that allows parallel benchmarking.
"""
def __init__(self, id: int, pool: ParallelRunWorkerPool, driver: AbstractRunDriver, cpuset: CPUSet):
"""
Creates an instance.
:param id: id of this thread
:param pool: parent run worked pool
:param driver: use run driver instance
:param cpuset: used CPUSet instance
"""
threading.Thread.__init__(self)
self.stop = False # type: bool
""" Stop the run loop? """
self.id = id # type: int
""" Id of this thread """
self.pool = pool # type: ParallelRunWorkerPool
""" Parent run worker pool """
self.driver = driver # type: AbstractRunDriver
""" Used run driver instance """
self.cpuset = cpuset # type: CPUSet
""" Used CPUSet instance """
[docs] def run(self):
"""
Start the run loop. It fetches run program blocks from the pool's submit queue, benchmarks them
and stores the results in the pool's result queue.
It stops if ``stop`` is true.
"""
while True:
try:
#time.sleep(1)
(block, block_id, runs) = self.pool.submit_queue.get(timeout=1)
except Empty:
if self.stop:
return
else:
continue
try:
self.pool.result_queue.put((block, self._process_block(block, runs), block_id))
logging.debug("Thread {set_id}: Benchmarked block {id}".format(set_id=self.id, id=block_id))
block.is_enqueued = False
self.pool.submit_queue.task_done()
except BaseException:
logging.error("Forced teardown of BenchmarkingThread")
#self.teardown()
raise
def _process_block(self, block: RunProgramBlock, runs: int) -> BenchmarkingResultBlock:
return self.driver.benchmark(block, runs, self.cpuset, self.id, timeout=self.pool.next_block_timeout())