Source code for simcraft.resources.resource

"""
Resource component for simulation.

Resources represent limited items that can be acquired and released,
such as operators, machines, or transportation vehicles.
"""

from __future__ import annotations
from dataclasses import dataclass, field
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Deque,
    Dict,
    Generic,
    List,
    Optional,
    TypeVar,
    Tuple,
)
from collections import deque
from enum import Enum, auto

if TYPE_CHECKING:
    from simcraft.core.simulation import Simulation
    from simcraft.core.entity import Entity

T = TypeVar("T")


[docs] class ResourceState(Enum): """Resource states.""" AVAILABLE = auto() ALLOCATED = auto() DOWN = auto()
[docs] @dataclass class ResourceStats: """ Statistics for resource usage. Attributes ---------- acquisitions : int Total number of successful acquisitions releases : int Total number of releases timeouts : int Number of acquisition timeouts total_busy_time : float Total time resources were in use total_idle_time : float Total time resources were available """ acquisitions: int = 0 releases: int = 0 timeouts: int = 0 total_busy_time: float = 0.0 total_idle_time: float = 0.0 total_wait_time: float = 0.0 _area_allocated: float = 0.0 _last_change_time: float = 0.0 _current_allocated: int = 0 _capacity: int = 1
[docs] def record_acquisition(self, time: float, wait_time: float = 0.0) -> None: """Record a resource acquisition.""" self._update_area(time) self.acquisitions += 1 self._current_allocated += 1 self.total_wait_time += wait_time
[docs] def record_release(self, time: float, hold_time: float) -> None: """Record a resource release.""" self._update_area(time) self.releases += 1 self._current_allocated -= 1 self.total_busy_time += hold_time
def _update_area(self, time: float) -> None: """Update time-weighted allocation area.""" duration = time - self._last_change_time self._area_allocated += self._current_allocated * duration self._last_change_time = time @property def utilization(self) -> float: """Get average resource utilization.""" if self._last_change_time == 0 or self._capacity == 0: return 0.0 return self._area_allocated / (self._last_change_time * self._capacity) @property def average_hold_time(self) -> float: """Get average hold time.""" if self.releases == 0: return 0.0 return self.total_busy_time / self.releases @property def average_wait_time(self) -> float: """Get average wait time for acquisition.""" if self.acquisitions == 0: return 0.0 return self.total_wait_time / self.acquisitions
[docs] def reset(self) -> None: """Reset all statistics.""" self.acquisitions = 0 self.releases = 0 self.timeouts = 0 self.total_busy_time = 0.0 self.total_idle_time = 0.0 self.total_wait_time = 0.0 self._area_allocated = 0.0 self._last_change_time = 0.0
@dataclass class AcquisitionRequest(Generic[T]): """Request to acquire a resource.""" requester: T quantity: int request_time: float priority: int = 0 timeout: Optional[float] = None callback: Optional[Callable[["Resource", T], None]] = None def __lt__(self, other: "AcquisitionRequest") -> bool: """Compare by priority then request time.""" if self.priority != other.priority: return self.priority > other.priority # Higher priority first return self.request_time < other.request_time
[docs] class Resource: """ Limited resource with acquire/release semantics. A Resource represents a limited item (or pool of items) that entities can acquire and later release. Supports priority-based waiting and timeouts. Parameters ---------- sim : Simulation Parent simulation capacity : int Number of resource units name : str Optional resource name Examples -------- >>> operator = Resource(sim, capacity=1, name="Operator") >>> >>> # Immediate acquisition >>> if operator.acquire(job): ... # Use resource ... operator.release(job) >>> >>> # Request with callback >>> def on_acquired(resource, job): ... sim.schedule(release_op, delay=5.0, args=(job,)) >>> operator.request(job, callback=on_acquired) """
[docs] def __init__( self, sim: "Simulation", capacity: int = 1, name: str = "", ) -> None: """Initialize resource.""" self._sim = sim self._capacity = capacity self._name = name or f"Resource_{id(self)}" self._available = capacity self._allocated: Dict[int, Tuple[T, int, float]] = {} # id -> (entity, qty, time) self._waiting: List[AcquisitionRequest] = [] self._stats = ResourceStats() self._stats._capacity = capacity self._is_down = False
@property def name(self) -> str: """Get resource name.""" return self._name @property def capacity(self) -> int: """Get total capacity.""" return self._capacity @property def available(self) -> int: """Get available units.""" if self._is_down: return 0 return self._available @property def allocated(self) -> int: """Get allocated units.""" return self._capacity - self._available @property def waiting_count(self) -> int: """Get number of waiting requests.""" return len(self._waiting) @property def stats(self) -> ResourceStats: """Get resource statistics.""" return self._stats @property def is_available(self) -> bool: """Check if any units are available.""" return self.available > 0
[docs] def acquire( self, requester: T, quantity: int = 1, ) -> bool: """ Immediately acquire resource units. Parameters ---------- requester : T Entity requesting the resource quantity : int Number of units to acquire Returns ------- bool True if acquisition succeeded, False otherwise """ if self._is_down or quantity > self._available: return False self._available -= quantity self._allocated[id(requester)] = (requester, quantity, self._sim.now) self._stats.record_acquisition(self._sim.now, 0.0) return True
[docs] def release(self, holder: T) -> bool: """ Release resource units. Parameters ---------- holder : T Entity currently holding the resource Returns ------- bool True if release succeeded, False if entity wasn't holding """ record = self._allocated.pop(id(holder), None) if record is None: return False entity, quantity, acquire_time = record hold_time = self._sim.now - acquire_time self._available += quantity self._stats.record_release(self._sim.now, hold_time) # Process waiting requests self._process_waiting() return True
[docs] def request( self, requester: T, quantity: int = 1, priority: int = 0, timeout: Optional[float] = None, callback: Optional[Callable[["Resource", T], None]] = None, ) -> bool: """ Request resource units (may wait if not available). Parameters ---------- requester : T Entity requesting the resource quantity : int Number of units to request priority : int Request priority (higher = processed first) timeout : Optional[float] Maximum wait time (None = wait indefinitely) callback : Optional[Callable] Function to call when acquired Returns ------- bool True if immediately acquired, False if waiting """ if self._is_down: return False # Try immediate acquisition if quantity <= self._available: self.acquire(requester, quantity) if callback: callback(self, requester) return True # Queue the request request = AcquisitionRequest( requester=requester, quantity=quantity, request_time=self._sim.now, priority=priority, timeout=timeout, callback=callback, ) # Insert in priority order import bisect bisect.insort(self._waiting, request) # Schedule timeout if specified if timeout is not None: self._sim.schedule( self._handle_timeout, delay=timeout, args=(request,), tag=f"{self._name}_timeout", ) return False
def _process_waiting(self) -> None: """Process waiting requests.""" while self._waiting: request = self._waiting[0] if request.quantity > self._available: break # Cannot satisfy this request self._waiting.pop(0) wait_time = self._sim.now - request.request_time self._available -= request.quantity self._allocated[id(request.requester)] = ( request.requester, request.quantity, self._sim.now, ) self._stats.record_acquisition(self._sim.now, wait_time) if request.callback: request.callback(self, request.requester) def _handle_timeout(self, request: AcquisitionRequest) -> None: """Handle request timeout.""" if request in self._waiting: self._waiting.remove(request) self._stats.timeouts += 1
[docs] def cancel_request(self, requester: T) -> bool: """ Cancel a pending request. Parameters ---------- requester : T Entity that made the request Returns ------- bool True if request was found and cancelled """ for i, request in enumerate(self._waiting): if id(request.requester) == id(requester): self._waiting.pop(i) return True return False
[docs] def shutdown(self) -> None: """Shut down the resource.""" self._is_down = True
[docs] def restart(self) -> None: """Restart the resource.""" self._is_down = False self._process_waiting()
[docs] def reset_stats(self) -> None: """Reset statistics.""" self._stats.reset() self._stats._capacity = self._capacity
def __repr__(self) -> str: """Return detailed representation.""" return ( f"Resource(name={self._name!r}, " f"available={self.available}/{self._capacity}, " f"waiting={self.waiting_count})" )
[docs] class PreemptiveResource(Resource): """ Resource with preemption support. Lower priority holders can be preempted by higher priority requests. Parameters ---------- sim : Simulation Parent simulation capacity : int Number of resource units name : str Optional resource name Examples -------- >>> machine = PreemptiveResource(sim, capacity=1) >>> machine.acquire(low_priority_job, priority=1) >>> machine.acquire(high_priority_job, priority=10) # Preempts """
[docs] def __init__( self, sim: "Simulation", capacity: int = 1, name: str = "", ) -> None: """Initialize preemptive resource.""" super().__init__(sim, capacity, name) self._priorities: Dict[int, int] = {} # id -> priority self._on_preempt: Optional[Callable[[T], None]] = None
[docs] def acquire( self, requester: T, quantity: int = 1, priority: int = 0, ) -> bool: """ Acquire resource units, possibly preempting lower priority holders. Parameters ---------- requester : T Entity requesting the resource quantity : int Number of units to acquire priority : int Priority level Returns ------- bool True if acquisition succeeded """ # Try normal acquisition if super().acquire(requester, quantity): self._priorities[id(requester)] = priority return True # Try preemption if quantity <= self._capacity: preempted = self._try_preempt(quantity, priority) if preempted: self._priorities[id(requester)] = priority return super().acquire(requester, quantity) return False
def _try_preempt(self, needed: int, priority: int) -> List[T]: """ Try to preempt lower priority holders. Returns list of preempted entities. """ preempted = [] freed = 0 # Sort allocations by priority (lowest first) allocations = [ (self._priorities.get(id_, 0), id_, record) for id_, record in self._allocated.items() ] allocations.sort() for alloc_priority, id_, (entity, qty, _) in allocations: if alloc_priority >= priority: break # No more lower priority holders if freed >= needed: break # Have enough # Preempt this holder super().release(entity) self._priorities.pop(id_, None) preempted.append(entity) freed += qty if self._on_preempt: self._on_preempt(entity) return preempted if freed >= needed else []
[docs] def release(self, holder: T) -> bool: """Release resource and clean up priority tracking.""" self._priorities.pop(id(holder), None) return super().release(holder)
[docs] def on_preempt(self, callback: Callable[[T], None]) -> None: """Set callback for preemption events.""" self._on_preempt = callback