강화학습 (reinforcement learning) 정책 학습과 가치 학습 비교 - Deep Q Network를 이용한 Atari Space Invaders 와 CartPole

정책 학습과 가치 학습

 강화 학습(reinforcement learning)의 두가지 범주에 속하는 정책 학습과 가치 학습을 비교하기 위해 이전 포스트에서 정책 경사(policy gradient)를 이용한 Atari 게임 Space Invaders 학습을 알아본 적이 있다.


 본 포스트에서는 OpenAI gym의 CartPole과 Atari 게임 Space Invaders 환경에서 Deep Q-network을 이용한 Agent를 만들고 학습이 잘 이루어지는지 테스트하였다. 이전 정책 경사(policy gradient) 에이전트에서 Space Invaders학습이 잘 이루어지지 않아 학습 방법 비교를 위해 간단한 게임인 CartPole도 테스트에 사용하였다.


DQN(Deep Q-Network)을 이용한 Atari 게임 Space Invaders 와 CartPole 학습

openai gym Atari 게임 Space Invaders / CartPole 환경


 Space Invaders는 128바이트 형태의 상태 데이터를 반환하는 'SpaceInvaders-ram-v0'를 사용하였다.

import gym

env = gym.make("SpaceInvaders-ram-v0")


 CartPole은 'CartPole-v0'을 사용하였으며, 테스트 목표에 맞게 _max_episode_steps도 설정 했다.


import gym

