"""
Random stream management for simulation.
Provides independent random streams for different purposes,
enabling better experimental design and variance reduction.
"""
from __future__ import annotations
from typing import Dict, List, Optional
from simcraft.random.distributions import RandomGenerator
[docs]
class RandomStream(RandomGenerator):
"""
Named random stream with checkpointing.
Extends RandomGenerator with stream identification and
state management for reproducibility.
Parameters
----------
name : str
Stream name
seed : Optional[int]
Initial seed
Examples
--------
>>> arrival_stream = RandomStream("arrivals", seed=100)
>>> service_stream = RandomStream("service", seed=200)
>>>
>>> # Save and restore state
>>> state = arrival_stream.checkpoint()
>>> # ... run simulation ...
>>> arrival_stream.restore(state)
"""
[docs]
def __init__(
self,
name: str = "",
seed: Optional[int] = None,
) -> None:
"""Initialize stream."""
super().__init__(seed=seed)
self._name = name or f"Stream_{id(self)}"
@property
def name(self) -> str:
"""Get stream name."""
return self._name
[docs]
def checkpoint(self) -> dict:
"""
Create checkpoint of current state.
Returns
-------
dict
Checkpoint data
"""
return {
"name": self._name,
"seed": self._seed,
"state": self.get_state(),
}
[docs]
def restore(self, checkpoint: dict) -> None:
"""
Restore from checkpoint.
Parameters
----------
checkpoint : dict
Checkpoint data
"""
self.set_state(checkpoint["state"])
[docs]
def fork(self, name: Optional[str] = None) -> "RandomStream":
"""
Create a new stream with derived seed.
Parameters
----------
name : Optional[str]
Name for new stream
Returns
-------
RandomStream
New stream
"""
new_seed = self.randint(0, 2**31 - 1)
return RandomStream(
name=name or f"{self._name}_fork",
seed=new_seed,
)
def __repr__(self) -> str:
"""Return detailed representation."""
return f"RandomStream(name={self._name!r}, seed={self._seed})"
[docs]
class StreamManager:
"""
Manages multiple random streams for a simulation.
Provides centralized stream management, seeding strategies,
and synchronization for variance reduction techniques.
Parameters
----------
base_seed : Optional[int]
Base seed for stream generation
Examples
--------
>>> manager = StreamManager(base_seed=42)
>>>
>>> # Get or create streams
>>> arrivals = manager.get_stream("arrivals")
>>> service = manager.get_stream("service")
>>>
>>> # Antithetic variates
>>> manager.enable_antithetic()
>>> value1 = arrivals.uniform() # U
>>> value2 = arrivals.uniform() # 1-U (antithetic)
"""
[docs]
def __init__(self, base_seed: Optional[int] = None) -> None:
"""Initialize stream manager."""
self._base_seed = base_seed
self._streams: Dict[str, RandomStream] = {}
self._stream_seeds: Dict[str, int] = {}
self._seed_generator = RandomGenerator(seed=base_seed)
self._antithetic_enabled = False
@property
def stream_names(self) -> List[str]:
"""Get names of all streams."""
return list(self._streams.keys())
[docs]
def get_stream(self, name: str) -> RandomStream:
"""
Get or create a named stream.
Parameters
----------
name : str
Stream name
Returns
-------
RandomStream
The stream
"""
if name not in self._streams:
seed = self._generate_seed(name)
self._streams[name] = RandomStream(name=name, seed=seed)
self._stream_seeds[name] = seed
return self._streams[name]
def _generate_seed(self, name: str) -> int:
"""Generate a seed for a stream."""
# Use hash of name combined with base seed for reproducibility
if self._base_seed is not None:
combined = hash((name, self._base_seed))
return abs(combined) % (2**31)
return self._seed_generator.randint(0, 2**31 - 1)
[docs]
def create_stream(
self,
name: str,
seed: Optional[int] = None,
) -> RandomStream:
"""
Create a new stream with explicit seed.
Parameters
----------
name : str
Stream name
seed : Optional[int]
Stream seed
Returns
-------
RandomStream
The new stream
"""
if seed is None:
seed = self._generate_seed(name)
stream = RandomStream(name=name, seed=seed)
self._streams[name] = stream
self._stream_seeds[name] = seed
return stream
[docs]
def reset_all(self) -> None:
"""Reset all streams to initial state."""
for name, seed in self._stream_seeds.items():
self._streams[name].set_seed(seed)
[docs]
def checkpoint(self) -> dict:
"""
Checkpoint all streams.
Returns
-------
dict
Checkpoint data
"""
return {
name: stream.checkpoint()
for name, stream in self._streams.items()
}
[docs]
def restore(self, checkpoint: dict) -> None:
"""
Restore all streams from checkpoint.
Parameters
----------
checkpoint : dict
Checkpoint data
"""
for name, data in checkpoint.items():
if name in self._streams:
self._streams[name].restore(data)
[docs]
def enable_antithetic(self) -> None:
"""Enable antithetic variates (for variance reduction)."""
self._antithetic_enabled = True
[docs]
def disable_antithetic(self) -> None:
"""Disable antithetic variates."""
self._antithetic_enabled = False
[docs]
def set_base_seed(self, seed: int) -> None:
"""
Set new base seed and regenerate all streams.
Parameters
----------
seed : int
New base seed
"""
self._base_seed = seed
self._seed_generator.set_seed(seed)
# Regenerate all streams
for name in self._streams.keys():
new_seed = self._generate_seed(name)
self._streams[name].set_seed(new_seed)
self._stream_seeds[name] = new_seed
def __repr__(self) -> str:
"""Return detailed representation."""
return (
f"StreamManager(base_seed={self._base_seed}, "
f"streams={list(self._streams.keys())})"
)
[docs]
class CommonRandomNumbers:
"""
Helper for common random numbers (CRN) technique.
Synchronizes random streams across multiple simulation runs
for variance reduction in comparing alternatives.
Examples
--------
>>> crn = CommonRandomNumbers(base_seed=42)
>>>
>>> # Run alternative 1
>>> crn.reset()
>>> results1 = run_simulation(crn.get_stream("arrivals"))
>>>
>>> # Run alternative 2 with same random numbers
>>> crn.reset()
>>> results2 = run_simulation(crn.get_stream("arrivals"))
"""
[docs]
def __init__(self, base_seed: int = 42) -> None:
"""Initialize CRN helper."""
self._base_seed = base_seed
self._manager = StreamManager(base_seed=base_seed)
self._checkpoints: Dict[str, dict] = {}
[docs]
def get_stream(self, name: str) -> RandomStream:
"""Get a synchronized stream."""
return self._manager.get_stream(name)
[docs]
def save_checkpoint(self, name: str = "default") -> None:
"""Save current stream states."""
self._checkpoints[name] = self._manager.checkpoint()
[docs]
def load_checkpoint(self, name: str = "default") -> None:
"""Load saved stream states."""
if name in self._checkpoints:
self._manager.restore(self._checkpoints[name])
[docs]
def reset(self) -> None:
"""Reset all streams to initial state."""
self._manager.reset_all()
def __repr__(self) -> str:
"""Return detailed representation."""
return f"CommonRandomNumbers(base_seed={self._base_seed})"