Back to Blog

Reinforcement Learning: From Q-Learning to Deep RL

August 10, 202513 min read
#Reinforcement Learning#Deep Learning#AI#Q-Learning#Policy Gradient

Reinforcement Learning: From Q-Learning to Deep RL

Reinforcement Learning (RL) is a type of machine learning where an agent learns to make decisions by interacting with an environment. From game-playing AI to autonomous robots, RL has achieved remarkable breakthroughs in artificial intelligence.

What is Reinforcement Learning?

Reinforcement Learning is a learning paradigm where an agent learns optimal behavior through trial and error by receiving rewards or penalties for actions taken in an environment. The goal is to maximize cumulative rewards over time.

Key Components

  • Agent: The learner or decision maker
  • Environment: The world in which the agent operates
  • State: Current situation of the environment
  • Action: What the agent can do
  • Reward: Feedback from the environment
  • Policy: Strategy for selecting actions

Basic Concepts

Markov Decision Process (MDP)

An MDP is a mathematical framework for modeling decision-making in environments where outcomes are partially random and partially under the control of a decision maker.

import numpy as np

class MDP:
    def __init__(self, states, actions, transitions, rewards, gamma=0.9):
        self.states = states
        self.actions = actions
        self.transitions = transitions  # P(s'|s,a)
        self.rewards = rewards          # R(s,a,s')
        self.gamma = gamma              # Discount factor

    def get_next_state(self, state, action):
        """Get next state based on transition probabilities"""
        probs = self.transitions[state][action]
        return np.random.choice(len(self.states), p=probs)

    def get_reward(self, state, action, next_state):
        """Get reward for transition"""
        return self.rewards[state][action][next_state]

Value Functions

def value_iteration(mdp, epsilon=0.01, max_iterations=1000):
    """Value iteration algorithm"""
    V = {state: 0 for state in mdp.states}

    for i in range(max_iterations):
        V_new = {}
        delta = 0

        for state in mdp.states:
            v = V[state]
            # Calculate value for all actions
            action_values = []
            for action in mdp.actions:
                value = 0
                for next_state in mdp.states:
                    prob = mdp.transitions[state][action][next_state]
                    reward = mdp.rewards[state][action][next_state]
                    value += prob * (reward + mdp.gamma * V[next_state])
                action_values.append(value)

            V_new[state] = max(action_values)
            delta = max(delta, abs(v - V_new[state]))

        V = V_new

        if delta < epsilon:
            break

    return V

def extract_policy(mdp, V):
    """Extract optimal policy from value function"""
    policy = {}

    for state in mdp.states:
        action_values = []
        for action in mdp.actions:
            value = 0
            for next_state in mdp.states:
                prob = mdp.transitions[state][action][next_state]
                reward = mdp.rewards[state][action][next_state]
                value += prob * (reward + mdp.gamma * V[next_state])
            action_values.append(value)

        policy[state] = np.argmax(action_values)

    return policy

Q-Learning

Q-Learning is a model-free reinforcement learning algorithm that learns the quality of actions, telling an agent what action to take under what circumstances.

class QLearningAgent:
    def __init__(self, states, actions, learning_rate=0.1, gamma=0.9, epsilon=0.1):
        self.states = states
        self.actions = actions
        self.learning_rate = learning_rate
        self.gamma = gamma
        self.epsilon = epsilon
        self.Q = {}

        # Initialize Q-table
        for state in states:
            self.Q[state] = {}
            for action in actions:
                self.Q[state][action] = 0.0

    def choose_action(self, state):
        """Choose action using epsilon-greedy policy"""
        if np.random.random() < self.epsilon:
            return np.random.choice(self.actions)
        else:
            return max(self.Q[state], key=self.Q[state].get)

    def learn(self, state, action, reward, next_state):
        """Update Q-value using Q-learning update rule"""
        old_value = self.Q[state][action]
        next_max = max(self.Q[next_state].values())
        new_value = (1 - self.learning_rate) * old_value + \
                   self.learning_rate * (reward + self.gamma * next_max)
        self.Q[state][action] = new_value

    def get_policy(self):
        """Extract policy from Q-table"""
        policy = {}
        for state in self.states:
            policy[state] = max(self.Q[state], key=self.Q[state].get)
        return policy

Q-Learning Example: Grid World

