欢迎fork我的github:https://github.com/zhaoyu611/DeepLearningTutorialForChinese
最近在学习git,所以正好趁这个机会,把学习到的知识实践一下~ 看完DeepLearning的原理,有了大体的了解,但是对于theano的代码,还是自己撸一遍印象更深 所以照着deeplearning.net上的代码,重新写了一遍,注释部分是原文翻译和自己的理解。 感兴趣的小伙伴可以一起完成这个工作哦~ 有问题欢迎联系我 Email: zhaoyuafeu@gmail.com QQ: 3062984605
'''
时间:2016.8.4
作者:赵雨
E-mail: zhaoyuafeu@gmail.com
说明:针对deepleanring.net中SdA部分的翻译
'''
"""
本教程使用Theano进行栈式自编码(SdA)。
SdA的基础是自编码器,该理论是Bengio等人在2007年提出的。
自编码器输入为x,并映射到隐含层 y = f_{\theta}(x) = s(Wx+b)
其中参数是\theta={W,b}。然后将隐层输出y映射输出重构向量z\in [0,1]^d
映射函数为z = g_{\theta'}(y) = s(W'y + b')。权重矩阵 W'可以由W' = W^T
得到,W'和W称为tied weights。网络的训练目标是最小化重构误差(x和z之间的误差)。
对于降噪自编码的训练,首先将x corrupted为\tilde{x},\tilde{x}是x的破损形式,
破损函数是随机映射。随后采用与之前相同方法计算y(y = s(W\tilde{x} + b)
并且 z = s(W'y + b') )。重构误差是计算z和uncorrupted x的误差,即:
计算两者的交叉熵:
- \sum_{k=1}^d[ x_k \log z_k + (1-x_k) \log( 1-z_k)]
参考文献:
- P. Vincent, H. Larochelle, Y. Bengio, P.A. Manzagol: Extracting and
Composing Robust Features with Denoising Autoencoders, ICML'08, 1096-1103,
2008
- Y. Bengio, P. Lamblin, D. Popovici, H. Larochelle: Greedy Layer-Wise
Training of Deep Networks, Advances in Neural Information Processing
Systems 19, 2007
"""
import cPickle
import gzip
import os
import sys
import time
import numpy
import theano
import theano.tensor
as T
from theano.tensor.shared_randomstreams
import RandomStreams
from logistic_sgd
import LogisticRegression, load_data
from mlp
import HiddenLayer
from dA
import dA
class SdA(object):
"""栈式自编码类(SdA)
栈式自编码模型是由若干dAs堆栈组成。第i层的dA的隐层变成第i+1层的输入。
第一层dA的输入是SdA的输入,最后一层dA的输出的SdA的输出、预训练后,
SdA的运行类似普通的MLP,dAs只是用来初始化权重。
"""
def __init__(self,numpy_rng,theano_rng=None,n_ins=784,
hidden_layers_sizes=[500,500],n_outs=10,
corruption_levels=[0.1,0.1]):
"""
该类可以构造可变层数的网络
numpy_rng:numpy.random.RandomState 用于初始化权重的随机数
theano_rng: theano.tensor.shared_randomstreams.RandomStreams
Theano随机生成数,如果,默认值为None, 则是由'rng'
生成的随机种子
n_ins: int SdA输入的维度
hidden_layers_sizes: lists of ints 中间层的层数列表,最少一个元素
n_out: int 网路输出量的维度
corruption_levels: list of float 每一层的corruption level
"""
self.sigmoid_layers=[]
self.dA_layers=[]
self.params=[]
self.n_layers=len(hidden_layers_sizes)
assert self.n_layers>
0
if not theano_rng:
theano_rng=RandomStreams(numpy_rng.randint(
2**
30))
self.x=T.matrix(
'x')
self.y=T.ivector(
'y')
for i
in xrange(self.n_layers):
if i==
0:
input_size=n_ins
else:
input_size=hidden_layers_sizes[i-
1]
if i==
0:
layer_input=self.x
else:
layer_input=self.sigmoid_layers[-
1].output
sigmoid_layer=HiddenLayer(rng=numpy_rng,
input=layer_input,
n_in=input_size,
n_out=hidden_layers_sizes[i],
activation=T.nnet.sigmoid)
self.sigmoid_layers.append(sigmoid_layer)
self.params.extend(sigmoid_layer.params)
dA_layer=dA(numpy_rng=numpy_rng,
theano_rng=theano_rng,
input=layer_input,
n_visible=input_size,
n_hidden=hidden_layers_sizes[i],
W=sigmoid_layer.W,
bhid=sigmoid_layer.b
)
self.dA_layers.append(dA_layer)
self.logLayer=LogisticRegression(
input=self.sigmoid_layers[-
1].output,
n_in=hidden_layers_sizes[-
1],n_out=n_outs)
self.params.extend(self.logLayer.params)
self.finetune_cost=self.logLayer.negative_log_likelihood(self.y)
self.errors=self.logLayer.errors(self.y)
def pretraining_function(self,train_set_x,batch_size):
'''
生成函数列表,每个函数执行一层中dA的训练,返回预训练的函数列表
函数输入是minibatch的索引,在所有的minibatch执行相同的训练
train_set_x: theano.tensor.TensorType 训练dA的数据点(共享变量)
batch_size: int [mini]batch大小
'''
index=T.lscalar(
'index')
corruption_level=T.scalar(
'corruption')
learning_rate=T.scalar(
'lr')
n_bathes=train_set_x.get_value(borrow=
True).shape[
0]/batch_size
batch_begin=index*batch_size
batch_end=batch_begin+batch_size
pretrain_fns=[]
for dA
in self.dA_layers:
cost,updates=dA.get_cost_updates(corruption_level,
learning_rate)
fn=theano.function(inputs=[index,
theano.Param(corruption_level,default=
0.2),
theano.Param(learning_rate,default=
0.1)],
outputs=cost,
updates=updates,
givens={self.x:train_set_x[batch_begin:
batch_end]})
pretrain_fns.append(fn)
return pretrain_fns
def build_finetune_functions(self,datasets,batch_size,learning_rate):
'''
创建"train"函数执行一步微调;"validate"函数计算验证集合中batch的误差;
"test"函数计算测试集合中batch误差
:param datasets: list of pairs of theano.tensor.TensorType
#包含所有datasets的列表,每3个元素为一个组:
依次为'train'、'valid'、'test'。每个元素又
包含两个theano变量:数据特征和标签
:param batch_size: int minibatch的大小
:param learning_rate:float 微调阶段的learning_rate
:return:
'''
(train_set_x,train_set_y)=datasets[
0]
(valid_set_x,valid_set_y)=datasets[
1]
(test_set_x,test_set_y)=datasets[
2]
n_valid_batches=valid_set_x.get_value(borrow=
True).shape[
0]
n_valid_batches/=batch_size
n_test_batches=test_set_x.get_value(borrow=
True).shape[
0]
n_test_batches/=batch_size
index=T.lscalar(
'index')
gparams=T.grad(self.finetune_cost,self.params)
updates=[]
for param,gparam
in zip(self.params,gparams):
updates.append((param,param-gparam*learning_rate))
train_fn=theano.function(inputs=[index],
outputs=self.finetune_cost,
updates=updates,
givens={
self.x:train_set_x[index*batch_size:
(index+
1)*batch_size],
self.y:train_set_y[index*batch_size:
(index+
1)*batch_size]},
name=
'train')
test_score_i=theano.function([index],self.errors,
givens={
self.x:test_set_x[index*batch_size:
(index+
1)*batch_size],
self.y:test_set_y[index*batch_size:
(index+
1)*batch_size]},
name=
'test')
valid_score_i=theano.function([index],self.errors,
givens={
self.x:valid_set_x[index*batch_size:
(index+
1)*batch_size],
self.y:valid_set_y[index*batch_size:
(index+
1)*batch_size]},
name=
'valid')
def valid_score():
return [valid_score_i(i)
for i
in xrange(n_valid_batches) ]
def test_score():
return [test_score_i(i)
for i
in xrange(n_test_batches)]
return train_fn, valid_score, test_score
def test_SdA(finetune_lr=0.1,pretraining_epochs=15,
pretrain_lr=0.001,training_epochs=1000,
dataset='./data/mnist.pkl.gz',batch_size=1):
'''
创建函数,训练和测试随机降噪自编码器,实验数据为MNINST
:param finetune_lr: float 微调阶段的学习率(随机梯度下降的影响因素)
:param pretraining_epochs:int 预训练的迭代次数
:param pretrain_lr: float 预训练阶段的学习率
:param training_epochs: int 整个训练的最大次数
:param dataset: string 数据集的路径
:param batch_size: batch大小
:return:
'''
datasets=load_data(dataset)
train_set_x,training_set_y=datasets[
0]
valid_set_x,valid_set_y=datasets[
1]
test_set_x,test_set_y=datasets[
2]
n_train_batches=train_set_x.get_value(borrow=
True).shape[
0]
n_train_batches/=batch_size
numpy_rng=numpy.random.RandomState(
89677)
print '...building the model'
sda=SdA(numpy_rng=numpy_rng,n_ins=
28*
28,
hidden_layers_sizes=[
1000,
1000,
1000],n_outs=
10)
print '...getting the pretraining functions'
pretraining_fns=sda.pretraining_function(train_set_x=train_set_x,
batch_size=batch_size)
print '... pre-training the model'
stat_time=time.clock()
corruption_levels=[
0.1,
0.2,
0.3]
for i
in xrange(sda.n_layers):
for epoch
in xrange(pretraining_epochs):
c=[]
for batch_index
in xrange(n_train_batches):
c.append(pretraining_fns[i](index=batch_index,
corruption=corruption_levels[i],
lr=pretrain_lr))
print 'Pre-training layer %i, epoch %d, cost '%(i,epoch)
print numpy.mean(c)
end_time=time.clock()
print >>sys.stderr,(
'The pretraining code for file'+
os.path.split(__file__)[
1]+
'ran for %0.2fm')%((end_time-stat_time)/
60)
print '...getting the finetuning functions'
train_fn,validate_model,test_model=sda.build_finetune_functions(
datasets=datasets,batch_size=batch_size,
learning_rate=finetune_lr)
print '... finetuning the model'
patience=
10*n_train_batches
patience_increase=
2.0
improvement_threshold=
0.995
validation_frequency=min(n_train_batches,patience/
2)
best_params=
None
best_validation_loss=numpy.inf
test_score=
0.
start_time=time.clock()
done_looping=
False
epoch=
0
while (epoch<training_epochs)
and (
not done_looping):
epoch=epoch+
1
for minibatch_index
in xrange(n_train_batches):
minibatch_avg_cost=train_fn(minibatch_index)
iter=(epoch-
1)*n_train_batches+minibatch_index
if (iter+
1)%validation_frequency==
0:
validation_losses=validate_model()
this_validation_loss=numpy.mean(validation_losses)
print (
'epoch %i, minibatch %i/%i, validation error %f %%' %
(epoch,minibatch_index+
1,n_train_batches,
this_validation_loss*
100))
if this_validation_loss<best_validation_loss:
if (this_validation_loss<best_validation_loss*
improvement_threshold):
patience=max(patience,iter*patience_increase)
best_validation_loss=this_validation_loss
best_iter=iter
test_losses=test_model()
test_score=numpy.mean(test_losses)
print (
'epoch %i, minibath %i/%i, test error of '
'best model %f %%') %(epoch,minibatch_index+
1,
n_train_batches,test_score*
100.)
if patience<=iter:
done_looping=
True
break
end_time=time.clock()
print (
'optimization complete with best validation score of %f %%,'
'with test performance %f %%') %(best_validation_loss%
100,test_score*
100)
print >>sys.stderr, (
'The training code for file'+
os.path.split(__file__)[
1]+
'ran for %0.2fm')%((end_time-start_time)/
60.)
if __name__==
'__main__':
test_SdA()
实验配置:ThunderBot笔记本,I7,1T机械硬盘+128SSD,N卡Geforce GTX 980。 实验数据集:MNIST 实验用时:预训练过程:686.75minutes, 微调过程:362.87minutes。 最优训练结果:验证集正确率:0.0137 %,测试数据集1.2600 %