1. 程式人生 > >深度學習——卷積神經網路在tensorflow框架下的應用案例

深度學習——卷積神經網路在tensorflow框架下的應用案例

一、簡單的卷積神經網路的小應用

tensorflow框架下構建訓練一個簡單的3層卷積神經網路實現分類問題

(一)資料集介紹——SIGNS Datasets

教電腦破譯手語,在白色的牆壁前拍照,得到以下資料集。

現在的任務是建立一個演算法,使有語音障礙的人與不懂手語的人交流。

訓練集:1080張圖片,每張圖片大小為:64*64*3,表示數字0至5,每個數字的圖片為180張。

測試集:120張圖片,每張圖片大小為:64*64*3,表示數字0至5,每個數字的圖片為20張。

每一張圖片是64*64畫素,3表示RGB的3個顏色通道。



載入資料集函式、隨機生成mini_batch函式以及獨熱編碼的函式放在檔案cnn_utils.py中,具體程式碼如下:

#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:ZhengzhengLiu

import math
import numpy as np
import h5py
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.python.framework import ops

#載入資料集
def load_dataset():
    train_dataset = h5py.File('datasets/train_signs.h5', "r")
    train_set_x_orig = np.array(train_dataset["train_set_x"][:])  # your train set features
    train_set_y_orig = np.array(train_dataset["train_set_y"][:])  # your train set labels

    test_dataset = h5py.File('datasets/test_signs.h5', "r")
    test_set_x_orig = np.array(test_dataset["test_set_x"][:])  # your test set features
    test_set_y_orig = np.array(test_dataset["test_set_y"][:])  # your test set labels

    classes = np.array(test_dataset["list_classes"][:])  # the list of classes

    train_set_y_orig = train_set_y_orig.reshape((1, train_set_y_orig.shape[0]))
    test_set_y_orig = test_set_y_orig.reshape((1, test_set_y_orig.shape[0]))

    return train_set_x_orig, train_set_y_orig, test_set_x_orig, test_set_y_orig, classes

#隨機生成大小為64的mini_batch
def random_mini_batches(X, Y, mini_batch_size=64, seed=0):
    """
    Creates a list of random minibatches from (X, Y)

    Arguments:
    X -- input data, of shape (input size, number of examples) (m, Hi, Wi, Ci)
    Y -- true "label" vector (containing 0 if cat, 1 if non-cat), of shape (1, number of examples) (m, n_y)
    mini_batch_size - size of the mini-batches, integer
    seed -- this is only for the purpose of grading, so that you're "random minibatches are the same as ours.

    Returns:
    mini_batches -- list of synchronous (mini_batch_X, mini_batch_Y)
    """

    m = X.shape[0]  # number of training examples
    mini_batches = []
    np.random.seed(seed)

    # Step 1: Shuffle (X, Y)
    permutation = list(np.random.permutation(m))
    shuffled_X = X[permutation, :, :, :]
    shuffled_Y = Y[permutation, :]

    # Step 2: Partition (shuffled_X, shuffled_Y). Minus the end case.
    num_complete_minibatches = math.floor(
        m / mini_batch_size)  # number of mini batches of size mini_batch_size in your partitionning
    for k in range(0, num_complete_minibatches):
        mini_batch_X = shuffled_X[k * mini_batch_size: k * mini_batch_size + mini_batch_size, :, :, :]
        mini_batch_Y = shuffled_Y[k * mini_batch_size: k * mini_batch_size + mini_batch_size, :]
        mini_batch = (mini_batch_X, mini_batch_Y)
        mini_batches.append(mini_batch)

    # Handling the end case (last mini-batch < mini_batch_size)
    if m % mini_batch_size != 0:
        mini_batch_X = shuffled_X[num_complete_minibatches * mini_batch_size: m, :, :, :]
        mini_batch_Y = shuffled_Y[num_complete_minibatches * mini_batch_size: m, :]
        mini_batch = (mini_batch_X, mini_batch_Y)
        mini_batches.append(mini_batch)

    return mini_batches

