1. 程式人生 > >【TensorFlow實戰】用Python實現自編碼器

【TensorFlow實戰】用Python實現自編碼器

程式碼:

import numpy as np
import sklearn.preprocessing as prep
import tensorflow as tf
from tensorflow.examples.tutorials.mnist import input_data

##均勻分佈的Xaiver初始化器
def xavier_init(fan_in, fan_out, constant = 1):
    low = -constant * np.sqrt(6.0 / (fan_in + fan_out))
    high = constant * np.sqrt(6.0 / (fan_in + fan_out))
    return tf.random_uniform((fan_in,fan_out),
                             minval = low,maxval=high,
                             dtype = tf.float32)

##定義一個去噪自編碼的class
class AdditiveGaussianNoiseAutoencoder(object):
    def __init__(self,n_input,n_hidden,transfer_function = tf.nn.softplus,
                 optimizer = tf.train.AdamOptimizer(),scale=0.1):
        self.n_input = n_input
        self.n_hidden = n_hidden
        self.transfer = transfer_function
        self.scale = tf.placeholder(tf.float32)
        self.training_scale = scale
        network_wegihts = self._initialize_weights()
        self.weights =network_wegihts
#接下來開始定義網路結構
        #為輸入x建立一個維度為n_input的placeholder
        self.x = tf.placeholder(tf.float32,[None,self.n_input])
        #簡歷一個能提取特徵的隱含層
        self.hidden = self.transfer(tf.add(tf.matmul(
            # 將x加上 噪聲
            self.x+scale * tf.random_normal((n_input,)),
            self.weights['w1']),self.weights['b1']))
        self.reconstruction = tf.add(tf.matmul(self.hidden,
                                               self.weights['w2']),
                                     self.weights['b2'])

##接下來定義自編碼器的損失函式,這裡直接使用平方誤差作為cost
#用tf.subtact計算輸出和輸入只差
        #使用tf.pow求差的平方,最後使用reduce_sum求和即可得到平方誤差
        self.cost = 0.5 * tf.reduce_sum(tf.pow(tf.subtract(
            self.reconstruction,self.x),2.0))
        #在定義訓練操作為優化器self.optimizer對損失self.cost進行優化
        self.optimizer = optimizer.minimize(self.cost)
        #最後建立session,並且初始化自編碼器的全部模型引數
        init = tf.global_variables_initializer()
        self.sess = tf.Session()
        self.sess.run(init)

#下面再來看一下引數初始化函式_initialize_weights
    def _initialize_weights(self):
        #先建立一個名為all_weights的字典dict
        all_weight = dict()
        #然後將w1,b1,w2,b2全部存入其中
        all_weight['w1']=tf.Variable(xavier_init(self.n_input,
                                                 self.n_hidden))
        all_weight['b1'] = tf.Variable(tf.zeros([self.n_hidden],
                                                dtype = tf.float32))
        all_weight['w2'] = tf.Variable(tf.zeros([self.n_hidden,
                                                 self.n_input],
                                                dtype = tf.float32))
        all_weight['b2'] = tf.Variable(tf.zeros([self.n_input],
                                                dtype = tf.float32))
        #最後染回all_weights
        return all_weight
    #我們定義計算損失cost及執行一步訓練的函式partial_fit
    #函式裡只需要讓Session執行兩個計算圖的節點,分別是算是cost和訓練過程optimizer
    #輸入的feed_dict包括輸入資料x以及噪聲的稀疏scale
    #函式partial_fit做的就是用一個batch資料進行訓練並且返回當前的損失cost
    def partial_fit(self,X):
        cost ,opt = self.sess.run((self.cost,self.optimizer),
            feed_dict={self.x: X, self.scale: self.training_scale})

        return cost
    #我們也只需要一個只求損失cost的函式calc_total_cost
    #這裡就只讓Session執行一個計算圖節點self.cost
    #傳入的引數和前面的partial_fit一直
    #這個函式是在自編碼器訓練完畢之後,在測試集上對模型效能進行評測會用到的
    #它不會像partial_fit那樣出發訓練操作
    def calc_total_cost(self,X):
        return self.sess.run(self.cost,feed_dict = {self.x:X,
                self.scale:self.training_scale})
    #我們還定義了transform函式,它範湖自編碼器隱含層的輸出結果
    #目的是提供一個藉口來獲取抽象後的特徵,自編碼器的隱含層的最主要功能就
    #是學習出資料中的高階特徵
    def transform(self,X):
        return self.sess.run(self.hidden,feed_dict = {self.x:X,
       self.scale:self.training_scale})
    #我們再定義generate函式,它將隱含層的輸出結果作為輸入,通過之後
    #的重建層就提取到的高階特徵復原為原始資料。
    #這個藉口和前面的transform正好將整個自編碼器拆分為兩個部門
    #這裡的generate介面是後半部門,將高階特徵復原成原始資料的步驟
    def generate(self,hidden = None):
        if hidden is None:
            hidden  = np.random.normal(size = self.weights["b1"])
        return self.sess.run(self.reconstruction,
                             feed_dict = {self.hidden:hidden})
    #接下來定義reconstruct函式,它整體執行一遍復原過程,包括提取高階特徵
    #和通過高階特徵復原資料,即包括transform和generate涼快
    #輸入資料是原資料,輸出資料是復原後的資料
    def reconstruct(self,X):
        return self.sess.run(self.reconstruction,feed_dict ={self.x:X,
                self.scale:self.training_scale})
    #這裡的getWeights函式作用his獲取隱含層的權重w1
    def getWegiths(self):
        return self.sess.run(self.weights['w1'])
    #getBiases函式則是獲取隱含層的偏置係數b1
    def getBiases(self):
        return self.sess.run(self.weights['b1'])
    #至此,去噪自編碼器的class就全部定義玩了,包括神經網路的設計
    #權重的初始化