class GridWorld:
    def __init__(self, width, height, start, goal, obstacles=None):
        self.width = width
        self.height = height
        self.start = start
        self.goal = goal
        self.obstacles = obstacles or []
        self.current_state = start

    def reset(self):
        """Reset environment to start state"""
        self.current_state = self.start
        return self.current_state

    def step(self, action):
        """Take action and return (next_state, reward, done)"""
        x, y = self.current_state

        # Define action effects
        if action == 0:  # Up
            next_state = (x, y - 1)
        elif action == 1:  # Down
            next_state = (x, y + 1)
        elif action == 2:  # Left
            next_state = (x - 1, y)
        elif action == 3:  # Right
            next_state = (x + 1, y)

        # Check boundaries
        if (next_state[0] < 0 or next_state[0] >= self.width or
            next_state[1] < 0 or next_state[1] >= self.height or
            next_state in self.obstacles):
            next_state = self.current_state

        self.current_state = next_state

        # Calculate reward
        if next_state == self.goal:
            reward = 100
            done = True
        else:
            reward = -1
            done = False

        return next_state, reward, done

def train_q_learning_agent(env, agent, episodes=1000):
    """Train Q-learning agent"""
    for episode in range(episodes):
        state = env.reset()
        total_reward = 0

        while True:
            action = agent.choose_action(state)
            next_state, reward, done = env.step(action)

            agent.learn(state, action, reward, next_state)
            total_reward += reward
            state = next_state

            if done:
                break

        if episode % 100 == 0:
            print(f"Episode {episode}, Total Reward: {total_reward}")

    return agent

Policy Gradient Methods

Policy gradient methods directly optimize the policy by following the gradient of expected reward.

import torch
import torch.nn as nn
import torch.optim as optim

