1. 程式人生 > 實用技巧 >強化學習筆記

強化學習筆記

Preliminary

  • Robbins-Monro Algorithm

    Robbins-Monro Algorithm is designed to solve the following equation:

    \[\int c(s, \theta)\tau_{\theta}(s)=0 \]

    where \(\tau_\theta\) is a distribution of \(s\) parameterized by \(\theta\).

    We can use following rule to obtain \(\theta^*\)

    \[\theta_{k+1} = \theta_k-\eta_k c(s_k, c_k) \]

    Q-learning Algorithm uses Robbins-Monro to update Q function, i.e.,

    \[Q'(s,a) = r+\lambda Q(s_{t+1}, a_{t+1}) - Q(s, a) \]

    The original equation of \(Q(s, a)\) and \(Q(s_{t+1}, a_{t+1})\) is

    \[Q(s,a)=r + \lambda \sum_{s_{t+1}} P(s_{t+1}\vert s_t, a)\max_{a'}Q(s_{t+1}, a') \]

    The sum of \(s_{t+1}\) can be considered as expectation. If we take \(Q(s, a)\)

    and \(r\) into the expectation, the above equation follows Robbin-Monro Algorithm.

Policy Gradient

Now, we parameterize the policy distribution with parameter \(\theta\) denoted by \(\tau_\theta\) and the objective function is \(J(\theta)\). We want to minimize \(J(\theta)\) (or maximize, depends on the definition) and improve our policy by optimizing \(\theta\)

.

Let reward function \(\mu^{\tau_{\theta}}(s_0)=\sum\limits_a \tau(a\vert s_0)Q^{\tau_\theta}(s_0,a)\) be the objective function (Note, we neglect time term \(t\)) and take gradient with respect to \(\theta\)

\[\begin{align*} \nabla_{\theta} \mu^{\tau_\theta}(s_0)&=\sum_a(\nabla\tau(a\vert s_0)Q(a,s_0)+\tau(a\vert s_0)\nabla Q(a, s_0))\\ &=\sum_a(\nabla\tau(a\vert s_0)Q(a,s_0)+\tau(a\vert s_0)\nabla \sum_{s'.r'}P(s',r'\vert a,s_0)(r'+\mu ^{\tau_\theta}(s')))\\ \end{align*} \]

Lemma:

If \(I-P > 0\), for the equation \((I-P)x=y\), we have

\[\begin{align*} x&=(I-P)^{-1}y\\ &=\sum_{k=0}^{\infin} P^{k}y \end{align*} \]

Using this lemma, we have

\[\nabla\mu^{\tau_\theta}(s_0) = \sum_{x\in \mathcal S}\sum_{k=0}^\infin P(s=x, k,\tau_\theta)\sum_a \nabla\tau_{\theta}(a\vert x)Q^{\tau}(x,a) \]

Let \(\eta(x)\) denotes \(\sum\limits_{k=0}^{\infin}P(s=x,k,\tau_\theta)\), we can rewrite the equation

\[\begin{align*} \nabla\mu^{\tau_\theta}(s_0) &= \sum_{x\in \mathcal S}\eta(x)\sum_a \nabla\tau_{\theta}(a\vert x)Q^{\tau}(x,a)\\ &\propto\sum_{x\in \mathcal S}\frac{\eta(x)}{\sum_{x'}\eta(x')}\sum_a \nabla\tau_{\theta}(a\vert x)Q^{\tau}(x,a)\\ &=E^{\tau_\theta}[\nabla\tau_{\theta}(a\vert x)Q^{\tau}(x,a)]\\ &=E^{\tau_\theta}[Q^{\tau}(S_t,A_t)\nabla\log(\tau(A_t\vert S_t))] \end{align*} \]

AC

We can using a network to estimate Q function and the actor network to learn \(\theta\).

from collections import deque
import random

import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense
from tensorflow.keras.optimizers import Adam
physical_devices = tf.config.experimental.list_physical_devices('GPU')

assert len(physical_devices) > 0, "Not enough GPU hardware devices available"

tf.config.experimental.set_memory_growth(physical_devices[0], True)

import gym
import argparse

parser = argparse.ArgumentParser()
parser.add_argument('--gamma', type=float, default=0.99)
parser.add_argument('--update_interval', type=int, default=5)
parser.add_argument('--actor_lr', type=float, default=0.0005)
parser.add_argument('--critic_lr', type=float, default=0.001)

