"""
Queue implementations for simulation.
Provides FIFO and priority-based queues with statistics collection.
"""
from __future__ import annotations
from collections import deque
from dataclasses import dataclass, field
from typing import (
TYPE_CHECKING,
Any,
Callable,
Deque,
Generic,
Iterator,
List,
Optional,
TypeVar,
)
import heapq
if TYPE_CHECKING:
from simcraft.core.simulation import Simulation
from simcraft.core.entity import Entity
T = TypeVar("T")
@dataclass
class QueueEntry(Generic[T]):
"""Wrapper for queue items with entry metadata."""
item: T
entry_time: float
[docs]
@dataclass
class QueueStats:
"""
Statistics for queue performance.
Attributes
----------
entries : int
Total number of entities that entered the queue
exits : int
Total number of entities that left the queue
max_length : int
Maximum queue length observed
total_wait_time : float
Sum of all waiting times
area : float
Time-weighted queue length integral
"""
entries: int = 0
exits: int = 0
max_length: int = 0
total_wait_time: float = 0.0
area: float = 0.0
_last_change_time: float = 0.0
_current_length: int = 0
[docs]
def record_entry(self, time: float) -> None:
"""Record an entry to the queue."""
self._update_area(time)
self.entries += 1
self._current_length += 1
self.max_length = max(self.max_length, self._current_length)
[docs]
def record_exit(self, time: float, wait_time: float) -> None:
"""Record an exit from the queue."""
self._update_area(time)
self.exits += 1
self._current_length -= 1
self.total_wait_time += wait_time
def _update_area(self, time: float) -> None:
"""Update time-weighted area."""
duration = time - self._last_change_time
self.area += self._current_length * duration
self._last_change_time = time
@property
def average_length(self) -> float:
"""Get time-average queue length."""
if self._last_change_time == 0:
return 0.0
return self.area / self._last_change_time
@property
def average_wait(self) -> float:
"""Get average waiting time."""
if self.exits == 0:
return 0.0
return self.total_wait_time / self.exits
@property
def current_length(self) -> int:
"""Get current queue length."""
return self._current_length
[docs]
def reset(self) -> None:
"""Reset all statistics."""
self.entries = 0
self.exits = 0
self.max_length = 0
self.total_wait_time = 0.0
self.area = 0.0
self._last_change_time = 0.0
self._current_length = 0
[docs]
class Queue(Generic[T]):
"""
FIFO queue with statistics collection.
A basic first-in-first-out queue that tracks entry times
and collects performance statistics.
Parameters
----------
sim : Simulation
Parent simulation for time tracking
capacity : int
Maximum queue capacity (0 = unlimited)
name : str
Optional name for the queue
Examples
--------
>>> queue = Queue(sim, capacity=10, name="WaitingRoom")
>>> queue.enqueue(customer)
>>> if not queue.is_empty:
... next_customer = queue.dequeue()
>>> print(queue.stats.average_wait)
"""
[docs]
def __init__(
self,
sim: "Simulation",
capacity: int = 0,
name: str = "",
) -> None:
"""Initialize queue."""
self._sim = sim
self._capacity = capacity
self._name = name or f"Queue_{id(self)}"
self._entries: Deque[QueueEntry[T]] = deque()
self._stats = QueueStats()
# Callbacks
self._on_enqueue: Optional[Callable[[T], None]] = None
self._on_dequeue: Optional[Callable[[T], None]] = None
@property
def name(self) -> str:
"""Get queue name."""
return self._name
@property
def stats(self) -> QueueStats:
"""Get queue statistics."""
return self._stats
@property
def capacity(self) -> int:
"""Get queue capacity (0 = unlimited)."""
return self._capacity
@property
def is_empty(self) -> bool:
"""Check if queue is empty."""
return len(self._entries) == 0
@property
def is_full(self) -> bool:
"""Check if queue is at capacity."""
if self._capacity == 0:
return False
return len(self._entries) >= self._capacity
def __len__(self) -> int:
"""Get current queue length."""
return len(self._entries)
[docs]
def enqueue(self, item: T) -> bool:
"""
Add item to the queue.
Parameters
----------
item : T
Item to add
Returns
-------
bool
True if item was added, False if queue is full
"""
if self.is_full:
return False
entry = QueueEntry(item=item, entry_time=self._sim.now)
self._entries.append(entry)
self._stats.record_entry(self._sim.now)
if self._on_enqueue:
self._on_enqueue(item)
return True
[docs]
def dequeue(self) -> Optional[T]:
"""
Remove and return the first item.
Returns
-------
Optional[T]
First item or None if queue is empty
"""
if self.is_empty:
return None
entry = self._entries.popleft()
wait_time = self._sim.now - entry.entry_time
self._stats.record_exit(self._sim.now, wait_time)
if self._on_dequeue:
self._on_dequeue(entry.item)
return entry.item
[docs]
def peek(self) -> Optional[T]:
"""
Return the first item without removing it.
Returns
-------
Optional[T]
First item or None if queue is empty
"""
if self.is_empty:
return None
return self._entries[0].item
[docs]
def remove(self, item: T) -> bool:
"""
Remove a specific item from the queue.
Parameters
----------
item : T
Item to remove
Returns
-------
bool
True if item was found and removed
"""
for entry in self._entries:
if entry.item is item or entry.item == item:
self._entries.remove(entry)
wait_time = self._sim.now - entry.entry_time
self._stats.record_exit(self._sim.now, wait_time)
return True
return False
[docs]
def clear(self) -> List[T]:
"""
Remove all items from the queue.
Returns
-------
List[T]
List of removed items
"""
items = list(self._items)
for item in items:
self.dequeue()
return items
[docs]
def contains(self, item: T) -> bool:
"""Check if item is in the queue."""
return any(entry.item is item or entry.item == item for entry in self._entries)
[docs]
def on_enqueue(self, callback: Callable[[T], None]) -> None:
"""Set callback for enqueue events."""
self._on_enqueue = callback
[docs]
def on_dequeue(self, callback: Callable[[T], None]) -> None:
"""Set callback for dequeue events."""
self._on_dequeue = callback
[docs]
def reset_stats(self) -> None:
"""Reset statistics (keep items)."""
self._stats.reset()
self._stats._current_length = len(self._entries)
def __iter__(self) -> Iterator[T]:
"""Iterate over items in queue order."""
return (entry.item for entry in self._entries)
def __repr__(self) -> str:
"""Return detailed representation."""
return (
f"Queue(name={self._name!r}, length={len(self)}, "
f"capacity={self._capacity})"
)
@dataclass(order=True)
class PriorityItem(Generic[T]):
"""Wrapper for priority queue items."""
priority: float
index: int = field(compare=True)
item: T = field(compare=False)
entry_time: float = field(compare=False, default=0.0)
[docs]
class PriorityQueue(Generic[T]):
"""
Priority queue with statistics collection.
Items are dequeued in order of priority (lower value = higher priority).
Parameters
----------
sim : Simulation
Parent simulation for time tracking
priority_fn : Optional[Callable[[T], float]]
Function to extract priority from items (default uses 0)
capacity : int
Maximum queue capacity (0 = unlimited)
name : str
Optional name for the queue
Examples
--------
>>> queue = PriorityQueue(sim, priority_fn=lambda x: x.priority)
>>> queue.enqueue(high_priority_job)
>>> queue.enqueue(low_priority_job)
>>> next_job = queue.dequeue() # Returns high_priority_job
"""
[docs]
def __init__(
self,
sim: "Simulation",
priority_fn: Optional[Callable[[T], float]] = None,
capacity: int = 0,
name: str = "",
) -> None:
"""Initialize priority queue."""
self._sim = sim
self._priority_fn = priority_fn or (lambda x: 0.0)
self._capacity = capacity
self._name = name or f"PriorityQueue_{id(self)}"
self._heap: List[PriorityItem[T]] = []
self._counter = 0
self._stats = QueueStats()
@property
def name(self) -> str:
"""Get queue name."""
return self._name
@property
def stats(self) -> QueueStats:
"""Get queue statistics."""
return self._stats
@property
def is_empty(self) -> bool:
"""Check if queue is empty."""
return len(self._heap) == 0
@property
def is_full(self) -> bool:
"""Check if queue is at capacity."""
if self._capacity == 0:
return False
return len(self._heap) >= self._capacity
def __len__(self) -> int:
"""Get current queue length."""
return len(self._heap)
[docs]
def enqueue(self, item: T, priority: Optional[float] = None) -> bool:
"""
Add item to the queue.
Parameters
----------
item : T
Item to add
priority : Optional[float]
Override priority (uses priority_fn if not specified)
Returns
-------
bool
True if item was added, False if queue is full
"""
if self.is_full:
return False
if priority is None:
priority = self._priority_fn(item)
self._counter += 1
entry = PriorityItem(
priority=priority,
index=self._counter,
item=item,
entry_time=self._sim.now,
)
heapq.heappush(self._heap, entry)
self._stats.record_entry(self._sim.now)
return True
[docs]
def dequeue(self) -> Optional[T]:
"""
Remove and return the highest priority item.
Returns
-------
Optional[T]
Highest priority item or None if empty
"""
if self.is_empty:
return None
entry = heapq.heappop(self._heap)
wait_time = self._sim.now - entry.entry_time
self._stats.record_exit(self._sim.now, wait_time)
return entry.item
[docs]
def peek(self) -> Optional[T]:
"""
Return the highest priority item without removing it.
Returns
-------
Optional[T]
Highest priority item or None if empty
"""
if self.is_empty:
return None
return self._heap[0].item
def _find_entry(self, item: T) -> Optional[PriorityItem[T]]:
"""Find the first entry matching the item."""
for entry in self._heap:
if entry.item is item or entry.item == item:
return entry
return None
[docs]
def remove(self, item: T) -> bool:
"""
Remove a specific item from the queue.
Parameters
----------
item : T
Item to remove
Returns
-------
bool
True if item was found and removed
"""
entry = self._find_entry(item)
if entry is None:
return False
self._heap.remove(entry)
heapq.heapify(self._heap)
wait_time = self._sim.now - entry.entry_time
self._stats.record_exit(self._sim.now, wait_time)
return True
[docs]
def update_priority(self, item: T, new_priority: float) -> bool:
"""
Update the priority of an item.
Parameters
----------
item : T
Item to update
new_priority : float
New priority value
Returns
-------
bool
True if item was found and updated
"""
entry = self._find_entry(item)
if entry is None:
return False
self._heap.remove(entry)
self._counter += 1
new_entry = PriorityItem(
priority=new_priority,
index=self._counter,
item=item,
entry_time=entry.entry_time,
)
heapq.heappush(self._heap, new_entry)
return True
def __repr__(self) -> str:
"""Return detailed representation."""
return (
f"PriorityQueue(name={self._name!r}, length={len(self)}, "
f"capacity={self._capacity})"
)