使用gym庫Classic control實現deep Q learning
阿新 • • 發佈:2018-12-06
本文轉自:https://blog.csdn.net/winycg/article/details/79468320
target="_blank">https://gym.openai.com/envs/ OpenAI gym官網
https://github.com/openai/gym#installation gym安裝教程
http://blog.csdn.net/cs123951/article/details/77854453 MountainCar原理參考
OpenAI gym提供了強化學習時的環境模組,使得我們實現強化學習演算法的時候無需關注於模擬環境的實現。
CartPole例項
運載體在一根杆子下無摩擦的跟蹤。系統通過施加+1和-1推動運載體。杆子的搖擺在初始時垂直的,目標是阻止它掉落運載體。每一步杆子保持垂直可以獲得+1的獎勵。episode將會終結於杆子的搖擺幅度超過了離垂直方向的15°或者是運載體偏移初始中心超過2.4個單位。
使用DQN程式碼測試CartPole:
import numpy as np
import random
import tensorflow as tf
import gym
max_episode = 100
env = gym.make('CartPole-v0' )
env = env.unwrapped
class DeepQNetwork(object):
def __init__(self,
n_actions,
n_features,
learning_rate=0.01,
reward_decay=0.9, # gamma
epsilon_greedy=0.9, # epsilon
epsilon_increment = 0.001,
replace_target_iter= 300, # 更新target網路的間隔步數
buffer_size=500, # 樣本緩衝區
batch_size=32,
):
self.n_actions = n_actions
self.n_features = n_features
self.lr = learning_rate
self.gamma = reward_decay
self.epsilon_max = epsilon_greedy
self.replace_target_iter = replace_target_iter
self.buffer_size = buffer_size
self.buffer_counter = 0 # 統計目前進入過buffer的數量
self.batch_size = batch_size
self.epsilon = 0 if epsilon_increment is not None else epsilon_greedy
self.epsilon_max = epsilon_greedy
self.epsilon_increment = epsilon_increment
self.learn_step_counter = 0 # 學習計步器
self.buffer = np.zeros((self.buffer_size, n_features * 2 + 2)) # 初始化Experience buffer[s,a,r,s_]
self.build_net()
# 將eval網路中引數全部更新到target網路
target_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='target_net')
eval_params = tf.get_collection(tf.GraphKeys.GLOBAL_VARIABLES, scope='eval_net')
with tf.variable_scope('soft_replacement'):
self.target_replace_op = [tf.assign(t, e) for t, e in zip(target_params, eval_params)]
self.sess = tf.Session()
tf.summary.FileWriter('logs/', self.sess.graph)
self.sess.run(tf.global_variables_initializer())
def build_net(self):
self.s = tf.placeholder(tf.float32, [None, self.n_features])
self.s_ = tf.placeholder(tf.float32, [None, self.n_features])
self.r = tf.placeholder(tf.float32, [None, ])
self.a = tf.placeholder(tf.int32, [None, ])
w_initializer = tf.random_normal_initializer(0., 0.3)
b_initializer = tf.constant_initializer(0.1)
# q_eval網路架構,輸入狀態屬性,輸出4種動作
with tf.variable_scope('eval_net'):
eval_layer = tf.layers.dense(self.s, 20, tf.nn.relu, kernel_initializer=w_initializer,
bias_initializer=b_initializer, name='eval_layer')
self.q_eval = tf.layers.dense(eval_layer, self.n_actions, kernel_initializer=w_initializer,
bias_initializer=b_initializer, name='output_layer1')
with tf.variable_scope('target_net'):
target_layer = tf.layers.dense(self.s_, 20, tf.nn.relu, kernel_initializer=w_initializer,
bias_initializer=b_initializer, name='target_layer')
self.q_next = tf.layers.dense(target_layer, self.n_actions, kernel_initializer=w_initializer,
bias_initializer=b_initializer, name='output_layer2')
with tf.variable_scope('q_target'):
# 計算期望價值,並使用stop_gradient函式將其不計算梯度,也就是當做常數對待
self.q_target = tf.stop_gradient(self.r + self.gamma * tf.reduce_max(self.q_next, axis=1))
with tf.variable_scope('q_eval'):
# 將a的值對應起來,
a_indices = tf.stack([tf.range(tf.shape(self.a)[0]), self.a], axis=1)
self.q_eval_a = tf.gather_nd(params=self.q_eval, indices=a_indices)
with tf.variable_scope('loss'):
self.loss = tf.reduce_mean(tf.squared_difference(self.q_target, self.q_eval_a))
with tf.variable_scope('train'):
self.train_op = tf.train.RMSPropOptimizer(self.lr).minimize(self.loss)
# 儲存訓練資料
def store_transition(self, s, a, r, s_):
transition = np.hstack((s, a, r, s_))
index = self.buffer_counter % self.buffer_size
self.buffer[index, :] = transition
self.buffer_counter += 1
def choose_action_by_epsilon_greedy(self, status):
status = status[np.newaxis, :]
if random.random() < self.epsilon:
actions_value = self.sess.run(self.q_eval, feed_dict={self.s: status})
action = np.argmax(actions_value)
else:
action = np.random.randint(0, self.n_actions)
return action
def learn(self):
# 每學習self.replace_target_iter步,更新target網路的引數
if self.learn_step_counter % self.replace_target_iter == 0:
self.sess.run(self.target_replace_op)
# 從Experience buffer中選擇樣本
sample_index = np.random.choice(min(self.buffer_counter, self.buffer_size), size=self.batch_size)
batch_buffer = self.buffer[sample_index, :]
_, cost = self.sess.run([self.train_op, self.loss], feed_dict={
self.s: batch_buffer[:, :self.n_features],
self.a: batch_buffer[:, self.n_features],
self.r: batch_buffer[:, self.n_features + 1],
self.s_: batch_buffer[:, -self.n_features:]
})
self.epsilon = min(self.epsilon_max, self.epsilon + self.epsilon_increment)
self.learn_step_counter += 1
return cost
RL = DeepQNetwork(n_actions=env.action_space.n,
n_features=env.observation_space.shape[0])
total_step = 0
for episode in range(max_episode):
observation = env.reset()
episode_reward = 0
while True:
env.render() # 表達環境
action = RL.choose_action_by_epsilon_greedy(observation)
observation_, reward, done, info = env.step(action)
# x是車的水平位移,theta是杆離垂直的角度
x, x_dot, theta, theta_dot = observation_
# reward1是車越偏離中心越少
reward1 = (env.x_threshold - abs(x))/env.x_threshold - 0.8
# reward2為杆越垂直越高
reward2 = (env.theta_threshold_radians - abs(theta))/env.theta_threshold_radians - 0.5
reward = reward1 + reward2
RL.store_transition(observation, action, reward, observation_)
if total_step > 100:
cost = RL.learn()
print('cost: %.3f' % cost)
episode_reward += reward
observation = observation_
if done:
print('episode:', episode,
'episode_reward %.2f' % episode_reward,
'epsilon %.2f' % RL.epsilon)
break
total_step += 1
MountainCar例項
car的軌跡是一維的,定位在兩山之間,目標是爬上右邊的山頂。可是car的發動機不足以一次性攀登到山頂,唯一的方式是car來回擺動增加動量。
有3個動作:向前、不動和向後。狀態有2個:位置position和速度velocity。position的值在最低點處為-0.5左右,左邊的坡頂為-1.2,右邊與之相對應的高度位置為0,小黃旗位置為0.5。reward的值只有-1,步數越少到達終點,reward越大。
自定義reward:高度越高,reward越大,因為左邊高度高了,可以積攢的動量越大。所以可設為reward=abs(position+0.5)
RL = DeepQNetwork(n_actions=env.action_space.n,
n_features=env.observation_space.shape[0])
total_step = 0
for episode in range(max_episode):
observation = env.reset()
episode_reward = 0
while True:
env.render() # 表達環境
action = RL.choose_action_by_epsilon_greedy(observation)
observation_, reward, done, info = env.step(action)
#
position, velocity = observation_
reward=abs(position+0.5)
RL.store_transition(observation, action, reward, observation_)
if total_step > 100:
cost_ = RL.learn()
cost.append(cost_)
episode_reward += reward
observation = observation_
if done:
print('episode:', episode,
'episode_reward %.2f' % episode_reward,
'epsilon %.2f' % RL.epsilon)
break
total_step += 1
plt.plot(np.arange(len(cost)), cost)
plt.show()
更多案例請關注“思享會Club”公眾號或者關注思享會部落格:http://gkhelp.cn/