Source code for l2l.utils.environment

from l2l.utils.trajectory import Trajectory
import logging
from l2l.utils.runner import Runner

logger = logging.getLogger("utils.environment")


[docs] class Environment: """ The Environment class takes the place of the pypet Environment and provides the required functionality to execute the inner loop. Based on the pypet environment concept: https://github.com/SmokinCaterpillar/pypet """ def __init__(self, *args, **keyword_args): """ Initializes an Environment :param args: arguments passed to the environment initialization :param keyword_args: arguments by keyword. Relevant keywords are trajectory and filename. The trajectory object holds individual parameters and history per generation of the exploration process. """ if 'trajectory' in keyword_args: self.trajectory = Trajectory(name=keyword_args['trajectory'], debug = keyword_args['debug'], stop_run = keyword_args['stop_run'], timeout=keyword_args['timeout']) if 'checkpoint' in keyword_args: self.trajectory = keyword_args["checkpoint"] self.trajectory.is_loaded = True if 'filename' in keyword_args: self.filename = keyword_args['filename'] self.postprocessing = None self.multiprocessing = True if 'multiprocessing' in keyword_args: self.multiprocessing = keyword_args['multiprocessing'] self.run_id = 0 self.enable_logging()
[docs] def run(self): """ Runs all generations of the optimizees using the runner. """ result = {} logger.info(f"Environment run starting Runner for n iterations: {self.trajectory.par['n_iteration']}") runner = Runner(self.trajectory, self.trajectory.par['n_iteration']+self.trajectory.individual.generation) for it in range(self.trajectory.individual.generation, self.trajectory.par['n_iteration']+self.trajectory.individual.generation): if self.multiprocessing: # Multiprocessing is done through the runner result[it] = [] logger.info(f"Iteration: {it+1}/{self.trajectory.par['n_iteration']}") # execute run try: result[it] = runner.run(self.trajectory,it) except Exception as e: if self.logging: logger.exception("Error launching run: " + str(e.__cause__)) raise e # Add results to the trajectory self.trajectory.results.f_add_result_to_group("all_results", it, result[it]) self.trajectory.current_results = result[it] # Update trajectory file runner.dump_traj(self.trajectory) # Perform the postprocessing step in order to generate the new parameter set self.postprocessing(self.trajectory, result[it]) runner.close_workers()
[docs] def add_postprocessing(self, func): """ Function to add a postprocessing step :param func: the function which performs the postprocessing. Postprocessing is the step where the results are assessed in order to produce a new set of parameters for the next generation. """ self.postprocessing = func
[docs] def enable_logging(self): """ Function to enable logging TODO think about removing this. """ self.logging = True
[docs] def disable_logging(self): """ Function to enable logging """ self.logging = False