Source code for simcraft.resources.server

"""
Server component for simulation.

Servers represent processing stations that serve entities with
configurable capacity, service time distributions, and queue policies.
"""

from __future__ import annotations
from dataclasses import dataclass, field
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Dict,
    Generic,
    List,
    Optional,
    TypeVar,
    Union,
)
from enum import Enum, auto

from simcraft.resources.queue import Queue, PriorityQueue

if TYPE_CHECKING:
    from simcraft.core.simulation import Simulation
    from simcraft.core.entity import Entity

T = TypeVar("T")


[docs] class ServerState(Enum): """Server operational states.""" IDLE = auto() BUSY = auto() BLOCKED = auto() DOWN = auto()
[docs] @dataclass class ServerStats: """ Statistics for server performance. Attributes ---------- arrivals : int Total entities that arrived at the server departures : int Total entities that completed service balked : int Entities that left without service (queue full) busy_time : float Total time server was busy idle_time : float Total time server was idle blocked_time : float Total time server was blocked down_time : float Total time server was down """ arrivals: int = 0 departures: int = 0 balked: int = 0 busy_time: float = 0.0 idle_time: float = 0.0 blocked_time: float = 0.0 down_time: float = 0.0 total_service_time: float = 0.0 _last_state_change: float = 0.0 _current_state: ServerState = ServerState.IDLE _busy_count: int = 0
[docs] def record_state_change(self, time: float, new_state: ServerState) -> None: """Record a state change.""" duration = time - self._last_state_change if self._current_state == ServerState.IDLE: self.idle_time += duration elif self._current_state == ServerState.BUSY: self.busy_time += duration elif self._current_state == ServerState.BLOCKED: self.blocked_time += duration elif self._current_state == ServerState.DOWN: self.down_time += duration self._current_state = new_state self._last_state_change = time
@property def utilization(self) -> float: """Get server utilization (busy time / total active time).""" active_time = self.busy_time + self.idle_time if active_time == 0: return 0.0 return self.busy_time / active_time @property def average_service_time(self) -> float: """Get average service time.""" if self.departures == 0: return 0.0 return self.total_service_time / self.departures @property def throughput_rate(self) -> float: """Get throughput rate (departures per time unit).""" total_time = self.busy_time + self.idle_time + self.blocked_time + self.down_time if total_time == 0: return 0.0 return self.departures / total_time
[docs] def reset(self) -> None: """Reset all statistics.""" self.arrivals = 0 self.departures = 0 self.balked = 0 self.busy_time = 0.0 self.idle_time = 0.0 self.blocked_time = 0.0 self.down_time = 0.0 self.total_service_time = 0.0 self._last_state_change = 0.0
@dataclass class ServiceRecord(Generic[T]): """Record of an entity being served.""" entity: T start_time: float service_time: float server_index: int = 0 @property def end_time(self) -> float: """Get scheduled end time.""" return self.start_time + self.service_time
[docs] class Server(Generic[T]): """ Multi-server processing station. A Server models a processing station with one or more parallel servers (capacity), a waiting queue, and configurable service time distributions. Parameters ---------- sim : Simulation Parent simulation capacity : int Number of parallel servers service_time : Union[float, Callable[[], float]] Fixed service time or function returning random service time queue_capacity : int Maximum queue length (0 = unlimited) name : str Optional name for the server Examples -------- >>> def service_time(): ... return sim.rng.exponential(5.0) ... >>> server = Server(sim, capacity=2, service_time=service_time) >>> server.enqueue(customer) >>> server.on_departure(lambda c: print(f"{c.id} departed")) """
[docs] def __init__( self, sim: "Simulation", capacity: int = 1, service_time: Union[float, Callable[[], float]] = 1.0, queue_capacity: int = 0, name: str = "", ) -> None: """Initialize server.""" self._sim = sim self._capacity = capacity self._service_time = service_time self._name = name or f"Server_{id(self)}" # Queue for waiting entities self._queue: Queue[T] = Queue(sim, capacity=queue_capacity) # Currently being served self._in_service: List[ServiceRecord[T]] = [] # State tracking self._state = ServerState.IDLE self._stats = ServerStats() self._is_down = False # Callbacks self._on_arrival: Optional[Callable[[T], None]] = None self._on_service_start: Optional[Callable[[T], None]] = None self._on_departure: Optional[Callable[[T], None]] = None self._on_balk: Optional[Callable[[T], None]] = None
@property def name(self) -> str: """Get server name.""" return self._name @property def capacity(self) -> int: """Get server capacity.""" return self._capacity @property def available_capacity(self) -> int: """Get number of available servers.""" return self._capacity - len(self._in_service) @property def is_idle(self) -> bool: """Check if all servers are idle.""" return len(self._in_service) == 0 @property def is_busy(self) -> bool: """Check if all servers are busy.""" return self.available_capacity == 0 @property def queue_length(self) -> int: """Get current queue length.""" return len(self._queue) @property def in_service_count(self) -> int: """Get number of entities in service.""" return len(self._in_service) @property def total_in_system(self) -> int: """Get total entities (queue + in service).""" return self.queue_length + self.in_service_count @property def stats(self) -> ServerStats: """Get server statistics.""" return self._stats @property def queue(self) -> Queue[T]: """Get the waiting queue.""" return self._queue @property def state(self) -> ServerState: """Get current server state.""" return self._state def _get_service_time(self) -> float: """Get service time value.""" if callable(self._service_time): return self._service_time() return self._service_time def _update_state(self) -> None: """Update server state based on occupancy.""" if self._is_down: new_state = ServerState.DOWN elif len(self._in_service) == 0: new_state = ServerState.IDLE else: new_state = ServerState.BUSY if new_state != self._state: self._stats.record_state_change(self._sim.now, new_state) self._state = new_state
[docs] def enqueue(self, entity: T) -> bool: """ Add an entity to the server. The entity will be served immediately if capacity is available, otherwise it will wait in the queue. Parameters ---------- entity : T Entity to process Returns ------- bool True if entity was accepted, False if queue is full """ if self._is_down: return False self._stats.arrivals += 1 if self._on_arrival: self._on_arrival(entity) # Try to start service immediately if self.available_capacity > 0: self._start_service(entity) return True # Queue if space available if self._queue.enqueue(entity): return True # Balk - queue is full self._stats.balked += 1 if self._on_balk: self._on_balk(entity) return False
def _start_service(self, entity: T) -> None: """Start serving an entity.""" service_time = self._get_service_time() server_index = len(self._in_service) record = ServiceRecord( entity=entity, start_time=self._sim.now, service_time=service_time, server_index=server_index, ) self._in_service.append(record) if self._on_service_start: self._on_service_start(entity) # Schedule departure self._sim.schedule( self._complete_service, delay=service_time, args=(record,), tag=f"{self._name}_departure", ) self._update_state() def _complete_service(self, record: ServiceRecord[T]) -> None: """Complete service for an entity.""" if record not in self._in_service: return # Already removed (e.g., due to preemption) self._in_service.remove(record) entity = record.entity # Update statistics self._stats.departures += 1 self._stats.total_service_time += record.service_time if self._on_departure: self._on_departure(entity) # Start next entity from queue if not self._queue.is_empty: next_entity = self._queue.dequeue() if next_entity is not None: self._start_service(next_entity) self._update_state()
[docs] def preempt(self, entity: T) -> Optional[T]: """ Preempt service for highest-priority entity. Parameters ---------- entity : T Entity requesting preemption Returns ------- Optional[T] The preempted entity, or None if no preemption occurred """ if not self._in_service: # No one to preempt, start service self._start_service(entity) return None # Find entity to preempt (last started) record = self._in_service.pop() preempted_entity = record.entity # Re-queue preempted entity at front self._queue._items.appendleft(preempted_entity) self._queue._entry_times[id(preempted_entity)] = self._sim.now # Start new entity self._start_service(entity) return preempted_entity
[docs] def shutdown(self) -> None: """Shut down the server.""" self._is_down = True self._update_state()
[docs] def restart(self) -> None: """Restart the server.""" self._is_down = False self._update_state() # Start any queued entities while self.available_capacity > 0 and not self._queue.is_empty: next_entity = self._queue.dequeue() if next_entity is not None: self._start_service(next_entity)
[docs] def on_arrival(self, callback: Callable[[T], None]) -> None: """Set callback for entity arrivals.""" self._on_arrival = callback
[docs] def on_service_start(self, callback: Callable[[T], None]) -> None: """Set callback for service start.""" self._on_service_start = callback
[docs] def on_departure(self, callback: Callable[[T], None]) -> None: """Set callback for entity departures.""" self._on_departure = callback
[docs] def on_balk(self, callback: Callable[[T], None]) -> None: """Set callback for balking entities.""" self._on_balk = callback
[docs] def reset_stats(self) -> None: """Reset server and queue statistics.""" self._stats.reset() self._queue.reset_stats()
[docs] def get_entities_in_service(self) -> List[T]: """Get list of entities currently in service.""" return [record.entity for record in self._in_service]
def __repr__(self) -> str: """Return detailed representation.""" return ( f"Server(name={self._name!r}, capacity={self._capacity}, " f"in_service={self.in_service_count}, queue={self.queue_length}, " f"state={self._state.name})" )
[docs] class MultiStageServer(Generic[T]): """ Multi-stage processing server. Entities flow through a sequence of processing stages, each with its own service time and optional queue. Parameters ---------- sim : Simulation Parent simulation stage_configs : List[Dict] Configuration for each stage with keys: - capacity: int - service_time: float or Callable - queue_capacity: int (optional) name : str Optional name Examples -------- >>> stages = [ ... {"capacity": 1, "service_time": 5.0}, ... {"capacity": 2, "service_time": 3.0}, ... ] >>> server = MultiStageServer(sim, stages) >>> server.enqueue(job) """
[docs] def __init__( self, sim: "Simulation", stage_configs: List[Dict[str, Any]], name: str = "", ) -> None: """Initialize multi-stage server.""" self._sim = sim self._name = name or f"MultiStageServer_{id(self)}" self._stages: List[Server[T]] = [] for i, config in enumerate(stage_configs): stage = Server( sim=sim, capacity=config.get("capacity", 1), service_time=config.get("service_time", 1.0), queue_capacity=config.get("queue_capacity", 0), name=f"{self._name}_Stage{i + 1}", ) self._stages.append(stage) # Connect stages for i in range(len(self._stages) - 1): self._stages[i].on_departure(self._stages[i + 1].enqueue) # Final stage callback self._on_completion: Optional[Callable[[T], None]] = None
@property def num_stages(self) -> int: """Get number of stages.""" return len(self._stages)
[docs] def enqueue(self, entity: T) -> bool: """ Add entity to first stage. Parameters ---------- entity : T Entity to process Returns ------- bool True if accepted """ return self._stages[0].enqueue(entity)
[docs] def on_completion(self, callback: Callable[[T], None]) -> None: """Set callback for entity completion of all stages.""" self._on_completion = callback self._stages[-1].on_departure(callback)
[docs] def get_stage(self, index: int) -> Server[T]: """ Get a specific stage. Parameters ---------- index : int Stage index (0-based) Returns ------- Server[T] The stage server """ return self._stages[index]
def __repr__(self) -> str: """Return detailed representation.""" return f"MultiStageServer(name={self._name!r}, stages={self.num_stages})"