Reinforcement Learning Integration
This tutorial shows how to integrate SimCraft simulations with reinforcement learning frameworks.
What You’ll Learn
Implementing the
RLInterfacefor your simulationDefining state and action spaces
Creating a Gym-compatible environment
Training an RL agent
Overview
SimCraft provides native RL integration through:
RLInterface: Abstract base class for RL-compatible simulations
RLEnvironment: Gym-compatible wrapper
ActionSpace/StateSpace: Space definitions
ReplayBuffer: Experience storage for off-policy learning
Step 1: Define the Problem
We’ll create an RL environment for a simple inventory control problem:
State: Current inventory level
Action: How much to order (0, 10, 20, or 30 units)
Reward: Revenue minus holding and ordering costs
Step 2: Implement RLInterface
import simcraft
from simcraft.optimization import RLInterface, ActionSpace, StateSpace
import numpy as np
class InventoryRL(simcraft.Simulation, RLInterface):
"""Inventory control simulation with RL interface."""
def __init__(self):
simcraft.Simulation.__init__(self)
RLInterface.__init__(self)
# Parameters
self.max_inventory = 100
self.holding_cost = 0.5
self.ordering_cost = 2.0
self.stockout_cost = 10.0
self.unit_revenue = 5.0
# State
self.inventory = 50
self.total_reward = 0.0
# Statistics
self.daily_demand = simcraft.Tally()
def on_init(self):
"""Start the simulation."""
self.schedule(self.daily_cycle, delay=0.0)
def daily_cycle(self):
"""Process one day: demand arrives, costs accrue."""
# Generate demand
demand = int(self.rng.poisson(lam=15))
self.daily_demand.observe(demand)
# Fulfill demand
sold = min(demand, self.inventory)
self.inventory -= sold
stockout = demand - sold
# Calculate daily reward
revenue = sold * self.unit_revenue
holding = self.inventory * self.holding_cost
stockout_penalty = stockout * self.stockout_cost
self.total_reward += revenue - holding - stockout_penalty
# Schedule next day (this triggers decision point)
if self.now < self.end_time:
self.schedule(self.daily_cycle, delay=1.0)
# ========== RLInterface Implementation ==========
def get_state_space(self) -> StateSpace:
"""Define the state space."""
return StateSpace(
low=np.array([0]),
high=np.array([self.max_inventory]),
dtype=np.float32
)
def get_action_space(self) -> ActionSpace:
"""Define the action space (discrete: order 0, 10, 20, or 30)."""
return ActionSpace(
space_type="discrete",
n=4 # Actions: 0, 1, 2, 3 -> order 0, 10, 20, 30
)
def get_state(self) -> np.ndarray:
"""Return current state observation."""
return np.array([self.inventory], dtype=np.float32)
def apply_action(self, action: int) -> None:
"""Apply the ordering decision."""
order_quantity = action * 10 # Convert action to order amount
order_cost = order_quantity * self.ordering_cost
self.total_reward -= order_cost
# Receive order (instant delivery for simplicity)
self.inventory = min(self.inventory + order_quantity, self.max_inventory)
def get_reward(self) -> float:
"""Return reward since last action."""
reward = self.total_reward
self.total_reward = 0.0 # Reset for next period
return reward
def is_done(self) -> bool:
"""Check if episode is complete."""
return self.now >= self.end_time
def reset_for_episode(self) -> np.ndarray:
"""Reset simulation for new episode."""
self.reset()
self.inventory = 50
self.total_reward = 0.0
self.end_time = 30 # 30-day episodes
return self.get_state()
Step 3: Create the Gym Environment
Wrap the simulation in a Gym-compatible environment:
from simcraft.optimization import RLEnvironment
def create_inventory_env():
"""Create a Gym-compatible environment."""
sim = InventoryRL()
sim.end_time = 30
env = RLEnvironment(
simulation=sim,
max_steps=30
)
return env
Step 4: Training Loop
Here’s a simple training loop (using a basic Q-learning approach):
import numpy as np
def train_agent(env, num_episodes=1000, learning_rate=0.1, discount=0.99):
"""Train a simple Q-learning agent."""
# Discretize state space into bins
num_state_bins = 11 # 0, 10, 20, ..., 100
num_actions = 4
# Initialize Q-table
q_table = np.zeros((num_state_bins, num_actions))
# Training parameters
epsilon = 1.0
epsilon_decay = 0.995
epsilon_min = 0.01
rewards_history = []
for episode in range(num_episodes):
state = env.reset()
state_bin = int(state[0] / 10) # Discretize
total_reward = 0
done = False
while not done:
# Epsilon-greedy action selection
if np.random.random() < epsilon:
action = np.random.randint(num_actions)
else:
action = np.argmax(q_table[state_bin])
# Take action
next_state, reward, done, info = env.step(action)
next_state_bin = int(min(next_state[0], 100) / 10)
# Q-learning update
best_next = np.max(q_table[next_state_bin])
q_table[state_bin, action] += learning_rate * (
reward + discount * best_next - q_table[state_bin, action]
)
state_bin = next_state_bin
total_reward += reward
rewards_history.append(total_reward)
epsilon = max(epsilon_min, epsilon * epsilon_decay)
if (episode + 1) % 100 == 0:
avg_reward = np.mean(rewards_history[-100:])
print(f"Episode {episode + 1}, Avg Reward: {avg_reward:.2f}")
return q_table, rewards_history
if __name__ == "__main__":
env = create_inventory_env()
q_table, rewards = train_agent(env)
# Print learned policy
print("\nLearned Policy (state -> order quantity):")
for state in range(11):
action = np.argmax(q_table[state])
print(f" Inventory {state * 10}: Order {action * 10}")
Using with Stable-Baselines3
For more sophisticated RL algorithms, use Stable-Baselines3:
from stable_baselines3 import PPO
from stable_baselines3.common.env_checker import check_env
def train_with_sb3():
"""Train using Stable-Baselines3 PPO."""
env = create_inventory_env()
# Verify environment compatibility
check_env(env)
# Create and train model
model = PPO(
"MlpPolicy",
env,
learning_rate=3e-4,
n_steps=2048,
batch_size=64,
n_epochs=10,
verbose=1
)
model.learn(total_timesteps=100_000)
# Evaluate
obs = env.reset()
total_reward = 0
done = False
while not done:
action, _ = model.predict(obs, deterministic=True)
obs, reward, done, info = env.step(action)
total_reward += reward
print(f"Evaluation reward: {total_reward:.2f}")
return model
Decision Points
For more complex simulations, use DecisionPoint to define where actions are needed:
from simcraft.optimization import DecisionPoint
class ComplexSimulation(simcraft.Simulation, RLInterface):
def __init__(self):
super().__init__()
self.decision_points = []
def on_machine_idle(self, machine):
"""Called when a machine becomes idle."""
# Create decision point: which job to process next?
dp = DecisionPoint(
state=self.get_state(),
available_actions=self.get_available_jobs(machine),
context={"machine": machine}
)
self.decision_points.append(dp)
# Wait for action from RL agent
Multi-Agent RL
For multi-agent scenarios, use MultiAgentInterface:
from simcraft.optimization import MultiAgentInterface
class MultiAgentSim(simcraft.Simulation, MultiAgentInterface):
def get_agent_ids(self):
return ["agent_1", "agent_2", "agent_3"]
def get_state(self, agent_id):
# Return agent-specific observation
pass
def apply_action(self, agent_id, action):
# Apply agent-specific action
pass
Next Steps
See Port Terminal Example for a complete RL-ready simulation
Explore the Optimization API for more details
Check out Stable-Baselines3 documentation