Python深度強化學習之DQN算法原理詳解

DQN算法是DeepMind團隊提出的一種深度強化學習算法,在許多電動遊戲中達到人類玩傢甚至超越人類玩傢的水準,本文就帶領大傢瞭解一下這個算法,論文的鏈接見下方。

論文:Human-level control through deep reinforcement learning | Nature

代碼:後續會將代碼上傳到Github上…

1 DQN算法簡介

Q-learning算法采用一個Q-tabel來記錄每個狀態下的動作值,當狀態空間或動作空間較大時,需要的存儲空間也會較大。如果狀態空間或動作空間連續,則該算法無法使用。因此,Q-learning算法隻能用於解決離散低維狀態空間和動作空間類問題。DQN算法的核心就是用一個人工神經網絡

來代替Q-tabel,即動作價值函數。網絡的輸入為狀態信息,輸出為每個動作的價值,因此DQN算法可以用來解決連續狀態空間和離散動作空間問題,無法解決連續動作空間類問題。針對連續動作空間類問題,後面blog會慢慢介紹。

2 DQN算法原理

DQN算法是一種off-policy算法,當同時出現異策、自益和函數近似時,無法保證收斂性,容易出現訓練不穩定或訓練困難等問題。針對這些問題,研究人員主要從以下兩個方面進行瞭改進。

(1)經驗回放:將經驗(當前狀態st、動作at、即時獎勵rt+1、下個狀態st+1、回合狀態done)存放在經驗池中,並按照一定的規則采樣。

(2)目標網絡:修改網絡的更新方式,例如不把剛學習到的網絡權重馬上用於後續的自益過程。

2.1 經驗回放

經驗回放就是一種讓經驗概率分佈變得穩定的技術,可以提高訓練的穩定性。經驗回放主要有“存儲”和“回放”兩大關鍵步驟:

存儲:將經驗以(st,at,rt+1,st+1,done)形式存儲在經驗池中。

回放:按照某種規則從經驗池中采樣一條或多條經驗數據。

從存儲的角度來看,經驗回放可以分為集中式回放和分佈式回放:

  • 集中式回放:智能體在一個環境中運行,把經驗統一存儲在經驗池中。
  • 分佈式回放:多個智能體同時在多個環境中運行,並將經驗統一存儲在經驗池中。由於多個智能體同時生成經驗,所以能夠使用更多資源的同時更快地收集經驗。

從采樣的角度來看,經驗回放可以分為均勻回放和優先回放:

  • 均勻回放:等概率從經驗池中采樣經驗。
  • 優先回放:為經驗池中每條經驗指定一個優先級,在采樣經驗時更傾向於選擇優先級更高的經驗。一般的做法是,如果某條經驗(例如經驗)的優先級為,那麼選取該經驗的概率為:

優先回放可以具體參照這篇論文:優先經驗回放

經驗回放的優點:

1.在訓練Q網絡時,可以打破數據之間的相關性,使得數據滿足獨立同分佈,從而減小參數更新的方差,提高收斂速度。

2.能夠重復使用經驗,數據利用率高,對於數據獲取困難的情況尤其有用。

經驗回放的缺點:

無法應用於回合更新和多步學習算法。但是將經驗回放應用於Q學習,就規避瞭這個缺點。

代碼中采用集中式均勻回放,具體如下:

import numpy as np
 
 
class ReplayBuffer:
    def __init__(self, state_dim, action_dim, max_size, batch_size):
        self.mem_size = max_size
        self.batch_size = batch_size
        self.mem_cnt = 0
 
        self.state_memory = np.zeros((self.mem_size, state_dim))
        self.action_memory = np.zeros((self.mem_size, ))
        self.reward_memory = np.zeros((self.mem_size, ))
        self.next_state_memory = np.zeros((self.mem_size, state_dim))
        self.terminal_memory = np.zeros((self.mem_size, ), dtype=np.bool)
 
    def store_transition(self, state, action, reward, state_, done):
        mem_idx = self.mem_cnt % self.mem_size
 
        self.state_memory[mem_idx] = state
        self.action_memory[mem_idx] = action
        self.reward_memory[mem_idx] = reward
        self.next_state_memory[mem_idx] = state_
        self.terminal_memory[mem_idx] = done
 
        self.mem_cnt += 1
 
    def sample_buffer(self):
        mem_len = min(self.mem_size, self.mem_cnt)
 
        batch = np.random.choice(mem_len, self.batch_size, replace=True)
 
        states = self.state_memory[batch]
        actions = self.action_memory[batch]
        rewards = self.reward_memory[batch]
        states_ = self.next_state_memory[batch]
        terminals = self.terminal_memory[batch]
 
        return states, actions, rewards, states_, terminals
 
    def ready(self):
        return self.mem_cnt > self.batch_size

2.2 目標網絡

對於基於自益的Q學習,動作價值估計和權重有關。當權重變化時,動作價值的估計也會發生變化。在學習的過程中,動作價值試圖追逐一個變化的回報,容易出現不穩定的情況。

