Skip to content

PPO

Proximal Policy Optimization algorithm.


Overview

PPO (Proximal Policy Optimization) is a state-of-the-art reinforcement learning algorithm used to optimize architectures.


Class Documentation

upir.learning.ppo.PPO

Proximal Policy Optimization (PPO) algorithm.

PPO is a policy gradient method that uses a clipped objective to ensure stable, conservative policy updates. It's one of the most popular RL algorithms due to its simplicity and effectiveness.

The key innovation is the clipped surrogate objective: L^CLIP(θ) = E[min(r(θ)A, clip(r(θ), 1-ε, 1+ε)A)]

where r(θ) = π_θ(a|s) / π_θ_old(a|s) is the probability ratio.

Attributes:

Name Type Description
policy

PolicyNetwork for action selection and value estimation

config

PPO hyperparameters

optimizer_state

State for optimization (momentum, etc.)

References: - PPO paper: https://arxiv.org/abs/1707.06347 - OpenAI Spinning Up: https://spinningup.openai.com/en/latest/algorithms/ppo.html - TD Commons: Architecture optimization using PPO

Source code in upir/learning/ppo.py
class PPO:
    """
    Proximal Policy Optimization (PPO) algorithm.

    PPO is a policy gradient method that uses a clipped objective to ensure
    stable, conservative policy updates. It's one of the most popular
    RL algorithms due to its simplicity and effectiveness.

    The key innovation is the clipped surrogate objective:
    L^CLIP(θ) = E[min(r(θ)A, clip(r(θ), 1-ε, 1+ε)A)]

    where r(θ) = π_θ(a|s) / π_θ_old(a|s) is the probability ratio.

    Attributes:
        policy: PolicyNetwork for action selection and value estimation
        config: PPO hyperparameters
        optimizer_state: State for optimization (momentum, etc.)

    References:
    - PPO paper: https://arxiv.org/abs/1707.06347
    - OpenAI Spinning Up: https://spinningup.openai.com/en/latest/algorithms/ppo.html
    - TD Commons: Architecture optimization using PPO
    """

    def __init__(self, state_dim: int, action_dim: int, config: PPOConfig = None):
        """
        Initialize PPO agent.

        Args:
            state_dim: Dimension of state space
            action_dim: Dimension of action space
            config: PPO configuration (uses defaults if None)
        """
        self.config = config or PPOConfig()
        self.policy = PolicyNetwork(state_dim, action_dim)

        # Optimizer state (simple momentum-based)
        self.optimizer_state = {
            name: {"velocity": np.zeros_like(param)}
            for name, param in self.policy.weights.items()
        }

        logger.info(
            f"Initialized PPO: state_dim={state_dim}, action_dim={action_dim}, "
            f"lr={self.config.learning_rate}, epsilon={self.config.epsilon}"
        )

    def select_action(self, state: np.ndarray) -> Tuple[int, float, float]:
        """
        Select action using current policy.

        Args:
            state: Current state vector

        Returns:
            Tuple of (action, log_prob, value)
        """
        return self.policy.get_action(state)

    def compute_gae(
        self,
        rewards: np.ndarray,
        values: np.ndarray,
        dones: np.ndarray
    ) -> Tuple[np.ndarray, np.ndarray]:
        """
        Compute Generalized Advantage Estimation (GAE).

        GAE uses an exponentially-weighted average of n-step advantages to
        reduce variance while maintaining low bias. It interpolates between
        Monte Carlo (high variance, low bias) and TD (low variance, high bias).

        Formula:
        δ_t = r_t + γV(s_{t+1})(1 - done_t) - V(s_t)
        A_t = δ_t + (γλ)δ_{t+1} + (γλ)²δ_{t+2} + ...

        Args:
            rewards: Rewards received (T,)
            values: Value estimates V(s_t) (T,)
            dones: Episode termination flags (T,)

        Returns:
            Tuple of (advantages, returns):
            - advantages: Advantage estimates A_t (T,)
            - returns: Discounted returns (T,)

        References:
        - GAE paper: https://arxiv.org/abs/1506.02438
        - OpenAI Spinning Up: GAE explanation
        """
        T = len(rewards)
        advantages = np.zeros(T, dtype=np.float32)
        returns = np.zeros(T, dtype=np.float32)

        # Compute TD errors (deltas)
        deltas = np.zeros(T, dtype=np.float32)
        for t in range(T):
            # δ_t = r_t + γV(s_{t+1})(1 - done_t) - V(s_t)
            next_value = values[t + 1] if t + 1 < T else 0.0
            deltas[t] = rewards[t] + self.config.gamma * next_value * (1 - dones[t]) - values[t]

        # Compute GAE advantages (backward pass)
        gae = 0
        for t in reversed(range(T)):
            # A_t = δ_t + (γλ)A_{t+1}(1 - done_t)
            gae = deltas[t] + self.config.gamma * self.config.lambda_gae * gae * (1 - dones[t])
            advantages[t] = gae

        # Compute returns: R_t = A_t + V(s_t)
        returns = advantages + values[:T]

        return advantages, returns

    def update(
        self,
        states: np.ndarray,
        actions: np.ndarray,
        old_log_probs: np.ndarray,
        returns: np.ndarray,
        advantages: np.ndarray
    ) -> Dict[str, float]:
        """
        Update policy using PPO clipped objective.

        Performs multiple epochs of minibatch updates using the PPO loss:
        L = L^CLIP - c_1 * L^VF + c_2 * H

        where:
        - L^CLIP: Clipped surrogate objective
        - L^VF: Value function loss (MSE)
        - H: Entropy bonus

        Args:
            states: Batch of states (batch_size, state_dim)
            actions: Batch of actions (batch_size,)
            old_log_probs: Old log probabilities (batch_size,)
            returns: Discounted returns (batch_size,)
            advantages: Advantage estimates (batch_size,)

        Returns:
            Dictionary with training metrics:
            - policy_loss: Policy loss
            - value_loss: Value function loss
            - entropy: Policy entropy
            - total_loss: Combined loss

        Example:
            >>> ppo = PPO(state_dim=10, action_dim=4)
            >>> # Collect trajectories...
            >>> metrics = ppo.update(states, actions, old_log_probs, returns, advantages)

        References:
        - PPO paper: Section 3 (PPO-Clip algorithm)
        - Clipped objective prevents large policy updates
        """
        # Normalize advantages (reduces variance)
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

        batch_size = states.shape[0]
        total_metrics = {
            "policy_loss": 0.0,
            "value_loss": 0.0,
            "entropy": 0.0,
            "total_loss": 0.0,
        }

        # Multiple epochs of updates
        for epoch in range(self.config.num_epochs):
            # Shuffle data
            indices = np.random.permutation(batch_size)

            # Minibatch updates
            for start in range(0, batch_size, self.config.batch_size):
                end = min(start + self.config.batch_size, batch_size)
                batch_indices = indices[start:end]

                # Get minibatch
                batch_states = states[batch_indices]
                batch_actions = actions[batch_indices]
                batch_old_log_probs = old_log_probs[batch_indices]
                batch_returns = returns[batch_indices]
                batch_advantages = advantages[batch_indices]

                # Evaluate actions under current policy
                new_log_probs, values, entropy = self.policy.evaluate_actions(
                    batch_states, batch_actions
                )

                # Compute probability ratio: r(θ) = π_θ(a|s) / π_θ_old(a|s)
                ratio = np.exp(new_log_probs - batch_old_log_probs)

                # Compute clipped objective
                # L^CLIP = E[min(r*A, clip(r, 1-ε, 1+ε)*A)]
                surr1 = ratio * batch_advantages
                surr2 = np.clip(ratio, 1 - self.config.epsilon, 1 + self.config.epsilon) * batch_advantages
                policy_loss = -np.minimum(surr1, surr2).mean()

                # Value function loss: MSE
                value_loss = ((values - batch_returns) ** 2).mean()

                # Total loss
                total_loss = (
                    policy_loss
                    + self.config.value_coef * value_loss
                    - self.config.entropy_coef * entropy
                )

                # Gradient descent (simplified - in practice, use automatic differentiation)
                # This is a placeholder - real implementation would compute gradients
                # and update weights using backpropagation
                self._simple_update(total_loss, batch_states, batch_actions)

                # Track metrics
                total_metrics["policy_loss"] += policy_loss
                total_metrics["value_loss"] += value_loss
                total_metrics["entropy"] += entropy
                total_metrics["total_loss"] += total_loss

        # Average metrics
        num_updates = self.config.num_epochs * max(1, (batch_size // self.config.batch_size))
        for key in total_metrics:
            total_metrics[key] /= num_updates

        logger.debug(
            f"PPO update: policy_loss={total_metrics['policy_loss']:.4f}, "
            f"value_loss={total_metrics['value_loss']:.4f}, "
            f"entropy={total_metrics['entropy']:.4f}"
        )

        return total_metrics

    def _simple_update(self, loss: float, states: np.ndarray, actions: np.ndarray):
        """
        Simplified weight update (placeholder for gradient descent).

        In a full implementation, this would:
        1. Compute gradients via backpropagation
        2. Update weights using optimizer (Adam, SGD, etc.)

        For now, this is a minimal placeholder. Upgrade to PyTorch for
        automatic differentiation and proper optimization.

        TODO: Implement full backpropagation or migrate to PyTorch

        Args:
            loss: Scalar loss value
            states: Batch of states
            actions: Batch of actions
        """
        # Placeholder: Random small updates (not real gradient descent)
        # This maintains the interface but should be replaced with proper backprop
        for name, param in self.policy.weights.items():
            # Very small random perturbation (NOT a real gradient update)
            gradient = np.random.randn(*param.shape) * 1e-6
            param -= self.config.learning_rate * gradient

    def __str__(self) -> str:
        """String representation."""
        return (
            f"PPO(lr={self.config.learning_rate}, "
            f"gamma={self.config.gamma}, "
            f"epsilon={self.config.epsilon})"
        )

    def __repr__(self) -> str:
        """Developer-friendly representation."""
        return (
            f"PPO(state_dim={self.policy.state_dim}, "
            f"action_dim={self.policy.action_dim}, "
            f"config={self.config})"
        )

Functions

__init__(state_dim, action_dim, config=None)

Initialize PPO agent.

Parameters:

Name Type Description Default
state_dim int

Dimension of state space

required
action_dim int

Dimension of action space

required
config PPOConfig

PPO configuration (uses defaults if None)

None
Source code in upir/learning/ppo.py
def __init__(self, state_dim: int, action_dim: int, config: PPOConfig = None):
    """
    Initialize PPO agent.

    Args:
        state_dim: Dimension of state space
        action_dim: Dimension of action space
        config: PPO configuration (uses defaults if None)
    """
    self.config = config or PPOConfig()
    self.policy = PolicyNetwork(state_dim, action_dim)

    # Optimizer state (simple momentum-based)
    self.optimizer_state = {
        name: {"velocity": np.zeros_like(param)}
        for name, param in self.policy.weights.items()
    }

    logger.info(
        f"Initialized PPO: state_dim={state_dim}, action_dim={action_dim}, "
        f"lr={self.config.learning_rate}, epsilon={self.config.epsilon}"
    )

select_action(state)

Select action using current policy.

Parameters:

Name Type Description Default
state ndarray

Current state vector

required

Returns:

Type Description
Tuple[int, float, float]

Tuple of (action, log_prob, value)

Source code in upir/learning/ppo.py
def select_action(self, state: np.ndarray) -> Tuple[int, float, float]:
    """
    Select action using current policy.

    Args:
        state: Current state vector

    Returns:
        Tuple of (action, log_prob, value)
    """
    return self.policy.get_action(state)

compute_gae(rewards, values, dones)

Compute Generalized Advantage Estimation (GAE).

GAE uses an exponentially-weighted average of n-step advantages to reduce variance while maintaining low bias. It interpolates between Monte Carlo (high variance, low bias) and TD (low variance, high bias).

Formula: δ_t = r_t + γV(s_{t+1})(1 - done_t) - V(s_t) A_t = δ_t + (γλ)δ_{t+1} + (γλ)²δ_{t+2} + ...

Parameters:

Name Type Description Default
rewards ndarray

Rewards received (T,)

required
values ndarray

Value estimates V(s_t) (T,)

required
dones ndarray

Episode termination flags (T,)

required

Returns:

Type Description
ndarray

Tuple of (advantages, returns):

ndarray
  • advantages: Advantage estimates A_t (T,)
Tuple[ndarray, ndarray]
  • returns: Discounted returns (T,)

References: - GAE paper: https://arxiv.org/abs/1506.02438 - OpenAI Spinning Up: GAE explanation

Source code in upir/learning/ppo.py
def compute_gae(
    self,
    rewards: np.ndarray,
    values: np.ndarray,
    dones: np.ndarray
) -> Tuple[np.ndarray, np.ndarray]:
    """
    Compute Generalized Advantage Estimation (GAE).

    GAE uses an exponentially-weighted average of n-step advantages to
    reduce variance while maintaining low bias. It interpolates between
    Monte Carlo (high variance, low bias) and TD (low variance, high bias).

    Formula:
    δ_t = r_t + γV(s_{t+1})(1 - done_t) - V(s_t)
    A_t = δ_t + (γλ)δ_{t+1} + (γλ)²δ_{t+2} + ...

    Args:
        rewards: Rewards received (T,)
        values: Value estimates V(s_t) (T,)
        dones: Episode termination flags (T,)

    Returns:
        Tuple of (advantages, returns):
        - advantages: Advantage estimates A_t (T,)
        - returns: Discounted returns (T,)

    References:
    - GAE paper: https://arxiv.org/abs/1506.02438
    - OpenAI Spinning Up: GAE explanation
    """
    T = len(rewards)
    advantages = np.zeros(T, dtype=np.float32)
    returns = np.zeros(T, dtype=np.float32)

    # Compute TD errors (deltas)
    deltas = np.zeros(T, dtype=np.float32)
    for t in range(T):
        # δ_t = r_t + γV(s_{t+1})(1 - done_t) - V(s_t)
        next_value = values[t + 1] if t + 1 < T else 0.0
        deltas[t] = rewards[t] + self.config.gamma * next_value * (1 - dones[t]) - values[t]

    # Compute GAE advantages (backward pass)
    gae = 0
    for t in reversed(range(T)):
        # A_t = δ_t + (γλ)A_{t+1}(1 - done_t)
        gae = deltas[t] + self.config.gamma * self.config.lambda_gae * gae * (1 - dones[t])
        advantages[t] = gae

    # Compute returns: R_t = A_t + V(s_t)
    returns = advantages + values[:T]

    return advantages, returns

update(states, actions, old_log_probs, returns, advantages)

Update policy using PPO clipped objective.

Performs multiple epochs of minibatch updates using the PPO loss: L = L^CLIP - c_1 * L^VF + c_2 * H

where: - L^CLIP: Clipped surrogate objective - L^VF: Value function loss (MSE) - H: Entropy bonus

Parameters:

Name Type Description Default
states ndarray

Batch of states (batch_size, state_dim)

required
actions ndarray

Batch of actions (batch_size,)

required
old_log_probs ndarray

Old log probabilities (batch_size,)

required
returns ndarray

Discounted returns (batch_size,)

required
advantages ndarray

Advantage estimates (batch_size,)

required

Returns:

Type Description
Dict[str, float]

Dictionary with training metrics:

Dict[str, float]
  • policy_loss: Policy loss
Dict[str, float]
  • value_loss: Value function loss
Dict[str, float]
  • entropy: Policy entropy
Dict[str, float]
  • total_loss: Combined loss
Example

ppo = PPO(state_dim=10, action_dim=4)

Collect trajectories...

metrics = ppo.update(states, actions, old_log_probs, returns, advantages)

References: - PPO paper: Section 3 (PPO-Clip algorithm) - Clipped objective prevents large policy updates

Source code in upir/learning/ppo.py
def update(
    self,
    states: np.ndarray,
    actions: np.ndarray,
    old_log_probs: np.ndarray,
    returns: np.ndarray,
    advantages: np.ndarray
) -> Dict[str, float]:
    """
    Update policy using PPO clipped objective.

    Performs multiple epochs of minibatch updates using the PPO loss:
    L = L^CLIP - c_1 * L^VF + c_2 * H

    where:
    - L^CLIP: Clipped surrogate objective
    - L^VF: Value function loss (MSE)
    - H: Entropy bonus

    Args:
        states: Batch of states (batch_size, state_dim)
        actions: Batch of actions (batch_size,)
        old_log_probs: Old log probabilities (batch_size,)
        returns: Discounted returns (batch_size,)
        advantages: Advantage estimates (batch_size,)

    Returns:
        Dictionary with training metrics:
        - policy_loss: Policy loss
        - value_loss: Value function loss
        - entropy: Policy entropy
        - total_loss: Combined loss

    Example:
        >>> ppo = PPO(state_dim=10, action_dim=4)
        >>> # Collect trajectories...
        >>> metrics = ppo.update(states, actions, old_log_probs, returns, advantages)

    References:
    - PPO paper: Section 3 (PPO-Clip algorithm)
    - Clipped objective prevents large policy updates
    """
    # Normalize advantages (reduces variance)
    advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)

    batch_size = states.shape[0]
    total_metrics = {
        "policy_loss": 0.0,
        "value_loss": 0.0,
        "entropy": 0.0,
        "total_loss": 0.0,
    }

    # Multiple epochs of updates
    for epoch in range(self.config.num_epochs):
        # Shuffle data
        indices = np.random.permutation(batch_size)

        # Minibatch updates
        for start in range(0, batch_size, self.config.batch_size):
            end = min(start + self.config.batch_size, batch_size)
            batch_indices = indices[start:end]

            # Get minibatch
            batch_states = states[batch_indices]
            batch_actions = actions[batch_indices]
            batch_old_log_probs = old_log_probs[batch_indices]
            batch_returns = returns[batch_indices]
            batch_advantages = advantages[batch_indices]

            # Evaluate actions under current policy
            new_log_probs, values, entropy = self.policy.evaluate_actions(
                batch_states, batch_actions
            )

            # Compute probability ratio: r(θ) = π_θ(a|s) / π_θ_old(a|s)
            ratio = np.exp(new_log_probs - batch_old_log_probs)

            # Compute clipped objective
            # L^CLIP = E[min(r*A, clip(r, 1-ε, 1+ε)*A)]
            surr1 = ratio * batch_advantages
            surr2 = np.clip(ratio, 1 - self.config.epsilon, 1 + self.config.epsilon) * batch_advantages
            policy_loss = -np.minimum(surr1, surr2).mean()

            # Value function loss: MSE
            value_loss = ((values - batch_returns) ** 2).mean()

            # Total loss
            total_loss = (
                policy_loss
                + self.config.value_coef * value_loss
                - self.config.entropy_coef * entropy
            )

            # Gradient descent (simplified - in practice, use automatic differentiation)
            # This is a placeholder - real implementation would compute gradients
            # and update weights using backpropagation
            self._simple_update(total_loss, batch_states, batch_actions)

            # Track metrics
            total_metrics["policy_loss"] += policy_loss
            total_metrics["value_loss"] += value_loss
            total_metrics["entropy"] += entropy
            total_metrics["total_loss"] += total_loss

    # Average metrics
    num_updates = self.config.num_epochs * max(1, (batch_size // self.config.batch_size))
    for key in total_metrics:
        total_metrics[key] /= num_updates

    logger.debug(
        f"PPO update: policy_loss={total_metrics['policy_loss']:.4f}, "
        f"value_loss={total_metrics['value_loss']:.4f}, "
        f"entropy={total_metrics['entropy']:.4f}"
    )

    return total_metrics

__str__()

String representation.

Source code in upir/learning/ppo.py
def __str__(self) -> str:
    """String representation."""
    return (
        f"PPO(lr={self.config.learning_rate}, "
        f"gamma={self.config.gamma}, "
        f"epsilon={self.config.epsilon})"
    )

__repr__()

Developer-friendly representation.

Source code in upir/learning/ppo.py
def __repr__(self) -> str:
    """Developer-friendly representation."""
    return (
        f"PPO(state_dim={self.policy.state_dim}, "
        f"action_dim={self.policy.action_dim}, "
        f"config={self.config})"
    )

See Also