강화학습 (reinforcement learning) 정책 학습과 가치 학습 비교 - 정책 경사 (policy gradient)를 이용한 Atari Space Invaders

정책 학습과 가치 학습

강화학습 (reinforcement learning)에서는 Agent를 정책 학습과 가치 학습 두가지 범주의 방식으로 학습시킨다고 한다. 정책 학습은 보상(reward)를 최대화하는 정책(policy)를 직접 학습하고, 가치 학습은 상태(status)와 행동(action)의 모든 가치(value)를 학습한다.

본 포스트에서는 gym을 이용한 Atari 게임 Space Invaders 환경에서 정책 경사를 이용해 정책을 학습하는 Agent를 만들고 학습이 잘 이루어지는지 테스트하였다.


정책 경사 (policy gradient)를 이용한 Atari 게임 Space Invaders 학습

openai gym Atari 게임 Space Invaders


openai gum 설치 후 여러 게임 환경을 불러올 수 있다. 그중 본 포스트에서는 Space Invaders 사용했다.


import gym

env = gym.make("SpaceInvaders-v0")
or 
env = gym.make("SpaceInvaders-ram-v0")

위의 코드를 사용해 Space Invaders 게임 환경을 불러올 수 있다. 'SpaceInvaders-v0'는 (210,160,3) 형태의 이미지 데이터를, 'SpaceInvaders-ram-v0'는 128바이트 형태의 데이터를 observation으로 반환한다. 본 포스트에서는 'SpaceInaders-ram-v0'를 사용하였다.


ipdb> observation
array([[[ 0,  0,  0],
        [ 0,  0,  0],
        [ 0,  0,  0],
        ...,
        [ 0,  0,  0],
        [ 0,  0,  0],
        [ 0,  0,  0]],

       [[ 0,  0,  0],
        [ 0,  0,  0],
        [ 0,  0,  0],
        ...,
        [ 0,  0,  0],
        [ 0,  0,  0],
        [ 0,  0,  0]],

       [[ 0,  0,  0],
        [ 0,  0,  0],
        [ 0,  0,  0],
        ...,
        [ 0,  0,  0],
        [ 0,  0,  0],
        [ 0,  0,  0]],

       ...,

       [[80, 89, 22],
        [80, 89, 22],
        [80, 89, 22],
        ...,
        [80, 89, 22],
        [80, 89, 22],
        [80, 89, 22]],

       [[80, 89, 22],
        [80, 89, 22],
        [80, 89, 22],
        ...,
        [80, 89, 22],
        [80, 89, 22],
        [80, 89, 22]],

       [[80, 89, 22],
        [80, 89, 22],
        [80, 89, 22],
        ...,
        [80, 89, 22],
        [80, 89, 22],
        [80, 89, 22]]], dtype=uint8)

ipdb> observation.shape
(210, 160, 3)


ipdb> observation
array([  0,   7,   0,  68, 241, 162,  34, 183,  68,  13, 124, 255, 255,
        50, 255, 255,   0,  36,  63,  63,  63,  63,  63,  63,  82,   0,
        23,  43,  35, 117, 180,   0,  36,  63,  63,  63,  63,  63,  63,
       110,   0,  23,   1,  60, 126, 126, 126, 126, 255, 255, 255, 195,
        60, 126, 126, 126, 126, 255, 255, 255, 195,  60, 126, 126, 126,
       126, 255, 255, 255, 195,   0,   0,  48,   3, 129,   0,   0,   0,
         0,   0,   0, 246, 246,  63,  63, 246, 246,  63,  63,   0,  21,
        24,   0,  52,  82, 196, 246,  20,   7,   0, 226,   0,   0,   0,
         0,   0,  21,  63,   0, 128, 171,   0, 255,   0, 189,   0,   0,
         0,   0,   0,  99, 255,   0,   0, 235, 254, 192, 242], dtype=uint8)

ipdb> observation.shape
(128,)


episode 상태 데이터

학습은 게임 에피소드가 끝난 후 진행하도록 했다. 그간의 에피소드의 데이터 observation, action, reward들은 별도의 class를 만들어 저장했다.

