Reinforcement Learning Experiments

Click a button below to view the corresponding experiment. Only one experiment is visible at a time.

Taxi Using DQN

Environment

The Taxi-v3 environment is a simple grid-world simulation where an agent (the taxi) must pick up and drop off a passenger at a destination. The environment is fully discrete, with defined states and actions:

  • States: Represented by the taxi's position, the passenger's location, and the destination.
  • Actions: Move north, south, east, west; pick up; drop off.
  • Rewards:
    • +20 for successfully dropping off the passenger
    • -1 per step to encourage efficiency
    • -10 for illegal pickup/drop-off actions

The environment is designed to test reinforcement learning algorithms’ ability to learn optimal policies in a small, fully observable world. It emphasizes:

  • Planning multiple steps ahead
  • Balancing exploration and exploitation
  • Learning from sparse and delayed rewards

Here is a visual representation of the grid:

Taxi-v3 grid layout
Taxi-v3 grid: taxi, passenger (red), destination (green)

Reference: Gymnasium Taxi Environment

Code

Below is a breakdown of the Deep Q-Learning agent implementation for the Taxi-v3 environment. The code is organized into sections for clarity.

1. Imports and Device Setup

import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import random
import torch
from torch import nn
import torch.nn.functional as F
import os

# Allow multiple OpenMP instances (sometimes required on Mac)
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

# Set device to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
  

This section sets up the Python environment, imports necessary libraries, and configures PyTorch to use GPU if available.

2. Deep Q-Network Definition

class DQN(nn.Module):
    def __init__(self, num_states, hidden_size, num_actions):
        super().__init__()
        self.embedding = nn.Embedding(num_states, 64)
        self.fc1 = nn.Linear(64, hidden_size)
        self.out = nn.Linear(hidden_size, num_actions)

    def forward(self, x):
        x = self.embedding(x)
        x = F.relu(self.fc1(x))
        x = self.out(x)
        return x
  

This neural network estimates the Q-values for each state-action pair. It uses an embedding layer for discrete states, a hidden fully-connected layer, and an output layer for the action values.

3. Replay Memory

class ReplayMemory():
    def __init__(self, maxlen):
        self.memory = deque([], maxlen=maxlen)

    def append(self, transition):
        self.memory.append(transition)

    def sample(self, sample_size):
        return random.sample(self.memory, sample_size)

    def __len__(self):
        return len(self.memory)
  

This class stores past experiences for experience replay, which helps stabilize learning by breaking correlations between consecutive steps.

4. Taxi Deep Q-Learning Agent

class TaxiDQL():
    learning_rate_a = 0.001
    discount_factor_g = 0.99
    network_sync_rate = 500
    replay_memory_size = 50000
    mini_batch_size = 64

    def __init__(self):
        self.loss_fn = nn.MSELoss()
        self.optimizer = None
  

This class implements the agent that trains and tests the DQN on the Taxi environment, including hyperparameters for learning, discounting, and memory.

5. Training the Agent

def train(self, episodes, render=False):
    env = gym.make('Taxi-v3', render_mode='human' if render else None)
    ...
    for episode in range(episodes):
        state, _ = env.reset()
        ...
        while not terminated and not truncated:
            # Epsilon-greedy action selection
            ...
            # Store experience
            memory.append((state, action, new_state, reward, terminated))
            ...
            # Optimize DQN
            if len(memory) > self.mini_batch_size and step_count % 4 == 0:
                mini_batch = memory.sample(self.mini_batch_size)
                self.optimize(mini_batch, policy_dqn, target_dqn)
        ...

This section contains the main training loop, including the epsilon-greedy policy, storing transitions in memory, and periodically updating the target network.

6. Optimization Step

def optimize(self, mini_batch, policy_dqn, target_dqn):
    # Prepare tensors
    states = torch.tensor([...])
    ...
    # Compute current Q-values and target Q-values
    ...
    # Compute loss and update network
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

This function performs the gradient descent step to minimize the difference between predicted Q-values and target Q-values from the Bellman equation.

7. Testing the Agent

def test(self, episodes):
    env = gym.make('Taxi-v3', render_mode='human')
    policy_dqn.load_state_dict(torch.load("taxi_dqn_fast.pt"))
    ...
    while not terminated and not truncated:
        # Select best action
        ...

This section evaluates the trained agent in the Taxi environment, using the policy learned during training.

Code

Full implementation of the Deep Q-Learning agent for the Taxi-v3 environment:


import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import random
import torch
from torch import nn
import torch.nn.functional as F
import os

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

class DQN(nn.Module):
    def __init__(self, num_states, hidden_size, num_actions):
        super().__init__()
        self.embedding = nn.Embedding(num_states, 64)
        self.fc1 = nn.Linear(64, hidden_size)
        self.out = nn.Linear(hidden_size, num_actions)

    def forward(self, x):
        x = self.embedding(x)
        x = F.relu(self.fc1(x))
        x = self.out(x)
        return x

class ReplayMemory():
    def __init__(self, maxlen):
        self.memory = deque([], maxlen=maxlen)

    def append(self, transition):
        self.memory.append(transition)

    def sample(self, sample_size):
        return random.sample(self.memory, sample_size)

    def __len__(self):
        return len(self.memory)

