"""
Resource pool for managing collections of individual resources.
Useful when resources are distinguishable (e.g., specific machines,
operators with different skills, AGVs with different locations).
"""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generic,
List,
Optional,
TypeVar,
Iterator,
)
from enum import Enum, auto
if TYPE_CHECKING:
from simcraft.core.simulation import Simulation
T = TypeVar("T")
R = TypeVar("R")
[docs]
class PoolSelectionPolicy(Enum):
"""Policies for selecting resources from pool."""
FIRST_AVAILABLE = auto()
ROUND_ROBIN = auto()
LEAST_UTILIZED = auto()
CUSTOM = auto()
[docs]
@dataclass
class PooledResource(Generic[R]):
"""
Wrapper for a resource in a pool.
Attributes
----------
resource : R
The actual resource object
id : str
Unique identifier
is_available : bool
Whether resource is currently available
allocations : int
Number of times this resource has been allocated
total_busy_time : float
Total time this resource has been allocated
"""
resource: R
id: str
is_available: bool = True
allocations: int = 0
total_busy_time: float = 0.0
_allocated_at: Optional[float] = None
_allocated_to: Optional[Any] = None
[docs]
@dataclass
class PoolStats:
"""
Statistics for resource pool.
Attributes
----------
total_acquisitions : int
Total successful acquisitions
total_releases : int
Total releases
failed_acquisitions : int
Failed acquisition attempts
"""
total_acquisitions: int = 0
total_releases: int = 0
failed_acquisitions: int = 0
total_wait_time: float = 0.0
_waiting_count: int = 0
@property
def success_rate(self) -> float:
"""Get acquisition success rate."""
total = self.total_acquisitions + self.failed_acquisitions
if total == 0:
return 0.0
return self.total_acquisitions / total
[docs]
class ResourcePool(Generic[R]):
"""
Pool of individual, distinguishable resources.
Unlike a simple capacity-based Resource, ResourcePool manages
individual resource objects that can be selected based on
various policies (first available, round-robin, least utilized,
or custom selection function).
Parameters
----------
sim : Simulation
Parent simulation
name : str
Pool name
selection_policy : PoolSelectionPolicy
Policy for selecting resources
Examples
--------
>>> class AGV:
... def __init__(self, id: str, location: tuple):
... self.id = id
... self.location = location
...
>>> pool = ResourcePool(sim, name="AGVPool")
>>> pool.add_resource(AGV("AGV1", (0, 0)), id="AGV1")
>>> pool.add_resource(AGV("AGV2", (10, 0)), id="AGV2")
>>>
>>> # Custom selection: nearest AGV
>>> def nearest_to(target):
... def selector(available):
... return min(available, key=lambda r: distance(r.location, target))
... return selector
>>>
>>> agv = pool.acquire(job, selector=nearest_to((5, 5)))
"""
[docs]
def __init__(
self,
sim: "Simulation",
name: str = "",
selection_policy: PoolSelectionPolicy = PoolSelectionPolicy.FIRST_AVAILABLE,
) -> None:
"""Initialize resource pool."""
self._sim = sim
self._name = name or f"Pool_{id(self)}"
self._policy = selection_policy
self._resources: Dict[str, PooledResource[R]] = {}
self._round_robin_index = 0
self._stats = PoolStats()
# Waiting queue: list of (requester, callback, selector)
self._waiting: List[tuple] = []
# Custom selector function
self._custom_selector: Optional[Callable[[List[R]], R]] = None
@property
def name(self) -> str:
"""Get pool name."""
return self._name
@property
def size(self) -> int:
"""Get total number of resources in pool."""
return len(self._resources)
@property
def available_count(self) -> int:
"""Get number of available resources."""
return sum(1 for r in self._resources.values() if r.is_available)
@property
def allocated_count(self) -> int:
"""Get number of allocated resources."""
return self.size - self.available_count
@property
def stats(self) -> PoolStats:
"""Get pool statistics."""
return self._stats
@property
def is_empty(self) -> bool:
"""Check if pool has no resources."""
return len(self._resources) == 0
@property
def has_available(self) -> bool:
"""Check if any resources are available."""
return self.available_count > 0
[docs]
def add_resource(self, resource: R, id: Optional[str] = None) -> str:
"""
Add a resource to the pool.
Parameters
----------
resource : R
Resource to add
id : Optional[str]
Unique identifier (generated if not provided)
Returns
-------
str
Resource identifier
"""
if id is None:
id = f"Resource_{len(self._resources) + 1}"
if id in self._resources:
raise ValueError(f"Resource with id '{id}' already exists")
self._resources[id] = PooledResource(resource=resource, id=id)
return id
[docs]
def remove_resource(self, id: str) -> Optional[R]:
"""
Remove a resource from the pool.
Parameters
----------
id : str
Resource identifier
Returns
-------
Optional[R]
The removed resource, or None if not found
"""
pooled = self._resources.pop(id, None)
if pooled:
return pooled.resource
return None
[docs]
def get_resource(self, id: str) -> Optional[R]:
"""
Get a resource by ID.
Parameters
----------
id : str
Resource identifier
Returns
-------
Optional[R]
The resource, or None if not found
"""
pooled = self._resources.get(id)
return pooled.resource if pooled else None
def _select_resource(
self,
selector: Optional[Callable[[List[R]], R]] = None,
) -> Optional[PooledResource[R]]:
"""Select a resource based on policy or custom selector."""
available = [r for r in self._resources.values() if r.is_available]
if not available:
return None
if selector is not None:
# Custom selector
resources = [r.resource for r in available]
selected = selector(resources)
for pooled in available:
if pooled.resource is selected:
return pooled
return None
if self._policy == PoolSelectionPolicy.FIRST_AVAILABLE:
return available[0]
elif self._policy == PoolSelectionPolicy.ROUND_ROBIN:
self._round_robin_index = self._round_robin_index % len(available)
result = available[self._round_robin_index]
self._round_robin_index += 1
return result
elif self._policy == PoolSelectionPolicy.LEAST_UTILIZED:
return min(available, key=lambda r: r.allocations)
elif self._policy == PoolSelectionPolicy.CUSTOM:
if self._custom_selector:
resources = [r.resource for r in available]
selected = self._custom_selector(resources)
for pooled in available:
if pooled.resource is selected:
return pooled
return available[0]
return available[0]
[docs]
def acquire(
self,
requester: Any,
selector: Optional[Callable[[List[R]], R]] = None,
) -> Optional[R]:
"""
Acquire a resource from the pool.
Parameters
----------
requester : Any
Entity requesting the resource
selector : Optional[Callable]
Custom function to select from available resources
Returns
-------
Optional[R]
Selected resource, or None if none available
"""
pooled = self._select_resource(selector)
if pooled is None:
self._stats.failed_acquisitions += 1
return None
pooled.is_available = False
pooled.allocations += 1
pooled._allocated_at = self._sim.now
pooled._allocated_to = requester
self._stats.total_acquisitions += 1
return pooled.resource
[docs]
def request(
self,
requester: Any,
callback: Callable[[R], None],
selector: Optional[Callable[[List[R]], R]] = None,
) -> bool:
"""
Request a resource (wait if not available).
Parameters
----------
requester : Any
Entity requesting the resource
callback : Callable
Function to call when resource is acquired
selector : Optional[Callable]
Custom selection function
Returns
-------
bool
True if immediately acquired, False if waiting
"""
resource = self.acquire(requester, selector)
if resource is not None:
callback(resource)
return True
# Add to waiting queue
self._waiting.append((requester, callback, selector, self._sim.now))
return False
[docs]
def release(self, resource: R) -> bool:
"""
Release a resource back to the pool.
Parameters
----------
resource : R
Resource to release
Returns
-------
bool
True if successfully released
"""
for pooled in self._resources.values():
if pooled.resource is resource:
if pooled.is_available:
return False # Already available
pooled.is_available = True
if pooled._allocated_at is not None:
hold_time = self._sim.now - pooled._allocated_at
pooled.total_busy_time += hold_time
pooled._allocated_at = None
pooled._allocated_to = None
self._stats.total_releases += 1
# Process waiting requests
self._process_waiting()
return True
return False
def _process_waiting(self) -> None:
"""Process waiting requests."""
if not self._waiting or not self.has_available:
return
# Process in order
i = 0
while i < len(self._waiting) and self.has_available:
requester, callback, selector, request_time = self._waiting[i]
resource = self.acquire(requester, selector)
if resource is not None:
self._waiting.pop(i)
wait_time = self._sim.now - request_time
self._stats.total_wait_time += wait_time
callback(resource)
else:
i += 1
[docs]
def set_selection_policy(
self,
policy: PoolSelectionPolicy,
custom_selector: Optional[Callable[[List[R]], R]] = None,
) -> None:
"""
Set resource selection policy.
Parameters
----------
policy : PoolSelectionPolicy
Selection policy
custom_selector : Optional[Callable]
Custom selector for CUSTOM policy
"""
self._policy = policy
self._custom_selector = custom_selector
[docs]
def get_available(self) -> List[R]:
"""
Get list of available resources.
Returns
-------
List[R]
Available resources
"""
return [r.resource for r in self._resources.values() if r.is_available]
[docs]
def get_allocated(self) -> List[R]:
"""
Get list of allocated resources.
Returns
-------
List[R]
Allocated resources
"""
return [r.resource for r in self._resources.values() if not r.is_available]
[docs]
def get_utilization(self, resource_id: str) -> float:
"""
Get utilization for a specific resource.
Parameters
----------
resource_id : str
Resource identifier
Returns
-------
float
Utilization ratio (0-1)
"""
pooled = self._resources.get(resource_id)
if pooled is None or self._sim.now == 0:
return 0.0
busy_time = pooled.total_busy_time
if not pooled.is_available and pooled._allocated_at:
busy_time += self._sim.now - pooled._allocated_at
return busy_time / self._sim.now
[docs]
def get_average_utilization(self) -> float:
"""
Get average utilization across all resources.
Returns
-------
float
Average utilization ratio
"""
if not self._resources:
return 0.0
total = sum(self.get_utilization(id) for id in self._resources)
return total / len(self._resources)
[docs]
def reset_stats(self) -> None:
"""Reset all statistics."""
self._stats = PoolStats()
for pooled in self._resources.values():
pooled.allocations = 0
pooled.total_busy_time = 0.0
def __iter__(self) -> Iterator[R]:
"""Iterate over all resources."""
return (r.resource for r in self._resources.values())
def __len__(self) -> int:
"""Get total number of resources."""
return len(self._resources)
def __repr__(self) -> str:
"""Return detailed representation."""
return (
f"ResourcePool(name={self._name!r}, "
f"size={self.size}, available={self.available_count}, "
f"waiting={len(self._waiting)})"
)