目標網絡是在原有的神經網絡之外重新搭建一個結構完全相同的網絡。原先的網絡稱為評估網絡,新構建的網絡稱為目標網絡。在學習過程中,使用目標網絡進行自益得到回報的評估值,作為學習目標。在更新過程中,隻更新評估網絡的權重,而不更新目標網絡的權重。這樣,更新權重時針對的目標不會在每次迭代都發生變化,是一個固定的目標。在更新一定次數後,再將評估網絡的權重復制給目標網絡,進而進行下一批更新,這樣目標網絡也能得到更新。由於在目標網絡沒有變化的一段時間內回報的估計是相對固定的,因此目標網絡的引入增加瞭學習的穩定性。

目標網絡的更新方式:

上述在一段時間內固定目標網絡,一定次數後將評估網絡權重復制給目標網絡的更新方式為硬更新(hard update),即

其中表示目標網絡權重,表示評估網絡權重。

另外一種常用的更新方式為軟更新(soft update),即引入一個學習率,將舊的目標網絡參數和新的評估網絡參數直接做加權平均後的值賦值給目標網絡

學習率

3 DQN算法偽代碼

DQN算法的實現代碼為:

import torch as T
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import numpy as np
from buffer import ReplayBuffer
 
device = T.device("cuda:0" if T.cuda.is_available() else "cpu")
 
 
class DeepQNetwork(nn.Module):
    def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim):
        super(DeepQNetwork, self).__init__()
 
        self.fc1 = nn.Linear(state_dim, fc1_dim)
        self.fc2 = nn.Linear(fc1_dim, fc2_dim)
        self.q = nn.Linear(fc2_dim, action_dim)
 
        self.optimizer = optim.Adam(self.parameters(), lr=alpha)
        self.to(device)
 
    def forward(self, state):
        x = T.relu(self.fc1(state))
        x = T.relu(self.fc2(x))
 
        q = self.q(x)
 
        return q
 
    def save_checkpoint(self, checkpoint_file):
        T.save(self.state_dict(), checkpoint_file, _use_new_zipfile_serialization=False)
 
    def load_checkpoint(self, checkpoint_file):
        self.load_state_dict(T.load(checkpoint_file))
 
 
class DQN:
    def __init__(self, alpha, state_dim, action_dim, fc1_dim, fc2_dim, ckpt_dir,
                 gamma=0.99, tau=0.005, epsilon=1.0, eps_end=0.01, eps_dec=5e-4,
                 max_size=1000000, batch_size=256):
        self.tau = tau
        self.gamma = gamma
        self.epsilon = epsilon
        self.eps_min = eps_end
        self.eps_dec = eps_dec
        self.batch_size = batch_size
        self.action_space = [i for i in range(action_dim)]
        self.checkpoint_dir = ckpt_dir
 
        self.q_eval = DeepQNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
                                   fc1_dim=fc1_dim, fc2_dim=fc2_dim)
        self.q_target = DeepQNetwork(alpha=alpha, state_dim=state_dim, action_dim=action_dim,
                                     fc1_dim=fc1_dim, fc2_dim=fc2_dim)
 
        self.memory = ReplayBuffer(state_dim=state_dim, action_dim=action_dim,
                                   max_size=max_size, batch_size=batch_size)
 
        self.update_network_parameters(tau=1.0)
 
    def update_network_parameters(self, tau=None):
        if tau is None:
            tau = self.tau
 
        for q_target_params, q_eval_params in zip(self.q_target.parameters(), self.q_eval.parameters()):
            q_target_params.data.copy_(tau * q_eval_params + (1 - tau) * q_target_params)
 
    def remember(self, state, action, reward, state_, done):
        self.memory.store_transition(state, action, reward, state_, done)
 
    def choose_action(self, observation, isTrain=True):
        state = T.tensor([observation], dtype=T.float).to(device)
        actions = self.q_eval.forward(state)
        action = T.argmax(actions).item()
 
        if (np.random.random() < self.epsilon) and isTrain:
            action = np.random.choice(self.action_space)
 
        return action
 
    def learn(self):
        if not self.memory.ready():
            return
 
        states, actions, rewards, next_states, terminals = self.memory.sample_buffer()
        batch_idx = np.arange(self.batch_size)
 
        states_tensor = T.tensor(states, dtype=T.float).to(device)
        rewards_tensor = T.tensor(rewards, dtype=T.float).to(device)
        next_states_tensor = T.tensor(next_states, dtype=T.float).to(device)
        terminals_tensor = T.tensor(terminals).to(device)
 
        with T.no_grad():
            q_ = self.q_target.forward(next_states_tensor)
            q_[terminals_tensor] = 0.0
            target = rewards_tensor + self.gamma * T.max(q_, dim=-1)[0]
        q = self.q_eval.forward(states_tensor)[batch_idx, actions]
 
        loss = F.mse_loss(q, target.detach())
        self.q_eval.optimizer.zero_grad()
        loss.backward()
        self.q_eval.optimizer.step()
 
        self.update_network_parameters()
        self.epsilon = self.epsilon - self.eps_dec if self.epsilon > self.eps_min else self.eps_min
 
    def save_models(self, episode):
        self.q_eval.save_checkpoint(self.checkpoint_dir + 'Q_eval/DQN_q_eval_{}.pth'.format(episode))
        print('Saving Q_eval network successfully!')
        self.q_target.save_checkpoint(self.checkpoint_dir + 'Q_target/DQN_Q_target_{}.pth'.format(episode))
        print('Saving Q_target network successfully!')
 
    def load_models(self, episode):
        self.q_eval.load_checkpoint(self.checkpoint_dir + 'Q_eval/DQN_q_eval_{}.pth'.format(episode))
        print('Loading Q_eval network successfully!')
        self.q_target.load_checkpoint(self.checkpoint_dir + 'Q_target/DQN_Q_target_{}.pth'.format(episode))
        print('Loading Q_target network successfully!')