#接下來依然使用TensorFlow提供的讀取示例資料的函式載入MNIST資料集
mnist = input_data.read_data_sets("MNIST_data/", False, one_hot=True)

def standard_scale(X_train,X_test):
    preprocessor = prep.StandardScaler().fit(X_train)
    X_train = preprocessor.transform(X_train)
    X_test = preprocessor.transform(X_test)
    return X_train,X_test
#在定義一個獲取隨機block資料的函式:取一個從0到len(data)-batch_size
#之間的隨機證書,再以這個隨機數作為block的起始位置,然後順序渠道一個batch_size的資料
#需要注意的是,這屬於不放回抽樣,可以提高資料的利用效率
def get_random_block_from_date(data,batch_size):
    start_index = np.random.randint(0,len(data)-batch_size)
    return data[start_index:(start_index+batch_size)]
#使用之前定義的standard_scale函式對訓練集,測試集進行標準化變換
X_train,X_test = standard_scale(mnist.train.images,mnist.test.images)
#接下來定義幾個常用引數,總訓練書樣本數,最大的訓練的輪數(epoch)為20
#batch_size設為128,並設定每隔一輪 epoch 就顯示一次損失cost
n_samples = int(mnist.train.num_examples)
training_epochs = 20
batch_size = 128
display_step = 1

#建立一個AGN自編碼器的示例,定義模型輸入節點數n_input為784
#自編碼器的隱含層節點數n_hidden為200
#隱含層的啟用函式transfer_function為softplus
#優化器optimizer為Adam且學習速率為0.001,同時將噪聲的稀疏scale設為0.01
autoencoder = AdditiveGaussianNoiseAutoencoder(n_input=784,
            n_hidden= 200,
            transfer_function=tf.nn.softplus,
            optimizer=tf.train.AdamOptimizer(learning_rate=0.001),
            scale=0.1)
#下面開始訓練過程,在每一輪迴圈開始時,我們將平均損失avg_cost設為0
#並且計算總共需要的batch數,注意這裡使用的是不放回抽樣,所以並不能保證
#每個樣本都被抽到並且參加訓練。
#然後在每一次迴圈中,先使用get_random_block_from_data函式隨機抽到一個block
#然後使用成員函式partial_fit訓練這個batch的資料並且計算當前的cost
#最後將當前的cost正和島avg_cost中。
#在每一輪的迭代之後,顯示當前的迭代數和這一輪迭代的平均cost
#我們在第一輪迭代時,cost大約是19000,在最後一輪迭代時,cost大約為7000
#再接著訓練cost也很難繼續降低了
for epoch in range(training_epochs):
    avg_cost = 0.
    total_batch  = int(n_samples/batch_size)
    for i in range(total_batch):
        batch_xs = get_random_block_from_date(X_train,batch_size)

        cost = autoencoder.partial_fit(batch_xs)
        avg_cost += cost / n_samples*batch_size
    if epoch % display_step ==0:
        print("Epoch:",'%04d' % (epoch + 1),"cost=",
              "{:.9f}".format(avg_cost))

#最後對訓練完的模型進行效能測試
#這裡使用之前定義的成員函式cal_total_cost對測試機X_tst進行測試
print("Total cost:" + str(autoencoder.calc_total_cost(X_test)))

結果輸出:
Epoch: 0001 cost= 18638.076740909
Epoch: 0002 cost= 13241.218212500
Epoch: 0003 cost= 10876.398575000
Epoch: 0004 cost= 10911.524663636
Epoch: 0005 cost= 9740.812732386
Epoch: 0006 cost= 10174.114690341
Epoch: 0007 cost= 9456.718578409
Epoch: 0008 cost= 9387.242152841
Epoch: 0009 cost= 9165.076931818
Epoch: 0010 cost= 9166.083271591
Epoch: 0011 cost= 8543.039659091
Epoch: 0012 cost= 8987.161418182
Epoch: 0013 cost= 8467.093871023
Epoch: 0014 cost= 8143.928632955
Epoch: 0015 cost= 8076.231632955
Epoch: 0016 cost= 8216.031440909
Epoch: 0017 cost= 8471.644106818
Epoch: 0018 cost= 8246.171704545
Epoch: 0019 cost= 8256.087601136
Epoch: 0020 cost= 8158.641851136
Total cost:671721.44