Source code for simcraft.activities.activity

"""
Activity component for modeling time-based operations.

Activities represent operations that take time, such as processing,
transportation, or service. They support capacity constraints,
external resource synchronization, and various lifecycle callbacks.
"""

from __future__ import annotations
from dataclasses import dataclass, field
from typing import (
    TYPE_CHECKING,
    Any,
    Callable,
    Deque,
    Dict,
    Generic,
    List,
    Optional,
    Set,
    TypeVar,
)
from collections import deque
from enum import Enum, auto

if TYPE_CHECKING:
    from simcraft.core.simulation import Simulation
    from simcraft.core.entity import Entity

T = TypeVar("T")


[docs] class ActivityState(Enum): """States for activity lifecycle.""" PENDING = auto() # Waiting to start READY_TO_START = auto() # Can start if capacity available PROCESSING = auto() # Currently executing COMPLETED = auto() # Processing done, not yet departed READY_TO_FINISH = auto() # Can finish (external sync) READY_TO_DEPART = auto() # Completed, can depart
[docs] @dataclass class ActivityStats: """ Statistics for activity performance. Attributes ---------- entries : int Total entities that entered the activity completions : int Total entities that completed departures : int Total entities that departed total_processing_time : float Sum of all processing times total_wait_time : float Sum of all wait times """ entries: int = 0 completions: int = 0 departures: int = 0 total_processing_time: float = 0.0 total_wait_time: float = 0.0 _area_processing: float = 0.0 _area_waiting: float = 0.0 _last_change_time: float = 0.0 _current_processing: int = 0 _current_waiting: int = 0
[docs] def record_entry(self, time: float) -> None: """Record entity entering waiting state.""" self._update_area(time) self.entries += 1 self._current_waiting += 1
[docs] def record_start(self, time: float, wait_time: float) -> None: """Record entity starting processing.""" self._update_area(time) self._current_waiting -= 1 self._current_processing += 1 self.total_wait_time += wait_time
[docs] def record_completion(self, time: float, processing_time: float) -> None: """Record entity completing processing.""" self._update_area(time) self.completions += 1 self.total_processing_time += processing_time
[docs] def record_departure(self, time: float) -> None: """Record entity departing.""" self._update_area(time) self._current_processing -= 1 self.departures += 1
def _update_area(self, time: float) -> None: """Update time-weighted areas.""" duration = time - self._last_change_time self._area_processing += self._current_processing * duration self._area_waiting += self._current_waiting * duration self._last_change_time = time @property def average_processing(self) -> float: """Get time-average number in processing.""" if self._last_change_time == 0: return 0.0 return self._area_processing / self._last_change_time @property def average_waiting(self) -> float: """Get time-average number waiting.""" if self._last_change_time == 0: return 0.0 return self._area_waiting / self._last_change_time @property def average_processing_time(self) -> float: """Get average processing time per entity.""" if self.completions == 0: return 0.0 return self.total_processing_time / self.completions @property def average_wait_time(self) -> float: """Get average wait time per entity.""" if self.completions == 0: return 0.0 return self.total_wait_time / self.completions
@dataclass class ActivityRecord(Generic[T]): """Record for an entity in an activity.""" entity: T state: ActivityState entry_time: float start_time: Optional[float] = None completion_time: Optional[float] = None
[docs] class Activity(Generic[T]): """ Time-based activity with capacity and resource synchronization. Activities model operations that take time, with support for: - Capacity constraints - External resource requirements - State-based lifecycle management - Statistics collection The activity lifecycle is: 1. PENDING: Entity is waiting to start 2. READY_TO_START: Resources available, can start 3. PROCESSING: Currently being processed 4. COMPLETED: Processing done 5. READY_TO_FINISH: Can depart (external signal) 6. READY_TO_DEPART: Ready to leave Parameters ---------- sim : Simulation Parent simulation capacity : int Maximum concurrent entities processing_time : Union[float, Callable[[T], float]] Fixed time or function returning processing time need_external_start : bool Whether external signal needed to start need_external_finish : bool Whether external signal needed to finish name : str Activity name Examples -------- >>> def processing_time(job): ... return job.size * 0.1 ... >>> activity = Activity( ... sim, ... capacity=2, ... processing_time=processing_time, ... name="Assembly" ... ) >>> >>> activity.on_departure(lambda job: output_queue.enqueue(job)) >>> activity.request_to_start(job) """
[docs] def __init__( self, sim: "Simulation", capacity: int = 1, processing_time: Any = 1.0, need_external_start: bool = False, need_external_finish: bool = False, name: str = "", ) -> None: """Initialize activity.""" self._sim = sim self._capacity = capacity self._processing_time = processing_time self._need_external_start = need_external_start self._need_external_finish = need_external_finish self._name = name or f"Activity_{id(self)}" # Entity tracking by state self._pending: Deque[ActivityRecord[T]] = deque() self._ready_to_start: Deque[ActivityRecord[T]] = deque() self._processing: List[ActivityRecord[T]] = [] self._completed: List[ActivityRecord[T]] = [] self._ready_to_finish: List[ActivityRecord[T]] = [] self._ready_to_depart: Deque[ActivityRecord[T]] = deque() self._stats = ActivityStats() # Callbacks self._on_start: Optional[Callable[[T], None]] = None self._on_complete: Optional[Callable[[T], None]] = None self._on_depart: Optional[Callable[[T], None]] = None
@property def name(self) -> str: """Get activity name.""" return self._name @property def capacity(self) -> int: """Get activity capacity.""" return self._capacity @property def stats(self) -> ActivityStats: """Get activity statistics.""" return self._stats @property def occupancy(self) -> int: """Get current occupancy (processing + completed + ready_to_finish).""" return ( len(self._processing) + len(self._completed) + len(self._ready_to_finish) + len(self._ready_to_depart) ) @property def available_capacity(self) -> int: """Get available capacity.""" return max(0, self._capacity - self.occupancy) @property def pending_count(self) -> int: """Get number of pending entities.""" return len(self._pending) + len(self._ready_to_start) @property def processing_count(self) -> int: """Get number of entities in processing.""" return len(self._processing) def _get_processing_time(self, entity: T) -> float: """Get processing time for an entity.""" if callable(self._processing_time): return self._processing_time(entity) return float(self._processing_time) def _find_record(self, entity: T) -> Optional[ActivityRecord[T]]: """Find record for an entity.""" for collection in [ self._pending, self._ready_to_start, self._processing, self._completed, self._ready_to_finish, self._ready_to_depart, ]: for record in collection: if record.entity is entity: return record return None
[docs] def request_to_start(self, entity: T) -> None: """ Request an entity to start the activity. Parameters ---------- entity : T Entity requesting to start """ record = ActivityRecord( entity=entity, state=ActivityState.PENDING, entry_time=self._sim.now, ) if self._need_external_start: self._pending.append(record) self._stats.record_entry(self._sim.now) else: self._ready_to_start.append(record) self._stats.record_entry(self._sim.now) self._attempt_to_start()
[docs] def try_start(self, entity: T) -> bool: """ External signal that entity can start. Parameters ---------- entity : T Entity to start Returns ------- bool True if entity was found and can start """ for record in list(self._pending): if record.entity is entity: self._pending.remove(record) record.state = ActivityState.READY_TO_START self._ready_to_start.append(record) self._attempt_to_start() return True return False
def _attempt_to_start(self) -> None: """Attempt to start entities that are ready.""" while self._ready_to_start and self.available_capacity > 0: record = self._ready_to_start.popleft() self._start(record) def _start(self, record: ActivityRecord[T]) -> None: """Start processing an entity.""" record.state = ActivityState.PROCESSING record.start_time = self._sim.now wait_time = self._sim.now - record.entry_time self._stats.record_start(self._sim.now, wait_time) self._processing.append(record) if self._on_start: self._on_start(record.entity) # Schedule completion processing_time = self._get_processing_time(record.entity) self._sim.schedule( self._complete, delay=processing_time, args=(record,), tag=f"{self._name}_complete", ) def _complete(self, record: ActivityRecord[T]) -> None: """Complete processing for an entity.""" if record not in self._processing: return # Already removed self._processing.remove(record) record.state = ActivityState.COMPLETED record.completion_time = self._sim.now processing_time = self._sim.now - (record.start_time or record.entry_time) self._stats.record_completion(self._sim.now, processing_time) if self._on_complete: self._on_complete(record.entity) if self._need_external_finish: self._completed.append(record) else: self._ready_to_finish.append(record) self._attempt_to_finish()
[docs] def try_finish(self, entity: T) -> bool: """ External signal that entity can finish. Parameters ---------- entity : T Entity to finish Returns ------- bool True if entity was found and can finish """ for record in list(self._completed): if record.entity is entity: self._completed.remove(record) record.state = ActivityState.READY_TO_FINISH self._ready_to_finish.append(record) self._attempt_to_finish() return True return False
def _attempt_to_finish(self) -> None: """Attempt to finish entities that are ready.""" while self._ready_to_finish: record = self._ready_to_finish.pop(0) self._finish(record) def _finish(self, record: ActivityRecord[T]) -> None: """Finish an entity and allow departure.""" record.state = ActivityState.READY_TO_DEPART self._ready_to_depart.append(record) self._depart(record) def _depart(self, record: ActivityRecord[T]) -> None: """Entity departs from the activity.""" if record in self._ready_to_depart: self._ready_to_depart.remove(record) self._stats.record_departure(self._sim.now) if self._on_depart: self._on_depart(record.entity) # Try to start more self._attempt_to_start()
[docs] def get_pending(self) -> List[T]: """Get list of pending entities.""" return [r.entity for r in self._pending]
[docs] def get_processing(self) -> List[T]: """Get list of entities in processing.""" return [r.entity for r in self._processing]
[docs] def get_completed(self) -> List[T]: """Get list of completed entities.""" return [r.entity for r in self._completed]
[docs] def on_start(self, callback: Callable[[T], None]) -> None: """Set callback for processing start.""" self._on_start = callback
[docs] def on_complete(self, callback: Callable[[T], None]) -> None: """Set callback for processing complete.""" self._on_complete = callback
[docs] def on_departure(self, callback: Callable[[T], None]) -> None: """Set callback for entity departure.""" self._on_depart = callback
[docs] def reset_stats(self) -> None: """Reset activity statistics.""" self._stats = ActivityStats()
def __repr__(self) -> str: """Return detailed representation.""" return ( f"Activity(name={self._name!r}, capacity={self._capacity}, " f"pending={self.pending_count}, processing={self.processing_count}, " f"occupancy={self.occupancy})" )
[docs] class ParallelActivity(Generic[T]): """ Parallel activities with shared waiting queue. Useful for modeling parallel processing stations that share a common input queue. Parameters ---------- sim : Simulation Parent simulation num_parallel : int Number of parallel activities processing_time : Any Processing time per activity Examples -------- >>> parallel = ParallelActivity(sim, num_parallel=3, processing_time=5.0) >>> parallel.request_to_start(job) # Goes to first available """
[docs] def __init__( self, sim: "Simulation", num_parallel: int, processing_time: Any = 1.0, name: str = "", ) -> None: """Initialize parallel activities.""" self._sim = sim self._name = name or f"ParallelActivity_{id(self)}" self._activities: List[Activity[T]] = [] for i in range(num_parallel): activity = Activity( sim=sim, capacity=1, processing_time=processing_time, name=f"{self._name}_{i + 1}", ) self._activities.append(activity) self._pending: Deque[T] = deque() self._on_depart: Optional[Callable[[T], None]] = None # Connect departures for activity in self._activities: activity.on_departure(self._handle_departure)
[docs] def request_to_start(self, entity: T) -> None: """ Request entity to start on any available activity. Parameters ---------- entity : T Entity to process """ # Find available activity for activity in self._activities: if activity.available_capacity > 0: activity.request_to_start(entity) return # Queue if all busy self._pending.append(entity)
def _handle_departure(self, entity: T) -> None: """Handle departure from any activity.""" # Start next pending if self._pending: next_entity = self._pending.popleft() self.request_to_start(next_entity) if self._on_depart: self._on_depart(entity)
[docs] def on_departure(self, callback: Callable[[T], None]) -> None: """Set callback for entity departure.""" self._on_depart = callback
@property def total_capacity(self) -> int: """Get total capacity across all activities.""" return len(self._activities) @property def available_capacity(self) -> int: """Get total available capacity.""" return sum(a.available_capacity for a in self._activities) @property def pending_count(self) -> int: """Get number of entities waiting.""" return len(self._pending) + sum(a.pending_count for a in self._activities) def __repr__(self) -> str: """Return detailed representation.""" return ( f"ParallelActivity(name={self._name!r}, " f"parallel={len(self._activities)}, " f"pending={self.pending_count})" )