#獨熱編碼
def convert_to_one_hot(Y, C):
    Y = np.eye(C)[Y.reshape(-1)].T
    return Y
(二)建立佔位符

首先,為輸入影象 X 和 輸出 Y 建立佔位符,在後面執行session會話的時候傳遞訓練集資料。

為靈活性選擇訓練集數量,在建立佔位符時可以先用 None 來表示訓練集的數目,其中,

輸入影象 X 的維度大小為[None, n_H0, n_W0, n_C0] 和 輸出 Y的維度大小為[None, n_y]

#建立佔位符
def create_placeholder(n_H0,n_W0,n_C0,n_y):
    """
    param :
    n_H0 -- 標量,輸入影象的高度
    n_W0 -- 標量,輸入影象的寬度
    n_C0 -- 標量,輸入影象的通道數目
    n_y -- 標量,分類的數目

    return:
    X -- 輸入影象的佔位符,維度大小為:[None, n_H0, n_W0, n_C0] 型別為:"float"
    Y -- 輸入分類標籤的佔位符,維度大小為:[None, n_y]  型別為:"float"
    """
    X = tf.placeholder(dtype=tf.float32,shape=(None,n_H0,n_W0,n_C0),name="X")
    Y = tf.placeholder(dtype=tf.float32,shape=(None,n_y),name="Y")

    return X,Y

X,Y = create_placeholder(64,64,3,6)
print("X="+str(X))
print("Y="+str(Y))

#執行結果:
X=Tensor("X:0", shape=(?, 64, 64, 3), dtype=float32)
Y=Tensor("Y:0", shape=(?, 6), dtype=float32)
(三)用tensorflow初始化引數

利用tf.contrib.layers.xavier_initializer(seed=0)初始化引數W1、W2:權重/過濾器filter。不用擔心偏向,

後面tensorflow函tf.nn.bias_add()會涉及到,即只需為後面用到的conv2d函式初始化權重。另外,tensorflow會

自動地為全連線層部分初始化層數。每一組過濾器filter的維度如下,即初始化權重W的維度順序是[1,2,3,4]:


#初始化引數
def initilize_parameters():
    """
    用tensorflow初始化權重引數構建卷積神經網路,維度為:
    W1 : [4, 4, 3, 8]    W2 : [2, 2, 8, 16]

    return:
    parameters -- 字典形式的張量,包含W1和W2
    """
    w1 = tf.get_variable(name="w1",shape=(4,4,3,8),dtype=tf.float32,initializer=tf.contrib.layers.xavier_initializer(seed=0))
    w2 = tf.get_variable(name="w2",shape=(2,2,8,16),dtype=tf.float32,initializer=tf.contrib.layers.xavier_initializer(seed=0))

    parameters = {"w1":w1,"w2":w2}

    return parameters

tf.reset_default_graph()
with tf.Session() as sess:
    parameters = initilize_parameters()
    init = tf.global_variables_initializer()
    sess.run(init)
    print("w1="+str(parameters["w1"].eval()[1,1,1]))
    print("w2="+str(parameters["w2"].eval()[1,1,1]))

#執行結果:
w1=[-0.0752826   0.08046506 -0.18115364  0.01793462 -0.11417466 -0.15131985
 -0.1336818  -0.06460937]
w2=[ 0.18771225  0.02089775  0.00491112 -0.12298661 -0.04200333  0.13979024
  0.05375856 -0.10530782 -0.15856427  0.19796973  0.04295498 -0.0471608
  0.01342118  0.21189791  0.1855194  -0.13519925]
(四)在tensorflow中應用前向傳播

卷積部分tensorflow自帶的函式如下:

1、tf.nn.conv2d(X,W1, strides = [1,s,s,1], padding = 'SAME')

 給出輸入 X 和一組權重引數/過濾器filter W1,引數strides = [1,f,f,1]代表輸入每一個維度

 (m, n_H_prev, n_W_prev, n_C_prev)上的步長。

