Source code for EDAspy.timeseries.TransformationsFeatureSelection
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import warnings
def __normalize__(array):
"""
Normalization of the array
:param array:
:return: normalized array
"""
if sum(array) == 0.0:
return [0] * len(array)
return [i / sum(array) for i in array]
[docs]class TransformationsFSEDA:
"""
Estimation of Distribution Algorithm that uses a Dirichlet distribution to select among the different time series
transformations that best improve the cost function to optimize.
...
Attributes:
--------------------
generation: pandas DataFrame
Last generation of the algorithm.
best_MAE: float
Best cost found.
best_ind: pandas DataFrame
First row of the pandas DataFrame. Can be casted to dictionary.
history_best: list
List of the costs found during runtime.
size_gen: int
Parameter set by user. Number of the individuals in each generation.
max_it: int
Parameter set by user. Maximum number of iterations of the algorithm.
dead_it: int
Parameter set by user. Number of iterations after which, if no improvement reached, the algorithm finishes.
vector: pandas DataFrame
When initialized, parameters set by the user. When finished, statistics learned by the user.
cost_function:
Set by user. Cost function set to optimize.
"""
generation = pd.DataFrame()
output_plot = ''
historic_best = []
best_MAE = 99999999999
best_ind = ''
def __init__(self, max_it, dead_it, size_gen, alpha, vector, array_transformations, cost_function):
"""
Constructor of the class.
:param max_it: Maximum number of iterations of the EDA.
:type max_it: int
:param dead_it: Number of iterations without improvement after which the algorithm finishes.
:type dead_it: int
:param size_gen: Number of individuals per generation.
:type size_gen: int
:param alpha: percentage (over 1) of the generation to be selected to reproduce the next generation.
:type alpha: float
:param vector: initial statistics.
:type vector: pandas DataFrame
:param array_transformations: name of the transformations to selected.
:type array_transformations: list
:param cost_function: cost function to calculate the cost of the individual. Receives a list of names, and
returns a float
:type cost_function: callable function
:raises Exception: cost function is not callable
"""
warnings.warn(
"This implementation will be deprecated in following versions of EDAspy, and adapted to the rest "
"of EDAs implementations", PendingDeprecationWarning
)
self.max_it = max_it
self.size_gen = size_gen
self.alpha = alpha
self.vector = vector
self.dead_it = dead_it
self.trunc_size = int(size_gen * alpha)
self.array_transformations = ['basic'] + array_transformations
self.variables = list(vector.columns)
# check if cost_function is real
if callable(cost_function):
self.cost_function = cost_function
else:
raise Exception('ERROR setting cost function. The cost function must be a callable function')
def __initialize_dirichlet__(self):
"""
Initialization of the transformation database. Associate a key to each transformation.
:return: updates the dictionary {transformation: key}
"""
dirichlet = pd.DataFrame(columns=['transformation'] + self.variables)
dirichlet['transformation'] = self.array_transformations
dirichlet = dirichlet.set_index('transformation')
for i in dirichlet.index:
dirichlet.loc[i] = 1 / len(dirichlet)
self.dirichlet = dirichlet
# dictionary that specifies a keys to each transformation
keys = {}
for i in range(len(self.array_transformations)):
keys[self.array_transformations[i]] = i + 2 # [2, inf]
self.keys = keys
def __new_individual__(self):
"""
Creates a new individual.
:return: dictionary {variable_name: number}
"""
num_vars = len(self.variables)
sample = list(np.random.uniform(low=0, high=1, size=num_vars))
individual = {}
index = 0
for ind in self.variables:
if float(self.vector[ind]) >= sample[index]:
individual[ind] = 1
else:
individual[ind] = 0
index = index + 1
# if is one, then choose transformation or normal
for ind in self.variables:
if individual[ind] == 1:
# assign the keys of the chosen transformation
probabilities = list(self.dirichlet[ind].values)
trans = list(self.dirichlet[ind].index)
selection = np.random.choice(trans, 1, p=probabilities)
individual[ind] = int(self.keys[selection[0]])
return individual
[docs] def new_generation(self):
"""
Creates a new generation of individuals. Updates the generation DataFrame
"""
gen = pd.DataFrame(columns=self.variables)
while len(gen) < self.size_gen:
individual = self.__new_individual__()
gen = gen.append(individual, True)
# drop duplicate individuals, to not calculate more than once
gen = gen.drop_duplicates()
gen = gen.reset_index()
del gen['index']
self.generation = gen
def __getKeysByValue__(self, value_2_find):
"""
Get a list of keys from dictionary which has the given value
:param value_2_find: value to find in the dictionary
:return: list of keys which match with value_2_find in the dictionary {transformation: key}
:rtype: list
"""
list_keys = list()
list_items = self.keys.items()
for item in list_items:
if item[1] == value_2_find:
list_keys.append(item[0])
return list_keys
def __check_individual__(self, individual):
"""
Check the cost of the individual in the cost function.
:param individual: dictionary of the respective individual
:type individual: dict
:return: cost of the individual calculated by cost function
:rtype: float
"""
variables = [] # list of variables included
for i in self.variables:
# if individual included in selection then != 0
# else == 0
if individual[i] != 0:
# format: name + 'name_transformation'
key = str(self.__getKeysByValue__(individual[i])[0])
if key == 'basic':
variables.append(i) # name
else:
variables.append(i + key) # name + name_transformation
return self.cost_function(variables)
# check the cost of each individual of the generation
[docs] def check_generation(self):
"""
Check the cost of each individual of the generation in the cost function
"""
for ind in range(len(self.generation)):
try:
mae = self.__check_individual__(self.generation.loc[ind])
except:
raise Exception('ERROR: something went wrong calculating the cost of the individual: \n', str(ind))
# print('ind: ', ind, ' MAE: ', mae)
self.generation.loc[ind, 'MAE'] = mae
# selection of the best individuals to mutate the next gen
[docs] def individuals_selection(self):
"""
Selection of the best individuals to mutate the next generation
"""
self.generation = self.generation.nsmallest(self.trunc_size, 'MAE')
[docs] def update_vector_probabilities(self):
"""
Re-build the vector of statistics based on the selection of the best individuals of the generation.
"""
for ind in self.variables:
# count how many 1s, 2s, 3s ...
my_list = list(self.generation[ind].values)
my_dict = {i: my_list.count(i) for i in my_list}
# if not 0 in dictionary, then prob is 0
if 0 not in my_dict:
prob_vector = 0
else:
# if 0 in my_dict
prob_vector = int(my_dict[0]) / len(self.generation)
self.vector[ind] = 1 - prob_vector # probability of being chosen
for trans in self.dirichlet.index:
key = int(self.keys[trans])
# check if all values are counted
if key not in my_dict:
prob_dirich = 0
else:
prob_dirich = int(my_dict[key]) / len(self.generation)
self.dirichlet.loc[trans, ind] = prob_dirich
# normalize probabilities in dirichlet
for ind in self.dirichlet.columns:
values = list(self.dirichlet[ind].values)
self.dirichlet[ind] = __normalize__(values)
def __plot__(self):
"""
Save a figure in the filename location with the EDA progress.
output_plot must be overwritten previously.
"""
if self.output_plot != '':
iteration = list(range(len(self.historic_best)))
plt.figure(figsize=(12, 8))
plt.plot(iteration, self.historic_best)
plt.title('EDA progression')
plt.xlabel('iteration')
plt.ylabel('MAE in model')
plt.savefig(self.output_plot)
[docs] def run(self, output=True):
"""
Algorithm run execution
:param output: If True then an output is printed in each iteration. Otherwise, not
:type output: bool
:return: best_individual, best MAE found
:rtype: list, float
"""
convergence = 0
self.__initialize_dirichlet__()
for i in range(self.max_it):
self.new_generation()
self.check_generation()
self.individuals_selection()
self.update_vector_probabilities()
best_mae_local = self.generation['MAE'].min()
best_ind_local = []
best = self.generation[self.generation['MAE'] == best_mae_local]
best = best.reset_index()
if len(best) > 1:
best = best.loc[0]
for var in self.variables:
if int(best[var]) != 0:
# format: name + 'name_transformation'
string = var + str(self.__getKeysByValue__(int(best[var]))[0])
best_ind_local.append(string)
self.historic_best.append(best_mae_local) # save MAE
'''if output:
print(list(self.vector.loc[0]))
print('Best of it.', best_mae_local)
print(best_ind_local)'''
# update best of model
if self.best_MAE > best_mae_local:
self.best_MAE = best_mae_local
self.best_ind = best_ind_local
# print('** Best MAE:', best_mae_local)
convergence = 0
else:
convergence = convergence + 1
if convergence == self.dead_it:
self.__plot__() # save the fig of the progression
return self.best_ind, self.best_MAE
if output:
print('[iteration:', i, ']', best_mae_local)
self.__plot__() # save the fig of the progression
return self.best_ind, self.best_MAE