env = gym.make("CartPole-v0')
env._max_episode_steps=MAX_EPISODE_STEP+1



Q-learning

 Q-learning은 정책을 직접 학습하는 대신 상태(state)와 행동(action)의 가치 (Q-value)를 학습한다. 가치 Q-value는 어떤 상태(state)에서 어떤 행동(action)을 행한 후 후속되는 기대되는 미래의 보상을 의미한다. 


 위 그림의 예를 들면, 로봇이 행동 a를 한 후 상태 s에서, 가장 많은 보상을 받기 위해선, 기대되는 Q-value가 가장 높은 행동을 해야 한다. 이 상태(state)별 각 행동(action)의 Q-value를 가진 테이블을 Q table이라고 한다. 

 제한된 환경이 아닌 일반적인 환경에서는 상태(state)도 무수히 많고, 행할 수 있는 행동(action)도 많기 때문에 Q-table의 크기도 커지고, Q-value를 만드는 것에 어려움이 있다. 이를 해결하고자 인공 신경망을 사용한다. 이것이 Deep Q Network이다. 

 Deep Q Network Agent의 학습은 상태(state)에서 각 행동(action)별 Q-value를 생성하는 과정이다. 

학습의 목적 함수는 아래와 같다.


Deep Q-Network 분리

학습의 목적함수를 보면 현재의 Q-value에서 미래의 Q-value를 차이를 기반으로 손실 함수를 정의하고 있다. 이렇게 되면 학습과정에 파라메터들이 업데이트 될 때마다 기준도 변하게 되어 파라메터들이 진동하고 손실(loss)값이 발산하게 되는 문제가 발생한다. 

이 문제를 해결하기 위해 Q-Network를 Predict Q-Network와 Target Q-Network로 분리하여, 학습 시 Predict Q-Network의 파라메터만 업데이트 한 후 일정 회수의 학습이 끝난 후 Target Q-Network를 업데이트 하는 방식을 사용해야 한다. 


Experience Replay 

DQN Agent 학습 시 최근의 경험만을 학습한다면, 이 Agent는 전체 환경의 대표(아래 그림의 A)하는 것이 아니라 일부 지역만을 대표(아래 그림의 B)하게 되는 문제가 발생할 수 있다. 

이 문제를 피하기 위해 학습과정에서 과거의 경험까지 함께 학습해야 한다. 


episode 경험 저장 class 소스 코드

experience replay, discounted reward, 데이터 정규화(normalize) 등이 구형되어 있다. 학습은 정해진 에피소드가 끝난 후 주기를 가지고 시행하며, 그 동안의 경험 데이터를 저장한다. 각 에피소드가 끝난 후 보상(reward)데이터는 discount와 normalize를 거치게 된다. 


class experience:
    def __init__(self, max_size=50000):
        self.statet0 = []
        self.action = []
        self.reward = []
        self.statet1 = []
        self.done= []
        self.max_size = max_size
    
    def reset(self):
        self.statet0 = []
        self.action = []
        self.reward = []
        self.statet1 = []
        self.done= []
        
    def append(self, statet0, action, reward, statet1, done):
        self.statet0.append(statet0)
        self.action.append(action)
        self.reward.append(reward)
        self.statet1.append(statet1)
        self.done.append(done)
        self.prune()
    
    def add(self, statet0, action, reward, statet1, done):
        self.statet0 += statet0
        self.action += action
        self.reward += reward
        self.statet1 += statet1
        self.done += done
        self.prune()
        
    def prune(self):
        while len(self.statet0) > self.max_size:
            self.statet0.pop(0)
            self.action.pop(0)
            self.reward.pop(0)
            self.statet1.pop(0)
            self.done.pop(0)
            
    def discount(self, data, gamma = 0.95):
        discount_data = [0 for _ in data]
        cumulative = 0
        for t in reversed(range(len(data))):
            cumulative = cumulative * gamma + data[t]
            discount_data[t] = cumulative
            
        discount_data -= np.mean(discount_data)
        if np.std(discount_data) == 0:
            return discount_data
        discount_data /= np.std(discount_data)
        
        return discount_data
    
    def normalize(self, data):
        normal_data = [x for x in data]
        normal_data -= np.mean(normal_data) 
        if np.std(normal_data) == 0:
            return normal_data
        normal_data /= np.std(normal_data)

        return normal_data

    
    def sample_batch(self, batch_size):
        
        statet0,action,reward,statet1,done=[],[],[],[],[]
        
        rands = np.arange(len(self.statet0))
        np.random.shuffle(rands)
        
        if batch_size > len(self.statet0):
            batch_size = len(self.statet0)
            
        rands = rands[:batch_size]
        for i in rands:
            statet0.append(self.statet0[i])
            action.append(self.action[i])
            reward.append(self.reward[i])
            statet1.append(self.statet1[i])
            done.append(self.done[i])
            
        return statet0,action,reward,statet1,done



Deep Q-Network Agent 소스 코드

인공 신경망은 keras를 사용해 구현했다. DQN 모델 간의 파라메터 업데이트가 편해 keras를 사용했다.  코드가 tensorflow api 만을 사용할 때 보다 훨씬 간결해진 느낌이다.


import numpy as np
import random
from collections import deque
from keras.layers import Input, Dense, Dropout, regularizers
from keras.models import Model
from keras.optimizers import Adam

class DQNAgent:
    
    def __init__(self,state_size, num_actions, learning_rate = 0.01, gamma=0.9, hidden_layer_units=10, replay_buffer_size = 50000):
        
        self.gamma = gamma
        self.num_actions = num_actions
        self.state_size = state_size
        self.replay_buffer_size = replay_buffer_size

        self.mainDQN = self.build_model(in_size=state_size,out_sizes=num_actions,layer_units=hidden_layer_units,lr=learning_rate)
        self.targetDQN = self.build_model(in_size=state_size,out_sizes=num_actions,layer_units=hidden_layer_units,lr=learning_rate)
        #self.mainDQN.summary()
        #self.targetDQN.summary()
        self.update_targetDQN()
        self.replay_buffer = experience()
        
        
    def build_model(self,in_size, out_sizes, depth=5, layer_units = 10, lr=0.01):
        in_layer = Input(shape=(in_size,),name='input_layer')
        prev = in_layer
        
        for fc in range(depth):
            prev = Dense(units=layer_units, activation='tanh', 
            kernel_regularizer=regularizers.l2(l=0.01), name='dense%d'%fc)(prev)
        
        out_layer = Dense(out_sizes,activation='linear', 
                          kernel_regularizer=regularizers.l2(l=0.01), name='out_layers')(prev)
        model = Model(inputs=[in_layer],outputs=[out_layer])
        optimizer = Adam(lr=lr)
        model.compile(loss='mean_squared_error', optimizer=optimizer,metrics=['accuracy'])
        return model
            
    def store_experience(self, state_t0, action, reward, state_t1, done):
        normal_reward = [0 for _ in reward]
        normal_reward[:] = self.replay_buffer.discount(reward)[:]
        self.replay_buffer.add(state_t0, action, normal_reward, state_t1, done)
        
    def update_targetDQN(self):
        self.targetDQN.set_weights(self.mainDQN.get_weights())
        
    def learn(self, batch_size = 10):
        # batch samples      
        statet0_batch,action_batch,reward_batch,statet1_batch,done_batch = self.replay_buffer.sample_batch(batch_size=batch_size)
        state = np.empty(0).reshape(0,self.state_size)
        targetQ = np.empty(0).reshape(0,self.num_actions)
        
        for state_t0, action, reward, state_t1, done in zip(statet0_batch,action_batch,reward_batch,statet1_batch,done_batch):
            Q = self.mainDQN.predict(self.state_reshape(state_t0,self.state_size))
            
            if done:
                Q[0,action]= reward
            else:
                Q[0,action]= reward + (self.gamma*np.max(self.targetDQN.predict(self.state_reshape(state_t1,self.state_size))))
                
            targetQ = np.vstack([targetQ,Q])
            state = np.vstack([state,state_t0])
        
        loss =self.mainDQN.train_on_batch(state,targetQ)
        
        return loss[0]
     
    def state_reshape(self, state, size):
        return np.reshape(state, [1,size]);

    def predict_action(self,env_action_sample, state, epsilon):
        if np.random.rand(1) < epsilon:
            return env_action_sample
        action = np.argmax(self.mainDQN.predict(self.state_reshape(state,self.state_size)))
        return action



DQN(Deep Q-Network) Agent를 사용한 CartPole-v0

CartPole 환경의 학습 코드는 아래와 같다. 


import gym
import time
import numpy as np
from RyanDQNAgent import DQNAgent,experience
import matplotlib.pyplot as plt

MAX_EPISODES = 5000
MAX_EPISODES_STEP = 10000
TRAINING_PERIOD = 10
TRAINING_START_EP = 50

def training(env,agent):
    
    if env == None or agent == None:
        return
    
    training_done_count = 0
    loss_log = []
    steps_log = []

    for episode in range(MAX_EPISODES):
        
        exp = experience()
        state_t0 = env.reset()
        epsilon = 1.0/((episode/10)+1)
        done = False
        step = 0
         
        #
        while done == False:
            #env.render()
            action = agent.predict_action(env_action_sample=env.action_space.sample(),state=state_t0,epsilon= epsilon)
                
            state_t1, reward, done, info = env.step(action)            
            if done:
                reward -= 1
                
            exp.append(statet0=state_t0,action=action,reward=reward,statet1=state_t1,done=done)
            state_t0 = state_t1
            step += 1
        
        #
        print('episode ',episode,' - steps:', step)
        agent.store_experience(state_t0=exp.statet0,action=exp.action,reward=exp.reward,state_t1=exp.statet1,done=exp.done)
        steps_log.append(step)
        
        if episode >= TRAINING_START_EP and episode % TRAINING_PERIOD == 0 and step < MAX_EPISODES_STEP:
            loss = 0
            for _ in range(50):
                loss += agent.learn(batch_size=10)               
            loss /= 50
            loss_log.append(loss)
            print('loss ..... ',loss)
            
            agent.update_targetDQN()
        
        if step >= MAX_EPISODES_STEP:
            training_done_count += 1
        else:
            training_done_count = 0
            
        if training_done_count >= 10: 
            print('training_done')
            break
        
    #
    print('loss')
    plt.bar(range(len(loss_log)), loss_log, color="red")
    plt.show()
    print('steps')
    plt.bar(range(len(steps_log)), steps_log, color="green")
    plt.show()


if __name__ == "__main__":
env = gym.make('CartPole-v0')
    env._max_episode_steps = MAX_EPISODES_STEP+2

    agent= DQNAgent(state_size = env.observation_space.shape[0],num_actions=env.action_space.n,hidden_layer_units=10)
    
    training(env,agent)
    
    env.close()


위 코드를 사용하여 학습한 결과는 아래와 같다.



이전 정책 경사(policy gradient)를 이용한 학습보다 빠르게 학습되는 것을 볼 수 있다. 


Discounted Reward와 Normalize 

Agent 학습 시 discounted reward와 normalize 여부에 따라 학습 속도와 능력에 많은 차이를 가져온다. 아래처럼 보상 감쇠와 정규화를 사용하지 않으면 사용한 경우에 비해 학습 속도가 느리거나 아예 학습을 실패하는 사례가 많아진다.




DQN(Deep Q-Network) Agent를 사용한 Atari Space Invaders

Space Invaders 학습 코드는 아래와 같다. 

import gym
import time
import numpy as np
from RyanDQNAgent import experience,DQNAgent
import matplotlib.pyplot as plt

MAX_EPISODES = 5000
MAX_EPISODES_STEP = 10000
TRAINING_START_EP = 10
TRAINING_PERIOD = 5

def training(env,agent):
    
    if env == None or agent == None:
        return
    
    training_done_count = 0
    rewards_log = []
    steps_log = []
    loss_log = []

    for episode in range(1,MAX_EPISODES+1,1):
        
        exp = experience()
        episode_reward = 0
        ale_lives = 3
        state_t0 = env.reset()
        step = 0
        epsilon = 1.0/((episode/10.0)+1.0)

        while step<=MAX_EPISODES_STEP:
            #env.render()
            step += 1
            action = agent.predict_action(env_action_sample=env.action_space.sample(),state=state_t0,epsilon= epsilon)
            state_t1, reward, done, info = env.step(action)
            episode_reward += reward
            
            if info['ale.lives'] != ale_lives or done:
                ale_lives = info['ale.lives']
                reward -= 1
                exp.append(statet0=state_t0,action=action,reward=reward,statet1=state_t1,done=done)
                agent.store_experience(state_t0=exp.statet0,action=exp.action,reward=exp.reward,state_t1=exp.statet1,done=exp.done)
                exp.reset()
            else:
                exp.append(statet0=state_t0,action=action,reward=reward,statet1=state_t1,done=done)
                        
            state_t0 = state_t1 # update new state
            
            if done:
                break
        
        #
        print('episode ',episode,' - steps: ', step, ' episode_reward: ',episode_reward)
        steps_log.append(step)
        rewards_log.append(episode_reward)
        
        if episode >= TRAINING_START_EP and episode % TRAINING_PERIOD == 0 and step < MAX_EPISODES_STEP:
            loss = 0
            for _ in range(50):
                loss += agent.learn(batch_size=10)               
            loss /= 50
            loss_log.append(loss)
            print('loss ..... ',loss)
            
            agent.update_targetDQN()
        
        if step >= MAX_EPISODES_STEP:
            training_done_count += 1
        else:
            training_done_count = 0
            
        if training_done_count >= TRAINING_PERIOD: 
            print('training_done')
            break    
    #
    print('loss')
    plt.bar(range(len(loss_log)), loss_log, color="red")
    plt.show()
    print('steps')
    plt.bar(range(len(steps_log)), steps_log, color="green")
    plt.show()
    print('reward')
    plt.bar(range(len(rewards_log)), rewards_log, color="blue")
    plt.show()

if __name__ == "__main__":
    
    env = gym.make("SpaceInvaders-ram-v0")

    agent= DQNAgent(state_size = env.observation_space.shape[0],num_actions=env.action_space.n,hidden_layer_units=256)
   
    training(env,agent)
    
    env.close()


위 코드를 이용한 학습 결과는 아래와 같다. 


이전 정책 경사(policy gradient)와 마찬가지로 DQN에서도 학습이 이루어지지 않았다.

Cartpole에서 성공한 것과는 대조적이다. 학습 성공에 다른 테크닉이 필요해 보인다. 실패의 원인이라고 생각되는 것이 있지만, 확실하진 않다. 그래도 policy gradient 보다 학습 성능이 좋아 보인다. 추후 별도의 포스트에서 학습에 성공해 성공 사례를 올리도록 하겠다. 


[관련 포스트]

댓글

이 블로그의 인기 게시물

간단한 cfar 알고리즘에 대해

windows에서 간단하게 크롬캐스트(Chromecast)를 통해 윈도우 화면 미러링 방법

base64 인코딩 디코딩 예제 c 소스

아두이노(arduino) 심박센서 (heart rate sensor) 심박수 측정 example code

python asyncio를 이용한 async socket server client example code