2、tf.nn.max_pool(A, ksize = [1,f,f,1], strides = [1,s,s,1], padding = 'SAME')

 對於輸入 A,該函式使用 (f*f) 大小的過濾器和 (s*s) 大小的步長對於每一個視窗執行最大池化。

3、tf.nn.relu(Z1)

 計算Z1的ReLU函式。

4、tf.contrib.layers.flatten(P)

 對於輸入的引數 P,該函式的作用是將每一個樣本轉換成一維的向量同時保持批大小(batch_size)。

它返回一個維度大小為 [batch_size, k] 的伸長轉換後的張量。

5、tf.contrib.layers.fully_connected(F, num_outputs)

給出伸長轉換後的輸入引數 F,返回一個應用全連線層的輸出。該函式中全連線層會自動初始化

圖中的權重,並在訓練模型時繼續訓練它們。因此,初始化引數時不需要初始化這些權重。

建立一個前向傳播函式,包含的模型如下:

CONV2D -> RELU -> MAXPOOL -> CONV2D -> RELU -> MAXPOOL -> FLATTEN -> FULLYCONNECTED

具體的,我們將為所有步驟使用以下引數:

(步長為 8 ,採用“SAME”卷積,filters,大小:8*8)(步長為 1 ,用“SAME”卷積,16個filters,大小:2*2*8)
 - ReLU
- Max pool: Use a 4 by 4 filter size and a 4 by 4 stride, padding is "SAME"
(步長為 4 ,採用“SAME”卷積,filters,大小:4*4)
- Flatten the previous output.
 - FULLYCONNECTED (FC) layer:
對全連線層不使用非線性啟用函式,輸出層有6個神經單元的softmax。在tensorflow中,softmax 和 目標函式(cost function)合併在一個函式裡面,用另外一個函式計算目標函式值cost
#前向傳播
def forward_propagation(X,parameters):
    """
    前向傳播的模型:
    CONV2D -> RELU -> MAXPOOL -> CONV2D -> RELU -> MAXPOOL -> FLATTEN -> FULLYCONNECTED
    param :
    X -- 佔位符的輸入資料集,維度大小為(樣本數目,輸入大小)
    parameters -- python字典包含引數 "W1", "W2",維度大小為:初始化時的維度

    return:
    Z3 -- 最後線性單元部分的輸出
    """

    W1 = parameters["w1"]
    W2 = parameters["w2"]

    # CONV2D: stride of 1, padding 'SAME',8個filters:4*4*3
    Z1 = tf.nn.conv2d(X,filter=W1,strides=(1,1,1,1),padding="SAME")
    # RELU
    A1 = tf.nn.relu(Z1)
    # MAXPOOL: window 8x8, sride 8, padding 'SAME'
    P1 = tf.nn.max_pool(A1,ksize=(1,8,8,1),strides=(1,1,1,1),padding="SAME")
    # CONV2D:  stride 1, padding 'SAME',16個filters:2*2*8
    Z2 = tf.nn.conv2d(P1,filter=W2,strides=(1,1,1,1),padding="SAME")
    # RELU
    A2 = tf.nn.relu(Z2)
    # MAXPOOL: window 4x4, sride 4, padding 'SAME'
    P2 = tf.nn.max_pool(A2,ksize=(1,4,4,1),strides=(1,4,4,1),padding="SAME")
    # FLATTEN
    P2 = tf.contrib.layers.flatten(inputs=P2)
    # FULLY-CONNECTED (沒有使用非線性啟用)
    # 輸出層有 6 個神經單元.
    Z3 = tf.contrib.layers.fully_connected(P2,6,activation_fn = None)

    return Z3

tf.reset_default_graph()
with tf.Session() as sess:
    np.random.seed(1)
    X,Y = create_placeholder(64,64,3,6)
    parameters = initilize_parameters()
    Z3 = forward_propagation(X,parameters)
    init = tf.global_variables_initializer()
    sess.run(init)
    a = sess.run(Z3,feed_dict={X:np.random.randn(2,64,64,3),Y:np.random.randn(2,6)})
    print("Z3="+str(a))

