Source code for EDAspy.optimization.eda

#!/usr/bin/env python
# coding: utf-8

import numpy as np
from abc import ABC
from .eda_result import EdaResult
from .custom.probabilistic_models import ProbabilisticModel
from .custom.initialization_models import GenInit
from .utils import _parallel_apply_along_axis
from time import process_time


[docs]class EDA(ABC): """ Abstract class which defines the general performance of the algorithms. The baseline of the EDA approach is defined in this object. The specific configurations is defined in the class of each specific algorithm. """ _pm = None _init = None def __init__(self, size_gen: int, max_iter: int, dead_iter: int, n_variables: int, alpha: float = 0.5, elite_factor: float = 0.4, disp: bool = True, parallelize: bool = False, init_data: np.array = None, w_noise: float = .5, *args, **kwargs): self.disp = disp self.size_gen = size_gen self.max_iter = max_iter self.alpha = alpha self.n_variables = n_variables self.truncation_length = int(size_gen * alpha) self.elite_factor = elite_factor self.elite_length = int(size_gen * elite_factor) self.parallelize = parallelize # Gaussian white noise definition if (w_noise != 0) and (w_noise is not None): self.w_noise = w_noise else: self.w_noise = 0 assert dead_iter <= self.max_iter, 'dead_iter must be lower than max_iter' self.dead_iter = dead_iter self.best_mae_global = 999999999999 self.best_ind_global = np.array([0]*self.n_variables) self.evaluations = np.array(0) self.evaluations_elite = np.array(0) self.generation = None self.elite_temp = None if parallelize: self._check_generation = self._check_generation_parallel else: self._check_generation = self._check_generation_no_parallel # allow initialize EDA with data if init_data is not None: assert init_data.shape[1] == n_variables, 'The inserted data shape and the number of variables do not match' # assert init_data.shape[0] == size_gen, 'The inserted data shape and the generation size do not match' self.init_data = init_data self._initialize_generation = self._initialize_generation_with_data else: self._initialize_generation = self._initialize_generation_with_init def _new_generation(self): # self.generation = np.concatenate([self.pm.sample(size=self.size_gen), self.elite_temp]) self.generation = self.pm.sample(size=self.size_gen) def _initialize_generation_with_data(self) -> np.array: return self.init_data def _initialize_generation_with_init(self) -> np.array: return self.init.sample(size=self.size_gen) def _initialize_generation(self) -> np.array: raise Exception('Not implemented function') def _truncation(self): """ Selection of the best individuals of the actual generation. """ # first add the elite selection to be considered self.generation = np.concatenate([self.generation, self.elite_temp]) self.evaluations = np.append(self.evaluations, self.evaluations_elite) # now we truncate ordering = self.evaluations.argsort() best_indices_truc = ordering[: self.truncation_length] best_indices_elit = ordering[: self.elite_length] self.elite_temp = self.generation[best_indices_elit, :] self.generation = self.generation[best_indices_truc, :] self.evaluations_elite = np.take(self.evaluations, best_indices_elit) self.evaluations = np.take(self.evaluations, best_indices_truc) # check each individual of the generation def _check_generation(self, objective_function: callable): """ Check the cost of each individual in the cost function implemented by the user, and updates the generation DataFrame. """ raise Exception('Not implemented function') def _check_generation_parallel(self, objective_function: callable): self.evaluations = _parallel_apply_along_axis(objective_function, 1, self.generation) def _check_generation_no_parallel(self, objective_function: callable): self.evaluations = np.apply_along_axis(objective_function, 1, self.generation) def _update_pm(self): """ Learn the probabilistic model from the best individuals of previous generation. """ self.pm.learn(dataset=self.generation)
[docs] def export_settings(self) -> dict: """ Export the configuration of the algorithm to an object to be loaded in other execution. :return: configuration dictionary. :rtype: dict """ return { "size_gen": self.size_gen, "max_iter": self.max_iter, "dead_iter": self.dead_iter, "n_variables": self.n_variables, "alpha": self.alpha, "elite_factor": self.elite_factor, "disp": self.disp, "parallelize": self.parallelize }
[docs] def minimize(self, cost_function: callable, output_runtime: bool = True, ftol: float = 1e-8, *args, **kwargs) -> EdaResult: """ Minimize function to execute the EDA optimization. By default, the optimizer is designed to minimize a cost function; if maximization is desired, just add a minus sign to your cost function. :param cost_function: cost function to be optimized and accepts an array as argument. :param output_runtime: true if information during runtime is desired. :param ftol: termination tolerance :return: EdaResult object with results and information. :rtype: EdaResult """ history = [] not_better = 0 t1 = process_time() self.generation = self._initialize_generation() self._check_generation(cost_function) # select just one item to be the elite selection if first iteration self.elite_temp = np.array([self.generation[0, :]]) self.evaluations_elite = np.array([self.evaluations.item(0)]) best_mae_local = self.best_mae_global = min(self.evaluations) history.append(best_mae_local) best_ind_local = self.best_ind_global = np.where(self.evaluations == best_mae_local)[0][0] if self.w_noise == 0: white_noise = 0 elif self.w_noise == -1: white_noise = "" else: white_noise = np.random.normal(0, self.w_noise, size=(self.truncation_length, self.n_variables)) if output_runtime: print('IT: 0\tBest cost: ', self.best_mae_global) for _ in range(1, self.max_iter): self._truncation() self.generation += white_noise # We add Gaussian white noise for avoiding genetic drift (optional) self._update_pm() self._new_generation() self._check_generation(cost_function) best_mae_local = min(self.evaluations) history.append(best_mae_local) best_ind_local = np.where(self.evaluations == best_mae_local)[0][0] best_ind_local = self.generation[best_ind_local] # update the best values ever if best_mae_local < self.best_mae_global * (1 + ftol): self.best_mae_global = best_mae_local self.best_ind_global = best_ind_local not_better = 0 else: not_better += 1 if not_better == self.dead_iter: break if output_runtime: print('IT: ', _, '\tBest cost: ', self.best_mae_global) if self.disp: print("\tNFEVALS = " + str(len(history) * self.size_gen) + " F = " + str(self.best_mae_global)) print("\tX = " + str(self.best_ind_global)) t2 = process_time() eda_result = EdaResult(self.best_ind_global, self.best_mae_global, len(history) * self.size_gen, history, self.export_settings(), t2-t1) return eda_result
@property def pm(self) -> ProbabilisticModel: """ Returns the probabilistic model used in the EDA implementation. :return: probabilistic model. :rtype: ProbabilisticModel """ return self._pm @pm.setter def pm(self, value): if isinstance(value, ProbabilisticModel): self._pm = value else: raise ValueError('The object you try to set as a probabilistic model does not extend the ' 'class ProbabilisticModel provided by EDAspy.') if len(value.variables) != self.n_variables: raise Exception('The number of variables of the probabilistic model is not equal to the number of ' 'variables of the EDA.') @property def init(self) -> GenInit: """ Returns the initializer used in the EDA implementation. :return: initializer. :rtype: GenInit """ return self._init @init.setter def init(self, value): if isinstance(value, GenInit): self._init = value else: raise ValueError('The object you try to set as an initializator does not extend the ' 'class GenInit provided by EDAspy') if value.n_variables != self.n_variables: raise Exception('The number of variables of the initializator is not equal to the number of ' 'variables of the EDA')