Source code for simcraft.optimization.rl_interface

"""
Reinforcement learning interface for simulation.

Provides abstractions for integrating simulation models with
RL agents, supporting both Gym-style and custom interfaces.
"""

from __future__ import annotations
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Generic,
    List,
    Optional,
    Tuple,
    TypeVar,
    Union,
)
from enum import Enum, auto
import numpy as np

if TYPE_CHECKING:
    from simcraft.core.simulation import Simulation

# Type aliases
State = Union[np.ndarray, Dict[str, Any]]
Action = Union[int, np.ndarray, Dict[str, Any]]
Reward = float


[docs] @dataclass class Transition: """ A single RL transition. Attributes ---------- state : State State before action action : Action Action taken reward : Reward Reward received next_state : State State after action done : bool Whether episode ended info : Dict Additional information """ state: State action: Action reward: Reward next_state: State done: bool = False info: Dict[str, Any] = field(default_factory=dict)
[docs] @dataclass class ActionSpace: """ Definition of action space. Attributes ---------- type : str 'discrete', 'continuous', or 'multi_discrete' n : Optional[int] Number of discrete actions shape : Optional[Tuple[int, ...]] Shape for continuous actions low : Optional[np.ndarray] Lower bounds for continuous high : Optional[np.ndarray] Upper bounds for continuous nvec : Optional[List[int]] Action counts for multi-discrete """ type: str n: Optional[int] = None shape: Optional[Tuple[int, ...]] = None low: Optional[np.ndarray] = None high: Optional[np.ndarray] = None nvec: Optional[List[int]] = None
[docs] @classmethod def discrete(cls, n: int) -> "ActionSpace": """Create discrete action space.""" return cls(type="discrete", n=n)
[docs] @classmethod def continuous( cls, shape: Tuple[int, ...], low: Union[float, np.ndarray] = -1.0, high: Union[float, np.ndarray] = 1.0, ) -> "ActionSpace": """Create continuous action space.""" if isinstance(low, (int, float)): low = np.full(shape, low) if isinstance(high, (int, float)): high = np.full(shape, high) return cls(type="continuous", shape=shape, low=low, high=high)
[docs] @classmethod def multi_discrete(cls, nvec: List[int]) -> "ActionSpace": """Create multi-discrete action space.""" return cls(type="multi_discrete", nvec=nvec)
[docs] @dataclass class StateSpace: """ Definition of state space. Attributes ---------- shape : Tuple[int, ...] State shape low : Optional[np.ndarray] Lower bounds (for bounded spaces) high : Optional[np.ndarray] Upper bounds (for bounded spaces) dtype : type Data type """ shape: Tuple[int, ...] low: Optional[np.ndarray] = None high: Optional[np.ndarray] = None dtype: type = np.float32
[docs] @classmethod def box( cls, shape: Tuple[int, ...], low: Union[float, np.ndarray] = -np.inf, high: Union[float, np.ndarray] = np.inf, ) -> "StateSpace": """Create box (continuous) state space.""" if isinstance(low, (int, float)): low = np.full(shape, low) if isinstance(high, (int, float)): high = np.full(shape, high) return cls(shape=shape, low=low, high=high)
[docs] class RLInterface(ABC): """ Abstract interface for RL-simulation integration. Subclass this to make a simulation model compatible with reinforcement learning agents. Examples -------- >>> class PortRLInterface(RLInterface): ... def __init__(self, sim): ... self.sim = sim ... ... def get_state(self): ... return np.array([ ... self.sim.queue_length, ... self.sim.utilization, ... ]) ... ... def get_action_space(self): ... return ActionSpace.discrete(4) # 4 berths ... ... def apply_action(self, action): ... self.sim.allocate_berth(action) ... ... def get_reward(self): ... return -self.sim.waiting_time """
[docs] @abstractmethod def get_state(self) -> State: """ Get current state observation. Returns ------- State Current state """ pass
[docs] @abstractmethod def get_action_space(self) -> ActionSpace: """ Get action space definition. Returns ------- ActionSpace Action space """ pass
[docs] @abstractmethod def get_state_space(self) -> StateSpace: """ Get state space definition. Returns ------- StateSpace State space """ pass
[docs] @abstractmethod def apply_action(self, action: Action) -> None: """ Apply an action to the simulation. Parameters ---------- action : Action Action to apply """ pass
[docs] @abstractmethod def get_reward(self) -> Reward: """ Get reward for current state/action. Returns ------- Reward Reward value """ pass
[docs] def is_done(self) -> bool: """ Check if episode is done. Returns ------- bool True if episode ended """ return False
[docs] def get_info(self) -> Dict[str, Any]: """ Get additional information. Returns ------- Dict[str, Any] Info dictionary """ return {}
[docs] def reset(self) -> State: """ Reset environment and return initial state. Returns ------- State Initial state """ return self.get_state()
[docs] class RLEnvironment: """ Gym-compatible wrapper for simulation-based RL. Wraps an RLInterface to provide a standard RL environment API. Parameters ---------- interface : RLInterface RL interface implementation simulation : Simulation Simulation instance max_steps : int Maximum steps per episode Examples -------- >>> env = RLEnvironment(interface, sim, max_steps=1000) >>> state = env.reset() >>> for _ in range(100): ... action = agent.select_action(state) ... state, reward, done, info = env.step(action) ... if done: ... break """
[docs] def __init__( self, interface: RLInterface, simulation: "Simulation", max_steps: int = 10000, ) -> None: """Initialize environment.""" self._interface = interface self._simulation = simulation self._max_steps = max_steps self._current_step = 0 self._episode = 0 self._total_reward = 0.0 # History for experience replay self._history: List[Transition] = []
@property def action_space(self) -> ActionSpace: """Get action space.""" return self._interface.get_action_space() @property def observation_space(self) -> StateSpace: """Get observation (state) space.""" return self._interface.get_state_space() @property def current_step(self) -> int: """Get current step in episode.""" return self._current_step @property def episode(self) -> int: """Get current episode number.""" return self._episode
[docs] def reset(self) -> State: """ Reset environment for new episode. Returns ------- State Initial state """ self._simulation.reset() self._current_step = 0 self._total_reward = 0.0 self._episode += 1 return self._interface.reset()
[docs] def step(self, action: Action) -> Tuple[State, Reward, bool, Dict[str, Any]]: """ Take a step in the environment. Parameters ---------- action : Action Action to take Returns ------- Tuple[State, Reward, bool, Dict] (next_state, reward, done, info) """ state = self._interface.get_state() # Apply action self._interface.apply_action(action) # Get results next_state = self._interface.get_state() reward = self._interface.get_reward() done = self._interface.is_done() info = self._interface.get_info() # Update counters self._current_step += 1 self._total_reward += reward # Check max steps if self._current_step >= self._max_steps: done = True info["truncated"] = True # Record transition transition = Transition( state=state, action=action, reward=reward, next_state=next_state, done=done, info=info, ) self._history.append(transition) return next_state, reward, done, info
[docs] def get_history(self) -> List[Transition]: """Get transition history.""" return self._history.copy()
[docs] def clear_history(self) -> None: """Clear transition history.""" self._history.clear()
[docs] class DecisionPoint: """ Represents a decision point in the simulation. Used for event-driven RL where actions are taken at specific simulation events. Parameters ---------- name : str Decision point name state_fn : Callable Function returning current state action_space : ActionSpace Available actions apply_fn : Callable Function to apply action reward_fn : Callable Function returning reward """
[docs] def __init__( self, name: str, state_fn: Callable[[], State], action_space: ActionSpace, apply_fn: Callable[[Action], None], reward_fn: Callable[[], Reward], ) -> None: """Initialize decision point.""" self.name = name self._state_fn = state_fn self._action_space = action_space self._apply_fn = apply_fn self._reward_fn = reward_fn
@property def action_space(self) -> ActionSpace: """Get action space.""" return self._action_space
[docs] def get_state(self) -> State: """Get current state.""" return self._state_fn()
[docs] def apply_action(self, action: Action) -> None: """Apply action.""" self._apply_fn(action)
[docs] def get_reward(self) -> Reward: """Get reward.""" return self._reward_fn()
[docs] class MultiAgentInterface: """ Interface for multi-agent reinforcement learning. Supports multiple agents with potentially different action spaces and rewards. Parameters ---------- n_agents : int Number of agents Examples -------- >>> interface = MultiAgentInterface(n_agents=3) >>> interface.add_agent("berth_allocator", berth_space, berth_reward) >>> interface.add_agent("agv_dispatcher", agv_space, agv_reward) """
[docs] def __init__(self, n_agents: int = 1) -> None: """Initialize multi-agent interface.""" self._n_agents = n_agents self._agents: Dict[str, DecisionPoint] = {} self._shared_state_fn: Optional[Callable[[], State]] = None
@property def n_agents(self) -> int: """Get number of agents.""" return self._n_agents @property def agent_names(self) -> List[str]: """Get agent names.""" return list(self._agents.keys())
[docs] def add_agent( self, name: str, action_space: ActionSpace, reward_fn: Callable[[], Reward], state_fn: Optional[Callable[[], State]] = None, apply_fn: Optional[Callable[[Action], None]] = None, ) -> None: """ Add an agent. Parameters ---------- name : str Agent name action_space : ActionSpace Agent's action space reward_fn : Callable Agent's reward function state_fn : Optional[Callable] Agent's state function (uses shared if None) apply_fn : Optional[Callable] Action application function """ self._agents[name] = DecisionPoint( name=name, state_fn=state_fn or self._shared_state_fn or (lambda: np.array([])), action_space=action_space, apply_fn=apply_fn or (lambda a: None), reward_fn=reward_fn, )
[docs] def set_shared_state(self, state_fn: Callable[[], State]) -> None: """Set shared state function for all agents.""" self._shared_state_fn = state_fn
[docs] def get_agent(self, name: str) -> Optional[DecisionPoint]: """Get agent by name.""" return self._agents.get(name)
[docs] def get_states(self) -> Dict[str, State]: """Get states for all agents.""" return {name: agent.get_state() for name, agent in self._agents.items()}
[docs] def apply_actions(self, actions: Dict[str, Action]) -> None: """Apply actions for all agents.""" for name, action in actions.items(): if name in self._agents: self._agents[name].apply_action(action)
[docs] def get_rewards(self) -> Dict[str, Reward]: """Get rewards for all agents.""" return {name: agent.get_reward() for name, agent in self._agents.items()}
[docs] class ReplayBuffer: """ Experience replay buffer for RL training. Stores transitions and supports random sampling for off-policy algorithms. Parameters ---------- capacity : int Maximum buffer size """
[docs] def __init__(self, capacity: int = 10000) -> None: """Initialize buffer.""" self._capacity = capacity self._buffer: List[Transition] = [] self._position = 0
@property def size(self) -> int: """Get current buffer size.""" return len(self._buffer)
[docs] def push(self, transition: Transition) -> None: """ Add transition to buffer. Parameters ---------- transition : Transition Transition to add """ if len(self._buffer) < self._capacity: self._buffer.append(transition) else: self._buffer[self._position] = transition self._position = (self._position + 1) % self._capacity
[docs] def sample(self, batch_size: int) -> List[Transition]: """ Sample random batch of transitions. Parameters ---------- batch_size : int Number of transitions to sample Returns ------- List[Transition] Sampled transitions """ import random return random.sample(self._buffer, min(batch_size, len(self._buffer)))
[docs] def sample_batch( self, batch_size: int ) -> Tuple[np.ndarray, np.ndarray, np.ndarray, np.ndarray, np.ndarray]: """ Sample batch as numpy arrays. Parameters ---------- batch_size : int Batch size Returns ------- Tuple (states, actions, rewards, next_states, dones) """ transitions = self.sample(batch_size) states = np.array([t.state for t in transitions]) actions = np.array([t.action for t in transitions]) rewards = np.array([t.reward for t in transitions]) next_states = np.array([t.next_state for t in transitions]) dones = np.array([t.done for t in transitions]) return states, actions, rewards, next_states, dones
[docs] def clear(self) -> None: """Clear buffer.""" self._buffer.clear() self._position = 0