args = parser.parse_args()
class Actor:
    def __init__(self, state_dim, action_dim):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.opt = Adam(args.actor_lr)
        self.model = self.create_model()

    def create_model(self):
        return tf.keras.Sequential([
            Input((self.state_dim, )),
            Dense(32, activation='relu'),
            Dense(16, activation='relu'),
            Dense(self.action_dim, activation='softmax')
        ])

    def compute_loss(self, actions, logits, advantages):
        ce_loss = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
        actions = tf.cast(actions, tf.int32)
        policy_loss = ce_loss(actions, logits, sample_weight=tf.stop_gradient(advantages))
        return policy_loss

    def train(self, states, actions, advantages):
        with tf.GradientTape() as tape:
            logits = self.model(states, training=True)
            loss = self.compute_loss(actions, logits, advantages)
        grads = tape.gradient(loss, self.model.trainable_variables)
        self.opt.apply_gradients(zip(grads, self.model.trainable_variables))
        return loss

class Critic:
    def __init__(self, state_dim, action_dim):
        self.state_dim = state_dim
        self.action_dim = action_dim
        self.opt = Adam(args.critic_lr)
        self.model = self.create_model()

    def create_model(self):
        return tf.keras.Sequential([
            Input((self.state_dim,)),
            Dense(32, activation='relu'),
            Dense(16, activation='relu'),
            Dense(16, activation='relu'),
            Dense(1, activation='linear')
        ])

    def compute_loss(self, v_pred, td_targets):
        mse = tf.keras.losses.MeanSquaredError()
        return mse(td_targets, v_pred)

    def train(self, states, td_targets):
        with tf.GradientTape() as tape:
            v_pred = self.model(states, training=True)
            assert v_pred.shape == td_targets.shape
            loss = self.compute_loss(v_pred, tf.stop_gradient(td_targets))
        grads = tape.gradient(loss, self.model.trainable_variables)
        self.opt.apply_gradients(zip(grads, self.model.trainable_variables))
        return loss

class Agent:
    def __init__(self, env):
        self.env = env
        self.state_dim = env.observation_space.shape[0]
        self.action_dim = env.action_space.n
        self.actor = Actor(self.state_dim, self.action_dim)
        self.critic = Critic(self.state_dim, self.action_dim)

    def td_target(self, reward, next_state, done):
        if done:
            return reward
        v_value = self.critic.model.predict(
            np.reshape(next_state, [1, self.state_dim]))
        return np.reshape(reward + args.gamma * v_value[0], [1, 1])

    def advantage(self, td_targets, baselines):
        return td_targets - baselines

    def list_to_batch(self, list):
        batch = list[0]
        for elem in list[1:]:
            batch = np.append(batch, elem, axis=0)
        return batch

    def train(self, max_episodes=1000):
        for ep in range(max_episodes):
            state_batch = []
            action_batch = []
            td_target_batch = []
            advatnage_batch = []
            episode_reward, done = 0, False

            state = self.env.reset()

            while not done:
                # self.env.render()
                probs = self.actor.model.predict(
                    np.reshape(state, [1, self.state_dim]))
                action = np.random.choice(self.action_dim, p=probs[0])  # choice action according to policy

                next_state, reward, done, _ = self.env.step(action)

                state = np.reshape(state, [1, self.state_dim])
                action = np.reshape(action, [1, 1])
                next_state = np.reshape(next_state, [1, self.state_dim])
                reward = np.reshape(reward, [1, 1])

                td_target = self.td_target(reward * 0.01, next_state, done)
                advantage = self.advantage(
                    td_target, self.critic.model.predict(state))

                state_batch.append(state)
                action_batch.append(action)
                td_target_batch.append(td_target)
                advatnage_batch.append(advantage)

                if len(state_batch) >= args.update_interval or done:
                    states = self.list_to_batch(state_batch)
                    actions = self.list_to_batch(action_batch)
                    td_targets = self.list_to_batch(td_target_batch)
                    advantages = self.list_to_batch(advatnage_batch)

                    actor_loss = self.actor.train(states, actions, advantages)
                    critic_loss = self.critic.train(states, td_targets)

                    state_batch = []
                    action_batch = []
                    td_target_batch = []
                    advatnage_batch = []

                episode_reward += reward[0][0]
                state = next_state[0]

            print('EP{} EpisodeReward={}'.format(ep, episode_reward))

def main():
    env_name = 'CartPole-v1'
    env = gym.make(env_name)
    agent = Agent(env)
    agent.train()


if __name__ == "__main__":
    main()