Reinforcement Learning: From Q-Learning to Deep RL
Reinforcement Learning (RL) is a type of machine learning where an agent learns to make decisions by interacting with an environment. From game-playing AI to autonomous robots, RL has achieved remarkable breakthroughs in artificial intelligence.
What is Reinforcement Learning?
Reinforcement Learning is a learning paradigm where an agent learns optimal behavior through trial and error by receiving rewards or penalties for actions taken in an environment. The goal is to maximize cumulative rewards over time.
Key Components
- Agent: The learner or decision maker
- Environment: The world in which the agent operates
- State: Current situation of the environment
- Action: What the agent can do
- Reward: Feedback from the environment
- Policy: Strategy for selecting actions
Basic Concepts
Markov Decision Process (MDP)
An MDP is a mathematical framework for modeling decision-making in environments where outcomes are partially random and partially under the control of a decision maker.
import numpy as np
class MDP:
def __init__(self, states, actions, transitions, rewards, gamma=0.9):
self.states = states
self.actions = actions
self.transitions = transitions # P(s'|s,a)
self.rewards = rewards # R(s,a,s')
self.gamma = gamma # Discount factor
def get_next_state(self, state, action):
"""Get next state based on transition probabilities"""
probs = self.transitions[state][action]
return np.random.choice(len(self.states), p=probs)
def get_reward(self, state, action, next_state):
"""Get reward for transition"""
return self.rewards[state][action][next_state]
Value Functions
def value_iteration(mdp, epsilon=0.01, max_iterations=1000):
"""Value iteration algorithm"""
V = {state: 0 for state in mdp.states}
for i in range(max_iterations):
V_new = {}
delta = 0
for state in mdp.states:
v = V[state]
# Calculate value for all actions
action_values = []
for action in mdp.actions:
value = 0
for next_state in mdp.states:
prob = mdp.transitions[state][action][next_state]
reward = mdp.rewards[state][action][next_state]
value += prob * (reward + mdp.gamma * V[next_state])
action_values.append(value)
V_new[state] = max(action_values)
delta = max(delta, abs(v - V_new[state]))
V = V_new
if delta < epsilon:
break
return V
def extract_policy(mdp, V):
"""Extract optimal policy from value function"""
policy = {}
for state in mdp.states:
action_values = []
for action in mdp.actions:
value = 0
for next_state in mdp.states:
prob = mdp.transitions[state][action][next_state]
reward = mdp.rewards[state][action][next_state]
value += prob * (reward + mdp.gamma * V[next_state])
action_values.append(value)
policy[state] = np.argmax(action_values)
return policy
Q-Learning
Q-Learning is a model-free reinforcement learning algorithm that learns the quality of actions, telling an agent what action to take under what circumstances.
class QLearningAgent:
def __init__(self, states, actions, learning_rate=0.1, gamma=0.9, epsilon=0.1):
self.states = states
self.actions = actions
self.learning_rate = learning_rate
self.gamma = gamma
self.epsilon = epsilon
self.Q = {}
# Initialize Q-table
for state in states:
self.Q[state] = {}
for action in actions:
self.Q[state][action] = 0.0
def choose_action(self, state):
"""Choose action using epsilon-greedy policy"""
if np.random.random() < self.epsilon:
return np.random.choice(self.actions)
else:
return max(self.Q[state], key=self.Q[state].get)
def learn(self, state, action, reward, next_state):
"""Update Q-value using Q-learning update rule"""
old_value = self.Q[state][action]
next_max = max(self.Q[next_state].values())
new_value = (1 - self.learning_rate) * old_value + \
self.learning_rate * (reward + self.gamma * next_max)
self.Q[state][action] = new_value
def get_policy(self):
"""Extract policy from Q-table"""
policy = {}
for state in self.states:
policy[state] = max(self.Q[state], key=self.Q[state].get)
return policy
Q-Learning Example: Grid World
class GridWorld:
def __init__(self, width, height, start, goal, obstacles=None):
self.width = width
self.height = height
self.start = start
self.goal = goal
self.obstacles = obstacles or []
self.current_state = start
def reset(self):
"""Reset environment to start state"""
self.current_state = self.start
return self.current_state
def step(self, action):
"""Take action and return (next_state, reward, done)"""
x, y = self.current_state
# Define action effects
if action == 0: # Up
next_state = (x, y - 1)
elif action == 1: # Down
next_state = (x, y + 1)
elif action == 2: # Left
next_state = (x - 1, y)
elif action == 3: # Right
next_state = (x + 1, y)
# Check boundaries
if (next_state[0] < 0 or next_state[0] >= self.width or
next_state[1] < 0 or next_state[1] >= self.height or
next_state in self.obstacles):
next_state = self.current_state
self.current_state = next_state
# Calculate reward
if next_state == self.goal:
reward = 100
done = True
else:
reward = -1
done = False
return next_state, reward, done
def train_q_learning_agent(env, agent, episodes=1000):
"""Train Q-learning agent"""
for episode in range(episodes):
state = env.reset()
total_reward = 0
while True:
action = agent.choose_action(state)
next_state, reward, done = env.step(action)
agent.learn(state, action, reward, next_state)
total_reward += reward
state = next_state
if done:
break
if episode % 100 == 0:
print(f"Episode {episode}, Total Reward: {total_reward}")
return agent
Policy Gradient Methods
Policy gradient methods directly optimize the policy by following the gradient of expected reward.
import torch
import torch.nn as nn
import torch.optim as optim
class PolicyNetwork(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(PolicyNetwork, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, output_size)
self.relu = nn.ReLU()
self.softmax = nn.Softmax(dim=-1)
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.softmax(self.fc2(x))
return x
class PolicyGradientAgent:
def __init__(self, state_size, action_size, hidden_size=64, lr=0.01):
self.policy_net = PolicyNetwork(state_size, hidden_size, action_size)
self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
self.action_size = action_size
def choose_action(self, state):
"""Choose action using current policy"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs = self.policy_net(state_tensor)
action_dist = torch.distributions.Categorical(action_probs)
action = action_dist.sample()
return action.item(), action_dist.log_prob(action)
def update_policy(self, rewards, log_probs):
"""Update policy using REINFORCE algorithm"""
# Calculate discounted rewards
discounted_rewards = []
R = 0
for r in reversed(rewards):
R = r + 0.99 * R
discounted_rewards.insert(0, R)
# Normalize rewards
discounted_rewards = torch.FloatTensor(discounted_rewards)
discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / \
(discounted_rewards.std() + 1e-9)
# Calculate loss
log_probs = torch.stack(log_probs)
loss = -(log_probs * discounted_rewards).sum()
# Update policy
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
return loss.item()
Deep Q-Network (DQN)
DQN combines Q-learning with deep neural networks to handle high-dimensional state spaces.
class DQN(nn.Module):
def __init__(self, input_size, hidden_size, output_size):
super(DQN, self).__init__()
self.fc1 = nn.Linear(input_size, hidden_size)
self.fc2 = nn.Linear(hidden_size, hidden_size)
self.fc3 = nn.Linear(hidden_size, output_size)
self.relu = nn.ReLU()
def forward(self, x):
x = self.relu(self.fc1(x))
x = self.relu(self.fc2(x))
x = self.fc3(x)
return x
class DQNAgent:
def __init__(self, state_size, action_size, hidden_size=64, lr=0.001):
self.state_size = state_size
self.action_size = action_size
self.memory = []
self.gamma = 0.95
self.epsilon = 1.0
self.epsilon_min = 0.01
self.epsilon_decay = 0.995
self.learning_rate = lr
self.model = DQN(state_size, hidden_size, action_size)
self.target_model = DQN(state_size, hidden_size, action_size)
self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
self.update_target_model()
def update_target_model(self):
"""Update target network"""
self.target_model.load_state_dict(self.model.state_dict())
def remember(self, state, action, reward, next_state, done):
"""Store experience in memory"""
self.memory.append((state, action, reward, next_state, done))
def act(self, state):
"""Choose action using epsilon-greedy policy"""
if np.random.random() <= self.epsilon:
return np.random.randint(self.action_size)
state_tensor = torch.FloatTensor(state).unsqueeze(0)
act_values = self.model(state_tensor)
return torch.argmax(act_values).item()
def replay(self, batch_size):
"""Train on batch of experiences"""
if len(self.memory) < batch_size:
return
minibatch = np.random.choice(len(self.memory), batch_size, replace=False)
states = torch.FloatTensor([self.memory[i][0] for i in minibatch])
actions = torch.LongTensor([self.memory[i][1] for i in minibatch])
rewards = torch.FloatTensor([self.memory[i][2] for i in minibatch])
next_states = torch.FloatTensor([self.memory[i][3] for i in minibatch])
dones = torch.BoolTensor([self.memory[i][4] for i in minibatch])
current_q_values = self.model(states).gather(1, actions.unsqueeze(1))
next_q_values = self.target_model(next_states).max(1)[0].detach()
target_q_values = rewards + (self.gamma * next_q_values * ~dones)
loss = nn.MSELoss()(current_q_values.squeeze(), target_q_values)
self.optimizer.zero_grad()
loss.backward()
self.optimizer.step()
if self.epsilon > self.epsilon_min:
self.epsilon *= self.epsilon_decay
return loss.item()
Actor-Critic Methods
Actor-Critic methods combine the benefits of policy gradient and value function methods.
class ActorCritic(nn.Module):
def __init__(self, state_size, action_size, hidden_size=64):
super(ActorCritic, self).__init__()
self.actor = nn.Sequential(
nn.Linear(state_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, action_size),
nn.Softmax(dim=-1)
)
self.critic = nn.Sequential(
nn.Linear(state_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, 1)
)
def forward(self, x):
return self.actor(x), self.critic(x)
class ActorCriticAgent:
def __init__(self, state_size, action_size, hidden_size=64, lr=0.001):
self.model = ActorCritic(state_size, action_size, hidden_size)
self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
self.gamma = 0.99
def choose_action(self, state):
"""Choose action using actor network"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs, _ = self.model(state_tensor)
action_dist = torch.distributions.Categorical(action_probs)
action = action_dist.sample()
return action.item(), action_dist.log_prob(action)
def update(self, state, action, reward, next_state, done):
"""Update actor and critic networks"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)
# Get current values
action_probs, value = self.model(state_tensor)
action_dist = torch.distributions.Categorical(action_probs)
log_prob = action_dist.log_prob(torch.tensor([action]))
# Get next value
_, next_value = self.model(next_state_tensor)
# Calculate advantage
if done:
advantage = reward - value.item()
else:
advantage = reward + self.gamma * next_value.item() - value.item()
# Calculate losses
actor_loss = -log_prob * advantage
critic_loss = nn.MSELoss()(value, torch.tensor([reward + self.gamma * next_value.item()]))
total_loss = actor_loss + critic_loss
# Update networks
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
return total_loss.item()
Advanced RL Algorithms
Proximal Policy Optimization (PPO)
class PPOAgent:
def __init__(self, state_size, action_size, hidden_size=64, lr=0.0003, clip_ratio=0.2):
self.policy_net = PolicyNetwork(state_size, hidden_size, action_size)
self.value_net = nn.Sequential(
nn.Linear(state_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, 1)
)
self.optimizer = optim.Adam([
{'params': self.policy_net.parameters(), 'lr': lr},
{'params': self.value_net.parameters(), 'lr': lr}
])
self.clip_ratio = clip_ratio
self.gamma = 0.99
def compute_advantages(self, rewards, values, dones):
"""Compute advantages using GAE"""
advantages = []
gae = 0
for i in reversed(range(len(rewards))):
if i == len(rewards) - 1:
next_value = 0
else:
next_value = values[i + 1]
delta = rewards[i] + self.gamma * next_value * (1 - dones[i]) - values[i]
gae = delta + self.gamma * 0.95 * gae * (1 - dones[i])
advantages.insert(0, gae)
return torch.FloatTensor(advantages)
def update(self, states, actions, old_log_probs, rewards, dones):
"""Update policy using PPO"""
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions)
old_log_probs = torch.stack(old_log_probs)
# Compute values and advantages
values = self.value_net(states).squeeze()
advantages = self.compute_advantages(rewards, values.detach().numpy(), dones)
advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
# PPO update
for _ in range(10): # Multiple epochs
action_probs = self.policy_net(states)
action_dist = torch.distributions.Categorical(action_probs)
new_log_probs = action_dist.log_prob(actions)
ratio = torch.exp(new_log_probs - old_log_probs)
surr1 = ratio * advantages
surr2 = torch.clamp(ratio, 1 - self.clip_ratio, 1 + self.clip_ratio) * advantages
policy_loss = -torch.min(surr1, surr2).mean()
value_loss = nn.MSELoss()(values, torch.FloatTensor(rewards))
total_loss = policy_loss + 0.5 * value_loss
self.optimizer.zero_grad()
total_loss.backward()
self.optimizer.step()
Soft Actor-Critic (SAC)
class SACAgent:
def __init__(self, state_size, action_size, hidden_size=256, lr=0.0003):
self.state_size = state_size
self.action_size = action_size
# Networks
self.actor = PolicyNetwork(state_size, hidden_size, action_size)
self.critic1 = nn.Sequential(
nn.Linear(state_size + action_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, 1)
)
self.critic2 = nn.Sequential(
nn.Linear(state_size + action_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, hidden_size),
nn.ReLU(),
nn.Linear(hidden_size, 1)
)
# Optimizers
self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
self.critic1_optimizer = optim.Adam(self.critic1.parameters(), lr=lr)
self.critic2_optimizer = optim.Adam(self.critic2.parameters(), lr=lr)
self.alpha = 0.2 # Temperature parameter
self.gamma = 0.99
def choose_action(self, state):
"""Choose action using current policy"""
state_tensor = torch.FloatTensor(state).unsqueeze(0)
action_probs = self.actor(state_tensor)
action_dist = torch.distributions.Categorical(action_probs)
action = action_dist.sample()
return action.item(), action_dist.log_prob(action)
def update(self, states, actions, rewards, next_states, dones):
"""Update SAC networks"""
states = torch.FloatTensor(states)
actions = torch.LongTensor(actions)
rewards = torch.FloatTensor(rewards)
next_states = torch.FloatTensor(next_states)
dones = torch.BoolTensor(dones)
# One-hot encode actions
actions_one_hot = torch.zeros(actions.size(0), self.action_size)
actions_one_hot.scatter_(1, actions.unsqueeze(1), 1)
# Update critics
current_q1 = self.critic1(torch.cat([states, actions_one_hot], dim=1))
current_q2 = self.critic2(torch.cat([states, actions_one_hot], dim=1))
with torch.no_grad():
next_action_probs = self.actor(next_states)
next_action_dist = torch.distributions.Categorical(next_action_probs)
next_actions = next_action_dist.sample()
next_log_probs = next_action_dist.log_prob(next_actions)
next_actions_one_hot = torch.zeros(next_actions.size(0), self.action_size)
next_actions_one_hot.scatter_(1, next_actions.unsqueeze(1), 1)
next_q1 = self.critic1(torch.cat([next_states, next_actions_one_hot], dim=1))
next_q2 = self.critic2(torch.cat([next_states, next_actions_one_hot], dim=1))
next_q = torch.min(next_q1, next_q2)
target_q = rewards + self.gamma * (1 - dones.float()) * \
(next_q - self.alpha * next_log_probs.unsqueeze(1))
critic1_loss = nn.MSELoss()(current_q1, target_q)
critic2_loss = nn.MSELoss()(current_q2, target_q)
self.critic1_optimizer.zero_grad()
critic1_loss.backward()
self.critic1_optimizer.step()
self.critic2_optimizer.zero_grad()
critic2_loss.backward()
self.critic2_optimizer.step()
# Update actor
action_probs = self.actor(states)
action_dist = torch.distributions.Categorical(action_probs)
new_actions = action_dist.sample()
new_log_probs = action_dist.log_prob(new_actions)
new_actions_one_hot = torch.zeros(new_actions.size(0), self.action_size)
new_actions_one_hot.scatter_(1, new_actions.unsqueeze(1), 1)
new_q1 = self.critic1(torch.cat([states, new_actions_one_hot], dim=1))
new_q2 = self.critic2(torch.cat([states, new_actions_one_hot], dim=1))
new_q = torch.min(new_q1, new_q2)
actor_loss = (self.alpha * new_log_probs - new_q).mean()
self.actor_optimizer.zero_grad()
actor_loss.backward()
self.actor_optimizer.step()
Real-World Applications
Game Playing
def train_game_agent(env, agent, episodes=1000):
"""Train agent on game environment"""
for episode in range(episodes):
state = env.reset()
total_reward = 0
while True:
action = agent.choose_action(state)
next_state, reward, done, _ = env.step(action)
agent.remember(state, action, reward, next_state, done)
total_reward += reward
state = next_state
if len(agent.memory) > 32:
agent.replay(32)
if done:
break
if episode % 100 == 0:
print(f"Episode {episode}, Total Reward: {total_reward}")
return agent
Robotics
class RobotEnvironment:
def __init__(self, target_position):
self.target_position = target_position
self.current_position = np.array([0.0, 0.0])
self.max_steps = 100
def reset(self):
"""Reset robot to initial position"""
self.current_position = np.array([0.0, 0.0])
return self.current_position
def step(self, action):
"""Take action (movement in x, y direction)"""
# Apply action (normalized movement)
movement = np.array(action) * 0.1
self.current_position += movement
# Calculate distance to target
distance = np.linalg.norm(self.current_position - self.target_position)
# Calculate reward
if distance < 0.1:
reward = 100
done = True
else:
reward = -distance
done = False
return self.current_position, reward, done
Best Practices for RL Projects
1. Environment Design
- Design clear reward functions
- Ensure proper state and action spaces
- Include appropriate termination conditions
- Consider exploration vs exploitation
2. Algorithm Selection
- Q-Learning: Good for discrete action spaces
- Policy Gradient: Good for continuous action spaces
- Actor-Critic: Good balance of stability and performance
- PPO: Good for continuous control tasks
- SAC: Good for continuous action spaces with exploration
3. Hyperparameter Tuning
- Learning rate
- Discount factor (gamma)
- Exploration rate (epsilon)
- Network architecture
- Batch size
4. Training Tips
- Use experience replay for sample efficiency
- Implement target networks for stability
- Use gradient clipping to prevent exploding gradients
- Monitor training progress with metrics
- Save and load models appropriately
Common Challenges and Solutions
1. Exploration vs Exploitation
def adaptive_exploration(episode, total_episodes, initial_epsilon=1.0, final_epsilon=0.01):
"""Adaptive exploration strategy"""
epsilon = initial_epsilon - (initial_epsilon - final_epsilon) * episode / total_episodes
return max(epsilon, final_epsilon)
2. Reward Shaping
def shaped_reward(original_reward, state, next_state, target):
"""Add shaped reward to guide learning"""
distance_reward = -np.linalg.norm(next_state - target)
progress_reward = np.linalg.norm(state - target) - np.linalg.norm(next_state - target)
return original_reward + 0.1 * distance_reward + 0.5 * progress_reward
3. Experience Replay
class PrioritizedReplayBuffer:
def __init__(self, capacity=10000):
self.capacity = capacity
self.buffer = []
self.priorities = []
def add(self, experience, priority=1.0):
"""Add experience with priority"""
if len(self.buffer) >= self.capacity:
self.buffer.pop(0)
self.priorities.pop(0)
self.buffer.append(experience)
self.priorities.append(priority)
def sample(self, batch_size):
"""Sample experiences based on priorities"""
priorities = np.array(self.priorities)
probabilities = priorities / priorities.sum()
indices = np.random.choice(len(self.buffer), batch_size, p=probabilities)
return [self.buffer[i] for i in indices], indices
Conclusion
Reinforcement Learning is a powerful paradigm for training agents to make optimal decisions in complex environments. From simple Q-learning to advanced algorithms like PPO and SAC, RL has achieved remarkable success in various domains.
The key to success in RL is:
- Understanding the fundamentals of MDPs and value functions
- Choosing appropriate algorithms for your specific problem
- Designing good environments with clear reward functions
- Proper hyperparameter tuning and training strategies
- Staying updated with the latest developments
Whether you're building a game-playing AI or training a robot to navigate complex environments, the principles of reinforcement learning remain the same. Start with simple algorithms, experiment with different approaches, and gradually work your way up to more advanced techniques.
The future of reinforcement learning is incredibly promising, with applications ranging from autonomous vehicles to personalized medicine. By mastering the fundamentals and staying current with the latest developments, you'll be well-positioned to build powerful RL systems that can learn and adapt to complex real-world challenges.