Source code for pyqlearning.annealingmodel.simulated_annealing

# -*- coding: utf-8 -*-
import numpy as np
from pyqlearning.annealing_model import AnnealingModel
from pyqlearning.annealingmodel.cost_functionable import CostFunctionable


[docs]class SimulatedAnnealing(AnnealingModel): ''' Simulated Annealing. Simulated Annealing is a probabilistic single solution based search method inspired by the annealing process in metallurgy. Annealing is a physical process referred to as tempering certain alloys of metal, glass, or crystal by heating above its melting point, holding its temperature, and then cooling it very slowly until it solidifies into a perfect crystalline structure. The simulation of this process is known as simulated annealing. References: - Bertsimas, D., & Tsitsiklis, J. (1993). Simulated annealing. Statistical science, 8(1), 10-15. - Du, K. L., & Swamy, M. N. S. (2016). Search and optimization by metaheuristics. New York City: Springer. - Mezard, M., & Montanari, A. (2009). Information, physics, and computation. Oxford University Press. - Nallusamy, R., Duraiswamy, K., Dhanalaksmi, R., & Parthiban, P. (2009). Optimization of non-linear multiple traveling salesman problem using k-means clustering, shrink wrap algorithm and meta-heuristics. International Journal of Nonlinear Science, 8(4), 480-487. ''' # Number of cycles. __cycles_num = 200 # Number of trials per cycle. __trials_per_cycle = 50 # Number of accepted solutions. __accepted_sol_num = 0.0 # Probability of accepting worse solution at the start. __init_prob = 0.7 # Probability of accepting worse solution at the end. __final_prob = 0.001 # Start location __start_pos = 0 # Initial temperature __init_temp = None # Final temperature __final_temp = None # User function for optimization __cost_functionable = None # The scalar of fractional reduction in every cycle # for lower the temperature for next cycle __fractional_reduction = None # Range of moving in the feature map. __move_range = 3 def __init__( self, cost_functionable, cycles_num=200, trials_per_cycle=50, accepted_sol_num=0.0, init_prob=0.7, final_prob=0.001, start_pos=0, move_range=3, tolerance_diff_e=None ): ''' Initialize. Args: cost_functionalbe: The object of `CostFunctionable`. cycles_num: The number of annealing cycles. trials_per_cycles: The number of traials per the cycles. accepted_sol_num: The number of acceptance solution. init_prob: Probability of accepting worse solution at the start. final_prob: Probability of accepting worse solution at the end. start_pos: The first searched position. move_range: The range of moving in the feature map. tolerance_diff_e: Tolerance for the optimization. When the ΔE is not improving by at least `tolerance_diff_e` for two consecutive iterations, annealing will stops. ''' if isinstance(cost_functionable, CostFunctionable): self.__cost_functionable = cost_functionable else: raise TypeError('The type of `cost_functionable must be CostFunctionable.') self.__cycles_num = cycles_num self.__trials_per_cycle = trials_per_cycle self.__accepted_sol_num = accepted_sol_num + 1.0 self.__init_prob = init_prob self.__final_prob = final_prob self.__start_pos = start_pos self.__tolerance_diff_e = tolerance_diff_e if move_range <= 1: move_range = 2 self.__move_range = move_range # Initial temperature self.__init_temp = -1.0/np.log(self.__init_prob) # Final temperature self.__final_temp = -1.0/np.log(self.__final_prob) # Fractional reduction every cycle. self.__fractional_reduction = (self.__final_temp / self.__init_temp) ** (1.0 / (self.__cycles_num-1.0)) def __move(self, current_pos): ''' Move in the feature map. Args: current_pos: The now position. Returns: The next position. ''' if self.__move_range is not None: next_pos = np.random.randint(current_pos - self.__move_range, current_pos + self.__move_range) if next_pos < 0: next_pos = 0 elif next_pos >= self.var_arr.shape[0] - 1: next_pos = self.var_arr.shape[0] - 1 return next_pos else: next_pos = np.random.randint(self.var_arr.shape[0] - 1) return next_pos
[docs] def annealing(self): ''' Annealing. ''' shape_list = list(self.var_arr.shape) shape_list[0] = self.__cycles_num + 1 self.var_log_arr = np.zeros(tuple(shape_list)) current_pos = self.__start_pos current_var_arr = self.var_arr[current_pos, :] current_cost_arr = self.__cost_functionable.compute(self.var_arr[current_pos, :]) self.computed_cost_arr = np.zeros(self.__cycles_num + 1) self.computed_cost_arr[0] = current_cost_arr t = self.__init_temp delta_e_avg = 0.0 pos_log_list = [current_pos] predicted_log_list = [] for i in range(self.__cycles_num): if isinstance(self.__tolerance_diff_e, float) and len(predicted_log_list) > 1: diff = abs(predicted_log_list[-1][2] - predicted_log_list[-2][2]) if diff < self.__tolerance_diff_e: break for j in range(self.__trials_per_cycle): current_pos = self.__move(current_pos) pos_log_list.append(current_pos) self.__now_dist_mat_arr = self.var_arr[current_pos, :] cost_arr = self.__cost_functionable.compute(self.__now_dist_mat_arr) delta_e = np.abs(cost_arr - current_cost_arr) if (cost_arr > current_cost_arr): if (i == 0 and j == 0): delta_e_avg = delta_e try: p = np.exp(-delta_e/(delta_e_avg * t)) except ZeroDivisionError: p = 0.0 if (np.random.random() < p): accept = True else: accept = False else: accept = True p = 0.0 if accept is True: current_var_arr = self.__now_dist_mat_arr current_cost_arr = cost_arr self.__accepted_sol_num = self.__accepted_sol_num + 1.0 delta_e_avg = (delta_e_avg * (self.__accepted_sol_num - 1.0) + delta_e) / self.__accepted_sol_num predicted_log_list.append((cost_arr , delta_e, delta_e_avg, p, int(accept))) self.var_log_arr[i + 1] = current_var_arr self.computed_cost_arr[i + 1] = current_cost_arr t = t * self.__fractional_reduction self.predicted_log_arr = np.array(predicted_log_list)