Source code for l2l.utils.experiment

import logging.config
import os
import pickle
import shutil

from l2l.utils.environment import Environment

from l2l.logging_tools import create_shared_logger_data, configure_loggers
from l2l.paths import Paths
import l2l.utils.runner as runner


[docs] class Experiment(object): def __init__(self, root_dir_path): """ Prepares and starts the l2l simulation. For an example see `L2L/bin/l2l-template.py` :param root_dir_path: str, Path to the results folder. Accepts relative paths. Will check if the folder exists and create if not. """ self.root_dir_path = os.path.abspath(root_dir_path) self.logger = logging.getLogger('utils.experiment') self.paths = None self.env = None self.traj = None self.optimizee = None self.optimizer = None
[docs] def prepare_experiment(self, **kwargs): """ Prepare the experiment by creating the enviroment and :param kwargs: optional dictionary, contains - name: str, name of the run, Default: L2L-run - trajectory_name: str, name of the trajectory, Default: trajectory - checkpoint: object, trajectory object - log_stdout: bool, if stdout should be sent to logs, Default:False - runner_params: dict, User specified parameters for the runner. See notes section for default runner parameters - multiprocessing, bool, enable multiprocessing, Default: False - debug, bool, enable verbose mode to get more detailed logs for debugging, Default: False - stop_run, bool, when an error occures the execution is stoped, Default: False - timeout, bool, stops execution after 2 hours if it is not finished by then, Default: True -overwrite, bool, specifies whether existing files should be overwritten Default: False :return traj, trajectory object :return all_runner_params, dict, a dictionary with all parameters for the runner given by the user and default ones :notes Default runner parameters are: - srun: "" - exec: 'python3 + "self.paths.simulation_path/run_optimizee.py"' - max_workers: 32 - work_path: self.paths.root_dir_path, - paths_obj: self.paths """ name = kwargs.get('name', 'L2L-run') if not os.path.isdir(self.root_dir_path): os.mkdir(os.path.abspath(self.root_dir_path)) print('Created a folder at {}'.format(self.root_dir_path)) if('checkpoint' in kwargs): self.traj = kwargs['checkpoint'] trajectory_name = self.traj._name else: trajectory_name = kwargs.get('trajectory_name', 'trajectory') self.paths = Paths(name, {}, root_dir_path=self.root_dir_path, suffix="-" + trajectory_name) overwrite = kwargs.get('overwrite', False) if os.path.isdir(self.paths.output_dir_path): if overwrite: ready_path = 'simulation/ready_files' if os.path.isdir(os.path.join(self.paths.output_dir_path, ready_path)): shutil.rmtree(os.path.join(self.paths.output_dir_path, ready_path)) else: raise Exception("There are already exsiting outputfiles in this directory. Please change the path specification.") print("All output logs can be found in directory ", self.paths.logs_path) # Create an environment that handles running our simulation # This initializes an environment if self.traj: self.env = Environment( checkpoint=self.traj, filename=self.paths.output_dir_path, file_title='{} data'.format(name), comment='{} data'.format(name), add_time=True, automatic_storing=True, log_stdout=kwargs.get('log_stdout', False), # Sends stdout to logs multiprocessing=kwargs.get('multiprocessing', True), debug = kwargs.get('debug', False), stop_run = kwargs.get('stop_run',False), timeout = kwargs.get('timeout', True) ) else: self.env = Environment( trajectory=trajectory_name, filename=self.paths.output_dir_path, file_title='{} data'.format(name), comment='{} data'.format(name), add_time=True, automatic_storing=True, log_stdout=kwargs.get('log_stdout', False), # Sends stdout to logs multiprocessing=kwargs.get('multiprocessing', True), debug = kwargs.get('debug', False), stop_run = kwargs.get('stop_run', False), timeout = kwargs.get('timeout', True) ) # Get the trajectory from the environment self.traj = self.env.trajectory create_shared_logger_data( logger_names=['optimizers', 'utils'], log_levels=['INFO', 'INFO'], log_to_consoles=[True, True], sim_name=name, log_directory=self.paths.logs_path) configure_loggers() default_runner_params = { "srun": "", "exec": 'python3 "' + os.path.join(self.paths.simulation_path, "run_optimizee.py") + '"', "max_workers": 32, "work_path": self.paths.root_dir_path, "paths_obj": self.paths, } # Will contain all runner parameters all_runner_params = {} self.traj.f_add_parameter_group("runner_params", "Contains runner parameters") # Go through the parameter dictionary and add to the trajectory if kwargs.get('runner_params'): for k, v in kwargs['runner_params'].items(): if k == "exec": val = v + ' "' + os.path.join(self.paths.simulation_path, "run_optimizee.py") + '"' self.traj.f_add_parameter_to_group("runner_params", k, val) all_runner_params[k] = val else: self.traj.f_add_parameter_to_group("runner_params", k, v) all_runner_params[k] = v # Default parameters are added if they are not already set by the user for k, v in default_runner_params.items(): if kwargs.get('runner_params'): if k not in kwargs.get('runner_params').keys(): self.traj.f_add_parameter_to_group("runner_params", k, v) all_runner_params[k] = v if k == "max_workers": self.logger.info(f"No parameter \'max_workers\' given to runner. Using default value {v}.") else: self.traj.f_add_parameter_to_group("runner_params", k, v) all_runner_params[k] = v print('Runner parameters used: {}'.format(all_runner_params)) return self.traj, all_runner_params
[docs] def run_experiment(self, optimizer, optimizee, optimizer_parameters=None, optimizee_parameters=None): """ Runs the simulation with all parameter combinations Optimizee and optimizer object are required as well as their parameters as namedtuples. :param optimizee: optimizee object :param optimizee_parameters: Namedtuple, optional, parameters of the optimizee :param optimizer: optimizer object :param optimizer_parameters: Namedtuple, optional, parameters of the optimizer """ ind = optimizee.create_individual() for key in ind: if(isinstance(ind[key], int)): raise ValueError('Parameter of type integer is not allowed') self.optimizee = optimizee self.optimizer = optimizer self.optimizer = optimizer self.logger.info("Optimizee parameters: %s", optimizee_parameters) self.logger.info("Optimizer parameters: %s", optimizer_parameters) runner.prepare_optimizee(optimizee, self.paths.simulation_path) # Add post processing self.env.add_postprocessing(optimizer.post_process) # Run the simulation self.env.run()
[docs] def end_experiment(self, optimizer): """ Ends the experiment and disables the logging :param optimizer: optimizer object :return traj, trajectory object :return path, Path object """ # Outer-loop optimizer end optimizer.end(self.traj) # Finally disable logging and close all log-files self.env.disable_logging() return self.traj, self.paths
[docs] def load_trajectory(self, traj_path): """ Loads a trajectory from a given file :param traj_path: path to the trajectory file :return traj: trajectory object """ traj_file = open(os.path.join(traj_path), "rb") loaded_traj = pickle.load(traj_file) traj_file.close() return loaded_traj