# -*- coding: utf-8 -*-
import taichi as ti
import numpy as np
import random
import time
ti.init()
@ti.data_oriented
class Population:
def __init__(self, min_range, max_range, dim, rounds, size, object_func):
self.min_range = ti.field(ti.f32, dim)
self.max_range = ti.field(ti.f32, dim)
self.min_range = min_range #list形式,输入各参数下界
self.max_range = max_range #list形式,输入各参数上界
self.dimension = dim #问题维数,即参数个数
self.rounds = rounds #迭代轮次
self.size = size #种群大小
self.G = 1 #代数
self.CR = 1 #交叉率
self.PI = np.pi
self.get_object_function_value = object_func #目标函数
self.AG = ti.Vector.field(dim, ti.f32, size)
self.BG = ti.Vector.field(dim, ti.f32, size)
self.best_x=ti.Vector.field(self.dimension, ti.f32, self.rounds+1)
for i in range(size):
self.AG[i] = np.array(self.max_range)-(np.array(self.max_range)-np.array(self.min_range))*random.random()
self.BG[i] = (np.array(self.max_range)-np.array(self.min_range))*random.random()+np.array(self.min_range)
self.Aobject_function_values = ti.field(ti.f32, size)
self.Bobject_function_values = ti.field(ti.f32, size)
self.AG_next_1=ti.Vector.field(self.dimension, ti.f32, self.size)
self.BG_next_1=ti.Vector.field(self.dimension, ti.f32, self.size)
self.AG_next_2=ti.Vector.field(self.dimension, ti.f32, self.size)
self.BG_next_2=ti.Vector.field(self.dimension, ti.f32, self.size)
self.XG_next=ti.Vector.field(self.dimension, ti.f32, self.size)
self.value = ti.field(ti.f32, size)
self.Gmin = ti.field(ti.f32, rounds)
@ti.kernel
def initial_AB(self): #初始化种群
for i in self.AG:
self.Aobject_function_values[i] = self.get_object_function_value(self.AG[i])
self.Bobject_function_values[i] = self.get_object_function_value(self.BG[i])
#@ti.kernel
def mutate_A(self): #A种群变异
for i in range(self.size):
r0 = 0
r1 = 0
while r0 == r1 or r0 == i or r1 == i: #如果相等,重新生成
r0 = random.randint(0, self.size-1) #生成0-size-1的随机数
r1 = random.randint(0, self.size-1)
F1=0.4*(ti.cos((self.G-1)*self.PI/self.rounds)+1)+0.1 #自适应种群A变异率
son = ti.Vector([0 for k in range(self.dimension)])
# for j in ti.static(range(self.dimension)):
# son[j] = self.AG[r0][j] + F1*(self.best_x[self.G-1][j] - self.AG[r1][j])
son=np.array(self.AG[r0])+F1*(np.array(self.best_x[self.G-1])-np.array(self.AG[r1]))
for j in ti.static(range(self.dimension)):
if son[j] > self.min_range[j] and son[j] < self.max_range[j]:
self.AG_next_1[i][j]=son[j]
else:
self.AG_next_1[i][j]=(self.max_range[j]-self.min_range[j])*random.random()+self.min_range[j]
#@ti.kernel
def mutate_B(self): #B种群变异
for i in range(self.size):
r0 = random.randint(0, self.size-1)
while r0 == i: #如果相等,重新生成
r0 = random.randint(0, self.size-1) #生成0~size-1的随机数
F2=0.5*(2-np.power(2, 1-self.rounds/self.G/self.G)) #自适应种群B变异率
son = ti.Vector([0 for k in range(self.dimension)])
son=random.random()*np.array(self.AG[r0])+F2*(np.array(self.best_x[self.G-1])-np.array(self.BG[r0]))
for j in ti.static(range(self.dimension)):
if son[j] > self.min_range[j] and son[j] < self.max_range[j]:
self.BG_next_1[i][j]=son[j]
else:
self.BG_next_1[i][j]=(self.max_range[j]-self.min_range[j])*random.random()+self.min_range[j]
#@ti.kernel
def cross_AB(self): #交叉
self.CR=0.4*(ti.cos((self.G-1)*self.PI/self.rounds-self.PI)+1) #自适应交叉率
for i in range(self.size):
s=[x for x in range(self.dimension)]
randx=random.sample(s,k=self.dimension)
for j in ti.static(range(self.dimension)):
tmp=random.random()
if tmp > self.CR and randx[0] != j:
self.AG_next_2[i][j]=self.AG[i][j]
self.BG_next_2[i][j]=self.BG[i][j]
else:
self.AG_next_2[i][j]=self.AG_next_1[i][j]
self.BG_next_2[i][j]=self.BG_next_1[i][j]
@ti.kernel
def choose(self): #选择最优解
for i in range(self.size):
Avalue = ti.Vector([0 for k in range(3)])
best = self.get_object_function_value(self.AG[i])
self.XG_next[i] = self.AG[i]
v1 = self.get_object_function_value(self.AG_next_1[i])
v2 = self.get_object_function_value(self.AG_next_2[i])
if best > v1:
best = v1
self.XG_next[i] = self.AG_next_1[i]
if best > v2:
best = v2
self.XG_next[i] = self.AG_next_2[i]
b1 = self.get_object_function_value(self.BG[i])
b2 = self.get_object_function_value(self.BG_next_1[i])
b3 = self.get_object_function_value(self.BG_next_2[i])
if best > b1:
best = b1
self.XG_next[i] = self.BG[i]
if best > b2:
best = b2
self.XG_next[i] = self.BG_next_1[i]
if best > b3:
best = b3
self.XG_next[i] = self.BG_next_2[i]
for i in range(self.size):
self.value[i]=self.get_object_function_value(self.XG_next[i])
def evolution(self): #进化
'''
******************* 初始化.**********************************************
'''
self.initial_AB()
startP = time.time()
best_value=np.inf
best_x=np.empty(shape=(self.rounds+1,self.dimension))
Abest_index = np.argmin(self.Aobject_function_values.to_numpy())
Bbest_index = np.argmin(self.Bobject_function_values.to_numpy())
if self.Aobject_function_values[Abest_index] < self.Bobject_function_values[Bbest_index]:
best_x[0] = self.AG[Abest_index].to_numpy()
else:
best_x[0] = self.BG[Bbest_index].to_numpy()
'''
******************* 开始进化.**********************************************
'''
self.G=1
while self.G <= self.rounds:
'''
******************* 变异过程.**********************************************
'''
'''
*** 种群A变异过程.****
'''
self.mutate_A()
'''
*** 种群B变异过程.****
'''
self.mutate_B()
'''
******************* 交叉过程.**********************************************
'''
self.cross_AB()
'''
******************* 选择过程.**********************************************
'''
self.choose()
'''
******************* 获取最优解.**********************************************
'''
[value_min,pos_min]=[np.min(self.value.to_numpy()),np.argmin(self.value.to_numpy())]
self.Gmin[self.G-1]=value_min
if self.G >= 2:
value_min=min(value_min,self.Gmin[self.G-2])
best_value=min(value_min,best_value)
best_x[self.G]=self.XG_next[pos_min].to_numpy()
print('Generation=',self.G,'\t','min_value= ',best_value)
# if best_value < self.accuracy: #收敛性检验
# print('The calibration took ',time.time() - startP,' seconds')
# print('CF calibration finished, please check xml file.')
# return best_x[G]
self.G=self.G+1
print('The optimization took ',time.time() - startP,' seconds')
[value_min,pos_min]=[np.min(self.Gmin.to_numpy()),np.argmin(self.Gmin.to_numpy())]
return best_x[pos_min+1] #返回最优解
@ti.func
def SCH(x) -> ti.f32:
sum = 0
for i in ti.static(range(x.n)):
sum = sum - x[i]*ti.sin(ti.sqrt(abs(x[i])))
return sum
min_bound=([-500.0 for i in range(30)])
max_bound=([500.0 for i in range(30)])
p=Population(min_bound,max_bound,30, 100, 30,SCH)
p.evolution()
各位老师好:
我在用太极编写一个启发式算法求最小值的程序,以上为代码,在运行的过程中,首先会报一系列的精度损失警告,而且该程序是并行化的,但却比我直接编写的相同的算法代码(不牵涉taichi)要慢很多(80s,8s),我检查了代码,但找不出问题在哪儿,想问下各位老师是什么原因,感谢!
精度问题是SCH那里应该是sum=0.0。性能的话我看你代码里有不少地方没用kernel,这些代码不会被并行,以及你用了不少numpy的东西,这些在跟太极数据类型转换的时候也会有性能损失
2 Likes
Hi @Ming_CHEN,
- 精度问题,确实如 @Vineyo 所说
@ti.func
def SCH(x) -> ti.f32:
sum = 0 # 精度问题出在这里
for i in ti.static(range(x.n)):
sum = sum - x[i]*ti.sin(ti.sqrt(abs(x[i])))
return sum
使用ti.kernel
和ti.func
两个修饰器修饰的函数属于Taichi scope,这部分代码会被提前编译好,函数内的类型会根据你的初始化提前确定。
sum=0
,太极编译器就认为sum
是一个i32
。 sum - x[i]*ti.sin(ti.sqrt(abs(x[i])))
会得到一个float类型的数字,然后赋值给sum
时就会导致精度缺失。
- 代码的速度
首先你可以设置ti.init(arch=ti.gpu)
,然后将计算中可以并行的部分尽可能的放到Taichi scope里处理。
还有就是你可以评估一下性能的瓶颈在哪些地方,然后做有目的的代码优化。可以参考太极图形课:here
2 Likes
好的,谢谢老师
谢谢!