#執行結果:
Z3=[[ 1.51075494  0.22193617 -1.84866357 -1.52119231 -0.01560354  0.82328957]
 [ 1.52116859  0.54076135 -1.3488152  -1.2703172   0.43205178  0.65169412]]
(五)計算目標函式
計算目標函式,應用的兩個函式如下:
1、tf.nn.softmax_cross_entropy_with_logits(logits = Z3, labels = Y)
計算softmax的交叉熵。
2、tf.reduce_mean
通過張量的維數計算元素的平均值。用它來將所有樣本的損失結合以得到總的cost。
#計算成本函式
def compute_cost(Z3,Y):
    """
    param :
    Z3 -- 前向傳播最後一個線性單元的輸出,維度大小為:(樣本數目,6)
    Y -- 佔位符,真實分類標籤的向量,維度與Z3相同

    return:
    cost -- 目標值cost的向量
    """
    cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=Z3,labels=Y))

    return cost

tf.reset_default_graph()
with tf.Session() as sess:
    np.random.seed(1)
    X, Y = create_placeholder(64, 64, 3, 6)
    parameters = initilize_parameters()
    Z3 = forward_propagation(X, parameters)
    cost = compute_cost(Z3,Y)
    init = tf.global_variables_initializer()
    sess.run(init)
    a = sess.run(cost,feed_dict={X:np.random.randn(4,64,64,3),Y:np.random.randn(4,6)})
    print("cost="+str(a))

#執行結果:
cost=1.4205
(六)模型構建
1、建立佔位符
2、初始化引數
3、前向傳播
4、計算目標函式
5、建立優化器
最後,建立一個會話並對 num_epochs 執行一個 for 迴圈,得到 mini-batches 批量樣本,然後用每個小批量樣本對目標函式進行優化。
#!/usr/bin/env python
# -*- coding:utf-8 -*-
# Author:ZhengzhengLiu

import math
import numpy as np
import h5py
import matplotlib.pyplot as plt
import scipy
from PIL import Image
from scipy import ndimage
import tensorflow as tf
from tensorflow.python.framework import ops
from cnn_utils import *

np.random.seed(1)

#載入資料集
X_train_orig,Y_train_orig,X_test_orig,Y_test_orig,classes = load_dataset()

#標準化特徵值,使得數值在0至1之間
X_train = X_train_orig/255
X_test = X_test_orig/255
Y_train = convert_to_one_hot(Y_train_orig,6).T
Y_test = convert_to_one_hot(Y_test_orig,6).T

print("Number of training examples = "+str(X_train.shape[0]))
print("Number of testing examples = "+str(X_test.shape[0]))
print("X_train shape:"+str(X_train.shape))
print("Y_train shape:"+str(Y_train.shape))
print("X_test shape:"+str(X_test.shape))
print("Y_test shape:"+str(Y_test.shape))

#建立佔位符
def create_placeholder(n_H0,n_W0,n_C0,n_y):
    """
    param :
    n_H0 -- 標量,輸入影象的高度
    n_W0 -- 標量,輸入影象的寬度
    n_C0 -- 標量,輸入影象的通道數目
    n_y -- 標量,分類的數目

    return:
    X -- 輸入影象的佔位符,維度大小為:[None, n_H0, n_W0, n_C0] 型別為:"float"
    Y -- 輸入分類標籤的佔位符,維度大小為:[None, n_y]  型別為:"float"
    """
    X = tf.placeholder(dtype=tf.float32,shape=(None,n_H0,n_W0,n_C0),name="X")
    Y = tf.placeholder(dtype=tf.float32,shape=(None,n_y),name="Y")

    return X,Y

X,Y = create_placeholder(64,64,3,6)
print("X="+str(X))
print("Y="+str(Y))