算法仿真環境是在gym庫中的LunarLander-v2環境,因此需要先配置好gym庫。進入Aanconda中對應的Python環境中,執行下面的指令

pip install gym

但是,這樣安裝的gym庫隻包括少量的內置環境,如算法環境、簡單文字遊戲環境和經典控制環境,無法使用LunarLander-v2。

訓練腳本如下:

import gym
import numpy as np
import argparse
from DQN import DQN
from utils import plot_learning_curve, create_directory
 
parser = argparse.ArgumentParser()
parser.add_argument('--max_episodes', type=int, default=500)
parser.add_argument('--ckpt_dir', type=str, default='./checkpoints/DQN/')
parser.add_argument('--reward_path', type=str, default='./output_images/avg_reward.png')
parser.add_argument('--epsilon_path', type=str, default='./output_images/epsilon.png')
 
args = parser.parse_args()
 
 
def main():
    env = gym.make('LunarLander-v2')
    agent = DQN(alpha=0.0003, state_dim=env.observation_space.shape[0], action_dim=env.action_space.n,
                fc1_dim=256, fc2_dim=256, ckpt_dir=args.ckpt_dir, gamma=0.99, tau=0.005, epsilon=1.0,
                eps_end=0.05, eps_dec=5e-4, max_size=1000000, batch_size=256)
    create_directory(args.ckpt_dir, sub_dirs=['Q_eval', 'Q_target'])
    total_rewards, avg_rewards, eps_history = [], [], []
 
    for episode in range(args.max_episodes):
        total_reward = 0
        done = False
        observation = env.reset()
        while not done:
            action = agent.choose_action(observation, isTrain=True)
            observation_, reward, done, info = env.step(action)
            agent.remember(observation, action, reward, observation_, done)
            agent.learn()
            total_reward += reward
            observation = observation_
 
        total_rewards.append(total_reward)
        avg_reward = np.mean(total_rewards[-100:])
        avg_rewards.append(avg_reward)
        eps_history.append(agent.epsilon)
        print('EP:{} reward:{} avg_reward:{} epsilon:{}'.
              format(episode + 1, total_reward, avg_reward, agent.epsilon))
 
        if (episode + 1) % 50 == 0:
            agent.save_models(episode + 1)
 
    episodes = [i for i in range(args.max_episodes)]
    plot_learning_curve(episodes, avg_rewards, 'Reward', 'reward', args.reward_path)
    plot_learning_curve(episodes, eps_history, 'Epsilon', 'epsilon', args.epsilon_path)
 
 
if __name__ == '__main__':
    main()

訓練時還會用到畫圖函數和創建文件夾函數,我將他們另外放在一個utils.py腳本中,具體代碼如下:

import os
import matplotlib.pyplot as plt
 
 
def plot_learning_curve(episodes, records, title, ylabel, figure_file):
    plt.figure()
    plt.plot(episodes, records, linestyle='-', color='r')
    plt.title(title)
    plt.xlabel('episode')
    plt.ylabel(ylabel)
 
    plt.show()
    plt.savefig(figure_file)
 
 
def create_directory(path: str, sub_dirs: list):
    for sub_dir in sub_dirs:
        if os.path.exists(path + sub_dir):
            print(path + sub_dir + ' is already exist!')
        else:
            os.makedirs(path + sub_dir, exist_ok=True)
            print(path + sub_dir + ' create successfully!')

仿真結果如下圖所示:

通過平均獎勵曲線可以看出,大概迭代到400步左右時算法趨於收斂。 

到此這篇關於Python深度強化學習之DQN算法原理詳解的文章就介紹到這瞭,更多相關Python DQN算法內容請搜索WalkonNet以前的文章或繼續瀏覽下面的相關文章希望大傢以後多多支持WalkonNet!

推薦閱讀: