"""
core.py
-------
Main decu classes and decorators.
"""
import os
import logging
from .config import config
from .logging import DecuLogger
from .io import write
from functools import wraps
from datetime import datetime
from collections import defaultdict
from multiprocessing import Pool, Value, Lock
if 'DISPLAY' not in os.environ:
import matplotlib
matplotlib.use('Agg')
import matplotlib.pyplot as plt
__all__ = ['Script', 'experiment', 'figure', 'run_parallel', 'DecuException']
lock = Lock()
runs = defaultdict(lambda: Value('i', 0))
[docs]class DecuException(Exception):
pass
[docs]class Script():
"""Base class for experimental computation scripts."""
data_dir = config['Script']['data_dir']
results_dir = config['Script']['results_dir']
figures_dir = config['Script']['figures_dir']
scripts_dir = config['Script']['scripts_dir']
gendata_dir = config['Script']['gendata_dir']
figure_fmt = config['Script']['figure_fmt']
def __init__(self, project_dir="", module=None):
self.start_time = datetime.now()
self.project_dir = os.getcwd() if project_dir is None else project_dir
self.module = self.__module__ if module is None else module
self.log = DecuLogger(self.start_time, project_dir, self.module)
[docs] def make_result_basename(self, exp_name, run):
return os.path.join(self.results_dir, config['Script'].subs(
'result_file', time=self.start_time, module_name=self.module,
exp_name=exp_name, run=run))
[docs]def run_parallel(exp, params):
"""Run an experiment in parallel.
For each element `p` in `params`, call `exp(*p)`. These calls are made
in parallel using multiprocessing.
Args:
exp (method): A @experiment-decorated method.
params (list): Each element is a set of arguments to call `exp` with.
Returns:
list: The result of calling `exp(*pi)` over each element of params.
"""
def init(*args):
global lock, runs
lock, runs = args
with Pool(initializer=init, initargs=(lock, runs),
maxtasksperchild=100) as pool:
results = pool.starmap(exp, params)
return results
def _get_parameters(method, param_name, args, kwargs):
"""Return the arguments passed to all experimental parameters.
All method arguments that are not param_name are treated as
experimental parameters. method is assumed to have been called as
method(*args, **kwargs).
"""
from inspect import getfullargspec
arg_values = kwargs.copy()
arg_values.update(dict(zip(getfullargspec(method).args, args)))
if param_name in arg_values:
del arg_values[param_name]
if 'self' in arg_values:
del arg_values['self']
return arg_values
[docs]def experiment(data_param=None):
"""Decorator that adds logging functionality to experiment methods.
Args:
data_param (str): Parameter treated by the method as data
input. All other parameters are treated as experimental parameters.
Returns:
func: A decorator that adds bookkeeping functionality to its
argument.
"""
def _experiment(method):
"""Decorator that adds bookkeeping functionality to its argument.
Parameters
----------
method (function): A experimental computation function.
Returns
-------
The method function, with added logging and bookkeeping
functionality.
"""
exp_name = method.__name__
cfg = config['experiment']
def exp_start_msg(run, params):
return cfg.subs('start_msg', exp_name=exp_name, run=run,
params=params)
def exp_end_msg(run, params, elapsed):
return cfg.subs('end_msg', exp_name=exp_name, params=params,
elapsed=round(elapsed, 5), run=run)
def wrote_results_msg(run, outfile, params):
return cfg.subs('write_msg', exp_name=exp_name, params=params,
outfile=outfile, run=run)
def no_result_msg(run, params):
return cfg.subs('no_result_msg', exp_name=exp_name, params=params,
run=run)
from time import time
@wraps(method)
def decorated(self, *args, **kwargs):
with lock:
decorated.run = runs[decorated].value
runs[decorated].value += 1
# Make sure the output dir exists
os.makedirs(self.results_dir, exist_ok=True)
values = _get_parameters(method, data_param, args, kwargs)
self.log.info(exp_start_msg(decorated.run, values))
start = time()
result = method(self, *args, **kwargs)
end = time()
self.log.info(exp_end_msg(decorated.run, values, end - start))
if result is not None:
basename = self.make_result_basename(exp_name, decorated.run)
write(result, basename)
self.log.info(wrote_results_msg(decorated.run, basename, values))
else:
self.log.warning(no_result_msg(decorated.run, values))
return result
return decorated
return _experiment