class EpisodeData:
    
    def __init__(self):
        self.observations = []
        self.actions = []
        self.rewards = []
        
    def store(self, observation, action, reward):
        self.observations.append(observation)
        self.actions.append(action)
        self.rewards.append(reward)
        
    def add(self, observations, actions, rewards):
        self.observations += observations
        self.observations += actions
        self.observations += self.get_discount_rewards(rewards)
    
    def get_discount_rewards(self, rewards, gamma=0.95):
        discounted_returns = [0 for _ in rewards]
        cumulative = 0
        for t in reversed(range(len(self.rewards))):
            cumulative = cumulative * gamma + rewards[t]
            discounted_returns[t] = cumulative
        
        if np.std(discounted_returns) == 0:
            return discounted_returns
        
        discounted_returns -= np.mean(discounted_returns)
        discounted_returns /= np.std(discounted_returns)
            
        return discounted_returns
    
    def reset(self):
        self.observations = []
        self.actions = []
        self.rewards = []


위 class에서는 보상(reward) 데이터를 시간에 따라 감소하는 discounted reward 함수도 추가되어 있다.


감쇠 된 미래 수익/보상 discounted future reward

어떠한 한 행동(Action)이 보상(reward)을 결정 짓는 것은 아니다. 보상을 얻기 전 일련의 행동들 모두가 이 보상들과 관련되어 있다. 행동의 즉각적인 보상 뿐만 아니라 행동이 미래에 가져올 보상도 더해줄 필요가 있다. 이를 미래 수익이라고 한다. 



하지만, 위 그림의 가운데처럼 단순히 미래 수익을 더해주기만 한다면, 너무 먼 미래의 수익에만 의존하거나, 어느 정도의 시간에 미래 수익/보상을 바라볼 지에 대한 처리 시간에 대한 문제가 발생한다. 이에 위 그림의 오른쪽처럼 먼 미래의 수익은 감쇠하면 너무 먼 미래 수익을 바라보는 문제와 시간처리에 대한 문제를 해결할 수 있다. 이를 감쇠 된 미래 수익(discounted future reward)이라고 한다. 감쇠 파라메터 gamma는 일반적으로 0.99~0.97 사이로 설정한다고 한다. 



정책 경사 (policy gradient) 에이전트

정책 경사(policy gradient)역시 일반적인 딥러닝에서의 경사 하강법을 그대로 사용할 수 있다고 한다. 


위 공식을 이용해 손실 함수를 구성하였다. 


class PolicyGradientAgent:      
    def build_model(self):
        with tf.name_scope('ryan_pgagent'):
            self.observation_input = tf.placeholder(shape=[None, self.observation_size],
                                              dtype=tf.float32, name='observation')
            self.action_input = tf.placeholder(shape=[None],
                                         dtype=tf.int32, 
                                         name='action')
            self.discounted_rewards_input = tf.placeholder(shape=[None],
                                                     dtype=tf.float32,
                                                     name='discounted_rewards')

            self.h0 = tf.layers.dense(inputs=self.observation_input, 
                                      units=self.hidden_layer_unit,
                                      activation=tf.nn.relu,
                                      name='h0')
            self.h1 = tf.layers.dense(inputs=self.h0,
                                      units=self.hidden_layer_unit, 
                                      activation=tf.nn.relu,
                                      name='h1')
            self.h2 = tf.layers.dense(inputs=self.h1,
                                      units=self.hidden_layer_unit, 
                                      activation=tf.nn.relu,
                                      name='h2')
            self.h3 = tf.layers.dense(inputs=self.h2,
                                      units=self.hidden_layer_unit, 
                                      activation=tf.nn.relu,
                                      name='h3')
            self.out = tf.layers.dense(inputs=self.h3,
                                       units=self.num_actions,
                                       activation=None,
                                       name='out')
            # Softmax outputs
            self.out_softmax = tf.nn.softmax(self.out, name='out_softmax')
        
        neg_log_prob = tf.nn.sparse_softmax_cross_entropy_with_logits(logits=self.out, labels=self.action_input)
        self.loss = tf.reduce_mean(neg_log_prob * self.discounted_rewards_input)
        self.train_op = tf.train.AdamOptimizer(learning_rate=self.learning_rate).minimize(self.loss)    
    

