"""
Main simulation engine.
The Simulation class is the core of the SimCraft framework,
providing event scheduling, execution, and hierarchical composition.
"""
from __future__ import annotations
import time as wall_time
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, Union
from enum import Enum, auto
import logging
from simcraft.core.clock import Clock, TimeUnit
from simcraft.core.event import Event, EventList
from simcraft.core.entity import Entity
[docs]
class SimulationState(Enum):
"""Simulation lifecycle states."""
CREATED = auto()
INITIALIZED = auto()
RUNNING = auto()
PAUSED = auto()
COMPLETED = auto()
TERMINATED = auto()
[docs]
@dataclass
class SimulationConfig:
"""
Configuration for simulation execution.
Attributes
----------
time_unit : TimeUnit
Base time unit for the simulation
seed : Optional[int]
Random seed for reproducibility
warmup_duration : float
Duration of warmup period
max_events : int
Maximum number of events to process (0 = unlimited)
real_time_factor : float
Factor for real-time execution (0 = as fast as possible)
log_level : int
Logging verbosity level
collect_trace : bool
Whether to collect event execution trace
"""
time_unit: TimeUnit = TimeUnit.HOURS
seed: Optional[int] = None
warmup_duration: float = 0.0
max_events: int = 0
real_time_factor: float = 0.0
log_level: int = logging.WARNING
collect_trace: bool = False
[docs]
class Simulation:
"""
Core discrete event simulation engine.
The Simulation class provides the fundamental infrastructure for
discrete event simulation including event scheduling, time advancement,
hierarchical model composition, and execution control.
Features
--------
- Efficient event scheduling with priority support
- Hierarchical model composition (parent-child sandboxes)
- Multiple execution modes (run until, run for, step)
- Warmup period handling
- Real-time execution support
- Event tracing and debugging
Examples
--------
Basic simulation:
>>> class BankSimulation(Simulation):
... def __init__(self):
... super().__init__()
... self.customers_served = 0
...
... def on_init(self):
... self.schedule(self.customer_arrival, delay=0)
...
... def customer_arrival(self):
... self.customers_served += 1
... self.schedule(self.customer_arrival,
... delay=self.rng.exponential(5.0))
...
>>> sim = BankSimulation()
>>> sim.run(until=100)
>>> print(f"Served {sim.customers_served} customers")
Hierarchical composition:
>>> class Server(Simulation):
... def __init__(self, parent: Simulation):
... super().__init__(parent=parent)
...
>>> class System(Simulation):
... def __init__(self):
... super().__init__()
... self.server1 = Server(parent=self)
... self.server2 = Server(parent=self)
"""
[docs]
def __init__(
self,
parent: Optional[Simulation] = None,
config: Optional[SimulationConfig] = None,
name: str = "",
) -> None:
"""
Initialize simulation.
Parameters
----------
parent : Optional[Simulation]
Parent simulation for hierarchical composition
config : Optional[SimulationConfig]
Simulation configuration
name : str
Optional name for the simulation
"""
self._parent = parent
self._children: List[Simulation] = []
self._config = config or SimulationConfig()
self._name = name or type(self).__name__
# State management
self._state = SimulationState.CREATED
self._is_root = parent is None
# Event management
self._events = EventList()
self._event_count = 0
self._processed_count = 0
# Clock - root owns, children reference parent's
if self._is_root:
self._clock = Clock(time_unit=self._config.time_unit)
else:
parent._children.append(self)
# Random number generation
self._init_rng()
# Statistics and monitoring
self._monitors: Dict[str, Any] = {}
self._trace: List[Tuple[float, str, str]] = []
# Logging
self._logger = logging.getLogger(f"simcraft.{self._name}")
self._logger.setLevel(self._config.log_level)
def _init_rng(self) -> None:
"""Initialize random number generator."""
from simcraft.random.distributions import RandomGenerator
self._rng = RandomGenerator(seed=self._config.seed)
@property
def clock(self) -> Clock:
"""Get the simulation clock."""
if self._is_root:
return self._clock
return self._parent.clock
@property
def now(self) -> float:
"""Get current simulation time."""
return self.clock.now
@property
def rng(self) -> "RandomGenerator":
"""Get random number generator."""
return self._rng
@property
def name(self) -> str:
"""Get simulation name."""
return self._name
@property
def state(self) -> SimulationState:
"""Get current simulation state."""
return self._state
@property
def is_running(self) -> bool:
"""Check if simulation is currently running."""
return self._state == SimulationState.RUNNING
@property
def is_warmed_up(self) -> bool:
"""Check if warmup period has completed."""
return self.clock.is_warmed_up
@property
def parent(self) -> Optional["Simulation"]:
"""Get parent simulation."""
return self._parent
@property
def root(self) -> "Simulation":
"""Get root simulation."""
if self._is_root:
return self
return self._parent.root
@property
def events_pending(self) -> int:
"""Get number of pending events."""
return len(self._events)
@property
def events_processed(self) -> int:
"""Get number of processed events."""
return self._processed_count
[docs]
def schedule(
self,
action: Callable[..., Any],
delay: float = 0.0,
at: Optional[float] = None,
args: Tuple[Any, ...] = (),
kwargs: Optional[Dict[str, Any]] = None,
tag: str = "",
priority: int = 0,
) -> Event:
"""
Schedule an event for future execution.
Parameters
----------
action : Callable
Function to execute
delay : float
Time delay from current time (ignored if 'at' is specified)
at : Optional[float]
Absolute simulation time for execution
args : tuple
Positional arguments for the action
kwargs : dict
Keyword arguments for the action
tag : str
Optional tag for event identification
priority : int
Priority level (higher = executed first at same time)
Returns
-------
Event
The scheduled event (can be used for cancellation)
Examples
--------
>>> sim.schedule(process_arrival, delay=5.0)
>>> sim.schedule(process_arrival, at=sim.now + 5.0)
>>> sim.schedule(process_arrival, delay=0, priority=10) # High priority
"""
if at is not None:
scheduled_time = at
else:
scheduled_time = self.now + delay
if scheduled_time < self.now:
self._logger.warning(
f"Scheduling event in the past: {scheduled_time} < {self.now}"
)
self._event_count += 1
event = Event(
scheduled_time=scheduled_time,
action=action,
args=args,
kwargs=kwargs or {},
owner=self,
index=self._event_count,
tag=tag,
priority=priority,
)
# Add to root's event list
self.root._events.add(event)
self._logger.debug(f"Scheduled: {event}")
return event
[docs]
def cancel_event(self, event: Event) -> bool:
"""
Cancel a scheduled event.
Parameters
----------
event : Event
The event to cancel
Returns
-------
bool
True if event was successfully cancelled
"""
event.cancel()
self._logger.debug(f"Cancelled: {event}")
return True
[docs]
def cancel_events_by_tag(self, tag: str) -> int:
"""
Cancel all events with a specific tag.
Parameters
----------
tag : str
Tag to match
Returns
-------
int
Number of events cancelled
"""
count = 0
for event in self.root._events:
if event.tag == tag and not event.cancelled:
event.cancel()
count += 1
return count
[docs]
def run(
self,
until: Optional[float] = None,
for_duration: Optional[float] = None,
events: Optional[int] = None,
) -> "Simulation":
"""
Run the simulation.
Parameters
----------
until : Optional[float]
Run until this simulation time
for_duration : Optional[float]
Run for this duration from current time
events : Optional[int]
Run for this many events
Returns
-------
Simulation
Self for method chaining
Examples
--------
>>> sim.run(until=100) # Run until time 100
>>> sim.run(for_duration=50) # Run for 50 time units
>>> sim.run(events=1000) # Run 1000 events
>>> sim.run() # Run until no more events
"""
if not self._is_root:
raise RuntimeError("Only root simulation can be run directly")
self._initialize()
stop_time = None
if until is not None:
stop_time = until
elif for_duration is not None:
stop_time = self.now + for_duration
max_events = events or self._config.max_events or 0
self._state = SimulationState.RUNNING
self._logger.info(f"Simulation started at time {self.now}")
try:
self._execute_loop(stop_time=stop_time, max_events=max_events)
except KeyboardInterrupt:
self._logger.info("Simulation interrupted by user")
self._state = SimulationState.PAUSED
except Exception as e:
self._logger.error(f"Simulation error: {e}")
self._state = SimulationState.TERMINATED
raise
if self._state == SimulationState.RUNNING:
self._state = SimulationState.COMPLETED
self._finalize()
self._logger.info(
f"Simulation ended at time {self.now} "
f"({self._processed_count} events processed)"
)
return self
def _initialize(self) -> None:
"""Initialize simulation before running."""
if self._state == SimulationState.CREATED:
# Set warmup
if self._config.warmup_duration > 0:
self.clock.set_warmup(self._config.warmup_duration)
# Call initialization hooks
self.on_init()
for child in self._children:
child._initialize()
self._state = SimulationState.INITIALIZED
def _finalize(self) -> None:
"""Finalize simulation after running."""
self.on_end()
for child in self._children:
child._finalize()
def _execute_loop(
self,
stop_time: Optional[float],
max_events: int,
) -> None:
"""
Main event processing loop.
Parameters
----------
stop_time : Optional[float]
Time at which to stop
max_events : int
Maximum events to process (0 = unlimited)
"""
event_count = 0
real_time_start = wall_time.time()
while self._events:
event = self._events.peek_next()
if event is None:
break
# Check stop conditions
if stop_time is not None and event.scheduled_time > stop_time:
self.clock.advance_to(stop_time)
break
if max_events > 0 and event_count >= max_events:
break
# Pop and process event
event = self._events.pop_next()
# Handle real-time execution
if self._config.real_time_factor > 0:
self._sync_real_time(
event.scheduled_time, real_time_start, self._config.real_time_factor
)
# Advance clock
self.clock.advance_to(event.scheduled_time)
# Execute event
if not event.cancelled:
self._execute_event(event)
event_count += 1
self._processed_count += 1
def _execute_event(self, event: Event) -> None:
"""
Execute a single event.
Parameters
----------
event : Event
The event to execute
"""
if self._config.collect_trace:
action_name = getattr(event.action, "__name__", str(event.action))
self._trace.append((self.now, action_name, event.tag))
self._logger.debug(f"Executing at {self.now}: {event}")
try:
event.invoke()
except Exception as e:
self._logger.error(f"Error executing event {event}: {e}")
raise
def _sync_real_time(
self, sim_time: float, real_start: float, factor: float
) -> None:
"""Synchronize with wall clock for real-time execution."""
expected_real_time = real_start + (sim_time - self.now) / factor
current_real_time = wall_time.time()
if current_real_time < expected_real_time:
wall_time.sleep(expected_real_time - current_real_time)
[docs]
def step(self) -> bool:
"""
Execute a single event.
Returns
-------
bool
True if an event was processed, False if no events remain
"""
if not self._is_root:
return self.root.step()
if self._state == SimulationState.CREATED:
self._initialize()
event = self._events.pop_next()
if event is None:
return False
self.clock.advance_to(event.scheduled_time)
if not event.cancelled:
self._execute_event(event)
self._processed_count += 1
return True
[docs]
def warmup(self, duration: Optional[float] = None) -> "Simulation":
"""
Run warmup period.
Parameters
----------
duration : Optional[float]
Warmup duration (uses config value if not specified)
Returns
-------
Simulation
Self for method chaining
"""
warmup_time = duration or self._config.warmup_duration
if warmup_time > 0:
self.clock.set_warmup(warmup_time)
self.run(for_duration=warmup_time)
return self
[docs]
def reset(self) -> "Simulation":
"""
Reset simulation to initial state.
Returns
-------
Simulation
Self for method chaining
"""
self._events.clear()
self.clock.reset()
self._processed_count = 0
self._event_count = 0
self._state = SimulationState.CREATED
self._trace.clear()
# Reset children
for child in self._children:
child.reset()
# Reinitialize RNG
self._init_rng()
self._logger.info("Simulation reset")
return self
[docs]
def on_init(self) -> None:
"""
Hook called during initialization.
Override this method to set up initial events and state.
"""
pass
[docs]
def on_end(self) -> None:
"""
Hook called when simulation ends.
Override this method for cleanup and result collection.
"""
pass
[docs]
def on_warmup_end(self) -> None:
"""
Hook called when warmup period ends.
Override this method to reset statistics after warmup.
"""
pass
[docs]
def add_monitor(self, name: str, monitor: Any) -> None:
"""
Register a monitor/statistics collector.
Parameters
----------
name : str
Monitor name for retrieval
monitor : Any
Monitor instance
"""
self._monitors[name] = monitor
[docs]
def get_monitor(self, name: str) -> Any:
"""
Get a registered monitor.
Parameters
----------
name : str
Monitor name
Returns
-------
Any
Monitor instance or None
"""
return self._monitors.get(name)
[docs]
def get_trace(self) -> List[Tuple[float, str, str]]:
"""
Get event execution trace.
Returns
-------
List[Tuple[float, str, str]]
List of (time, action_name, tag) tuples
"""
return self._trace.copy()
def __repr__(self) -> str:
"""Return detailed representation."""
return (
f"{type(self).__name__}("
f"name={self._name!r}, "
f"time={self.now:.4f}, "
f"state={self._state.name}, "
f"events_pending={self.events_pending}, "
f"events_processed={self._processed_count}"
f")"
)
def __str__(self) -> str:
"""Return human-readable representation."""
return f"{self._name} @ t={self.now:.4f}"