class PolicyNetwork(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(PolicyNetwork, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, output_size)
        self.relu = nn.ReLU()
        self.softmax = nn.Softmax(dim=-1)

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.softmax(self.fc2(x))
        return x

class PolicyGradientAgent:
    def __init__(self, state_size, action_size, hidden_size=64, lr=0.01):
        self.policy_net = PolicyNetwork(state_size, hidden_size, action_size)
        self.optimizer = optim.Adam(self.policy_net.parameters(), lr=lr)
        self.action_size = action_size

    def choose_action(self, state):
        """Choose action using current policy"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        action_probs = self.policy_net(state_tensor)
        action_dist = torch.distributions.Categorical(action_probs)
        action = action_dist.sample()
        return action.item(), action_dist.log_prob(action)

    def update_policy(self, rewards, log_probs):
        """Update policy using REINFORCE algorithm"""
        # Calculate discounted rewards
        discounted_rewards = []
        R = 0
        for r in reversed(rewards):
            R = r + 0.99 * R
            discounted_rewards.insert(0, R)

        # Normalize rewards
        discounted_rewards = torch.FloatTensor(discounted_rewards)
        discounted_rewards = (discounted_rewards - discounted_rewards.mean()) / \
                           (discounted_rewards.std() + 1e-9)

        # Calculate loss
        log_probs = torch.stack(log_probs)
        loss = -(log_probs * discounted_rewards).sum()

        # Update policy
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        return loss.item()

Deep Q-Network (DQN)

DQN combines Q-learning with deep neural networks to handle high-dimensional state spaces.

class DQN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, output_size)
        self.relu = nn.ReLU()

    def forward(self, x):
        x = self.relu(self.fc1(x))
        x = self.relu(self.fc2(x))
        x = self.fc3(x)
        return x

class DQNAgent:
    def __init__(self, state_size, action_size, hidden_size=64, lr=0.001):
        self.state_size = state_size
        self.action_size = action_size
        self.memory = []
        self.gamma = 0.95
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.learning_rate = lr
        self.model = DQN(state_size, hidden_size, action_size)
        self.target_model = DQN(state_size, hidden_size, action_size)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.update_target_model()

    def update_target_model(self):
        """Update target network"""
        self.target_model.load_state_dict(self.model.state_dict())

    def remember(self, state, action, reward, next_state, done):
        """Store experience in memory"""
        self.memory.append((state, action, reward, next_state, done))

    def act(self, state):
        """Choose action using epsilon-greedy policy"""
        if np.random.random() <= self.epsilon:
            return np.random.randint(self.action_size)

        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        act_values = self.model(state_tensor)
        return torch.argmax(act_values).item()

    def replay(self, batch_size):
        """Train on batch of experiences"""
        if len(self.memory) < batch_size:
            return

        minibatch = np.random.choice(len(self.memory), batch_size, replace=False)
        states = torch.FloatTensor([self.memory[i][0] for i in minibatch])
        actions = torch.LongTensor([self.memory[i][1] for i in minibatch])
        rewards = torch.FloatTensor([self.memory[i][2] for i in minibatch])
        next_states = torch.FloatTensor([self.memory[i][3] for i in minibatch])
        dones = torch.BoolTensor([self.memory[i][4] for i in minibatch])

        current_q_values = self.model(states).gather(1, actions.unsqueeze(1))
        next_q_values = self.target_model(next_states).max(1)[0].detach()
        target_q_values = rewards + (self.gamma * next_q_values * ~dones)

        loss = nn.MSELoss()(current_q_values.squeeze(), target_q_values)

        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

        if self.epsilon > self.epsilon_min:
            self.epsilon *= self.epsilon_decay

        return loss.item()

Actor-Critic Methods

Actor-Critic methods combine the benefits of policy gradient and value function methods.

class ActorCritic(nn.Module):
    def __init__(self, state_size, action_size, hidden_size=64):
        super(ActorCritic, self).__init__()
        self.actor = nn.Sequential(
            nn.Linear(state_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, action_size),
            nn.Softmax(dim=-1)
        )
        self.critic = nn.Sequential(
            nn.Linear(state_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 1)
        )

    def forward(self, x):
        return self.actor(x), self.critic(x)

class ActorCriticAgent:
    def __init__(self, state_size, action_size, hidden_size=64, lr=0.001):
        self.model = ActorCritic(state_size, action_size, hidden_size)
        self.optimizer = optim.Adam(self.model.parameters(), lr=lr)
        self.gamma = 0.99

    def choose_action(self, state):
        """Choose action using actor network"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        action_probs, _ = self.model(state_tensor)
        action_dist = torch.distributions.Categorical(action_probs)
        action = action_dist.sample()
        return action.item(), action_dist.log_prob(action)

    def update(self, state, action, reward, next_state, done):
        """Update actor and critic networks"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        next_state_tensor = torch.FloatTensor(next_state).unsqueeze(0)

        # Get current values
        action_probs, value = self.model(state_tensor)
        action_dist = torch.distributions.Categorical(action_probs)
        log_prob = action_dist.log_prob(torch.tensor([action]))

        # Get next value
        _, next_value = self.model(next_state_tensor)

        # Calculate advantage
        if done:
            advantage = reward - value.item()
        else:
            advantage = reward + self.gamma * next_value.item() - value.item()

        # Calculate losses
        actor_loss = -log_prob * advantage
        critic_loss = nn.MSELoss()(value, torch.tensor([reward + self.gamma * next_value.item()]))

        total_loss = actor_loss + critic_loss

        # Update networks
        self.optimizer.zero_grad()
        total_loss.backward()
        self.optimizer.step()

        return total_loss.item()

Advanced RL Algorithms

Proximal Policy Optimization (PPO)

class PPOAgent:
    def __init__(self, state_size, action_size, hidden_size=64, lr=0.0003, clip_ratio=0.2):
        self.policy_net = PolicyNetwork(state_size, hidden_size, action_size)
        self.value_net = nn.Sequential(
            nn.Linear(state_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 1)
        )
        self.optimizer = optim.Adam([
            {'params': self.policy_net.parameters(), 'lr': lr},
            {'params': self.value_net.parameters(), 'lr': lr}
        ])
        self.clip_ratio = clip_ratio
        self.gamma = 0.99

    def compute_advantages(self, rewards, values, dones):
        """Compute advantages using GAE"""
        advantages = []
        gae = 0

        for i in reversed(range(len(rewards))):
            if i == len(rewards) - 1:
                next_value = 0
            else:
                next_value = values[i + 1]

            delta = rewards[i] + self.gamma * next_value * (1 - dones[i]) - values[i]
            gae = delta + self.gamma * 0.95 * gae * (1 - dones[i])
            advantages.insert(0, gae)

        return torch.FloatTensor(advantages)

    def update(self, states, actions, old_log_probs, rewards, dones):
        """Update policy using PPO"""
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        old_log_probs = torch.stack(old_log_probs)

        # Compute values and advantages
        values = self.value_net(states).squeeze()
        advantages = self.compute_advantages(rewards, values.detach().numpy(), dones)
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        # PPO update
        for _ in range(10):  # Multiple epochs
            action_probs = self.policy_net(states)
            action_dist = torch.distributions.Categorical(action_probs)
            new_log_probs = action_dist.log_prob(actions)

            ratio = torch.exp(new_log_probs - old_log_probs)
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.clip_ratio, 1 + self.clip_ratio) * advantages

            policy_loss = -torch.min(surr1, surr2).mean()
            value_loss = nn.MSELoss()(values, torch.FloatTensor(rewards))

            total_loss = policy_loss + 0.5 * value_loss

            self.optimizer.zero_grad()
            total_loss.backward()
            self.optimizer.step()

Soft Actor-Critic (SAC)

class SACAgent:
    def __init__(self, state_size, action_size, hidden_size=256, lr=0.0003):
        self.state_size = state_size
        self.action_size = action_size

        # Networks
        self.actor = PolicyNetwork(state_size, hidden_size, action_size)
        self.critic1 = nn.Sequential(
            nn.Linear(state_size + action_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 1)
        )
        self.critic2 = nn.Sequential(
            nn.Linear(state_size + action_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, hidden_size),
            nn.ReLU(),
            nn.Linear(hidden_size, 1)
        )

        # Optimizers
        self.actor_optimizer = optim.Adam(self.actor.parameters(), lr=lr)
        self.critic1_optimizer = optim.Adam(self.critic1.parameters(), lr=lr)
        self.critic2_optimizer = optim.Adam(self.critic2.parameters(), lr=lr)

        self.alpha = 0.2  # Temperature parameter
        self.gamma = 0.99

    def choose_action(self, state):
        """Choose action using current policy"""
        state_tensor = torch.FloatTensor(state).unsqueeze(0)
        action_probs = self.actor(state_tensor)
        action_dist = torch.distributions.Categorical(action_probs)
        action = action_dist.sample()
        return action.item(), action_dist.log_prob(action)

    def update(self, states, actions, rewards, next_states, dones):
        """Update SAC networks"""
        states = torch.FloatTensor(states)
        actions = torch.LongTensor(actions)
        rewards = torch.FloatTensor(rewards)
        next_states = torch.FloatTensor(next_states)
        dones = torch.BoolTensor(dones)

        # One-hot encode actions
        actions_one_hot = torch.zeros(actions.size(0), self.action_size)
        actions_one_hot.scatter_(1, actions.unsqueeze(1), 1)

        # Update critics
        current_q1 = self.critic1(torch.cat([states, actions_one_hot], dim=1))
        current_q2 = self.critic2(torch.cat([states, actions_one_hot], dim=1))

        with torch.no_grad():
            next_action_probs = self.actor(next_states)
            next_action_dist = torch.distributions.Categorical(next_action_probs)
            next_actions = next_action_dist.sample()
            next_log_probs = next_action_dist.log_prob(next_actions)

            next_actions_one_hot = torch.zeros(next_actions.size(0), self.action_size)
            next_actions_one_hot.scatter_(1, next_actions.unsqueeze(1), 1)

            next_q1 = self.critic1(torch.cat([next_states, next_actions_one_hot], dim=1))
            next_q2 = self.critic2(torch.cat([next_states, next_actions_one_hot], dim=1))
            next_q = torch.min(next_q1, next_q2)

            target_q = rewards + self.gamma * (1 - dones.float()) * \
                      (next_q - self.alpha * next_log_probs.unsqueeze(1))

        critic1_loss = nn.MSELoss()(current_q1, target_q)
        critic2_loss = nn.MSELoss()(current_q2, target_q)

        self.critic1_optimizer.zero_grad()
        critic1_loss.backward()
        self.critic1_optimizer.step()

        self.critic2_optimizer.zero_grad()
        critic2_loss.backward()
        self.critic2_optimizer.step()

        # Update actor
        action_probs = self.actor(states)
        action_dist = torch.distributions.Categorical(action_probs)
        new_actions = action_dist.sample()
        new_log_probs = action_dist.log_prob(new_actions)

        new_actions_one_hot = torch.zeros(new_actions.size(0), self.action_size)
        new_actions_one_hot.scatter_(1, new_actions.unsqueeze(1), 1)

        new_q1 = self.critic1(torch.cat([states, new_actions_one_hot], dim=1))
        new_q2 = self.critic2(torch.cat([states, new_actions_one_hot], dim=1))
        new_q = torch.min(new_q1, new_q2)

        actor_loss = (self.alpha * new_log_probs - new_q).mean()

        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()

Real-World Applications

Game Playing

def train_game_agent(env, agent, episodes=1000):
    """Train agent on game environment"""
    for episode in range(episodes):
        state = env.reset()
        total_reward = 0

        while True:
            action = agent.choose_action(state)
            next_state, reward, done, _ = env.step(action)

            agent.remember(state, action, reward, next_state, done)
            total_reward += reward
            state = next_state

            if len(agent.memory) > 32:
                agent.replay(32)

            if done:
                break

        if episode % 100 == 0:
            print(f"Episode {episode}, Total Reward: {total_reward}")

    return agent

Robotics

class RobotEnvironment:
    def __init__(self, target_position):
        self.target_position = target_position
        self.current_position = np.array([0.0, 0.0])
        self.max_steps = 100

    def reset(self):
        """Reset robot to initial position"""
        self.current_position = np.array([0.0, 0.0])
        return self.current_position

    def step(self, action):
        """Take action (movement in x, y direction)"""
        # Apply action (normalized movement)
        movement = np.array(action) * 0.1
        self.current_position += movement

        # Calculate distance to target
        distance = np.linalg.norm(self.current_position - self.target_position)

        # Calculate reward
        if distance < 0.1:
            reward = 100
            done = True
        else:
            reward = -distance
            done = False

        return self.current_position, reward, done

Best Practices for RL Projects

1. Environment Design

  • Design clear reward functions
  • Ensure proper state and action spaces
  • Include appropriate termination conditions
  • Consider exploration vs exploitation

2. Algorithm Selection

  • Q-Learning: Good for discrete action spaces
  • Policy Gradient: Good for continuous action spaces
  • Actor-Critic: Good balance of stability and performance
  • PPO: Good for continuous control tasks
  • SAC: Good for continuous action spaces with exploration

3. Hyperparameter Tuning

  • Learning rate
  • Discount factor (gamma)
  • Exploration rate (epsilon)
  • Network architecture
  • Batch size

4. Training Tips

  • Use experience replay for sample efficiency
  • Implement target networks for stability
  • Use gradient clipping to prevent exploding gradients
  • Monitor training progress with metrics
  • Save and load models appropriately

Common Challenges and Solutions

1. Exploration vs Exploitation

def adaptive_exploration(episode, total_episodes, initial_epsilon=1.0, final_epsilon=0.01):
    """Adaptive exploration strategy"""
    epsilon = initial_epsilon - (initial_epsilon - final_epsilon) * episode / total_episodes
    return max(epsilon, final_epsilon)

2. Reward Shaping

def shaped_reward(original_reward, state, next_state, target):
    """Add shaped reward to guide learning"""
    distance_reward = -np.linalg.norm(next_state - target)
    progress_reward = np.linalg.norm(state - target) - np.linalg.norm(next_state - target)
    return original_reward + 0.1 * distance_reward + 0.5 * progress_reward

3. Experience Replay

class PrioritizedReplayBuffer:
    def __init__(self, capacity=10000):
        self.capacity = capacity
        self.buffer = []
        self.priorities = []

    def add(self, experience, priority=1.0):
        """Add experience with priority"""
        if len(self.buffer) >= self.capacity:
            self.buffer.pop(0)
            self.priorities.pop(0)

        self.buffer.append(experience)
        self.priorities.append(priority)

    def sample(self, batch_size):
        """Sample experiences based on priorities"""
        priorities = np.array(self.priorities)
        probabilities = priorities / priorities.sum()
        indices = np.random.choice(len(self.buffer), batch_size, p=probabilities)
        return [self.buffer[i] for i in indices], indices

Conclusion

Reinforcement Learning is a powerful paradigm for training agents to make optimal decisions in complex environments. From simple Q-learning to advanced algorithms like PPO and SAC, RL has achieved remarkable success in various domains.

The key to success in RL is:

  1. Understanding the fundamentals of MDPs and value functions
  2. Choosing appropriate algorithms for your specific problem
  3. Designing good environments with clear reward functions
  4. Proper hyperparameter tuning and training strategies
  5. Staying updated with the latest developments

Whether you're building a game-playing AI or training a robot to navigate complex environments, the principles of reinforcement learning remain the same. Start with simple algorithms, experiment with different approaches, and gradually work your way up to more advanced techniques.

The future of reinforcement learning is incredibly promising, with applications ranging from autonomous vehicles to personalized medicine. By mastering the fundamentals and staying current with the latest developments, you'll be well-positioned to build powerful RL systems that can learn and adapt to complex real-world challenges.

Share this article

SA

Sunnat Axmadov

AI & Big Data Enthusiast.

Stay Updated

Subscribe to my newsletter to get the latest blog posts and tech insights delivered straight to your inbox.

No spamWeekly updatesUnsubscribe anytime