#该代码是一个小型的DNN网络代码的实现,并且实现了多种激活函数,并且实现了图形化显示,特别适合直观地理解神经网络的拟合过程
#代码主要测试了sigmoid和relu函数,另外我们还测试了sin和正态分布,这些函数都能很好地拟合函数,但是一定要对初始化权重做一定的处理,否则训练会很难
#原作者:易瑜 邮箱:296721135@qq.com 如果有错误,欢迎指正,如转载,请注明作者和出处
importnumpyasnp
importmatplotlib.pyplotasplt
importmath
importrandom
classActivation:##子类必须实现下面的函数
def__init__(self):
pass
# 初始化权重 wx + b = w(x + b/w) = w(x + h) -> h = b/w,w决定了函数的x方向的缩放,h决定了缩放后x方向的平移
#初始化权重并不是一个随机初始化的过程,我们测试中发现,在对s型函数拟合的过程中,务必把函数进行合适的缩放,然后初始化偏移,让其均匀地分布在整个整个输入空间
#但对relu类型的函数,w可以设置为+1,-1即可,只要改变初始偏移即可完成相应的拟合
definitWeight(self,cell):
foriinrange(len(cell.w)):
cell.w[i] = random.uniform(0.99,1)
cell.b = random.uniform(-1,1)
defactivation_fun(self,x):#激活函数
raiseNotImplemented("")
defactivation_deri_fun(self,cell):#偏导
raiseNotImplemented("")
# 权重差值,求出来的偏导为
# △loss/△w = deri,(1)
# 如果令 △w = -speed*deri (2)
# 令2代入1可以导出
# △loss = deri*△w = - speed*deri*deri,loss是往恒往小的方向进行的
#但是这个更新策略并不是唯一的策略,只要令△loss实际是往减小方向的策略理论上都是可以的,比如我们,在deri不为零的前提下
#令 △w = -speed/deri (3)
#代入1,可得 △loss = -speed,即每更新一步,△loss是以固定速度减小的
#但是在(3)式的策略其实也可能有一些其他的问题,比如我们的偏导deri只是在当前w的一个很小的邻域内才成立,所以一定要限制△w 的范围,
#此处是只抛砖引玉,梯度下降的策略很有多种,可以参数一下下面文章:
#http://www.360doc.com/content/16/1121/12/22755525_608221032.shtml
defupdateDeltaWeight(self,deri,speed,cell,loss,coefficient):
return-speed * deri
###############################################################X2,梯度很容易爆炸,但可以通过修改更新权重的策略让其拟合一些函数
classActivationXX(Activation):
defactivation_fun(self,x):# 激活函数
returnx*x
defactivation_deri_fun(self,cell):# 偏导
return2*cell.sum
############################################################### V型函数
classActivationAbsolute(Activation):
defactivation_fun(self,x):# 激活函数
returnabs(x)
defactivation_deri_fun(self,cell):# 偏导
return1.0ifcell.sum < 0.0else1.0
############################################################### Sinc型函数
classActivationSinc(Activation):
defactivation_fun(self,x):# 激活函数
return1.0ifx == 0.0elsemath.sin(x)/x
defactivation_deri_fun(self,cell):# 偏导
x = cell.sum
return1.0ifx == 0.0elsemath.cos(x)/x - math.sin(x)/(x*x)
classActivationTanh(Activation):
defactivation_fun(self,x):# 激活函数
returnmath.tanh(x)
defactivation_deri_fun(self,cell):# 偏导
return1 - cell.out*cell.out
classActivationRelu(Activation):
definitWeight(self,cell):#初始化权重
foriinrange(len(cell.w)):
cell.w[i] = random.choice([1.,-1.])
cell.b = random.uniform(-3,3)
defactivation_fun(self,x):# 激活函数
returnmax(0.0,x)
defactivation_deri_fun(self,cell):# 偏导
return0.0ifcell.sum <= 0.else1.0
classActivationLeakyRelu(Activation):
defactivation_fun(self,x):# 激活函数
returnxifx > 0.0else0.01*x
defactivation_deri_fun(self,cell):# 偏导
return0.01ifcell.sum <= 0else1.0
classActivationStep(Activation):#___|~~~~~~,0 - 1
defactivation_fun(self,x):# 激活函数
return1.0ifx >= 0else0
defactivation_deri_fun(self,cell):# 偏导
return0
classActivationSignum(Activation):#___|~~~~~~,-1 - 1
defactivation_fun(self,x):# 激活函数
return1.0ifx >= 0else-1.0
defactivation_deri_fun(self,cell):# 偏导
return0.0
classActivationSoftPlus(Activation):#ln(1 + e^x)
defactivation_fun(self,x):# 激活函数
returnmath.log(1 + math.exp(x))
defactivation_deri_fun(self,cell):# 偏导
return1/(1 + math.exp(-cell.sum))
classActivationLecunTanh(Activation):# LeCun Tanh
defactivation_fun(self,x):# 激活函数
return1.7519*math.tanh(2*x/3)#
defactivation_deri_fun(self,cell):# 偏导
return1.7519*2*(1 - cell.out*cell/(1.7519*1.7519))/3
classActivationHardTanh(Activation):# ____/~~~~~~~~~,
defactivation_fun(self,x):# 激活函数
return1ifx > 1.0else(-1ifx < -1.0elsex)
defactivation_deri_fun(self,cell):# 偏导
return1ifabs(x) < 1.0else0
classActivationArcTan(Activation):# ArcTan
defactivation_fun(self,x):# 激活函数
returnmath.atan(x)#
defactivation_deri_fun(self,cell):# 偏导
return1 / (cell.sum*cell.sum + 1)
classActivationSoftsign(Activation):# x/(1 + |x|)
defactivation_fun(self,x):# 激活函数
returnx/(1 + abs(x))#
defactivation_deri_fun(self,cell):# 偏导
return1 / ((1 + abs(x))*(1 + abs(x)))#
###############################################################sigmoid
classActivationSigmoid(Activation):
def__init__(self):
super().__init__()
definitWeight(self,cell):#初始化权重
foriinrange(len(cell.w)):
cell.w[i] = 3*random.uniform(0.99,1)
cell.b = 8*random.uniform(-1,x,loss = 0):#激活函数
try:
return1/(1 + math.exp(-x))
exceptOverflowError:
ifx < 0.0:
return0
else:
return1;
defactivation_deri_fun(self,cell):#偏导
returncell.out*(1 - cell.out)
# def updateDeltaWeight(self,coefficient): ##权重差值,这种策略貌似会更快一点
# sigmoidDri = abs(cell.out * (1 - cell.out))
# if((sigmoidDri) < 0.1): #梯度太小,不处理
# return 0.0
# coefficient = abs(coefficient)
# coefficient = max(coefficient,0.1)
# maxDelta = (0.3/coefficient)*sigmoidDri #一次的x变化不能太大
#
# if abs(deri) > 0.000001:
# delta = (speed/deri) * loss
# else:
# return 0.0
# if abs(delta) > maxDelta:
# delta = maxDelta if delta > 0 else -maxDelta
# return -delta
###############################################################正态分布
classActivationNormal(Activation):
def__init__(self):
super().__init__()
definitWeight(self,cell):#初始化权重
foriinrange(len(cell.w)):
cell.w[i] = random.uniform(0.99,x):#激活函数
returnmath.exp(-x*x)
defactivation_deri_fun(self,cell):#偏导
return-cell.out*2*cell.sum
###############################################################tanh(x/2)函数
classActivationTanh(Activation):
defactivation_fun(self,x):# 激活函数
return(1 - math.exp(-x))/(1 + math.exp(-x))
defactivation_deri_fun(self,cell):# 偏导
return0.5*( 1 - cell.out*cell.out)
###############################################################loglog函数
classActivationLogLog(Activation):
defactivation_fun(self,x):# 激活函数
return1 - math.exp(-math.exp(x))
defactivation_deri_fun(self,cell):# 偏导
returnmath.exp(cell.sum)*cell.out
###############################################################cos函数
classActivationCos(Activation):
defactivation_fun(self,x):# 激活函数
returnmath.cos(x)
defactivation_deri_fun(self,cell):# 偏导
returnmath.sin(cell.sum)
###############################################################sin函数
classActivationSin(Activation):
defactivation_fun(self,x):# 激活函数
returnmath.sin(x)
defactivation_deri_fun(self,cell):# 偏导
returnmath.cos(cell.sum)
###############################################################线性函数
classActivationLiner(Activation):
definitWeight(self,-1.])#(1*random.uniform(-1,1))
cell.b = random.uniform(0,0.1)
defactivation_fun(self,x):#激活函数
returnx
defactivation_deri_fun(self,cell):#偏导
return1
# def updateDeltaWeight(self,coefficient):
# return 0. #暂时先强制为0,测试
classCell:
def__init__(self,activation):
self._activation = activation
self.inputCell =None
self.sum = 0.0
self.out = 0.0
self.error = 0.0
defsetInputCells(self,inputCell):
self.inputCell = inputCell
self.w = [0foriinrange(len(inputCell))]
self.delta_w = [0foriinrange(len(inputCell))]
self.b = 0.0
self.delta_b = 0.0
if(self._activation):
self._activation.initWeight(self)
defcaculateOut(self):#计算输出
sum = 0.0
i = 0
forcellinself.inputCell:
sum += self.w[i]*cell.out
i += 1
sum += self.b
self.sum = sum
self.out = self._activation.activation_fun(sum)
defupdateWeight(self,loss):
ifself.inputCell:
i = 0
outDeri = self.error*self._activation.activation_deri_fun(self)
forcellinself.inputCell:
deri = cell.out*outDeri
self.delta_w[i] = self._activation.updateDeltaWeight(deri,self,cell.out)
self.w[i] += self.delta_w[i]
i += 1
deri = outDeri
self.delta_b = self._activation.updateDeltaWeight(deri,1)
self.b += self.delta_b
classLayer:
def__init__(self,lastLayer =None,cellNum = 1,activation =None):
self._lastLayer = lastLayer
self._cellNum = cellNum
self.cells = [Cell(activation)foriinrange(cellNum)]
self._nextLayer =None
iflastLayer:
lastLayer._nextLayer = self
forcellinself.cells:
cell.setInputCells(lastLayer.cells)
def_forward(self):#第一个层调用
nextLayer = self._nextLayer
whilenextLayer:
forcellinnextLayer.cells:
cell.caculateOut()
nextLayer = nextLayer._nextLayer
defsetInputAndForward(self,x):#仅第一层调用
foriinrange(len(self.cells)):
self.cells[i].out = x[i]
self._forward()
defbackPropagation(self,loss):#最后一个层调用,往前跑
currLayer = self
lastLayer = self._lastLayer
whilelastLayer:#计算所有的error
forlastLayerCellinlastLayer.cells:
lastLayerCell.error = 0.0
forcurrLayercellincurrLayer.cells:
deri = currLayercell._activation.activation_deri_fun(currLayercell)*currLayercell.error
forjinrange(len(lastLayer.cells)):
lastLayerCell = lastLayer.cells[j]
lastLayerCell.error += currLayercell.w[j]*deri
currLayer = lastLayer
lastLayer = lastLayer._lastLayer
whilecurrLayer:#更新权重
forcurrLayercellincurrLayer.cells:
currLayercell.updateWeight(speed,loss)
currLayer = currLayer._nextLayer
classLoss:
def__init__(self,layer):
self._layer = layer
pass
defminimize(self,expect):
raiseNotImplemented("")
classLossL2(Loss):
def__init__(self,layer):
super().__init__(layer)
if(len(layer.cells) != 1):
raise(Exception("last layer shoule only one cell!"))
defminimize(self,expect,speed):#L2距离为 (out - expect)^2,其偏导为 2*(out - expect)
loss = (self._layer.cells[0].out - expect)*(self._layer.cells[0].out - expect)
self._layer.cells[0].error = 2*(self._layer.cells[0].out - expect)
self._layer.backPropagation(speed,loss)
classLossEntropy(Loss):#通常是配合前一级是 sigmoid函数的损失计算,否则意义不大
def__init__(self,speed):#距离为 -(expect*ln(out) + (1 - expect)*ln(1 - out),其偏导为 -(expect/out - (1 - expect)/(1 - out)) = (out - expect)/((1 - out)*out) ,因为error有一个除法,很容易在计算的时候,数据超出浮点数范围
loss = -(expect*math.log(self._layer.cells[0].out) + (1 - expect)*math.log(1 - self._layer.cells[0].out))
self._layer.cells[0].error = (self._layer.cells[0].out - expect)/(self._layer.cells[0].out*(1 - self._layer.cells[0].out))
self._layer.backPropagation(speed,loss)
if__name__ =="__main__":
hideCellNum = 200
speed = 0.00001#不要小看这个speed,选择过大的时候,非常容易造成递度爆炸,比如你可以试试speed为1,Relu的训练
inputLayer = Layer(None,1,None)
hideLayer1 = Layer(inputLayer,hideCellNum,ActivationRelu())
# hideLayer2 = Layer(hideLayer1,ActivationRelu()) #我们同样可以进行多层的神经网络
# hideLayer3 = Layer(hideLayer2,ActivationRelu())
outputLayer = Layer(hideLayer1,ActivationLiner())
loss = LossL2(outputLayer)
x = np.linspace(-3,3,40)#这个输入的范围,要和相应的激活函数的权重初始化相关联
orig_y = 20* np.sin(1*x) + 2.9 * (x - 3)*x#调事sin()里面的系数,可以控制输出的周期幅度
y = orig_y#1/(1 + np.exp(-orig_y)) #如果最后一层是sigmoid,这里就可以再用sigmoid处理一下,如果最后一层是Liner,直接用原始的即可
_y = np.array([0.0foriinrange(len(y))])#千万不要写_y = y 这种愚蠢的写法,这种写法,_y和y会共用同一个存储空间,改变_y也会改变y,但你可以写成_y = np.array(y),这时_y和y的存储空间是独立的
hideOutY = [np.array(_y)foriinrange(hideCellNum + 1)]
hideDeltaWeightY = [np.array(_y)foriinrange(hideCellNum)]
hideDeltaBiasY = [np.array(_y)foriinrange(hideCellNum)]
outWeightY = [np.array(_y)foriinrange(hideCellNum)]
outDeltaWeightY = [np.array(_y)foriinrange(hideCellNum)]
plt.close()# clf() # 清图 cla() # 清坐标轴 close() # 关窗口
plt.grid(True)# 添加网格
plt.ion()# interactive mode on
plt.figure(1)# 创建图表1
ax1 = plt.subplot(221)# 在图表2中创建子图1
ax2 = plt.subplot(222)# 在图表2中创建子图2
ax3 = plt.subplot(223)# 在图表2中创建子图3
ax4 = plt.subplot(224)# 在图表2中创建子图4
# ax.axis("equal") # 设置图像显示的时候XY轴比例
foriinrange(10000):
fortinrange(len(x)):
inputLayer.setInputAndForward([x[t]])
loss.minimize(y[t],speed)
forjinrange(len(hideLayer1.cells)):
hideOutY[j][t] = hideLayer1.cells[j].out*outputLayer.cells[0].w[j]
hideDeltaWeightY[j][t] = hideLayer1.cells[j].delta_w[0]
hideDeltaBiasY[j][t] = hideLayer1.cells[j].delta_b
outDeltaWeightY[j][t] = outputLayer.cells[0].delta_w[j]
outWeightY[j][t] = outputLayer.cells[0].w[j]
hideOutY[hideCellNum][t] = outputLayer.cells[0].b
_y[t] = outputLayer.cells[0].out
forepochinrange(30):
# t = int(random.uniform(0,1)*10000000)%len(x)
fortinrange(len(x)):
inputLayer.setInputAndForward([x[t]])
loss.minimize(y[t],speed)
if(epoch == 1):#True:#True:#
inputLayer.setInputAndForward([x[t]])
forjinrange(len(hideLayer1.cells)):
hideDeltaWeightY[j][t] = hideLayer1.cells[j].delta_w[0]
hideDeltaBiasY[j][t] = hideLayer1.cells[j].delta_b
outDeltaWeightY[j][t] = outputLayer.cells[0].delta_w[j]
outWeightY[j][t] = outputLayer.cells[0].w[j]
forninrange(len(x)):
inputLayer.setInputAndForward([x[n]])
forjinrange(len(hideLayer1.cells)):
hideOutY[j][n] = hideLayer1.cells[j].out * outputLayer.cells[0].w[j]
hideOutY[hideCellNum][n] = outputLayer.cells[0].b
_y[n] = outputLayer.cells[0].sum
ax1.clear()
ax1.set_title('target and train result')#目标函数,补经网络的输出,以及隐含层每个神经元的输出乘以相应w权重
ax2.clear()
ax2.set_title('hide layer △w')
ax3.clear()
ax3.set_title('hide layer △b')
ax4.clear()
ax4.set_title('target layer △w')
forjinrange(len(hideOutY)):
ax1.plot(x,hideOutY[j])
ax1.plot(x,orig_y)
ax1.plot(x,_y)
ax1.plot([x[t],x[t]],[np.min(_y[t]),np.max(y[t])])
forjinrange(len(hideDeltaWeightY)):
ax2.plot(x,hideDeltaWeightY[j])
ax3.plot(x,hideDeltaBiasY[j])
# ax4.plot(x,outWeightY[j])ax4.plot(x,outDeltaWeightY[j])ax2.plot([x[t],[np.min(hideDeltaWeightY),np.max(hideDeltaWeightY)])ax3.plot([x[t],[np.min(hideDeltaBiasY),np.max(hideDeltaBiasY)])plt.pause(0.1)