#初始化引數
def initilize_parameters():
    """
    用tensorflow初始化權重引數構建卷積神經網路,維度為:
    W1 : [4, 4, 3, 8]    W2 : [2, 2, 8, 16]

    return:
    parameters -- 字典形式的張量,包含W1和W2
    """
    w1 = tf.get_variable(name="w1",shape=(4,4,3,8),dtype=tf.float32,initializer=tf.contrib.layers.xavier_initializer(seed=0))
    w2 = tf.get_variable(name="w2",shape=(2,2,8,16),dtype=tf.float32,initializer=tf.contrib.layers.xavier_initializer(seed=0))

    parameters = {"w1":w1,"w2":w2}

    return parameters

tf.reset_default_graph()
with tf.Session() as sess:
    parameters = initilize_parameters()
    init = tf.global_variables_initializer()
    sess.run(init)
    print("w1="+str(parameters["w1"].eval()[1,1,1]))
    print("w2="+str(parameters["w2"].eval()[1,1,1]))

#前向傳播
def forward_propagation(X,parameters):
    """
    前向傳播的模型:
    CONV2D -> RELU -> MAXPOOL -> CONV2D -> RELU -> MAXPOOL -> FLATTEN -> FULLYCONNECTED
    param :
    X -- 佔位符的輸入資料集,維度大小為(樣本數目,輸入大小)
    parameters -- python字典包含引數 "W1", "W2",維度大小為:初始化時的維度

    return:
    Z3 -- 最後線性單元部分的輸出
    """

    W1 = parameters["w1"]
    W2 = parameters["w2"]

    # CONV2D: stride of 1, padding 'SAME',8個filters:4*4*3
    Z1 = tf.nn.conv2d(X,filter=W1,strides=(1,1,1,1),padding="SAME")
    # RELU
    A1 = tf.nn.relu(Z1)
    # MAXPOOL: window 8x8, sride 8, padding 'SAME'
    P1 = tf.nn.max_pool(A1,ksize=(1,8,8,1),strides=(1,1,1,1),padding="SAME")
    # CONV2D:  stride 1, padding 'SAME',16個filters:2*2*8
    Z2 = tf.nn.conv2d(P1,filter=W2,strides=(1,1,1,1),padding="SAME")
    # RELU
    A2 = tf.nn.relu(Z2)
    # MAXPOOL: window 4x4, sride 4, padding 'SAME'
    P2 = tf.nn.max_pool(A2,ksize=(1,4,4,1),strides=(1,4,4,1),padding="SAME")
    # FLATTEN
    P2 = tf.contrib.layers.flatten(inputs=P2)
    # FULLY-CONNECTED (沒有使用非線性啟用)
    # 輸出層有 6 個神經單元.
    Z3 = tf.contrib.layers.fully_connected(P2,6,activation_fn = None)

    return Z3

tf.reset_default_graph()
with tf.Session() as sess:
    np.random.seed(1)
    X,Y = create_placeholder(64,64,3,6)
    parameters = initilize_parameters()
    Z3 = forward_propagation(X,parameters)
    init = tf.global_variables_initializer()
    sess.run(init)
    a = sess.run(Z3,feed_dict={X:np.random.randn(2,64,64,3),Y:np.random.randn(2,6)})
    print("Z3="+str(a))

#計算成本函式
def compute_cost(Z3,Y):
    """
    param :
    Z3 -- 前向傳播最後一個線性單元的輸出,維度大小為:(樣本數目,6)
    Y -- 佔位符,真實分類標籤的向量,維度與Z3相同

    return:
    cost -- 目標值cost的向量
    """
    cost = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=Z3,labels=Y))

    return cost

tf.reset_default_graph()
with tf.Session() as sess:
    np.random.seed(1)
    X, Y = create_placeholder(64, 64, 3, 6)
    parameters = initilize_parameters()
    Z3 = forward_propagation(X, parameters)
    cost = compute_cost(Z3,Y)
    init = tf.global_variables_initializer()
    sess.run(init)
    b = sess.run(cost, {X: np.random.randn(4, 64, 64, 3), Y: np.random.randn(4, 6)})
    print("cost="+str(b))

