| import random |
| import numpy as np |
| import threading |
| import panel as pn |
| pn.extension(template='bootstrap') |
| import holoviews as hv |
| import time |
| import pandas as pd |
| from holoviews.streams import Stream |
| hv.extension('bokeh', logo=False) |
|
|
| |
| class Particle(): |
|
|
| |
| def __init__(self, initial): |
| self.position = [] |
| self.velocity = [] |
| self.initial = initial |
| self.best_position = [] |
| self.best_error = float('inf') |
| self.error = float('inf') |
| self.num_dimensions = 2 |
| for i in range(0, self.num_dimensions): |
| self.velocity.append(random.uniform(-1, 1)) |
| self.position.append(initial[i]) |
|
|
| |
| def update_velocity(self, global_best_position, max_iter, iter_count): |
| c1_start = 2.5 |
| c1_end = 0.5 |
| c2_start = 0.5 |
| c2_end = 2.5 |
| w = 0.7298 |
|
|
| c1 = c1_start - (c1_start - c1_end) * (iter_count / max_iter) |
| c2 = c2_start + (c2_end - c2_start) * (iter_count / max_iter) |
|
|
| for i in range(0, self.num_dimensions): |
| r1 = random.random() |
| r2 = random.random() |
|
|
| cog_vel = c1 * r1 * (self.best_position[i] - self.position[i]) |
| social_vel = c2 * r2 * (global_best_position[i] - self.position[i]) |
| self.velocity[i] = w * self.velocity[i] + cog_vel + social_vel |
|
|
| |
| def update_position(self, bounds): |
| for i in range(0, self.num_dimensions): |
| self.position[i] = self.position[i] + self.velocity[i] |
|
|
| if self.position[i] > bounds[i][1]: |
| self.position[i] = bounds[i][1] |
|
|
| if self.position[i] < bounds[i][0]: |
| self.position[i] = bounds[i][0] |
|
|
| |
| def evaluate_fitness(self, number, target, function): |
| if number == 1: |
| self.error = fitness_function(self.position, target) |
| else: |
| self.error = cost_function(self.position, function) |
|
|
| if self.error < self.best_error: |
| self.best_position = self.position[:] |
| self.best_error = self.error |
|
|
| |
| def get_error(self): |
| return self.error |
|
|
| |
| def get_best_pos(self): |
| return self.best_position[:] |
|
|
| |
| def get_best_error(self): |
| return self.best_error |
|
|
| |
| def get_pos(self): |
| return self.position[:] |
|
|
| |
| def get_velocity(self): |
| return self.velocity[:] |
|
|
| |
| def fitness_function(particle_position, target): |
| x_pos, y_pos = float(target[0]), float(target[1]) |
| return (x_pos - particle_position[0])**2 + (y_pos - particle_position[1])**2 |
|
|
| |
| import sympy as sp |
|
|
| def cost_function(particle_position, function_str): |
| x, y = sp.symbols('x y') |
| function = sp.sympify(function_str) |
| return function.subs({x: particle_position[0], y: particle_position[1]}) |
|
|
| |
| class Interactive_PSO(): |
|
|
| |
| def __init__(self): |
| self._running = False |
| self.max_iter = 500 |
| self.num_particles = 25 |
| self.initial = [5, 5] |
| self.bounds = [(-500, 500), (-500, 500)] |
| self.x_axis = [] |
| self.y_axis = [] |
| self.target = [5] * 2 |
| self.global_best_error = float('inf') |
| self.update_particles_position_lists_with_random_values() |
| self.global_best_position = [0, 0] |
|
|
| |
| |
| def swarm_initialization(self, number, max_iter): |
| swarm = [] |
| self.global_best_position = [0, 0] |
| self.global_best_error = float('inf') |
| self.gamma = 0.0001 |
| function = function_select.value |
| |
| for i in range(0, self.num_particles): |
| swarm.append(Particle([self.x_axis[i], self.y_axis[i]])) |
| |
| iter_count = 0 |
| while self._running: |
| if self.global_best_error <= 0.00001: |
| break |
| |
| for j in range(0, self.num_particles): |
| swarm[j].evaluate_fitness(number, self.target, function) |
| |
| if swarm[j].get_error() < self.global_best_error: |
| self.global_best_position = swarm[j].get_best_pos() |
| self.global_best_error = swarm[j].get_best_error() |
| |
| for j in range(0, self.num_particles): |
| swarm[j].update_velocity(self.global_best_position, max_iter, iter_count) |
| swarm[j].update_position(self.bounds) |
| self.x_axis[j] = swarm[j].get_pos()[0] |
| self.y_axis[j] = swarm[j].get_pos()[1] |
| |
| |
| time.sleep(0.05) |
| |
| iter_count += 1 |
| |
| |
| update_table = True |
| hv.streams.Stream.trigger(table_dmap.streams) |
| |
| self.initial = self.global_best_position |
| self._running = False |
| print('Best Position:', self.global_best_position) |
| print('Best Error:', self.global_best_error) |
| print('Function:', function) |
|
|
|
|
| |
| def terminate(self): |
| self._running = False |
|
|
| |
| def starting(self): |
| self._running = True |
|
|
| |
| def isrunning(self): |
| return self._running |
|
|
| |
| def get_num_particles(self): |
| return self.num_particles |
|
|
| |
| def update_num_particles(self, new_value): |
| self.num_particles = new_value |
|
|
| |
| def get_xaxis(self): |
| return self.x_axis[:] |
|
|
| |
| def get_yaxis(self): |
| return self.y_axis[:] |
|
|
| |
| def set_target(self, x, y): |
| self.target = [x, y] |
| |
| |
| def get_target(self): |
| return self.target[:] |
| |
| |
| def update_particles_position_lists(self, updated_num_particles): |
| old_x_value = self.x_axis[0] |
| old_y_value = self.y_axis[0] |
| if updated_num_particles > self.num_particles: |
| for i in range(self.num_particles, updated_num_particles): |
| self.x_axis.append(old_x_value) |
| self.y_axis.append(old_y_value) |
| else: |
| for i in range((self.num_particles) - 1, updated_num_particles - 1, -1): |
| self.x_axis.pop(i) |
| self.y_axis.pop(i) |
| |
| |
| def update_particles_position_lists_with_random_values(self): |
| self.x_axis = random.sample(range(-500, 500), self.num_particles) |
| self.y_axis = random.sample(range(-500, 500), self.num_particles) |
|
|
| pso_swarm = Interactive_PSO() |
| pso_computation_swarm = Interactive_PSO() |
|
|
| update_table = False |
|
|
| |
| def start_finding_the_target(): |
| pso_swarm.swarm_initialization(1, pso_swarm.max_iter) |
|
|
| |
| def start_computation(): |
| pso_computation_swarm.swarm_initialization(2, pso_computation_swarm.max_iter) |
|
|
| |
| def create_target_element(x, y): |
| pso_swarm.terminate() |
| if x is not None: |
| pso_swarm.set_target(x, y) |
| return hv.Points((x, y, 1), label='Target').opts(color='red', marker='^', size=10) |
|
|
| |
| def update(): |
| x_axis = pso_swarm.get_xaxis() |
| y_axis = pso_swarm.get_yaxis() |
| data = (x_axis, y_axis, np.random.random(size=len(x_axis))) |
| pop_scatter = hv.Scatter(data, vdims=['y_axis', 'z']) |
| pop_scatter.opts(size=8, color='z', cmap='Coolwarm_r') |
| return pop_scatter |
|
|
| |
| def computational_update(): |
| x_axis = pso_computation_swarm.get_xaxis() |
| y_axis = pso_computation_swarm.get_yaxis() |
| data = (x_axis, y_axis, np.random.random(size=len(x_axis))) |
| pop_scatter1 = hv.Scatter(data, vdims=['y_axis', 'z']) |
| pop_scatter1.opts(size=8, color='z', cmap='Coolwarm_r') |
| return pop_scatter1 |
|
|
| |
| def update_num_particles_event(event): |
| if population_slider.value == pso_swarm.get_num_particles(): |
| return |
| pso_swarm.terminate() |
| pso_computation_swarm.terminate() |
| time.sleep(1) |
| updated_num_particles = population_slider.value |
| pso_swarm.update_particles_position_lists(updated_num_particles) |
| pso_swarm.update_num_particles(updated_num_particles) |
| pso_computation_swarm.update_num_particles(updated_num_particles) |
| pso_computation_swarm.update_particles_position_lists_with_random_values() |
| pso_swarm.update_particles_position_lists_with_random_values() |
| hv.streams.Stream.trigger(pso_scatter1.streams) |
| hv.streams.Stream.trigger(pso_scatter.streams) |
| |
| |
| def trigger_streams(): |
| global update_table |
| hv.streams.Stream.trigger(pso_scatter.streams) |
| hv.streams.Stream.trigger(pso_scatter1.streams) |
| if update_table: |
| update_table = False |
| hv.streams.Stream.trigger(table_dmap.streams) |
|
|
| |
| tap.event(x=pso_swarm.get_target()[0], y=pso_swarm.get_target()[1]) |
|
|
| |
| time.sleep(0.05) |
|
|
| |
| def hunting_button_event(event): |
| if not pso_swarm.isrunning(): |
| pso_swarm.starting() |
| threading.Thread(target=start_finding_the_target).start() |
|
|
| |
| def computation_button_event(event): |
| if not pso_computation_swarm.isrunning(): |
| pso_computation_swarm.starting() |
| threading.Thread(target=start_computation).start() |
|
|
| def table(): |
| position = pso_computation_swarm.global_best_position |
| df = pd.DataFrame({ |
| 'x_position': [round(position[0])], |
| 'y_position': [round(position[1])] |
| }) |
|
|
| |
| hv_table = hv.Table(df).opts(width=300, height=100) |
|
|
| return hv_table |
|
|
|
|
| |
| def update_function(event): |
| pso_computation_swarm.terminate() |
| time.sleep(1) |
| pso_computation_swarm.update_particles_position_lists_with_random_values() |
|
|
| |
| |
| pso_scatter = hv.DynamicMap(update, streams=[Stream.define('Next')()]).opts(xlim=(-500, 500), ylim=(-500, 500), |
| title="Plot 2 : PSO for target finding ") |
| pso_scatter1 = hv.DynamicMap(computational_update, streams=[Stream.define('Next')()]).opts(xlim=(-500, 500), |
| ylim=(-500, 500), |
| title="Plot 1 : PSO for a mathematical computation") |
|
|
| |
| tap = hv.streams.SingleTap(x=pso_swarm.get_target()[0], y=pso_swarm.get_target()[1]) |
| target_dmap = hv.DynamicMap(create_target_element, streams=[tap]) |
|
|
| |
| custom_style = { |
| 'background': '##4287f5', |
| 'border': '1px solid black', |
| 'padding': '8px', |
| 'box-shadow': '5px 5px 5px #bcbcbc' |
| } |
|
|
| |
| table_dmap = hv.DynamicMap(table,streams=[hv.streams.Stream.define('Next')()]) |
| table_label = pn.pane.Markdown("Once an optimal solution is found in plot 1 it is updated in the below table") |
|
|
| |
| start_hunting_button = pn.widgets.Button(name=' Click to find target for plot 2 ', width=50) |
| start_hunting_button.on_click(hunting_button_event) |
|
|
| |
| start_finding_button = pn.widgets.Button(name=' Click to start computation for plot 1', width=50) |
| start_finding_button.on_click(computation_button_event) |
|
|
| |
| update_num_particles_button = pn.widgets.Button(name='Update number of particles', width=50) |
| update_num_particles_button.on_click(update_num_particles_event) |
|
|
| |
| pn.state.add_periodic_callback(trigger_streams, 3) |
|
|
| |
| population_slider = pn.widgets.IntSlider(name='Number of praticles', start=10, end=100, value=25) |
|
|
| |
| function_select = pn.widgets.Select(name='Select', options=['x^2+(y-100)^2','(x-234)^2+(y+100)^2', 'x^3 + y^3 - 3*x*y', 'x^2 * y^2']) |
| function_select.param.watch(update_function,'value') |
|
|
| |
| plot_for_finding_the_target = pso_scatter*target_dmap |
| |
| |
| dashboard = pn.Column(pn.Row(pn.Row(pso_scatter1.opts(width=500, height=500)), pn.Column(plot_for_finding_the_target.opts(width=500, height=500)), |
| pn.Column(pn.Column(table_label, table_dmap, styles=custom_style), start_finding_button, start_hunting_button, update_num_particles_button, population_slider,function_select))) |
|
|
| pn.panel(dashboard).servable(title='Swarm Particles Visualization') |