Source code for l2l.utils.runner

import os
import os.path
import pickle
import logging
import shlex, subprocess
import time
import copy
import zipfile
import sys
  
from l2l.utils.trajectory import Trajectory

logger = logging.getLogger("utils.runner")

[docs] class Runner(): """ A class used to launch the individual optimizees in parallel within the available computing resources. Takes care of launching, monitoring and ending the execution of the individuals. It generates workers which are assigned one or more individuals to be executed within each generation. It also takes care of collecting the results and relaunching individuals if there are runtime or logic errors associated with the execution. ... Methods ------- collect_results_from_run(self, generation, individuals) run(self, trajectory, generation) produce_run_command(idx) launch(idx) launch_workers() close_workers() restart_worker(w_id) simulate_generation(gen, n_inds) prepare_run_file() dump_traj(trajectory) create_zipfile(folder, filename) """ def __init__(self, trajectory, iterations): """ :param trajectory: A trajectory object holding the parameters to use in the initialization :param iterations: number of iterations in the optimization process """ self.trajectory = trajectory self.iterations = iterations args = self.trajectory.parameters["runner_params"].params self.path = args['paths_obj'].simulation_path self.srun_command = args['srun'] self.exec_command = args['exec'] # Create directories for workspace subdirs = ['trajectories', 'results', 'individual_logs'] self.work_paths = {sdir: os.path.join(self.path, sdir) for sdir in subdirs} os.makedirs(self.path, exist_ok=True) for dir in self.work_paths: os.makedirs(self.work_paths[dir], exist_ok=True) self.optimizeepath = os.path.join(self.path, "optimizee.bin") self.debug = self.trajectory.debug self.stop_run = self.trajectory.stop_run self.timeout = self.trajectory.timeout self.pending_individuals = [] self.running_individuals = [] self.finished_individuals = [] self.running_workers = {} self.running_workers_individual_indeces = {} # TODO built a cleaner data structure for the workers that stores both its process and the idx of the current individual self.idle_workers = {} self.outputpipes = {} self.inputpipes = {} self.worker_to_individual_map = {} self.n_inds = len(trajectory.individuals[0]) self.n_workers = min(self.n_inds, args['max_workers']) self.prepare_run_file() self.launch_workers() logger.info(f"{self.n_inds} workers launched\n")
[docs] def collect_results_from_run(self, generation, individuals): """ Collects the results generated by each individual in the generation. Results are, for the moment, stored in individual binary files. :param generation: generation id :param individuals: list of individuals which were executed in this generation :return results: a list containing objects produced as results of the execution of each individual """ results = [] for ind in individuals: indfname = "results_%s_%s.bin" % (generation, ind.ind_idx) handle = open(os.path.join(self.work_paths["results"], indfname), "rb") results.append((ind.ind_idx, pickle.load(handle))) handle.close() return results
[docs] def run(self, trajectory, generation): """ Takes care of running the generation by executing run_optimizee.py in parallel, waiting for the execution and gathering the results. :param trajectory: trajectory object storing individual parameters for each generation :param generation: id of the generation :return results: a list containing objects produced as results of the execution of each individual """ self.generation = generation #self.prepare_run_file() # Dump trajectory for each optimizee run in the generation # each trajectory needs an individual to get the correct generation trajectory.individual = self.trajectory.individuals[generation][0] self.dump_traj(trajectory) logger.info("Running generation: " + str(self.generation)) exit_codes = self.simulate_generation(self.generation, len(trajectory.individuals[generation])) ## Touch done generation logger.info("Finished generation: " + str(self.generation)) if all(exit_code == 0 for exit_code in exit_codes): results = self.collect_results_from_run(generation, self.trajectory.individuals[generation]) else: # not all individuals finished without error (even potentially after restarting) raise RuntimeError(f"Generation {generation} did not finish successfully") #create zipfiles for Err and Out files self.create_zipfile(self.work_paths["individual_logs"], f"logs_generation_{generation}") self.close_workers() return results
[docs] def produce_run_command(self, idx): """ Generates a string that can be used to launch an instance of the optimizee with specific parameters, also called an individual. :param idx: the id of the individual to be launched. """ log_files = {'stdout': os.path.join(self.work_paths['individual_logs'], f'out_{idx}.log'), 'stderr': os.path.join(self.work_paths['individual_logs'], f'err_{idx}.log')} if self.srun_command: # HPC case with slurm log_files ={} run_ind = f"{self.srun_command} --output='{self.work_paths['individual_logs']}/out_{idx}.log' --error='{self.work_paths['individual_logs']}/err_{idx}.log' {self.exec_command} {idx} &" else: # local case without slurm run_ind = f"{self.exec_command} {idx}" args = shlex.split(run_ind) if(self.debug): logger.info(f"run command: {run_ind}") return args, log_files
[docs] def launch_worker(self, w_id): """ This function uses the subprocess.Popen function to launch the command required to initialize a parallel worker. This worker will stay alive during the whole optimization run and will receive the id of the individuals it will execute each generation. It also generates the pipes (files in a shared file system) to communicate between the runner and the worker. Each worker has its own file in the path 'individual_logs' with the worker_ prefix. The logs pertaining to each individual are also stored in 'individual_logs' with err_ and out_ prefixes. :param idx: the id of the individual to be launched. """ outputpipename = os.path.join(self.work_paths['individual_logs'],f"outputpipe_{w_id}") open(outputpipename, 'w+').close() inputpipename = os.path.join(self.work_paths['individual_logs'],f"inputpipe_{w_id}") open(inputpipename, 'w+').close() if(self.debug): logger.info(f"Pipe created {outputpipename}") logger.info(f"Pipe created {inputpipename}") args, log_files = self.produce_run_command(w_id) if log_files: process = subprocess.Popen(args, stdout=open(log_files['stdout'], 'w'), stderr=open(log_files['stderr'], 'w')) else: process = subprocess.Popen(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE)#, stdin=subprocess.PIPE) try: self.outputpipes[w_id] = open(outputpipename, 'r') self.inputpipes[w_id] = open(inputpipename, 'w') except Exception as e: logger.info(f"{e}") #os.set_blocking(self.outputpipes[idx].fileno(), False) self.idle_workers[w_id] = process if(self.debug): logger.info(f"Worker created {w_id}")
[docs] def launch_workers(self): """ Takes care of launching enough workers as allowed by the available computing resources. """ for w_id in range(self.n_workers): self.launch_worker(w_id) logger.info(f"All {self.n_workers} workers created")
[docs] def close_workers(self): """ Makes sure all workers are notified that the optimization run is over and closes all open pipes and files. """ for w_id in list(self.running_workers.keys()): process = self.running_workers[w_id] process.terminate()
[docs] def restart_worker(self, w_id): """ Takes care of handling the restart of a worker and its associated individual which failed by any reason, either runtime or logic. :param w_id: the id of the worker to be launched. """ self.running_workers.pop(w_id) self.launch_worker(w_id)
[docs] def restart_individual(self, gen, idx): """ Takes care of handling the restart of an individual which failed by any reason, either runtime or logic. :param gen: the current generation :param idx: the id of the individual to be launched.i """ #TODO: implement different restart strategies depending on the optimizee. self.pending_individuals.append(idx) logger.info(f"Restarting individual {idx}")
[docs] def populate_free_workers(self, gen): # assing pending inds to free workers while self.idle_workers and self.pending_individuals: w_id = list(self.idle_workers.keys())[0] next_idx = self.pending_individuals.pop(0) self.inputpipes[w_id].write(f"{gen} {next_idx} 1\n")#.encode('ascii')) self.inputpipes[w_id].flush() self.running_workers[w_id] = self.idle_workers.pop(w_id) self.running_workers_individual_indeces[w_id] = next_idx self.running_individuals.append(next_idx) self.worker_to_individual_map[w_id] = next_idx if(self.debug): logger.info(f"--- sent idx {next_idx} to worker {w_id}")
[docs] def simulate_generation(self, gen, n_inds): """ Executes n_inds srun commands, waits for them to finish and writes their exit codes to 'exit_codes.log' :param gen: the current generation :param n_inds: the number of individuals within the generation """ self.pending_individuals = list(range(n_inds)) self.populate_free_workers(gen=gen) # Wait for all individual to finish # Restart failed individuals retry=0 sorted_exit_codes = [1]*n_inds if (self.debug): logger.info(f"All workers started running individuals for gen {gen}\n") # Add a try catch block to manage restarting individuals correctly logger.info(f"Reading output from gen {gen}") while True: for w_id in list(self.running_workers.keys()): process = self.running_workers[w_id] ind_idx = self.running_workers_individual_indeces[w_id] status_code = process.poll() try: out = self.outputpipes[w_id].readline().replace('\n', '') except Exception as e: logger.error(f"Exception: {e}") continue if out == "0": if(self.debug): logger.info(f"Individual finished without error {ind_idx}: {out}") # individual finished without error self.running_individuals.remove(ind_idx) self.finished_individuals.append(ind_idx) sorted_exit_codes[ind_idx] = 0 # set worker to idle self.idle_workers[w_id] = self.running_workers.pop(w_id) #TODO error control of problematic optimizees elif out == "1": if(self.debug): logger.info(f"Individual finished with error {ind_idx}: {out}. Restarting.") # if stop_run is set, simulation should stop if error occurs if(self.stop_run): logger.info(f"Individual finished with error {ind_idx}: {out}. Stop Execution.") self.close_workers() sys.exit(1) self.running_individuals.remove(ind_idx) sorted_exit_codes[ind_idx] = 1 # set worker to idle self.idle_workers[w_id] = self.running_workers.pop(w_id) # restart individual self.restart_individual(gen, ind_idx) if status_code == None: # Indivdual still running continue elif status_code == 0: # Process closed self.running_workers.pop(w_id) if(self.debug): logger.info(f"Finished worker {w_id}: {status_code}") else: if(self.debug): logger.info(f"Error status worker {w_id}: {status_code}") # worker raised error if (self.stop_run): logger.info(f"Error status worker {w_id}: {status_code}. Stop execution") self.close_workers() sys.exit(1) # TODO depending on what kind of error restart failed worker if status_code > 128 and retry<20:#Error spawning step, wait a bit? if(self.debug): logger.info(f"Restarting {w_id} from error {status_code}\n retry {retry}") time.sleep(4) retry += 1 self.trajectory.retry = retry self.dump_traj(self.trajectory) self.restart_worker(w_id) self.restart_individual(gen, self.worker_to_individual_map[w_id]) self.running_individuals.remove(ind_idx) else: logger.error("Worker could not be initialized") raise NotImplementedError("Restart failed for worker") # assing pending inds to free workers self.populate_free_workers(gen=gen) if not self.running_individuals and not self.pending_individuals: self.worker_to_individual_map = {} # all individuals finished break sys.stdout.flush() time.sleep(5) #sorted_exit_codes = [self.finished_individuals[idx].poll() for idx in range(n_inds)] return sorted_exit_codes
[docs] def prepare_run_file(self): """ Writes a python run file which takes care of loading the optimizee from a binary file, the trajectory object of each individual. Then executes the 'simulate' function of the optimizee using the trajectory and writes the results in a binary file. :param path_ready: path to store the ready files :return true if all files are present, false otherwise """ trajpath = os.path.join(self.work_paths["trajectories"], "op_trajectory_") respath = os.path.join(self.work_paths['results'], "results_") f = open(os.path.join(self.path, "run_optimizee.py"), "w") f.write('import pickle\n' + 'import sys\n' + 'import gc\n' + 'import os\n' + 'import logging\n' + 'import socket\n' + 'import time\n' + 'worker_id = sys.argv[1]\n' + 'logfilename = f"'+self.work_paths['individual_logs']+'/workers_{worker_id}.wlog"\n' + 'logging.basicConfig(filename=logfilename, filemode="a", level=logging.INFO)\n' + 'logger = logging.getLogger("Optimizee")\n'+ 'logger.info(socket.gethostname())\n' + 'outputpipename = f"'+self.work_paths['individual_logs']+'/outputpipe_{worker_id}"\n'+ 'outputpipe = open(outputpipename, "wb")\n' + 'inputpipename = f"'+self.work_paths['individual_logs']+'/inputpipe_{worker_id}"\n'+ 'inputpipe = open(inputpipename, "r")\n' + 'running = 1\n' + 'while running:\n' + ' try:\n' + ' logger.info(f"Receiving")\n' + ' params = ""\n' + ' while not params:\n' + ' params = inputpipe.readline()\n' + ' time.sleep(5)\n' + ' logger.info(f"Params received: {params}")\n' + ' params = params.split()\n' + ' logger.info(params)\n' + ' generation = params[0]\n' + ' idx = params[1]\n' + ' running = int(params[2])\n' + ' if not running:\n' + ' break\n' + ' handle_trajectory = open("' + trajpath + '"+ str(generation) + ".bin", "rb")\n' + ' trajectory = pickle.load(handle_trajectory)\n' + ' handle_trajectory.close()\n' + ' handle_optimizee = open("' + self.optimizeepath + '", "rb")\n' + ' optimizee = pickle.load(handle_optimizee)\n' + ' handle_optimizee.close()\n\n' + ' logger.info("Trajectory access")\n' + ' logger.info(trajectory.individuals)\n' + ' logger.info(trajectory.retry)\n' + ' logger.info(len(trajectory.individuals[int(generation)]))\n' + ' trajectory.individual = trajectory.individuals[int(generation)][int(idx)] \n'+ ' res = optimizee.simulate(trajectory)\n\n' + ' handle_res = open("' + respath + '"+ str(generation) + "_" + str(idx) + ".bin", "wb")\n' + ' pickle.dump(res, handle_res, pickle.HIGHEST_PROTOCOL)\n' + ' handle_res.close()\n' + ' del optimizee\n' + ' outputpipe.write(f"0\\n".encode(\'ascii\'))\n' + #' outputpipe.write("0\\n")\n' + ' outputpipe.flush()\n' + ' logger.info(f"Finished {idx}")\n' + ' gc.collect()\n' + ' except Exception as e:\n' + ' logger.info(str(e))\n' + ' logger.info(f"Params received in except: {params}")\n' + ' if params == "":\n' + ' continue\n' + ' #else:\n' + ' # sys.stderr.write(b"1")\n' + ' # sys.stderr.flush()\n' + 'outputpipe.close()\n'+ 'inputpipe.close()') f.close()
[docs] def dump_traj(self, trajectory): """Dumpes trajectory files. :param trajectory, object to be dumped""" #trajfname = "trajectory_%s.bin" % (trajectory.individual.generation) #handle = open(os.path.join(self.work_paths["trajectories"], trajfname), # "wb") #pickle.dump(trajectory, handle, pickle.HIGHEST_PROTOCOL) #handle.close() mtrajfname = os.path.join(self.work_paths["trajectories"], "trajectory_%s.bin" % (trajectory.individual.generation)) mtraj = trajectory #if os.path.isfile(mtrajfname): # with open(mtrajfname, 'rb') as mhandle: # mtraj = pickle.load(mhandle) # mtraj.individuals[trajectory.individual.generation] = trajectory.individuals[trajectory.individual.generation] with open(mtrajfname, 'wb') as mhandle: pickle.dump(mtraj, mhandle, pickle.HIGHEST_PROTOCOL) #tmpgen = trajectory.individuals[trajectory.individual.generation] tmptraj = Trajectory() tmptraj.retry = trajectory.retry tmptraj.individual = trajectory.individual tmptraj.individuals[trajectory.individual.generation] = trajectory.individuals[trajectory.individual.generation]#tmpgen trajfname = "op_trajectory_%s.bin" % (trajectory.individual.generation) handle = open(os.path.join(self.work_paths["trajectories"], trajfname), "wb") pickle.dump(tmptraj, handle, pickle.HIGHEST_PROTOCOL) handle.close() del tmptraj
[docs] def create_zipfile(self, folder, filename): """ Creates zipfile and deletes files included in the zip file :param folder: path to folder containing the files :param filename: filename of the created zip file """ # Full path for the zip file zip_path = os.path.join(folder, filename + '.zip') # Creating the zip file in the specified folder with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as target: for root, dirs, files in os.walk(folder): for file in files: if file.endswith('.log'): add = os.path.join(root, file) target.write(add, os.path.relpath(add, folder)) # Deleting the content of the log files after zipping #os.remove(add) f = open(add,'w') f.close()
def prepare_optimizee(optimizee, path): """ Helper function used to dump the optimizee it a binary file for later loading during run. :param optimizee: the optimizee to dump into a binary file :param path: The path to store the optimizee. """ # Serialize optimizee object so each process can run simulate on it independently on the CNs fname = os.path.join(path, "optimizee.bin") f = open(fname, "wb") pickle.dump(optimizee, f) f.close() logger.info("Serialized optimizee writen to path: {}".format(fname))