Source code for statemachine.statemachine

import warnings
from inspect import isawaitable
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict
from typing import Generic
from typing import List
from typing import MutableSet
from typing import TypeVar

from statemachine.orderedset import OrderedSet

from .callbacks import SPECS_ALL
from .callbacks import SPECS_SAFE
from .callbacks import CallbackSpecList
from .callbacks import CallbacksRegistry
from .callbacks import SpecListGrouper
from .callbacks import SpecReference
from .configuration import Configuration
from .dispatcher import Listener
from .dispatcher import Listeners
from .engines.async_ import AsyncEngine
from .engines.sync import SyncEngine
from .event import BoundEvent
from .event_data import TriggerData
from .exceptions import InvalidDefinition
from .exceptions import InvalidStateValue
from .exceptions import StateMachineError
from .exceptions import TransitionNotAllowed
from .factory import StateMachineMetaclass
from .graph import iterate_states_and_transitions
from .i18n import _
from .model import Model
from .signature import SignatureAdapter
from .state import InstanceState
from .utils import run_async_from_sync

if TYPE_CHECKING:
    from .event import Event
    from .state import State
    from .states import States

TModel = TypeVar("TModel")


[docs] class StateChart(Generic[TModel], metaclass=StateMachineMetaclass): """ Args: model: An optional external object to store state. See :ref:`domain models`. state_field (str): The model's field which stores the current state. Default: ``state``. start_value: An optional start state value if there's no current state assigned on the :ref:`domain models`. Default: ``None``. listeners: An optional list of objects that provides attributes to be used as callbacks. See :ref:`listeners` for more details. **kwargs: Additional keyword arguments available for dependency injection into callbacks. These are passed to class listener ``setup()`` methods and to the initial activation callbacks (e.g. ``on_enter_<state>``). """ TransitionNotAllowed = TransitionNotAllowed """Shortcut alias for easy exception handling. Example:: try: sm.send("an-inexistent-event") except sm.TransitionNotAllowed: pass """ _loop_sleep_in_ms = 0.001 allow_event_without_transition: bool = True """If ``False`` when an event does not result in a transition, an exception ``TransitionNotAllowed`` will be raised. If ``True`` the state machine allows triggering events that may not lead to a state :ref:`transition`, including tolerance to unknown :ref:`event` triggers. Default: ``True``.""" enable_self_transition_entries: bool = True """If `False` (default), when a self-transition is selected, the state entry/exit actions will not be executed. If `True`, the state entry actions will be executed, which is conformant with the SCXML spec. """ atomic_configuration_update: bool = False """If `False` (default), the state machine will follow the SCXML specification, that means in a microstep, it will first exit and execute exit callbacks for all the states in the exit set in reversed document order, then execute the transition content (on callbaks), then enter all the states in the enter set in document order. If `True`, the state machine will execute the exit callbacks, the on transition callbacks, then atomically update the configuration of exited and entered states, then execute the enter callbacks. """ catch_errors_as_events: bool = True """If ``True`` (default), runtime exceptions in callbacks (guards, actions, entry/exit) produce an ``error.execution`` internal event instead of propagating, as mandated by the SCXML specification. If ``False``, exceptions propagate normally.""" start_configuration_values: List[Any] = [] """Default state values to be entered when the state machine starts. If empty (default), the root ``initial`` state will be used. """ # -- Attributes set by StateMachineMetaclass during class construction -- name: str """The class name of the state machine (e.g. ``"TrafficLightMachine"``).""" id: str """Lowercase version of :attr:`name` (e.g. ``"trafficlightmachine"``).""" states: "States" """Collection of top-level :ref:`State` objects declared on this class.""" states_map: Dict[Any, "State"] """Mapping from each state's ``value`` to the corresponding :ref:`State` instance. Includes states at all nesting levels (compound children, parallel regions, etc.).""" initial_state: "State | None" """The single top-level initial :ref:`State`, or ``None`` for abstract classes.""" final_states: "List[State]" """List of top-level :ref:`State` objects marked as ``final``.""" _abstract: bool _events: "Dict[Event, None]" _protected_attrs: set _specs: CallbackSpecList _class_listeners: List[Any] prepare: SpecListGrouper def __init__( self, model: "TModel | None" = None, state_field: str = "state", start_value: Any = None, listeners: "List[object] | None" = None, **kwargs: Any, ): self.model: TModel = model if model is not None else Model() # type: ignore[assignment] """The external model object that holds domain state, or an internal :class:`Model` instance when none is provided. See :ref:`domain models`.""" self.history_values: Dict[str, List[State]] = {} """Mapping from compound state IDs to the list of states that were active the last time that compound state was exited. Used by history pseudo-states to restore previous configurations.""" self.state_field = state_field self.start_configuration_values = ( [start_value] if start_value is not None else list(self.start_configuration_values) ) self._callbacks = CallbacksRegistry() self._config = self._build_configuration() self._listeners: Dict[int, Any] = {} """Listeners that provides attributes to be used as callbacks.""" if self._abstract: raise InvalidDefinition(_("There are no states or transitions.")) class_listener_instances = self._resolve_class_listeners(**kwargs) all_listeners = class_listener_instances + (listeners or []) self._register_callbacks(all_listeners) # Activate the initial state, this only works if the outer scope is sync code. # for async code, the user should manually call `await sm.activate_initial_state()` # after state machine creation. self._engine = self._get_engine() self._engine.start(**kwargs) def _get_engine(self): if self._callbacks.has_async_callbacks: return AsyncEngine(self) return SyncEngine(self) def _resolve_class_listeners(self, **kwargs: Any) -> List[object]: resolved: List[object] = [] for entry in self._class_listeners: if callable(entry): instance = entry() setup = getattr(instance, "setup", None) if setup is not None: sig = SignatureAdapter.from_callable(setup) ba = sig.bind_expected(self, **kwargs) try: setup(*ba.args, **ba.kwargs) except TypeError as err: raise TypeError( f"Error calling setup() on listener {type(instance).__name__}: {err}" ) from err else: instance = entry resolved.append(instance) return resolved def _build_configuration(self) -> Configuration: """Create InstanceState entries and return a new Configuration.""" instance_states: Dict[str, Any] = {} events = self.__class__._events for state in self.states_map.values(): ist = InstanceState(state, self) instance_states[state.id] = ist if state.id not in events: vars(self)[state.id] = ist return Configuration( instance_states=instance_states, model=self.model, state_field=self.state_field, states_map=self.states_map, )
[docs] def activate_initial_state(self) -> Any: result = self._engine.activate_initial_state() if not isawaitable(result): return result return run_async_from_sync(result)
def _processing_loop(self, caller_future: "Any | None" = None) -> Any: result = self._engine.processing_loop(caller_future) if not isawaitable(result): return result return run_async_from_sync(result) def __setattr__(self, name, value): # Fast path: internal/private attributes are never state IDs. if not name.startswith("_") and name in self.__class__.states_map: raise StateMachineError( _("State overriding is not allowed. Trying to add '{}' to {}").format(value, name) ) super().__setattr__(name, value) def __repr__(self): configuration_ids = [s.id for s in self.configuration] return ( f"{type(self).__name__}(model={self.model!r}, state_field={self.state_field!r}, " f"configuration={configuration_ids!r})" ) def __format__(self, fmt: str) -> str: from .contrib.diagram.formatter import formatter return formatter.render(self, fmt) def __getstate__(self): state = {k: v for k, v in self.__dict__.items() if not isinstance(v, InstanceState)} del state["_callbacks"] del state["_config"] del state["_engine"] return state def __setstate__(self, state: Dict[str, Any]) -> None: listeners = state.pop("_listeners") self.__dict__.update(state) # type: ignore[attr-defined] self._callbacks = CallbacksRegistry() self._config = self._build_configuration() self._listeners = {} # _listeners already contained both class-level and runtime listeners # when serialized, so just re-register them all. self._register_callbacks([]) if listeners: self.add_listener(*listeners.values()) self._engine = self._get_engine() self._engine.start() def _get_initial_configuration(self): initial_state_values = ( self.start_configuration_values if self.start_configuration_values else [self.initial_state.value] # type: ignore[union-attr] ) try: return [self.states_map[value] for value in initial_state_values] except KeyError as err: raise InvalidStateValue(initial_state_values) from err
[docs] def bind_events_to(self, *targets): """Bind the state machine events to the target objects.""" for event in self.events: trigger = getattr(self, event) for target in targets: if hasattr(target, event): warnings.warn( f"Attribute '{event}' already exists on {target!r}. Skipping binding.", UserWarning, stacklevel=2, ) continue setattr(target, event, trigger)
def _add_listener(self, listeners: "Listeners", allowed_references: SpecReference = SPECS_ALL): registry = self._callbacks listeners.resolve(self._specs, registry=registry, allowed_references=allowed_references) for visited in iterate_states_and_transitions(self.states): listeners.resolve( visited._specs, registry=registry, allowed_references=allowed_references, ) return self def _register_callbacks(self, listeners: List[object]): self._listeners.update({id(listener): listener for listener in listeners}) self._add_listener( Listeners.from_listeners( ( Listener.from_obj(self, skip_attrs=self._protected_attrs), Listener.from_obj(self.model, skip_attrs={self.state_field}), *(Listener.from_obj(listener) for listener in listeners), ) ) ) check_callbacks = self._callbacks.check for visited in iterate_states_and_transitions(self.states): try: check_callbacks(visited._specs) except Exception as err: raise InvalidDefinition( f"Error on {visited!s} when resolving callbacks: {err}" ) from err self._callbacks.async_or_sync() @property def active_listeners(self) -> List[object]: """List of all active listeners attached to this instance. Includes class-level listeners (resolved from the ``listeners`` class attribute), constructor ``listeners=`` parameter, and any added via :meth:`add_listener`. """ return list(self._listeners.values())
[docs] def add_listener(self, *listeners): """Add a listener. Listener are a way to generically add behavior to a :ref:`StateMachine` without changing its internal implementation. .. seealso:: :ref:`listeners`. """ self._listeners.update({id(listener): listener for listener in listeners}) return self._add_listener( Listeners.from_listeners(Listener.from_obj(listener) for listener in listeners), allowed_references=SPECS_SAFE, )
def _repr_html_(self): return f'<div class="statemachine">{self._repr_svg_()}</div>' def _repr_svg_(self): return self._graph().create_svg().decode() # type: ignore[attr-defined] def _graph(self): from .contrib.diagram import DotGraphMachine return DotGraphMachine(self).get_graph() @property def configuration_values(self) -> OrderedSet[Any]: """The state configuration values is the set of currently active states's values (or ids if no custom value is defined).""" return self._config.values @property def configuration(self) -> OrderedSet["State"]: """The set of currently active states.""" return self._config.states @configuration.setter def configuration(self, new_configuration: OrderedSet["State"]): self._config.states = new_configuration @property def current_state_value(self): """Get/Set the current :ref:`state` value. This is a low level API, that can be used to assign any valid state value completely bypassing all the hooks and validations. """ return self._config.value @current_state_value.setter def current_state_value(self, value): self._config.value = value @property def current_state(self) -> "State | MutableSet[State]": """Get/Set the current :ref:`state`. .. deprecated:: 3.0.0 Use :attr:`configuration` / :attr:`configuration_values` instead. """ warnings.warn( """Property `current_state` is deprecated in favor of `configuration`.""", DeprecationWarning, stacklevel=2, ) return self._config.current_state @current_state.setter def current_state(self, value): warnings.warn( """Property `current_state` is deprecated in favor of `configuration`.""", DeprecationWarning, stacklevel=2, ) self.current_state_value = value.value @property def events(self) -> "List[Event]": """List of all :ref:`Event` instances declared on this state machine.""" return [getattr(self, event) for event in self.__class__._events] @property def allowed_events(self) -> "List[Event]": """List of the current allowed events.""" return [ getattr(self, event) for state in self.configuration for event in state.transitions.unique_events ]
[docs] def enabled_events(self, *args, **kwargs) -> Any: """List of the current enabled events, considering guard conditions. An event is **enabled** if at least one of its transitions from the current state has all ``cond``/``unless`` guards satisfied. Args: *args: Positional arguments forwarded to condition callbacks. **kwargs: Keyword arguments forwarded to condition callbacks. Returns: A list of enabled :ref:`Event` instances. """ result = self._engine.enabled_events(*args, **kwargs) if not isawaitable(result): return result return run_async_from_sync(result)
def _put_nonblocking(self, trigger_data: TriggerData, internal: bool = False): """Put the trigger on the queue without blocking the caller.""" self._engine.put(trigger_data, internal=internal)
[docs] def send( self, event: str, *args, delay: float = 0, send_id: "str | None" = None, internal: bool = False, **kwargs, ) -> Any: """Send an :ref:`Event` to the state machine. :param event: The trigger for the state machine, specified as an event id string. :param args: Additional positional arguments to pass to the event. :param delay: A time delay in milliseconds to process the event. Default is 0. :param send_id: An identifier for the event, used with ``cancel_event()`` to cancel delayed events. :param kwargs: Additional keyword arguments to pass to the event. .. seealso:: See: :ref:`triggering events`. """ know_event = getattr(self, event, None) event_name = know_event.name if know_event else event delay = ( delay if delay else know_event and know_event.delay or 0 ) # first the param, then the event, or 0 event_instance = BoundEvent( id=event, name=event_name, delay=delay, internal=internal, _sm=self ) result = event_instance(*args, send_id=send_id, **kwargs) if not isawaitable(result): return result return run_async_from_sync(result)
[docs] def raise_( self, event: str, *args, delay: float = 0, send_id: "str | None" = None, **kwargs ) -> Any: """Send an :ref:`Event` to the state machine in the internal event queue. Events on the internal queue are processed immediately within the current macrostep, before any pending external events. This is equivalent to calling ``send(..., internal=True)``. .. seealso:: See: :ref:`triggering-events`. """ return self.send(event, *args, delay=delay, send_id=send_id, internal=True, **kwargs)
[docs] def cancel_event(self, send_id: str): """Cancel all the delayed events with the given ``send_id``.""" self._engine.cancel_event(send_id)
@property def is_terminated(self): """Whether the state machine has reached a final state. Returns ``True`` when a top-level final state has been entered and the engine is no longer running. This is the recommended way to check for completion -- it works for flat, compound, and parallel topologies. """ return not self._engine.running
[docs] class StateMachine(StateChart): allow_event_without_transition: bool = False enable_self_transition_entries: bool = False atomic_configuration_update: bool = True catch_errors_as_events: bool = False