위 코드는 딥러닝 모델을 만드는 함수다. 입력은 observation, action, discounted reward 3가지며, 손실 함수는 self.loss와 같이 구성할 수 있다. self.out은 학습 과정 중 신경망의 출력으로 나오는 action이고, self.action은 환경에서 행해진 action이다. 손실 최소화는 AdamOptimizer를 사용하였다.


class PolicyGradientAgent:

    def epsilon_greedy_action(self, action, epsilon):
        if random.random() < epsilon:
            return np.argmax(np.random.random(action.shape))
        return np.random.choice(range(len(action.ravel())), p=action.ravel())
        
    def get_action(self,observation,epsilon):
        observation = observation[np.newaxis, :]
        predicted_action = self.sess.run(self.out_softmax, feed_dict = {self.observation_input: observation})
        return self.epsilon_greedy_action(predicted_action, epsilon) 

에이전트가 지역 최소화에 빠지는 것을 방지하고자 get_action함수에서 엡실론 그리디를 활용했다.

policy gradient agent의 나머지 소스 코드는 아래와 같다. 


import tensorflow as tf
import numpy as np
import random

class PolicyGradientAgent:
    
    def __init__(self, observation_size, num_actions,learning_rate=0.01, hidden_layer_unit=10):
        tf.reset_default_graph()
        self.observation_size = observation_size
        self.num_actions = num_actions
        self.learning_rate = learning_rate
        self.hidden_layer_unit = hidden_layer_unit
        self.data = EpisodeData()    
        self.build_model()
        self.sess = tf.Session()   
        self.sess.run(tf.global_variables_initializer())
            
    def store(self, observation, action, reward):
        self.data.store(observation=observation,action=action,reward=reward)
    
    def learn(self,gamma=0.95):
        # discount and normalize reward
        discounted_rewards = self.data.get_discount_rewards(self.data.rewards,gamma=gamma)
        
        # train
        _,loss = self.sess.run([self.train_op,self.loss], feed_dict={
             self.observation_input: np.array(self.data.observations),
             self.action_input: np.array(self.data.actions),
             self.discounted_rewards_input: discounted_rewards})
    
        # reset the data
        self.data.reset()
        return loss



정책 경사(policy gradient) 에이전트 학습 함수

학습 함수는 아래와 같다.

2000회의 에피소드를 실행하며 학습한다. 에피소드 데이터는 별도로 모아두었다. 각 에피소드가 끝날 때마다 agent에 넘겨준다. 매 5회의 에피소드가 끝날때 그때까지 모인 observation, action, reward데이터를 사용하여 agent를 학습시킨다.


import gym
import time
import numpy as np
from RyanPolicyGradientAgent import PolicyGradientAgent,EpisodeData
import matplotlib.pyplot as plt

MAX_EPISODES = 2000
MAX_EPISODES_STEP = 5000

def training(env,agent):
    
    if env == None or agent == None:
        return
    
    training_done_count = 0
    rewards_log = []
    steps_log = []
    loss_log = []

    for episode in range(MAX_EPISODES):
        
        print('episode ',episode)
        step = 0
        reward_sum = 0
        ale_lives = 3
        epsilon = 1.0/((episode/1.0)+1.0)
        observation = env.reset()

        while (ale_lives > 0 and step <= MAX_EPISODES_STEP):
            step += 1
            #env.render()
            action = agent.get_action(observation,epsilon)
            observation_, reward, done, info = env.step(action)
      
            reward_sum += reward
            
            if info['ale.lives'] != ale_lives:
                ale_lives = info['ale.lives']
            
            agent.store(observation=observation, action=action, reward=reward)        
            observation = observation_
                            
            if done or step >= MAX_EPISODES_STEP:
                loss = agent.learn()   
                loss_log.append(loss)
                rewards_log.append(reward_sum)
                steps_log.append(step)
                print('steps ', step,' reward sum: ',reward_sum, ' loss:',loss)
                break;
            
           
        if ale_lives > 0:
            training_done_count += 1
        else:
            training_done_count = 0
        
        if training_done_count > 10:
            print('training done')
            break
    
    #
    print('loss')
    plt.bar(range(len(loss_log)), loss_log, color="red")
    plt.show()
    print('steps')
    plt.bar(range(len(steps_log)), steps_log, color="green")
    plt.show()
    print('reward')
    plt.bar(range(len(rewards_log)), rewards_log, color="blue")
    plt.show()



