TensorFlow強化學習入門(4)——深度Q網路(DQN)及其擴充套件
本文中我們將一起建立一個深度Q網路(DQN)。它基於我們系列文章中(0)的單層Q網路,如果你是強化學習的初學者,我推薦你到文末跳轉到(0)開始閱讀。儘管簡單的Q網路已經可以在簡單的問題上和Q表表現一樣出色,但是深度Q網路可以使其變得更強。要將簡單的Q網路轉化為深度Q網路,我們需要以下改進:
- 將單層的網路切換為多層卷積網路。
- 支援歷程重現(Experience Replay),使我們的網路可以通過其記憶的歷程來進行自我訓練。
- 利用第二“目標”網路來計算更新Q值。
這三點創新也使得Google DeepMind團隊的DQN agent在很多雅達利遊戲上達到超越人類水平
從Q網路到深度Q網路
改進1:卷積層
由於我們的agent要玩電子遊戲,所以它必須能像人類或其他靈長動物一樣理解螢幕上的輸出內容。與單獨考慮每個畫素的輸入不同,卷積層使網路以區域為單位來理解輸出,同時在向更高層的網路傳遞資訊時,這些區域的聯絡也可以得到保持。這和感受野的機理類似,實際上也已經有研究表明
在TensorFlow中,我們可以利用tf.contrib.layers.convolution2d
方法來快速建立一個卷積層,示例如下:
convolution_layer = tf.contrib.layers.convolution2d(inputs, num_outputs, kernel_size, stride, padding)
num_outs
: 我們使用多少個卷積核來接收上一層的資訊。kernel_size
:接收上一層資訊所用的滑動窗大小。Stride
:滑動窗邊界每次移動的畫素數。padding
:是否為影象邊界補充padding來保持輸入輸出尺寸一致,'SAME'填充,'VALID'不填充。
改進2:歷程重現
DQN的第二個主要改進就是支援歷程重現。其基本思想就是將agent的訓練歷程儲存下來,然後從中隨機抽取來訓練網路,通過這種方式我們可以使得我們的agent在任務中的表現更加穩定健壯。通過歷程的隨機抽取,我們可以確保網路只能基於當前環境的狀態進行學習,從而習得比原始訓練歷程更豐富的表示。這些歷程被儲存在形為<state, action, reward, next state>
的元組當中。歷程重現緩衝器中儲存著固定長度的最近訓練記錄,每當有新的元素進入時,最舊的一個就將被移除。當需要訓練的時候,我們只需要從緩衝器中隨機提取訓練記錄即可。我們後面將建立一個簡單的類實現記錄的儲存和重新提取。
改進3:目標網路分離
DQN的第三個主要改進,也是最獨特的一個改進,就是在訓練過程中對第二個網路的利用。第二個網路用於計算訓練過程中每個行動帶來的損失值。為什麼不直接使用一個網路來估算損失值呢?原因是訓練中的每一步都會帶來Q網路中值的變化,當我們基於不斷變化的值來調整我們的網路引數時,預估值的變化很容易失控。此時目標值和預估Q值作用產生的反饋會將不穩定轉移至網路自身。為了規避這一風險,目標網路應當被凍結,只對主Q網路做週期性的緩慢更新。通過這一手段,訓練過程可以變得更加穩定。
除了週期性地單次更新目標網路之外,我們也可以頻繁地更新網路,不過更新的幅度要小。這項技術在DeepMind的另一篇論文中有介紹,這一技術也使訓練過程更加穩定。
超越DQN
根據上面的改進,我們可以復現2014年提出的DQN。但是技術日新月異,現在已經有很多技術的效能和穩定性都已經達到、超越了2014年DeepMind提出的DQN架構的水平。在將你的DQN應用於你喜歡的雅達利遊戲之前,我建議你先在原先的網路上新增一些新特性,下面我將著重說明其中的兩個(Double DQN 和 Dueling DQN )並給出其部分程式碼實現,藉助它們我們的網路可以在更短的時間內訓練達到更優的效能。
Double DQN
Double DQN產生的直接原因是常規的DQN在給定狀態下往往會高估具有潛力的行動。如果所有的行動總是被同樣高估的,那麼這個情況也不錯,但是事實並非如此。你可以想象一個情形,次優的行動經常得到超過最優行動的Q值,此時agent將很難習得最優的策略。為了糾正這個錯誤,DDQN的作者使用了一個簡單的技巧:利用主網路選擇行動,目標網路來生成該行動的目標Q值,而不是在訓練過程中計算目標Q值的同時選擇最大Q值對應的行動。將行動選擇從目標Q值生成邏輯中抽離出來後,網路高估行動的問題基本得到了解決,訓練也更加快速和可信。下面給出DDQN更新目標值使用的等式:
Q-Target = r + γQ(s’,argmax(Q(s’,a,ϴ),ϴ’))
Dueling DQN
為了解釋Dueling DQN中網路架構變更的原因,我們首先要解釋一些額外的強化學習術語。到目前為止我們討論的Q值對應於確定情況下某種行動的優劣,可以寫作Q(s,a)
。“確定狀態下的行動”可以被拆分為兩個更細粒度的基本變數/符號來表示。第一個是價值函式V(s),它告訴我們當前狀態的優劣。第二個是決策函式(advantage function),它告訴我們和其他行動相比某一行動的優劣。我們可以將Q值視為V和A綜合後的結果,即可以表示為:
Q(s,a) =V(s) + A(a)
Dueling DQN的目標是獲得一個可以分別計算價值和決策並通過最後一層綜合得到Q值的網路。乍一看這麼做好像沒有任何意義,反正最後都要整合在一起,為什麼還要單獨再拆開呢?這麼做的好處主要體現在強化學習的agent不需要在每個時刻都同時考慮價值和決策。舉例來說:想象你在坐在公園的長椅上看日落的場景,這是十分美好的,也就是說坐在長椅上這一行為會帶來很高的收益。但是如果不考慮你當前所處的狀態的話(日落),我們不需要作出任何動作,即思考坐在長椅上這一動作的價值是沒有意義的。通過將狀態價值從其繫結的動作上分離出來後,我們可以得到更加健壯的狀態價值預估。
綜合實踐
現在我們已經習得了構建我們的DQN所需的全部技巧,下面就讓我們在實際的遊戲環境中進行測試吧!雖然上面我們說DQN經過足夠的訓練後可以學會雅達利遊戲,但是要在這些遊戲上表現良好,至少要在強大算力的計算機上訓練一天。為了做教學演示,我設計了一個我們DQN可以在比較強大的算力(我使用的是GTX970)下經過數小時訓練掌握的簡單遊戲。在這個環境下,agent將控制一個藍色方塊,目標是避開紅色方塊(分值 -1)的前提下移動至綠色方塊(分值 +1)。每個episode將以隨機生成的5x5的網格開局,agent需要在50步內得到儘可能高的分數。由於方塊的位置是隨機產生的,agent不是像FrozenLake問題中那樣簡單地習得一個固定路徑就可以了,它必須理解這些方塊的空間特徵。下面讓我們來實際嘗試一下吧!
# 譯者執行環境為jupyterlab,每個分割線對應一個程式碼塊,Python3,需要pillow庫
from __future__ import division
import gym
import numpy as np
import random
import tensorflow as tf
import tensorflow.contrib.slim as slim
import matplotlib.pyplot as plt
import scipy.misc
import os
%matplotlib inline
# --------------------------------------------------
# 載入遊戲環境
# 你可以自行調整遊戲難度(網格大小),小的網格可以使網路訓練更快,大的網格可以提升遊戲難度
from gridworld import gameEnv
env = gameEnv(partial=False, size=5)
# --------------------------------------------------
# 實現網路
class Qnetwork():
def __init__(self, h_size):
# 網路接收到遊戲傳遞出的一幀影象並將之轉化為陣列
# 之後調整大小並通過四個卷積層
self.scalarInput = tf.placeholder(shape=[None, 21168], dtype=tf.float32)
self.imageIn = tf.reshape(self.scalarInput, shape=[-1, 84, 84, 3])
self.conv1 = slim.conv2d(inputs=self.imageIn, num_outputs=32, kernel_size=[8,8], stride=[4,4], padding='VALID', biases_initializer=None)
self.conv2 = slim.conv2d(inputs=self.conv1, num_outputs=64, kernel_size=[4,4], stride=[2,2], padding='VALID', biases_initializer=None)
self.conv3 = slim.conv2d(inputs=self.conv2, num_outputs=64, kernel_size=[3,3], stride=[1,1], padding='VALID', biases_initializer=None)
self.conv4 = slim.conv2d(inputs=self.conv3, num_outputs=h_size, kernel_size=[7,7], stride=[1,1], padding='VALID', biases_initializer=None)
# 取得最後一層卷積層的輸出進行拆分,分別計算價值與決策
self.streamAC, self.streamVC = tf.split(self.conv4, 2, 3)
self.streamA = slim.flatten(self.streamAC)
self.streamV = slim.flatten(self.streamVC)
xavier_init = tf.contrib.layers.xavier_initializer()
self.AW = tf.Variable(xavier_init([h_size//2, env.actions]))
self.VW = tf.Variable(xavier_init([h_size//2, 1]))
self.Advantage = tf.matmul(self.streamA, self.AW)
self.Value = tf.matmul(self.streamV, self.VW)
# 綜合得到最終的Q值
self.Qout = self.Value + tf.subtract(self.Advantage, tf.reduce_mean(self.Advantage, axis=1, keep_dims=True))
self.predict = tf.argmax(self.Qout, 1)
# 將目標Q值和預測Q值作差平方和作為損失值
self.targetQ = tf.placeholder(shape=[None], dtype=tf.float32)
self.actions = tf.placeholder(shape=[None], dtype=tf.int32)
self.actions_onehot = tf.one_hot(self.actions, env.actions, dtype=tf.float32)
self.Q = tf.reduce_sum(tf.multiply(self.Qout, self.actions_onehot), axis=1)
self.td_error = tf.square(self.targetQ - self.Q)
self.loss = tf.reduce_mean(self.td_error)
self.trainer = tf.train.AdamOptimizer(learning_rate=0.0001)
self.updateModel = self.trainer.minimize(self.loss)
# --------------------------------------------------
# 歷程重現
# 這個類賦予了網路儲存、重取樣來進行訓練的能力
class experience_buffer():
def __init__(self, buffer_size = 50000):
self.buffer = []
self.buffer_size = buffer_size
def add(self, experience):
if len(self.buffer) + len(experience) >= self.buffer_size:
self.buffer[0:(len(experience) + len(self.buffer)) - self.buffer_size] = []
self.buffer.extend(experience)
def sample(self, size):
return np.reshape(np.array(random.sample(self.buffer, size)), [size, 5])
# --------------------------------------------------
# 用於處理遊戲返回幀的函式
def processState(states):
return np.reshape(states, [21168])
# --------------------------------------------------
# 利用主網路引數更新目標網路
def updateTargetGraph(tfVars, tau):
total_vars = len(tfVars)
op_holder = []
for idx, var in enumerate(tfVars[0: total_vars//2]):
op_holder.append(tfVars[idx+total_vars//2].assign((var.value()*tau) + ((1-tau)*tfVars[idx+total_vars//2].value())))
return op_holder
def updateTarget(op_holder,sess):
for op in op_holder:
sess.run(op)
# --------------------------------------------------
batch_size = 32 #每次訓練使用多少訓練記錄
update_freq = 4 # 多久執行一次訓練操作
y = .99 # Q 值的折算因子
startE = 1 # 隨機行動的初始概率
endE = 0.1 # 隨機行動的最低概率
annealing_steps = 10000. # startE衰減至endE所需的步驟數
num_episodes = 10000 # 網路在遊戲環境下訓練的episodes數
pre_train_steps = 10000 # 訓練開始前允許的隨機行動次數
max_epLength = 50 # episode的最大允許值
load_model = False # 是否載入儲存的模型
path = "./dqn" # 我們模型的儲存路徑
h_size = 512 # 最後一個卷積層的尺寸
tau = 0.001 # 目標網路更新至主網路的速率
# --------------------------------------------------
tf.reset_default_graph()
mainQN = Qnetwork(h_size)
targetQN = Qnetwork(h_size)
init = tf.global_variables_initializer()
saver = tf.train.Saver()
trainables = tf.trainable_variables()
targetOps = updateTargetGraph(trainables,tau)
myBuffer = experience_buffer()
# 設定隨機決策的衰減速率
e = startE
stepDrop = (startE - endE)/annealing_steps
#建立每個episode中包含所有收益和操作記錄的列表
jList = []
rList = []
total_steps = 0
#建立用於儲存模型的目錄
if not os.path.exists(path):
os.makedirs(path)
with tf.Session() as sess:
sess.run(init)
if load_model == True:
print('Loading Model...')
ckpt = tf.train.get_checkpoint_state(path)
saver.restore(sess,ckpt.model_checkpoint_path)
for i in range(num_episodes):
episodeBuffer = experience_buffer()
# 初始化環境
s = env.reset()
s = processState(s)
d = False
rAll = 0
j = 0
# Q網路
while j < max_epLength: # 如果agent移動了超過200次還沒有接觸任何方塊,停止本次訓練
j+=1
# 根據Q網路和貪心法則選取行動(有隨機行動的可能性)
if np.random.rand(1) < e or total_steps < pre_train_steps:
a = np.random.randint(0,4)
else:
a = sess.run(mainQN.predict,feed_dict={mainQN.scalarInput:[s]})[0]
s1,r,d = env.step(a)
s1 = processState(s1)
total_steps += 1
episodeBuffer.add(np.reshape(np.array([s,a,r,s1,d]),[1,5])) # 儲存訓練記錄至緩衝器
if total_steps > pre_train_steps:
if e > endE:
e -= stepDrop
if total_steps % (update_freq) == 0:
trainBatch = myBuffer.sample(batch_size) # 從記錄中隨機獲取訓練批次資料
# 使用 Double-DQN 更新目標Q值
Q1 = sess.run(mainQN.predict,feed_dict={mainQN.scalarInput:np.vstack(trainBatch[:,3])})
Q2 = sess.run(targetQN.Qout,feed_dict={targetQN.scalarInput:np.vstack(trainBatch[:,3])})
end_multiplier = -(trainBatch[:,4] - 1)
doubleQ = Q2[range(batch_size),Q1]
targetQ = trainBatch[:,2] + (y*doubleQ * end_multiplier)
# 利用目標值更新網路
_ = sess.run(mainQN.updateModel,
feed_dict={mainQN.scalarInput:np.vstack(trainBatch[:,0]),mainQN.targetQ:targetQ, mainQN.actions:trainBatch[:,1]})
updateTarget(targetOps,sess) # 更新目標網路至主網路
rAll += r
s = s1
if d == True:
break
myBuffer.add(episodeBuffer.buffer)
jList.append(j)
rList.append(rAll)
# 週期性儲存訓練結果
if i % 1000 == 0:
saver.save(sess,path+'/model-'+str(i)+'.ckpt')
print("Saved Model")
if len(rList) % 10 == 0:
print(total_steps,np.mean(rList[-10:]), e)
saver.save(sess,path+'/model-'+str(i)+'.ckpt')
print("平均得分: " + str(sum(rList)/num_episodes))
# --------------------------------------------------
rMat = np.resize(np.array(rList),[len(rList)//100,100])
rMean = np.average(rMat,1)
plt.plot(rMean)
# --------------------------------------------------
結果輸出:
···
498000 24.0 0.09999999999985551
498500 22.0 0.09999999999985551
499000 22.8 0.09999999999985551
499500 21.5 0.09999999999985551
500000 22.1 0.09999999999985551
平均得分: 20.7166
···
遊戲環境輸出84x84x3的彩色圖片,使用和OpenAI gym相似的函式回撥,這使得程式碼可以輕鬆地移植至OpenAI的雅達利遊戲上。在計算資源和時間允許的情況下,我建議你在其他的雅達利遊戲上進行嘗試。超引數可能需要一些調整,但是一定是有可行解的,祝你好運!
系列文章(翻譯進度):
- (0) Q-Learning的查詢表實現和神經網路實現
- (1) 雙臂賭博機
- (1.5) — 上下文賭博機
- (2) —— 基於策略的Agents
- (3) —— 構建模擬環境來進行強化學習
- (4)—— 深度Q網路及擴充套件
- Part 5 — Visualizing an Agent’s Thoughts and Actions
- Part 6 — Partial Observability and Deep Recurrent Q-Networks
- Part 7 — Action-Selection Strategies for Exploration
- Part 8 — Asynchronous Actor-Critic Agents (A3C)