class TaxiDQL():
    learning_rate_a = 0.001
    discount_factor_g = 0.99
    network_sync_rate = 500
    replay_memory_size = 50000
    mini_batch_size = 64

    def __init__(self):
        self.loss_fn = nn.MSELoss()
        self.optimizer = None

    def train(self, episodes, render=False):
        env = gym.make('Taxi-v3', render_mode='human' if render else None)
        num_states = env.observation_space.n
        num_actions = env.action_space.n
        epsilon = 1.0
        epsilon_decay = 1 / episodes
        epsilon_min = 0.05
        memory = ReplayMemory(self.replay_memory_size)
        policy_dqn = DQN(num_states, 128, num_actions).to(device)
        target_dqn = DQN(num_states, 128, num_actions).to(device)
        target_dqn.load_state_dict(policy_dqn.state_dict())
        self.optimizer = torch.optim.Adam(policy_dqn.parameters(), lr=self.learning_rate_a)
        rewards_per_episode = np.zeros(episodes)
        step_count = 0

        for episode in range(episodes):
            state, _ = env.reset()
            terminated = False
            truncated = False
            episode_reward = 0
            while not terminated and not truncated:
                if random.random() < epsilon:
                    action = env.action_space.sample()
                else:
                    with torch.no_grad():
                        state_tensor = torch.tensor(state, dtype=torch.long, device=device)
                        action = policy_dqn(state_tensor).argmax().item()

                new_state, reward, terminated, truncated, _ = env.step(action)
                memory.append((state, action, new_state, reward, terminated))
                state = new_state
                episode_reward += reward
                step_count += 1

                if len(memory) > self.mini_batch_size and step_count % 4 == 0:
                    mini_batch = memory.sample(self.mini_batch_size)
                    self.optimize(mini_batch, policy_dqn, target_dqn)

                if step_count % self.network_sync_rate == 0:
                    target_dqn.load_state_dict(policy_dqn.state_dict())

            rewards_per_episode[episode] = episode_reward
            epsilon = max(epsilon - epsilon_decay, epsilon_min)
            if episode % 100 == 0:
                print(f"Episode {episode} | Reward: {episode_reward} | Epsilon: {epsilon:.3f}")

        env.close()
        torch.save(policy_dqn.state_dict(), "taxi_dqn_fast.pt")
        plt.figure(figsize=(10,5))
        moving_avg = np.convolve(rewards_per_episode, np.ones(100)/100, mode='valid')
        plt.plot(moving_avg)
        plt.title("Moving Average Reward (100 episodes)")
        plt.show()
        plt.savefig('taxi_dqn')

    def optimize(self, mini_batch, policy_dqn, target_dqn):
        states = torch.tensor([s for s, _, _, _, _ in mini_batch], dtype=torch.long, device=device)
        actions = torch.tensor([a for _, a, _, _, _ in mini_batch], dtype=torch.long, device=device)
        next_states = torch.tensor([ns for _, _, ns, _, _ in mini_batch], dtype=torch.long, device=device)
        rewards = torch.tensor([r for _, _, _, r, _ in mini_batch], dtype=torch.float32, device=device)
        dones = torch.tensor([t for _, _, _, _, t in mini_batch], dtype=torch.float32, device=device)
        current_q = policy_dqn(states).gather(1, actions.unsqueeze(1)).squeeze()
        with torch.no_grad():
            max_next_q = target_dqn(next_states).max(1)[0]
            target_q = rewards + (1 - dones) * self.discount_factor_g * max_next_q
        loss = self.loss_fn(current_q, target_q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def test(self, episodes):
        env = gym.make('Taxi-v3', render_mode='human')
        num_states = env.observation_space.n
        num_actions = env.action_space.n
        policy_dqn = DQN(num_states, 128, num_actions).to(device)
        policy_dqn.load_state_dict(torch.load("taxi_dqn_fast.pt", map_location=device))
        policy_dqn.eval()
        for episode in range(episodes):
            state, _ = env.reset()
            terminated = False
            truncated = False
            while not terminated and not truncated:
                with torch.no_grad():
                    state_tensor = torch.tensor(state, dtype=torch.long, device=device)
                    action = policy_dqn(state_tensor).argmax().item()
                state, reward, terminated, truncated, _ = env.step(action)
        env.close()

if __name__ == '__main__':
    taxi = TaxiDQL()
    taxi.train(1000)
    taxi.test(5)
  

Results

The agent was trained for 1000 episodes. Over time, it learned to maximize rewards by efficiently picking up and dropping off passengers.

Taxi DQN training performance
Moving average reward per episode during DQN training.

Frozen Lake with DQN

Environment

The FrozenLake-v1 environment is a grid-world where an agent must navigate from the starting point to a goal without falling into holes. It simulates a frozen lake where tiles can be slippery.

  • Grid: 4x4 or 8x8 tiles (customizable)
  • Goal: Reach the designated goal tile safely
  • Holes: Certain tiles end the episode with a negative reward if stepped on
  • Reward: +1 for reaching the goal, 0 for each other move, episode ends if agent falls in a hole
  • States: The agent’s current position on the grid
  • Actions: Move Left, Down, Right, Up
  • is_slippery: If set to True, the agent may slip and end up in a different tile than intended, making the environment stochastic. If False, movements are deterministic.

The FrozenLake environment is often used to test reinforcement learning algorithms in stochastic or deterministic settings depending on the value of is_slippery. It helps illustrate how agents learn to deal with uncertainty in state transitions.

Reference: Gymnasium FrozenLake Environment

Frozen Lake grid example
Example layout of a 4x4 Frozen Lake environment.

Code

Below is a breakdown of the Deep Q-Learning agent implementation for the FrozenLake-v1 environment. The code is organized into sections for clarity.

1. Imports and Device Setup

import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import pickle
import random
import torch
from torch import nn
import torch.nn.functional as F
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"
  

This section sets up the Python environment, imports necessary libraries, and configures PyTorch to avoid OpenMP conflicts on Mac.

2. Deep Q-Network Definition

class DQN(nn.Module):
    def __init__(self, in_states, h1_nodes, out_actions):
        super().__init__()
        self.fc1 = nn.Linear(in_states, h1_nodes)
        self.out = nn.Linear(h1_nodes, out_actions)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.out(x)
        return x
  

This neural network estimates Q-values for each state-action pair. It uses a fully-connected hidden layer and outputs values for all possible actions.

3. Replay Memory

class ReplayMemory():
    def __init__(self, maxlen):
        self.memory = deque([], maxlen=maxlen)

    def append(self, transition):
        self.memory.append(transition)

    def sample(self, sample_size):
        return random.sample(self.memory, sample_size)

    def __len__(self):
        return len(self.memory)
  

This class stores past experiences for experience replay, stabilizing learning by breaking correlations between consecutive steps.

4. FrozenLake Deep Q-Learning Agent

class FrozenLakeDQL():
    learning_rate_a = 0.001
    discount_factor_g = 0.9
    network_sync_rate = 10
    replay_memory_size = 1000
    mini_batch_size = 32

    loss_fn = nn.MSELoss()
    optimizer = None

    ACTIONS = ['L','D','R','U']
  

This class implements the agent for the FrozenLake environment, including hyperparameters for learning rate, discount factor, memory size, batch size, and target network synchronization. It also defines the action mapping.

5. Training the Agent

def train(self, episodes, render=False, is_slippery=False):
    env = gym.make('FrozenLake-v1', map_name="4x4", is_slippery=is_slippery,
                   render_mode='human' if render else None)
    num_states = env.observation_space.n
    num_actions = env.action_space.n

    epsilon = 1
    memory = ReplayMemory(self.replay_memory_size)

    policy_dqn = DQN(in_states=num_states, h1_nodes=num_states, out_actions=num_actions)
    target_dqn = DQN(in_states=num_states, h1_nodes=num_states, out_actions=num_actions)
    target_dqn.load_state_dict(policy_dqn.state_dict())

    self.optimizer = torch.optim.Adam(policy_dqn.parameters(), lr=self.learning_rate_a)

    rewards_per_episode = np.zeros(episodes)
    epsilon_history = []
    step_count = 0

    for i in range(episodes):
        state = env.reset()[0]
        terminated = False
        truncated = False

        while not terminated and not truncated:
            # Epsilon-greedy action selection
            ...
            # Store experience
            memory.append((state, action, new_state, reward, terminated))
            ...
            # Optimize DQN if enough memory
            if len(memory) > self.mini_batch_size:
                mini_batch = memory.sample(self.mini_batch_size)
                self.optimize(mini_batch, policy_dqn, target_dqn)
        ...

This section contains the main training loop, including epsilon-greedy action selection, storing transitions in replay memory, and periodically optimizing the policy network and syncing the target network.

6. Optimization Step

def optimize(self, mini_batch, policy_dqn, target_dqn):
    # Compute current Q-values and target Q-values
    ...
    # Compute loss and update network
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()

This function performs the gradient descent step to minimize the difference between predicted Q-values and target Q-values computed from the Bellman equation.

7. State to DQN Input

def state_to_dqn_input(self, state:int, num_states:int) -> torch.Tensor:
    input_tensor = torch.zeros(num_states)
    input_tensor[state] = 1
    return input_tensor

Converts a discrete state into a one-hot encoded tensor suitable for the DQN input.

8. Testing the Agent

def test(self, episodes, is_slippery=False):
    # Load trained policy and run FrozenLake environment
    ...
    while not terminated and not truncated:
        # Select best action
        ...

This section evaluates the trained agent using the learned policy, navigating the FrozenLake map without exploration.

9. Printing the DQN

def print_dqn(self, dqn):
    # Print Q-values and best action for each state
    ...

This function prints the Q-values and best action for each state in a readable format aligned with the 4x4 FrozenLake map.

Code

Full implementation of the Deep Q-Learning agent for the Frozen Lake-v1 environment:


import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import pickle
import random
import torch
from torch import nn
import torch.nn.functional as F
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"


class DQN(nn.Module):
    def __init__(self, in_states, h1_nodes, out_actions):
        super().__init__()

        self.fc1 = nn.Linear(in_states, h1_nodes)
        self.out = nn.Linear(h1_nodes, out_actions)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.out(x)
        return x

class ReplayMemory():
    def __init__(self, maxlen):
        self.memory = deque([], maxlen=maxlen)

    def append(self, transition):
        self.memory.append(transition)

    def sample(self, sample_size):
        return random.sample(self.memory, sample_size)
    
    def __len__(self):
        return len(self.memory)
    
class FrozenLakeDQL():
    learning_rate_a = 0.001
    discount_factor_g = 0.9
    network_sync_rate = 10
    replay_memory_size = 1000
    mini_batch_size = 32

    loss_fn = nn.MSELoss()
    optimizer = None

    ACTIONS = ['L','D','R','U']

    def train(self, episodes, render=False, is_slippery=False):
        env = gym.make('FrozenLake-v1', map_name="4x4", is_slippery=is_slippery, render_mode='human' if render else None)
        num_states = env.observation_space.n
        num_actions = env.action_space.n

        epsilon = 1
        memory = ReplayMemory(self.replay_memory_size)

        policy_dqn = DQN(in_states=num_states, h1_nodes=num_states, out_actions=num_actions)
        target_dqn = DQN(in_states=num_states, h1_nodes=num_states, out_actions=num_actions)

        target_dqn.load_state_dict(policy_dqn.state_dict())

        print('Policy (random, before training):')
        self.print_dqn(policy_dqn)

        self.optimizer = torch.optim.Adam(policy_dqn.parameters(), lr=self.learning_rate_a)

        rewards_per_episode = np.zeros(episodes)

        epsilon_history = []

        step_count = 0

        for i in range(episodes):
            print(i)
            state = env.reset()[0]
            terminated = False
            truncated = False

            while(not terminated and not truncated):
                if random.random() < epsilon:
                    action = env.action_space.sample()
                else:
                    with torch.no_grad():
                        action = policy_dqn(self.state_to_dqn_input(state, num_states)).argmax().item()
                
                new_state,reward,terminated,truncated,info = env.step(action)

                memory.append((state,action,new_state,reward,terminated))

                state = new_state

                step_count+=1

            if reward==1:
                rewards_per_episode[i] = 1
            
            if len(memory)>self.mini_batch_size and np.sum(rewards_per_episode)>0:
                mini_batch = memory.sample(self.mini_batch_size)
                self.optimize(mini_batch, policy_dqn, target_dqn)

                epsilon = max(epsilon - 1/episodes, 0)
                epsilon_history.append(epsilon)

                if step_count > self.network_sync_rate:
                    target_dqn.load_state_dict(policy_dqn.state_dict())
                    step_count=0
        
        env.close()

        torch.save(policy_dqn.state_dict(), 'frozen_lake_dqn.pt')

        plt.figure(1)

        sum_rewards = np.zeros(episodes)
        for x in range(episodes):
            sum_rewards[x] = np.sum(rewards_per_episode[max(0, x-100):(x+1)])
        plt.subplot(121)
        plt.plot(sum_rewards)

        plt.subplot(122)
        plt.plot(epsilon_history)

        plt.savefig('frozen_lake_dql')

    def optimize(self, mini_batch, policy_dqn, target_dqn):

        # Get number of input nodes
        num_states = policy_dqn.fc1.in_features

        current_q_list = []
        target_q_list = []

        for state, action, new_state, reward, terminated in mini_batch:

            if terminated: 
                # Agent either reached goal (reward=1) or fell into hole (reward=0)
                # When in a terminated state, target q value should be set to the reward.
                target = torch.FloatTensor([reward])
            else:
                # Calculate target q value 
                with torch.no_grad():
                    target = torch.FloatTensor(
                        reward + self.discount_factor_g * target_dqn(self.state_to_dqn_input(new_state, num_states)).max()
                    )

            # Get the current set of Q values
            current_q = policy_dqn(self.state_to_dqn_input(state, num_states))
            current_q_list.append(current_q)

            # Get the target set of Q values
            target_q = target_dqn(self.state_to_dqn_input(state, num_states)) 
            # Adjust the specific action to the target that was just calculated
            target_q[action] = target
            target_q_list.append(target_q)
                
        # Compute loss for the whole minibatch
        loss = self.loss_fn(torch.stack(current_q_list), torch.stack(target_q_list))

        # Optimize the model
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def state_to_dqn_input(self, state:int, num_states:int)->torch.Tensor:
        input_tensor = torch.zeros(num_states)
        input_tensor[state] = 1
        return input_tensor

    # Run the FrozeLake environment with the learned policy
    def test(self, episodes, is_slippery=False):
        # Create FrozenLake instance
        env = gym.make('FrozenLake-v1', map_name="4x4", is_slippery=is_slippery, render_mode='human')
        num_states = env.observation_space.n
        num_actions = env.action_space.n

        # Load learned policy
        policy_dqn = DQN(in_states=num_states, h1_nodes=num_states, out_actions=num_actions) 
        policy_dqn.load_state_dict(torch.load("frozen_lake_dqn.pt"))
        policy_dqn.eval()    # switch model to evaluation mode

        print('Policy (trained):')
        self.print_dqn(policy_dqn)

        for i in range(episodes):
            state = env.reset()[0]  # Initialize to state 0
            terminated = False      # True when agent falls in hole or reached goal
            truncated = False       # True when agent takes more than 200 actions            

            # Agent navigates map until it falls into a hole (terminated), reaches goal (terminated), or has taken 200 actions (truncated).
            while(not terminated and not truncated):  
                # Select best action   
                with torch.no_grad():
                    action = policy_dqn(self.state_to_dqn_input(state, num_states)).argmax().item()

                # Execute action
                state,reward,terminated,truncated,_ = env.step(action)

        env.close()

    # Print DQN: state, best action, q values
    def print_dqn(self, dqn):
        # Get number of input nodes
        num_states = dqn.fc1.in_features

        # Loop each state and print policy to console
        for s in range(num_states):
            #  Format q values for printing
            q_values = ''
            for q in dqn(self.state_to_dqn_input(s, num_states)).tolist():
                q_values += "{:+.2f}".format(q)+' '  # Concatenate q values, format to 2 decimals
            q_values=q_values.rstrip()              # Remove space at the end

            # Map the best action to L D R U
            best_action = self.ACTIONS[dqn(self.state_to_dqn_input(s, num_states)).argmax()]

            # Print policy in the format of: state, action, q values
            # The printed layout matches the FrozenLake map.
            print(f'{s:02},{best_action},[{q_values}]', end=' ')         
            if (s+1)%4==0:
                print() # Print a newline every 4 states

if __name__ == '__main__':

    frozen_lake = FrozenLakeDQL()
    is_slippery = True
    frozen_lake.train(5000, is_slippery=is_slippery)
    frozen_lake.test(10, is_slippery=is_slippery)

Results

This section shows the training results for the Frozen Lake DQN agent on the 4x4 grid. Two cases are shown: deterministic movement (is_slippery=False) and stochastic movement (is_slippery=True).

1. Frozen Lake (Deterministic, is_slippery=False)

In this case, the agent's movements are deterministic, meaning each action always succeeds as intended. The following plot shows:

  • Left graph: Sum of rewards per 100 episodes
  • Right graph: Epsilon value decay over training
Frozen Lake deterministic training results
Rewards per 100 episodes (left) and epsilon decay (right) for deterministic Frozen Lake.
2. Frozen Lake (Stochastic, is_slippery=True)

Here, the lake is slippery, so each action may not always go as intended. This introduces more uncertainty and requires the agent to learn a robust policy. The plot again shows:

  • Left graph: Sum of rewards per 100 episodes
  • Right graph: Epsilon value decay over training
Frozen Lake stochastic training results
Rewards per 100 episodes (left) and epsilon decay (right) for stochastic Frozen Lake.

Frozen Lake with CNN

Environment

The FrozenLake-v1 environment is a grid-world where an agent must navigate from the starting point to a goal without falling into holes. It simulates a frozen lake where tiles can be slippery.

  • Grid: 4x4 or 8x8 tiles (customizable)
  • Goal: Reach the designated goal tile safely
  • Holes: Certain tiles end the episode with a negative reward if stepped on
  • Reward: +1 for reaching the goal, 0 for each other move, episode ends if agent falls in a hole
  • States: The agent’s current position on the grid
  • Actions: Move Left, Down, Right, Up
  • is_slippery: If set to True, the agent may slip and end up in a different tile than intended, making the environment stochastic. If False, movements are deterministic.

The FrozenLake environment is often used to test reinforcement learning algorithms in stochastic or deterministic settings depending on the value of is_slippery. It helps illustrate how agents learn to deal with uncertainty in state transitions.

Reference: Gymnasium FrozenLake Environment

Frozen Lake grid example
Example layout of a 4x4 Frozen Lake environment.

Code

Below is a breakdown of the Convolutional Deep Q-Learning agent for the Frozen Lake 4x4 environment. The code is organized into sections for clarity.

1. Imports and Setup

import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import random
import torch
from torch import nn
import torch.nn.functional as F
import math
import os

# Allow multiple OpenMP instances (sometimes required)
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
  

This section imports necessary libraries and sets up the Python environment for deep reinforcement learning.

2. Convolutional Deep Q-Network Definition

class DQN(nn.Module):
    def __init__(self, input_channels, num_actions):
        super().__init__()
        self.conv_block1 = nn.Sequential(
            nn.Conv2d(input_channels, 10, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(10, 10, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.conv_block2 = nn.Sequential(
            nn.Conv2d(10, 10, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(10, 10, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )
        self.output_layer = nn.Sequential(
            nn.Flatten(),
            nn.Linear(10*1*1, num_actions)
        )

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        return self.output_layer(x)
  

This network takes a 4x4 map as input (3 channels for RGB encoding), passes it through two convolutional blocks with pooling, then flattens it and outputs Q-values for each action.

3. Replay Memory

class ReplayMemory:
    def __init__(self, max_size):
        self.memory = deque(maxlen=max_size)

    def append(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)
  

This class stores past experiences for experience replay to stabilize training by reducing correlations between consecutive steps.

4. CNN-based Frozen Lake Deep Q-Learning Agent

class FrozenLakeDQL:
    alpha = 0.001
    gamma = 0.9
    sync_steps = 10
    memory_size = 1000
    batch_size = 32
    ACTIONS = ['L','D','R','U']

    def __init__(self):
        self.loss_fn = nn.MSELoss()
        self.optimizer = None
  

Defines the agent class with learning parameters, memory size, discount factor, and the four actions for Frozen Lake.

5. State to Tensor Conversion

def state_to_tensor(self, state: int) -> torch.Tensor:
    tensor = torch.zeros(1, 3, 4, 4)
    row, col = divmod(state, 4)
    tensor[0, 0, row, col] = 245 / 255  # Red channel
    tensor[0, 1, row, col] = 66 / 255   # Green channel
    tensor[0, 2, row, col] = 120 / 255  # Blue channel
    return tensor
  

This method encodes a single state as a 3-channel 4x4 tensor for the CNN input.

6. Optimization Step

def optimize(self, batch, policy_net, target_net):
    current_qs, target_qs = [], []
    for state, action, next_state, reward, done in batch:
        target = torch.FloatTensor([reward]) if done else \
                 torch.FloatTensor([reward + self.gamma * target_net(self.state_to_tensor(next_state)).max()])
        current_q = policy_net(self.state_to_tensor(state))
        target_q = target_net(self.state_to_tensor(state))
        target_q[0, action] = target
        current_qs.append(current_q)
        target_qs.append(target_q)
    loss = self.loss_fn(torch.stack(current_qs), torch.stack(target_qs))
    self.optimizer.zero_grad()
    loss.backward()
    self.optimizer.step()
  

Updates the policy network using the Bellman equation for Q-values with a batch of experiences.

7. Training Loop

def train(self, episodes, render=False, is_slippery=False):
    env = gym.make('FrozenLake-v1', map_name="4x4",
                   is_slippery=is_slippery,
                   render_mode='human' if render else None)
    num_actions = env.action_space.n
    epsilon = 1.0
    memory = ReplayMemory(self.memory_size)
    policy_net = DQN(3, num_actions)
    target_net = DQN(3, num_actions)
    target_net.load_state_dict(policy_net.state_dict())
    self.optimizer = torch.optim.Adam(policy_net.parameters(), lr=self.alpha)
    rewards_history = np.zeros(episodes)
    epsilon_history = []
    step_count = 0
    for ep in range(episodes):
        state, terminated, truncated = env.reset()[0], False, False
        while not terminated and not truncated:
            action = env.action_space.sample() if random.random() < epsilon \
                     else policy_net(self.state_to_tensor(state)).argmax().item()
            next_state, reward, terminated, truncated, _ = env.step(action)
            memory.append((state, action, next_state, reward, terminated))
            state = next_state
            step_count += 1
        rewards_history[ep] = reward
        if len(memory) > self.batch_size and rewards_history.sum() > 0:
            batch = memory.sample(self.batch_size)
            self.optimize(batch, policy_net, target_net)
            epsilon = max(epsilon - 1/episodes, 0)
            epsilon_history.append(epsilon)
            if step_count > self.sync_steps:
                target_net.load_state_dict(policy_net.state_dict())
                step_count = 0
    env.close()
    torch.save(policy_net.state_dict(), "frozen_lake_dql_cnn.pt")
  

Main training loop: epsilon-greedy action selection, experience storage, network optimization, and periodic target network updates.

8. Testing the Trained Policy

def test(self, episodes, is_slippery=False):
    env = gym.make('FrozenLake-v1', map_name="4x4",
                   is_slippery=is_slippery, render_mode='human')
    num_actions = env.action_space.n
    policy_net = DQN(3, num_actions)
    policy_net.load_state_dict(torch.load("frozen_lake_dql_cnn.pt"))
    policy_net.eval()
    for _ in range(episodes):
        state, terminated, truncated = env.reset()[0], False, False
        while not terminated and not truncated:
            with torch.no_grad():
                action = policy_net(self.state_to_tensor(state)).argmax().item()
            state, _, terminated, truncated, _ = env.step(action)
    env.close()
  

This evaluates the trained CNN agent in the Frozen Lake environment for a given number of episodes.

9. Displaying Learned Policy

def print_policy(self, net):
    for s in range(16):
        q_vals = net(self.state_to_tensor(s))[0].tolist()
        best_action = self.ACTIONS[torch.tensor(q_vals).argmax()]
        formatted_qs = ' '.join(f"{q:+.2f}" for q in q_vals)
        print(f"{s:02},{best_action},[{formatted_qs}]", end=' ')
        if (s + 1) % 4 == 0:
            print()
  

Prints the Q-values and best action for each state in the 4x4 grid, row by row.

Code

Full implementation of the CNN agent for the Frozenlake-v1 environment:



import gymnasium as gym
import numpy as np
import matplotlib.pyplot as plt
from collections import deque
import random
import torch
from torch import nn
import torch.nn.functional as F
import math
import os

os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"


class DQN(nn.Module):
    def __init__(self, input_channels, num_actions):
        super().__init__()

        self.conv_block1 = nn.Sequential(
            nn.Conv2d(input_channels, 10, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(10, 10, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )

        self.conv_block2 = nn.Sequential(
            nn.Conv2d(10, 10, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.Conv2d(10, 10, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(2)
        )

        # After flattening, input size is 10*1*1 for 4x4 map
        self.output_layer = nn.Sequential(
            nn.Flatten(),
            nn.Linear(10 * 1 * 1, num_actions)
        )

    def forward(self, x):
        x = self.conv_block1(x)
        x = self.conv_block2(x)
        return self.output_layer(x)



class ReplayMemory:
    def __init__(self, max_size):
        self.memory = deque(maxlen=max_size)

    def append(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


class FrozenLakeDQL:
    # Hyperparameters
    alpha = 0.001
    gamma = 0.9
    sync_steps = 10
    memory_size = 1000
    batch_size = 32

    ACTIONS = ['L', 'D', 'R', 'U']

    def __init__(self):
        self.loss_fn = nn.MSELoss()
        self.optimizer = None

    # Convert state to DQN input tensor
    def state_to_tensor(self, state: int) -> torch.Tensor:
        tensor = torch.zeros(1, 3, 4, 4)
        row, col = divmod(state, 4)
        tensor[0, 0, row, col] = 245 / 255
        tensor[0, 1, row, col] = 66 / 255
        tensor[0, 2, row, col] = 120 / 255
        return tensor

    # Optimize policy network
    def optimize(self, batch, policy_net, target_net):
        current_qs, target_qs = [], []

        for state, action, next_state, reward, done in batch:
            target = torch.FloatTensor([reward]) if done else \
                torch.FloatTensor([reward + self.gamma * target_net(self.state_to_tensor(next_state)).max()])

            current_q = policy_net(self.state_to_tensor(state))
            target_q = target_net(self.state_to_tensor(state))
            target_q[0, action] = target

            current_qs.append(current_q)
            target_qs.append(target_q)

        loss = self.loss_fn(torch.stack(current_qs), torch.stack(target_qs))
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    # Train the agent
    def train(self, episodes, render=False, is_slippery=False):
        env = gym.make('FrozenLake-v1', map_name="4x4",
                       is_slippery=is_slippery,
                       render_mode='human' if render else None)

        num_states = env.observation_space.n
        num_actions = env.action_space.n
        epsilon = 1.0
        memory = ReplayMemory(self.memory_size)
        policy_net = DQN(input_channels=3, num_actions=num_actions)
        target_net = DQN(input_channels=3, num_actions=num_actions)
        target_net.load_state_dict(policy_net.state_dict())
        self.optimizer = torch.optim.Adam(policy_net.parameters(), lr=self.alpha)

        rewards_history = np.zeros(episodes)
        epsilon_history = []
        step_count = 0

        print('Policy (before training):')
        self.print_policy(policy_net)

        for ep in range(episodes):
            state, terminated, truncated = env.reset()[0], False, False

            while not terminated and not truncated:
                action = env.action_space.sample() if random.random() < epsilon \
                    else policy_net(self.state_to_tensor(state)).argmax().item()

                next_state, reward, terminated, truncated, _ = env.step(action)
                memory.append((state, action, next_state, reward, terminated))
                state = next_state
                step_count += 1

            rewards_history[ep] = reward

            if len(memory) > self.batch_size and rewards_history.sum() > 0:
                batch = memory.sample(self.batch_size)
                self.optimize(batch, policy_net, target_net)
                epsilon = max(epsilon - 1 / episodes, 0)
                epsilon_history.append(epsilon)

                if step_count > self.sync_steps:
                    target_net.load_state_dict(policy_net.state_dict())
                    step_count = 0

        env.close()
        torch.save(policy_net.state_dict(), "frozen_lake_dql_cnn.pt")

        # Plot rewards and epsilon
        plt.figure(figsize=(10, 4))
        rolling_rewards = [np.sum(rewards_history[max(0, i - 100):(i + 1)]) for i in range(episodes)]
        plt.subplot(1, 2, 1)
        plt.plot(rolling_rewards)
        plt.title('Rolling Rewards')

        plt.subplot(1, 2, 2)
        plt.plot(epsilon_history)
        plt.title('Epsilon Decay')
        plt.savefig('frozen_lake_dql_cnn.png')

    # Test learned policy
    def test(self, episodes, is_slippery=False):
        env = gym.make('FrozenLake-v1', map_name="4x4", is_slippery=is_slippery, render_mode='human')
        num_actions = env.action_space.n
        policy_net = DQN(input_channels=3, num_actions=num_actions)
        policy_net.load_state_dict(torch.load("frozen_lake_dql_cnn.pt"))
        policy_net.eval()

        print('Policy (trained):')
        self.print_policy(policy_net)

        for _ in range(episodes):
            state, terminated, truncated = env.reset()[0], False, False
            while not terminated and not truncated:
                with torch.no_grad():
                    action = policy_net(self.state_to_tensor(state)).argmax().item()
                state, _, terminated, truncated, _ = env.step(action)

        env.close()

    # Print the Q-values and best action for each state
    def print_policy(self, net):
        for s in range(16):
            q_vals = net(self.state_to_tensor(s))[0].tolist()
            best_action = self.ACTIONS[torch.tensor(q_vals).argmax()]
            formatted_qs = ' '.join(f"{q:+.2f}" for q in q_vals)
            print(f"{s:02},{best_action},[{formatted_qs}]", end=' ')
            if (s + 1) % 4 == 0:
                print()


if __name__ == "__main__":
    agent = FrozenLakeDQL()
    slippery = False
    agent.train(1000, is_slippery=slippery)
    agent.test(10, is_slippery=slippery)

Results

This section shows the training results for the Frozen Lake CNN agent on the 4x4 grid. Two cases are shown: deterministic movement (is_slippery=False) and stochastic movement (is_slippery=True).

1. Frozen Lake (Deterministic, is_slippery=False)

In this case, the agent's movements are deterministic, meaning each action always succeeds as intended. The following plot shows:

  • Left graph: Rolling rewards
  • Right graph: Epsilon value decay over training
Frozen Lake deterministic training results
Rolling rewards (left) and epsilon decay (right) for deterministic Frozen Lake.
2. Frozen Lake (Stochastic, is_slippery=True)

Here, the lake is slippery, so each action may not always go as intended. This introduces more uncertainty and requires the agent to learn a robust policy. The plot again shows:

  • Left graph: Rolling rewards
  • Right graph: Epsilon value decay over training
Frozen Lake stochastic training results
Rolling rewards (left) and epsilon decay (right) for stochastic Frozen Lake.

Custom Golf Environment with DQN Agent

Environment

The Golf-v0 environment is a simplified golf simulation where an agent must learn how to reach the hole using different golf clubs. The environment models a one-dimensional fairway where the agent repeatedly selects a club to hit the ball toward the hole. The goal is to minimize the remaining distance while reaching the hole within a limited number of strokes.

  • States: The observation consists of the remaining distance to the hole (in yards). This value changes after each shot depending on the club used and the randomness of the shot.
  • Actions: The agent chooses one of six clubs:
    • Putter (forward)
    • Iron (forward)
    • Driver (forward)
    • Putter (backward)
    • Iron (backward)
    • Driver (backward)
    Each club has a different expected shot distance and variability.
  • Rewards:
    • +10 for successfully landing the ball in the hole
    • A negative reward proportional to the remaining distance after each shot
    • A large penalty if the agent exceeds the maximum allowed number of strokes

The environment is inspired by the golf example discussed in the finite Markov Decision Processes section of reinforcement learning literature. In this simplified version, the golf course is represented as a single straight line rather than a full 2D course.

  • The ball begins far from the hole (e.g., ~400 yards)
  • The agent must select appropriate clubs to reduce the distance efficiently
  • Different difficulty settings introduce randomness in shot distances
  • The episode ends when the ball reaches the hole or the stroke limit is exceeded

This environment helps demonstrate reinforcement learning concepts such as:

  • Sequential decision making under uncertainty
  • Learning optimal action choices based on distance
  • Balancing aggressive and conservative strategies

Here is a visual representation of the environment:

Golf RL environment visualization
Golf-v0: the agent selects clubs to move the ball toward the hole

Reference: Inspired by the golf reinforcement learning example described in the Sutton & Barto reinforcement learning textbook.

Code

Below is a breakdown of the custom Golf-v0 reinforcement learning environment. The code defines a one-dimensional golf simulation where an agent selects clubs to move a ball toward a hole. The implementation follows the Gymnasium environment interface.

1. Imports and Environment Registration

import gymnasium as gym
from gymnasium import spaces
from gymnasium.envs.registration import register
from enum import Enum
import numpy as np
import pygame

register(
    id='golf-v0',
    entry_point='golf_game:GolfEnv',
)
  

This section imports required libraries. The environment is then registered with Gymnasium so it can be created using gym.make("golf-v0"). The entry_point tells Gymnasium which class implements the environment.

2. Custom Environment Class

class GolfEnv(gym.Env):
    metadata = {'render_modes': ['human']}
  

This class defines the custom reinforcement learning environment. It inherits from gym.Env and follows Gymnasium’s standard interface: reset(), step(), and render().

3. Environment Initialization

def __init__(self, render_mode=None, difficulty='Easy'):
    self.render_mode = render_mode
    self.clock = None
    self.fps = 1

    self.hole_radius = 25.0
    self.max_distance = 500
    self.start_distance = 400
    self.num_strokes = 0
  

The constructor initializes environment parameters such as the maximum distance of the course, the starting distance from the hole, and the number of strokes taken. Rendering parameters such as frame rate and display settings are also defined here.

4. Club Distance Models

if(difficulty=='Easy'):
    self.clubs = {
        0: (10, 0),
        1: (50, 0),
        2: (200, 0),
        3: (10, 0),
        4: (50, 0),
        5: (200, 0)
    }
  

Each action corresponds to a golf club defined by a mean distance and standard deviation. The standard deviation introduces randomness in the shot distance.

Three difficulty levels are supported:

  • Easy: Deterministic shots (no randomness)
  • Medium: Moderate randomness
  • Hard: Large variability in shot distance
5. Action and Observation Spaces

self.action_space = spaces.Discrete(len(self.clubs))

self.observation_space = spaces.Box(
    low=0,
    high=self.max_distance,
    shape=(1,),
    dtype=np.float32
)
  

The action space consists of six discrete actions representing different golf clubs and directions. The observation space contains a single continuous value: the remaining distance to the hole.

6. Reset Function

def reset(self, seed=None, options=None):
    super().reset(seed=seed)

    self.distance = self.start_distance
    self.ball_position = self.max_distance - self.distance

    self.num_strokes = 0
    self.total_reward = 0

    observation = np.array([self.distance], dtype=np.float32)
    info = {}

    return observation, info
  

The reset() method initializes the environment at the start of an episode. The ball begins a fixed distance from the hole, stroke counters are reset, and the initial observation (distance to the hole) is returned.

7. Step Function (Environment Dynamics)

def step(self, action):

    mean, std = self.clubs[action]
    shot = self.np_random.normal(mean, std)

    if(action < 3):
        self.distance -= shot
    else:
        self.distance += shot
  

The step() function updates the environment after the agent selects an action. A shot distance is sampled from a normal distribution based on the selected club. Forward clubs reduce the remaining distance to the hole, while backward clubs increase it.

8. Rewards and Episode Termination

terminated = abs(self.distance) <= self.hole_radius
truncated = self.num_strokes > 10

if terminated:
    reward = 10
elif truncated:
    reward = -10000
else:
    reward = -abs(self.distance)/100
  

The reward structure encourages the agent to reach the hole efficiently:

  • +10 reward for successfully reaching the hole
  • Negative reward proportional to remaining distance
  • Large penalty if the maximum number of strokes is exceeded
9. Rendering the Environment

pygame.draw.circle(canvas, (0, 0, 0), (hole_x, 100), 6)

progress = (self.max_distance - self.distance) / self.max_distance
ball_x = int(progress * (self.window_size - 100)) + 50

pygame.draw.circle(canvas, (255, 255, 255), (ball_x, 100), 8)
  

The render() method visualizes the environment using Pygame. The hole is drawn as a black circle, while the ball position is calculated based on the remaining distance and displayed as a white circle moving across the screen.

10. Environment Validation

def check_env():
    from gymnasium.utils.env_checker import check_env
    env = gym.make('golf-v0', render_mode='human')
    check_env(env.unwrapped)
  

This function verifies that the environment follows Gymnasium’s API conventions using the built-in environment checker.

11. Random Policy Rollout

def random_rollout():
    env = gym.make("golf-v0")

    obs, _ = env.reset()

    done = False

    while not done:
        action = env.action_space.sample()
        obs, reward, terminated, truncated, info = env.step(action)
        done = terminated or truncated
  

This function runs a simple simulation where actions are chosen randomly. It demonstrates how the environment behaves and allows basic testing before applying reinforcement learning algorithms.

12. Running the Script

if __name__ == "__main__":
    check_env()
    random_rollout()
  

When the script is executed directly, the environment is first validated and then tested using a random policy rollout.

Environment Full Code

Full implementation of the Golf environment


  import gymnasium as gym
from gymnasium import spaces
from gymnasium.envs.registration import register
from enum import Enum
import numpy as np
import pygame

register(
    id='golf-v0',
    entry_point='golf_game:GolfEnv', # module_name:class_name
)



class GolfEnv(gym.Env):
    metadata = {'render_modes': ['human']}


    def __init__(self, render_mode=None, difficulty='Easy'):

        self.render_mode = render_mode

        self.clock = None
        self.fps = 1

        self.hole_radius = 25.0  # 1 yard tolerance

        self.max_distance = 500
        self.start_distance = 400

        self.num_strokes = 0

        # clubs: mean distance, std deviation
        if(difficulty=='Easy'):
            self.clubs = {
                0: (10, 0),     # putter forwards
                1: (50, 0),    # iron forwards
                2: (200, 0),   # driver forwards
                3: (10, 0),   # putter backwards
                4: (50, 0),  # iron backwards
                5: (200, 0)  # driver backwards
            }

        if(difficulty=='Medium'):
            self.clubs = {
                0: (10, 1),     # putter forwards
                1: (50, 10),    # iron forwards
                2: (200, 20),   # driver forwards
                3: (10, 1),   # putter backwards
                4: (50, 10),  # iron backwards
                5: (200, 20)  # driver backwards
            }

        if(difficulty=='Hard'):
            self.clubs = {
                0: (10, 5),     # putter forwards
                1: (50, 25),    # iron forwards
                2: (200, 50),   # driver forwards
                3: (10, 5),   # putter backwards
                4: (50, 25),  # iron backwards
                5: (200, 50)  # driver backwards
            }

        self.action_space = spaces.Discrete(len(self.clubs))

        self.observation_space = spaces.Box(
            low=0,
            high=self.max_distance,
            shape=(1,),
            dtype=np.float32
        )

        self.distance = None

        # rendering
        self.window = None
        self.window_size = 800

        self.last_action = None
   
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)

        self.distance = self.start_distance
        self.ball_position = self.max_distance - self.distance

        self.num_strokes = 0

        self.total_reward = 0

        observation = np.array([self.distance], dtype=np.float32)
        info = {}

        return observation, info

    def step(self, action):

        mean, std = self.clubs[action]

        shot = self.np_random.normal(mean, std)

        self.last_action = action

        if(action <  3):
            self.distance -= shot
        else:
            self.distance += shot
        self.ball_position = self.max_distance - self.distance

        terminated = abs(self.distance) <= self.hole_radius
        truncated = self.num_strokes > 10

        if terminated:
            reward = 10
        elif truncated:
            reward = -10000
        else:
            reward = -abs(self.distance)/100

        self.total_reward += reward

        observation = np.array([max(self.distance, 0)], dtype=np.float32)

        info = {"shot_distance": shot}

        self.num_strokes += 1

        self.render()

        return observation, reward, terminated, truncated, info
   
    def render(self):
        if self.render_mode is None:
            pass

        if self.render_mode == 'human':
            if self.window is None:
                pygame.init()
                self.window = pygame.display.set_mode((self.window_size, 200))
                pygame.display.set_caption("Golf RL")
                self.clock = pygame.time.Clock()
                self.font = pygame.font.SysFont("Arial", 24)

            canvas = pygame.Surface((self.window_size, 200))
            canvas.fill((34, 139, 34))  # grass

            # hole position
            hole_x = self.window_size - 50

            pygame.draw.circle(canvas, (0, 0, 0), (hole_x, 100), 6)

            # convert distance -> screen position
            progress = (self.max_distance - self.distance) / self.max_distance
            ball_x = int(progress * (self.window_size - 100)) + 50

            pygame.draw.circle(canvas, (255, 255, 255), (ball_x, 100), 8)

            club_text = self.font.render(f"Club: {self.last_action}", True, (255,255,255))
            distance_text = self.font.render(f"Distance: {self.distance:.1f}", True, (255,255,255))

            canvas.blit(club_text, (20, 20))
            canvas.blit(distance_text, (20, 50))

            self.window.blit(canvas, (0, 0))
            pygame.display.update()

            pygame.display.update()
            self.clock.tick(self.fps)

def check_env():
    from gymnasium.utils.env_checker import check_env
    env = gym.make('golf-v0', render_mode='human')
    check_env(env.unwrapped)

def random_rollout():
    env = gym.make("golf-v0")

    obs, _ = env.reset()

    done = False

    while not done:
        action = env.action_space.sample()

        print('Club used: ', action)

        obs, reward, terminated, truncated, info = env.step(action)

        env.render()

        done = terminated or truncated

        print(obs, reward)

    env.close()

if __name__ == "__main__":
        check_env()
        random_rollout()
  

Code

Below is a breakdown of the Deep Q-Learning (DQN) agent designed for the custom Golf-v0 environment. The agent learns to select optimal golf clubs to minimize distance to the hole using experience replay and a neural network approximation of Q-values.

1. Imports and Setup

import gymnasium as gym
import golf_game  
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
from collections import deque
import matplotlib.pyplot as plt
import os

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"
  

This section imports required libraries for reinforcement learning, neural networks, and visualization. The code also selects GPU acceleration if available.

2. Replay Memory

class ReplayMemory:
    def __init__(self, maxlen):
        self.memory = deque(maxlen=maxlen)

    def append(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)
  

This class stores past experiences (state, action, next_state, reward, done). Random sampling from memory helps break correlations between consecutive experiences and stabilizes training.

3. Deep Q-Network (DQN)

class GolfDQN(nn.Module):
    def __init__(self, num_inputs, hidden_size, num_actions):
        super().__init__()
        self.fc1 = nn.Linear(num_inputs, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, num_actions)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.out(x)
  

This neural network approximates the Q-function. It takes the current state (distance to the hole) as input and outputs a Q-value for each possible action (club choice).

4. Agent Initialization

class GolfDQL:
    def __init__(self,
                 hidden_size=128,
                 learning_rate=0.001,
                 discount_factor=0.99,
                 target_sync_freq=500,
                 replay_memory_size=50000,
                 batch_size=64):
  

This defines key hyperparameters for training:

  • Learning rate: Controls how quickly the network updates
  • Discount factor (γ): Importance of future rewards
  • Target sync frequency: How often the target network is updated
  • Replay memory size: Maximum stored experiences
  • Batch size: Number of samples per training step
5. Optimization Step

def optimize(self, batch, policy_dqn, target_dqn):
    states = torch.tensor([s for s, _, _, _, _ in batch], dtype=torch.float32, device=device)
    actions = torch.tensor([a for _, a, _, _, _ in batch], dtype=torch.long, device=device)

    current_q = policy_dqn(states).gather(1, actions.unsqueeze(1)).squeeze()
  

This function performs a single gradient update using a batch of experiences. It computes:

  • Current Q-values from the policy network
  • Target Q-values using the Bellman equation

with torch.no_grad():
    max_next_q = target_dqn(next_states).max(1)[0]
    target_q = rewards + (1 - dones) * self.gamma * max_next_q
  

The loss between predicted and target Q-values is minimized using mean squared error, and gradients are applied via backpropagation.

6. Training the Agent

def train(self, episodes, render=False, difficulty='Easy'):
    env = gym.make('golf-v0', render_mode='human' if render else None, difficulty=difficulty)
  

The training loop initializes the environment and neural networks, then runs episodes where the agent interacts with the environment.


if random.random() < epsilon:
    action = env.action_space.sample()
else:
    action = policy_dqn(state_tensor).argmax().item()
  

An epsilon-greedy strategy is used to balance exploration and exploitation.


memory.append((state, action, new_state, reward, terminated))
  

Each transition is stored in replay memory for later training.


if len(memory) > self.batch_size and step_count % 4 == 0:
    batch = memory.sample(self.batch_size)
    self.optimize(batch, policy_dqn, target_dqn)
  

The network is periodically trained using sampled batches from memory.


if step_count % self.target_sync_freq == 0:
    target_dqn.load_state_dict(policy_dqn.state_dict())
  

The target network is updated at fixed intervals to stabilize learning.

7. Exploration Strategy

epsilon = max(epsilon - epsilon_decay, epsilon_min)
  

Epsilon gradually decreases over time, reducing random exploration as the agent learns a better policy.

8. Saving Model and Plotting

torch.save(policy_dqn.state_dict(), "golf_dqn.pt")
  

The trained model is saved to disk for later use.


moving_avg = np.convolve(rewards_per_episode, np.ones(50)/50, mode='valid')
plt.plot(moving_avg)
  

A moving average plot of rewards is generated to visualize training progress over time.

9. Testing the Trained Agent

def test(self, episodes, difficulty):
    env = gym.make('golf-v0', render_mode='human', difficulty=difficulty)
  

This function evaluates the trained agent by running it in the environment without exploration.


action = policy_dqn(state_tensor).argmax().item()
  

The agent always selects the action with the highest predicted Q-value.

10. Running the Agent

if __name__ == "__main__":
    golf_agent = GolfDQL()
    difficulty = 'Medium'
    golf_agent.train(episodes=5000, render=False, difficulty=difficulty)
    golf_agent.test(episodes=5, difficulty=difficulty)
  

The script first trains the agent over many episodes, then evaluates its learned policy in a small number of test episodes.

11. Reinforcement Learning Formulation
  • State: Distance to the hole (continuous value)
  • Actions: Selection of golf clubs (discrete)
  • Reward: Positive for reaching the hole, negative based on distance and penalties
  • Policy: Approximated using a neural network (DQN)

This implementation demonstrates how Deep Q-Learning can be applied to a simple continuous-state environment with discrete actions.

Full Code

Full implementation of the DQN agent for the Golf-v0 environment:


  import gymnasium as gym
import golf_game  
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import random
from collections import deque
import matplotlib.pyplot as plt
import os

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
os.environ["KMP_DUPLICATE_LIB_OK"] = "TRUE"

# --- Replay Memory ---
class ReplayMemory:
    def __init__(self, maxlen):
        self.memory = deque(maxlen=maxlen)

    def append(self, transition):
        self.memory.append(transition)

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

# --- DQN Network ---
class GolfDQN(nn.Module):
    def __init__(self, num_inputs, hidden_size, num_actions):
        super().__init__()
        self.fc1 = nn.Linear(num_inputs, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.out = nn.Linear(hidden_size, num_actions)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        return self.out(x)

# --- DQN Agent ---
class GolfDQL:
    def __init__(self,
                 hidden_size=128,
                 learning_rate=0.001,
                 discount_factor=0.99,
                 target_sync_freq=500,
                 replay_memory_size=50000,
                 batch_size=64):
        self.hidden_size = hidden_size
        self.lr = learning_rate
        self.gamma = discount_factor
        self.target_sync_freq = target_sync_freq
        self.memory_size = replay_memory_size
        self.batch_size = batch_size

        self.loss_fn = nn.MSELoss()
        self.optimizer = None

    def optimize(self, batch, policy_dqn, target_dqn):
        states = torch.tensor([s for s, _, _, _, _ in batch], dtype=torch.float32, device=device)
        actions = torch.tensor([a for _, a, _, _, _ in batch], dtype=torch.long, device=device)
        next_states = torch.tensor([ns for _, _, ns, _, _ in batch], dtype=torch.float32, device=device)
        rewards = torch.tensor([r for _, _, _, r, _ in batch], dtype=torch.float32, device=device)
        dones = torch.tensor([d for _, _, _, _, d in batch], dtype=torch.float32, device=device)

        current_q = policy_dqn(states).gather(1, actions.unsqueeze(1)).squeeze()
        with torch.no_grad():
            max_next_q = target_dqn(next_states).max(1)[0]
            target_q = rewards + (1 - dones) * self.gamma * max_next_q

        loss = self.loss_fn(current_q, target_q)
        self.optimizer.zero_grad()
        loss.backward()
        self.optimizer.step()

    def train(self, episodes, render=False, difficulty='Easy'):
        env = gym.make('golf-v0', render_mode='human' if render else None, difficulty=difficulty)

        num_states = env.observation_space.shape[0]
        num_actions = env.action_space.n

        epsilon = 1.0
        epsilon_decay = 1 / episodes
        epsilon_min = 0.05

        memory = ReplayMemory(self.memory_size)
        policy_dqn = GolfDQN(num_states, self.hidden_size, num_actions).to(device)
        target_dqn = GolfDQN(num_states, self.hidden_size, num_actions).to(device)
        target_dqn.load_state_dict(policy_dqn.state_dict())

        self.optimizer = torch.optim.Adam(policy_dqn.parameters(), lr=self.lr)
        rewards_per_episode = np.zeros(episodes)
        step_count = 0


        for episode in range(episodes):
            state, _ = env.reset()
            state = np.array(state, dtype=np.float32)
            terminated = False
            truncated = False
            episode_reward = 0
           
            num_strokes = 0

            while not terminated and not truncated:
                if random.random() < epsilon:
                    action = env.action_space.sample()
                else:
                    with torch.no_grad():
                        state_tensor = torch.tensor(state, dtype=torch.float32, device=device)
                        action = policy_dqn(state_tensor).argmax().item()

                new_state, reward, terminated, truncated, _ = env.step(action)
                new_state = np.array(new_state, dtype=np.float32)

                memory.append((state, action, new_state, reward, terminated))
                state = new_state
                episode_reward += reward
                step_count += 1

                num_strokes += 1

                if len(memory) > self.batch_size and step_count % 4 == 0:
                    batch = memory.sample(self.batch_size)
                    self.optimize(batch, policy_dqn, target_dqn)

                if step_count % self.target_sync_freq == 0:
                    target_dqn.load_state_dict(policy_dqn.state_dict())

            rewards_per_episode[episode] = episode_reward
            epsilon = max(epsilon - epsilon_decay, epsilon_min)

            if(episode % 50 == 0):
                print(f"Episode {episode} | Reward: {episode_reward} | Strokes: {num_strokes} | Epsilon: {epsilon:.3f}")

        torch.save(policy_dqn.state_dict(), "golf_dqn.pt")
        env.close()

        # --- Plot moving average ---
        plt.figure(figsize=(10,5))
        moving_avg = np.convolve(rewards_per_episode, np.ones(50)/50, mode='valid')
        plt.plot(moving_avg)
        plt.title("Moving Average Reward (50 episodes)")
        plt.xlabel("Episode")
        plt.ylabel("Reward")
        plt.savefig("golf_dqn_training.png")
        plt.show()

    def test(self, episodes, difficulty):
        env = gym.make('golf-v0', render_mode='human', difficulty=difficulty)
        num_states = env.observation_space.shape[0]
        num_actions = env.action_space.n

        policy_dqn = GolfDQN(num_states, self.hidden_size, num_actions).to(device)
        policy_dqn.load_state_dict(torch.load("golf_dqn.pt", map_location=device))
        policy_dqn.eval()

        for episode in range(episodes):
            state, _ = env.reset()
            state = np.array(state, dtype=np.float32)
            terminated = False
            truncated = False
            episode_reward = 0

            while not terminated and not truncated:
                with torch.no_grad():
                    state_tensor = torch.tensor(state, dtype=torch.float32, device=device)
                    action = policy_dqn(state_tensor).argmax().item()

                state, reward, terminated, truncated, _ = env.step(action)
                episode_reward += reward

                print(reward)


            print(f"Episode {episode} | Reward: {episode_reward}")
        env.close()


if __name__ == "__main__":
    golf_agent = GolfDQL()
    difficulty = 'Medium'
    golf_agent.train(episodes=5000, render=False, difficulty=difficulty)
    golf_agent.test(episodes=5, difficulty=difficulty)
  

Results

This section presents the training results for the Golf-v0 Deep Q-Learning agent across three difficulty settings: Easy, Medium, and Hard. Each plot shows the moving average reward (50 episodes), which helps visualize learning progress over time.

1. Easy Difficulty

In the Easy setting, all golf shots are deterministic (no randomness). This allows the agent to learn a stable and predictable policy.

  • The reward steadily improves over time
  • Convergence is relatively fast compared to other settings
  • The agent learns consistent club selection for minimizing distance
Golf Easy training results
Moving average reward for Easy difficulty (deterministic shots).
2. Medium Difficulty

In the Medium setting, moderate randomness is introduced into shot distances. This increases uncertainty and makes learning more challenging.

  • Rewards improve more gradually compared to Easy
  • Noticeable fluctuations due to stochastic outcomes
  • The agent still learns a generally effective strategy
Golf Medium training results
Moving average reward for Medium difficulty (moderate randomness).
3. Hard Difficulty

The Hard setting introduces significant randomness in shot distances, making precise control difficult.

  • Learning is slower and less stable
  • High variance in rewards due to unpredictable outcomes
  • The agent still improves but does not converge as cleanly
Golf Hard training results
Moving average reward for Hard difficulty (high randomness).
4. Reward Function Design

The reward function went through several iterations to achieve stable learning:

  • Initial approach: A simple reward of -1 per stroke and +10 for reaching the hole.
    This approach failed because the agent did not learn meaningful distance-based behavior and lacked guidance toward the goal.
  • Second attempt: A truncation penalty of -10000 after exceeding the stroke limit.
    This caused unintended behavior where the agent would continue taking random actions until termination, rather than learning efficient play.
  • Final reward design:
    • Reward = -|distance_to_hole| / 100 for each step
    • +10 for reaching the goal region
    • Large penalty for exceeding maximum strokes

This final design provides dense feedback, allowing the agent to continuously evaluate whether its actions are improving its position. As a result:

  • The agent learns to minimize distance rather than just survive
  • Training becomes more stable and consistent
  • The policy generalizes better under stochastic conditions

Additionally, instead of requiring the ball to land exactly in the hole, a tolerance radius was introduced. This accounts for randomness in shot distances and makes the task more achievable, especially in Medium and Hard difficulties.

Dynamic Resource Allocation

Environment

This experiment explores dynamic resource allocation in a multi-beam satellite system using deep reinforcement learning (DRL). The agent interacts continuously with the environment to allocate satellite resources such as channels, power, and traffic to user requests while maximizing long-term system performance.

DDRA Architecture

The Deep Dynamic Resource Allocation (DDRA) approach allows the satellite agent to select actions based on current states and rewards. The agent observes the current system state and global feedback, chooses actions via a Q-network, and stores experience tuples (st, at, rt, st+1) in a replay memory. Mini-batches are sampled from memory to update the Q-network using a target network, and the target network is periodically updated to stabilize learning.

MDP Model

The resource allocation task is formulated as a Markov Decision Process (MDP) consisting of:

  • States (st): Include the currently allocated power and rate across beams, and the incoming user request ut = [Qt, µt, bt] (traffic volume, service duration, and user beam location). Formally: st = [Pt, Vt, ut].
  • Actions (at): Decisions for allocating resources to a new user, including channel assignment ct, traffic allocation vt, and optional traffic reduction dt for existing users: at = [ct, vt, dt]. This breaks a complex decision into sequential discrete choices.
  • Rewards (rt): Multi-component reward function:
    • If a request is blocked due to lack of resources: rt = Rblocked < 0.
    • If a request is accepted: rt = w · (vt / Qt), where vt is allocated traffic, Qt is requested traffic, and w is a positive scaling factor.
    This encourages minimizing blocked requests and maximizing service quality.
Learning Framework
  • DRL agent uses a Deep Q-Network (DQN) to approximate the action-value function in high-dimensional spaces.
  • Experience replay stores transitions for stable training; the target network improves convergence.
  • Action selection uses an ε-greedy policy with decaying exploration: ε = εmin + (ε0 − εmin) e^(−t / decay).

This environment models sequential decision-making in a stochastic satellite network, where the agent adapts allocations dynamically to optimize long-term resource utilization and service quality.

Reference: A Deep Reinforcement Learning Based Dynamic Resource Allocation Approach in Satellite Systems, IEEE, 2025. IEEE Xplore

Code

As of right now, this project is still in the research phase and there is no code for it

Code

As of right now, this project is still in the research phase and there is no code for it

Results

As of right now, this project is still in the research phase and there are no results