위 학습함수는 아래 코드처럼 env와 agent를 만들어 구동한다. 


env = gym.make("SpaceInvaders-ram-v0")
observation = env.reset()
agent= PolicyGradientAgent(observation_size=observation.shape[0],
                    num_actions=env.action_space.n,learning_rate=0.01,hidden_layer_unit=10)
training(env,agent)
env.close()


정책 경사(policy gradient) 에이전트 학습 결과



구현된 코드를 사용하여 학습을 진행하면 위 그림과 같은 결과를 얻을 수 있다. 학습이 이루어지지 않는 것처럼 보인다. 학습에 사용되는 episode 수를 늘린다고 해도 학습될 것 같지 않다. 

단순히 결과를 보면 정책 경사(policy gradient) 학습법으로는 Space Invaders 게임의 Agent를 만들 수 없어 보인다. 

물론 구현상의 문제일 수도 있다. Q-learning을 먼저 구현 테스트 해보고 policy gradient agent 구현 내용을 다시 살펴보자.



정책 경사(policy gradient) 에이전트를 이용한 CartPole-v0

Space Invaders 게임 학습 결과가 기대에 미치지 않아 CartPole 환경에서 학습 테스트를 진행했다. 약간의 버그를 발견해 수정했다. (포스트의 코드도 수정했다.)

CartPole 환경의 학습 코드는 아래와 같다. 


import gym
import time
import numpy as np
from RyanPolicyGradientAgent import PolicyGradientAgent
import matplotlib.pyplot as plt

MAX_EPISODES = 5000
MAX_EPISODES_STEP = 5000

def training(env,agent):
    
    if env == None or agent == None:
        return
    
    training_done_count = 0
    rewards_log = []
    steps_log = []
    loss_log = []
    
    for episode in range(MAX_EPISODES):
        print('episode ',episode)
        observation = env.reset()
        episode_reward = 0
        epsilon = 1.0/((episode/2.0)+1.0)
        
        for step in range(1,MAX_EPISODES_STEP+1,1):
            action = agent.get_action(observation,epsilon)
            observation_, reward, done, info = env.step(action)
    
            agent.store(observation=observation, action=action, reward=reward)
            
            episode_reward += reward
            observation = observation_
                
            if done or step>=MAX_EPISODES_STEP:
                
                if step>=MAX_EPISODES_STEP:
                    training_done_count += 1    
                else:
                    training_done_count = 0
                    
                loss = agent.learn()
                rewards_log.append(episode_reward)
                steps_log.append(step)
                loss_log.append(loss)
                
                print(' - steps:', step,' reward sum: ',episode_reward,' loss: ',loss)
                break
            
        if training_done_count > 10:
            print('training done')
            break
    
    #
    print('loss')
    plt.bar(range(len(loss_log)), loss_log, color="red")
    plt.show()
    print('steps')
    plt.bar(range(len(steps_log)), steps_log, color="green")
    plt.show()
    print('reward')
    plt.bar(range(len(rewards_log)), rewards_log, color="blue")
    plt.show()



env = gym.make('CartPole-v0')
env._max_episode_steps = MAX_EPISODES_STEP+2

agent= PolicyGradientAgent(observation_size=env.observation_space.shape[0],num_actions=env.action_space.n,hidden_layer_unit=10)

training(env,agent)

env.close()


학습 결과는 아래와 같다. CartPole환경에서는 학습이 잘되는 것을 볼 수 있다. 




[관련 포스트]


댓글

이 블로그의 인기 게시물

간단한 cfar 알고리즘에 대해

windows에서 간단하게 크롬캐스트(Chromecast)를 통해 윈도우 화면 미러링 방법

base64 인코딩 디코딩 예제 c 소스

아두이노(arduino) 심박센서 (heart rate sensor) 심박수 측정 example code

python asyncio를 이용한 async socket server client example code