#模型構建
def model(X_train,Y_train,X_test,Y_test,learning_rate = 0.009,
          num_epochs = 100,minibatch_size = 64,print_cost = True):
    """
    用tensorflow建立一個3層的卷積神經網路
    param :
    X_train -- 訓練集影象,維度大小為 (None,64,64,3)
    Y_train -- 訓練集分類標籤,維度大小為 (None,n_y=6)
    X_test --  測試集影象,維度大小為 (None,64,64,3)
    Y_test --  測試集分類標籤,維度大小為 (None,n_y=6)
    learning_rate -- 優化器中的學習率
    num_epochs -- epochs的數目(epoch:全部樣本迭代完一次為一個epoch)
    minibatch_size -- minibatch的大小,每次選取批量樣本的數目
    print_cost -- 是否列印目標函式cost的值

    return:
    train_accuracy -- 訓練集上的準確率
    test_accuracy -- 測試集的準確率
    parameters -- 模型中學習的引數,可用於後面的預測
    """
    ops.reset_default_graph()   #在沒有tf變數重寫的情況下重新執行模型
    tf.set_random_seed(1)
    seed = 3
    (m,n_H0,n_W0,n_C0) = X_train.shape
    n_y = Y_train.shape[1]
    costs = []  #列表存放目標函式值

    #建立佔位符
    X,Y = create_placeholder(n_H0,n_W0,n_C0,n_y)

    #初始化引數
    parameters = initilize_parameters()

    #前向傳播
    Z3 = forward_propagation(X,parameters)

    #計算目標函式
    cost = compute_cost(Z3,Y)

    #建立優化器
    optm = tf.train.AdamOptimizer(learning_rate=learning_rate).minimize(cost)

    #初始化所有引數
    init = tf.global_variables_initializer()

    #建立會話並執行計算圖
    with tf.Session() as sess:
        sess.run(init)      #執行初始化
        #迴圈進行模型訓練
        for epoch in range(num_epochs):
            minibatch_cost = 0.
            num_minibatch = int(m/minibatch_size)
            seed = seed + 1
            minibatchs = random_mini_batches(X_train,Y_train,minibatch_size,seed)

            for minibatch in minibatchs:
                #選擇一個minibatch
                (minibatch_X,minibatch_Y) = minibatch
                #執行會話執行優化器和目標函式,喂minibatch的資料給(X,Y)
                _,temp_cost = sess.run([optm,cost],feed_dict={X:minibatch_X,Y:minibatch_Y})
                minibatch_cost += temp_cost/num_minibatch
            #每 5 次epoch列印目標函式值cost
            if print_cost == True and epoch % 5 == 0:
                print("Cost after epoch %i:%f"%(epoch,minibatch_cost))
            if print_cost == True and epoch % 1 == 0:
                costs.append(minibatch_cost)

        #繪製關於目標函式值cost的圖
        plt.plot(np.squeeze(costs))
        plt.ylabel("cost")
        plt.xlabel("iteration(per tens)")
        plt.title("Learning rate = "+str(learning_rate))
        plt.savefig("3層卷積神經網路cost值.png")
        plt.show()

        #計算準確的預測
        correct_predict = tf.equal(tf.argmax(Z3,1),tf.argmax(Y,1))

        #在測試集上計算準確率
        accuracy = tf.reduce_mean(tf.cast(correct_predict,tf.float32))
        print(accuracy)
        train_accuracy = accuracy.eval({X:X_train,Y:Y_train})
        test_accuracy = accuracy.eval({X:X_test,Y:Y_test})
        print("Train Acurracy :" +str(train_accuracy))
        print("Test Acurracy :" +str(test_accuracy))

        return train_accuracy,test_accuracy,parameters

_,_,parameters = model(X_train,Y_train,X_test,Y_test)