Policy Gradient
To learn more about policy gradients, see
We implement the REINFORCE algorithm. This algorithm requires a full trajectory to be completed in order to begin training. If you look at the following equation, you will see why:
Notice that we always need the discounted return from the next time step. Therefore, we have to use a Monte Carlo approach and simulate an entire battle (since we are not doing temporal difference learning). While battling, we collect the rewards at each time step, then afterwards we calculate the discounted returns by starting at the last time step and moving backwards.
import numpy as np
def get_discounted_return(rewards, gamma):
running_discounted_reward = rewards[len(rewards)-1]
discounted_return = rewards.copy()
discounted_return[-1] = rewards[-1]
for t in reversed(range(len(discounted_return) - 1)):
discounted_return[t] = rewards[t] + gamma * running_discounted_reward
running_discounted_reward =+ rewards[t]
return np.array(discounted_return)
As with all continuous space RL models, we use a function approximation to map the state to the policy. For AI Arena’s initial tests, we constrain researchers to use feedforward, fully-connected, neural networks.
Supported Frameworks
We have implemented the neural network portion of the starter model in the following frameworks, including the following code snippets:
import torch
import torch.nn as nn
import torch.nn.functional as F
class PolicyGradient(torch.nn.Module):
def __init__(
self,
n_features,
n_actions,
neurons,
activation_function,
learning_rate
):
super(PolicyGradient, self).__init__()
self.n_features = n_features
self.n_actions = n_actions
self.neurons = neurons
self.activation_function = activation_function
self.learning_rate = learning_rate
self.output_activation = F.softmax
self.n_layers = len(self.neurons) + 1
self.layers = torch.nn.ModuleList()
for l in range(self.n_layers):
if l == 0:
in_dim = n_features
out_dim = neurons[l]
elif l == self.n_layers - 1:
in_dim = neurons[l-1]
out_dim = n_actions
else:
in_dim = neurons[l-1]
out_dim = neurons[l]
self.layers.append(nn.Linear(in_dim, out_dim))
self.optimizer = torch.optim.Adam(
self.parameters(),
lr = self.learning_rate
)
def policy(self, state):
current_layer = state
for l in range(self.n_layers):
if l < self.n_layers - 1:
current_layer = self.activation_function(
self.layers[l](current_layer)
)
else:
current_layer = self.output_activation(
self.layers[l](current_layer),
dim = 1
)
return current_layer
def get_loss(self, states, actions, rewards):
states = torch.tensor(states).float()
actions = torch.tensor(actions)
rewards = torch.tensor(rewards).float()
policy = self.policy(states)
actions_one_hot = F.one_hot(actions, num_classes = self.n_actions)
action_probabilities = torch.sum(policy * actions_one_hot, dim = 1)
return -torch.mean(rewards * torch.log(action_probabilities + 0.001))
def train(self, states, actions, rewards):
self.optimizer.zero_grad()
loss = self.get_loss(states, actions, rewards)
loss.backward()
self.optimizer.step()
import tensorflow as tf
tf.compat.v1.disable_eager_execution()
class PolicyGradient():
def __init__(
self,
n_features,
n_actions,
neurons,
activation_function,
learning_rate
):
self.n_features = n_features
self.n_actions = n_actions
self.neurons = neurons
self.activation_function = activation_function
self.learning_rate = learning_rate
self.output_activation = tf.nn.softmax
self.n_layers = len(self.neurons) + 1
self.layers = {}
self.parameters = {}
with tf.compat.v1.variable_scope("Policy", reuse = tf.compat.v1.AUTO_REUSE):
self.states = tf.compat.v1.placeholder(
shape = [None, self.n_features],
dtype = tf.float32,
name = "States"
)
self.actions = tf.compat.v1.placeholder(
shape = None,
dtype = tf.int32,
name = "Actions"
)
self.rewards = tf.compat.v1.placeholder(
shape = None,
dtype = tf.float32,
name = "Rewards"
)
# Create the layers
for l in range(self.n_layers):
if l == 0:
scaling_factor = tf.sqrt(2/(self.n_features + self.neurons[l]))
self.make_layer(
self.states,
self.n_features,
self.neurons[l],
scaling_factor,
l
)
elif l == self.n_layers - 1:
input_dim = self.neurons[l-1]
scaling_factor = tf.sqrt(2/(input_dim + self.n_actions))
self.make_layer(
self.layers["Layer" + str(l-1)],
input_dim,
self.n_actions,
scaling_factor,
l,
"Policy"
)
else:
input_dim = self.neurons[l-1]
scaling_factor = tf.sqrt(2/(input_dim + self.neurons[l]))
self.make_layer(
self.layers["Layer" + str(l-1)],
input_dim,
self.neurons[l],
scaling_factor,
l
)
self.policy = self.layers["Policy"]
# Define the loss function
action_probabilities = tf.reduce_sum(
self.policy * tf.one_hot(
indices = self.actions,
depth = self.n_actions),
axis = 1
)
self.loss = -tf.reduce_mean(self.rewards * tf.math.log(action_probabilities + 0.001))
# Define the optimizer
optimizer = tf.compat.v1.train.AdamOptimizer(self.learning_rate)
self.train_op = optimizer.minimize(self.loss)
self.sess = tf.compat.v1.Session()
self.sess.run(tf.compat.v1.global_variables_initializer())
def make_layer(
self,
previous_layer,
input_dim,
output_dim,
scaling_factor,
layer_number,
custom_name = ""
):
name_append = str(layer_number) + custom_name
self.parameters["Weights" + name_append] = tf.Variable(
tf.random.normal(
[input_dim, output_dim],
stddev = scaling_factor
),
name = "Weights" + name_append
)
self.parameters["Bias" + name_append] = tf.Variable(
tf.zeros([1, output_dim]) + 0.1,
name = "Bias" + name_append
)
if custom_name == "":
layer_name = "Layer" + str(layer_number)
else:
layer_name = custom_name
unactivated_layer = tf.matmul(
previous_layer,
self.parameters["Weights" + name_append]
) + self.parameters["Bias" + name_append]
if layer_number == self.n_layers - 1:
activation_function = self.output_activation
else:
activation_function = self.activation_function
if custom_name == "Policy":
self.layers["Raw Policy"] = unactivated_layer
self.layers[layer_name] = activation_function(
unactivated_layer,
name = layer_name
)
def train(self, states, actions, rewards):
feed_dict = {
self.states: states,
self.actions: actions,
self.rewards: rewards
}
self.sess.run(self.train_op, feed_dict)
import tensorflow as tf
class PolicyGradient():
def __init__(
self,
n_features,
n_actions,
neurons,
activation_function,
learning_rate
):
self.n_features = n_features
self.n_actions = n_actions
self.neurons = neurons
self.activation_function = activation_function
self.learning_rate = learning_rate
self.output_activation = tf.nn.softmax
self.n_layers = len(self.neurons) + 1
self.layers = [
tf.keras.layers.Dense(neurons[0], input_shape=(n_features,))
]
for l in range(1, self.n_layers):
if l < self.n_layers - 1:
out_dim = neurons[l]
activation = self.activation_function
else:
out_dim = n_actions
activation = tf.nn.softmax
self.layers.append(
tf.keras.layers.Dense(out_dim, activation = activation)
)
self.policy = tf.keras.models.Sequential(self.layers)
self.train_op = tf.keras.optimizers.Adam(
learning_rate = self.learning_rate
)
def get_loss(self, states, actions, rewards):
policy = self.policy(states)
actions_one_hot = tf.one_hot(indices = actions, depth = self.n_actions)
action_probabilities = tf.reduce_sum(policy * actions_one_hot, axis = 1)
return -tf.reduce_mean(rewards * tf.math.log(action_probabilities + 0.001))
def get_gradients(self, states, actions, rewards):
states = tf.convert_to_tensor(states, dtype = tf.float32)
actions = tf.convert_to_tensor(actions, dtype = tf.int32)
rewards = tf.convert_to_tensor(rewards, dtype = tf.float32)
with tf.GradientTape() as tape:
for layer in self.layers:
tape.watch(layer.variables)
loss = self.get_loss(states, actions, rewards)
parameters = [param for layer in self.layers for param in layer.variables]
gradients = tape.gradient(loss, parameters)
return gradients
def train(self, states, actions, rewards):
gradients = self.get_gradients(states, actions, rewards)
parameters = [param for layer in self.layers for param in layer.variables]
self.train_op.apply_gradients(zip(gradients, parameters))