import warnings
from collections.abc import MutableSet
from inspect import isawaitable
from typing import TYPE_CHECKING
from typing import Any
from typing import Generic
from typing import TypeVar
from statemachine.orderedset import OrderedSet
from .callbacks import SPECS_ALL
from .callbacks import SPECS_SAFE
from .callbacks import CallbackSpecList
from .callbacks import CallbacksRegistry
from .callbacks import SpecListGrouper
from .callbacks import SpecReference
from .configuration import Configuration
from .dispatcher import Listener
from .dispatcher import Listeners
from .engines.async_ import AsyncEngine
from .engines.sync import SyncEngine
from .event import BoundEvent
from .event_data import TriggerData
from .exceptions import InvalidDefinition
from .exceptions import InvalidStateValue
from .exceptions import StateMachineError
from .exceptions import TransitionNotAllowed
from .factory import StateMachineMetaclass
from .graph import iterate_states_and_transitions
from .i18n import _
from .model import Model
from .signature import SignatureAdapter
from .state import InstanceState
from .utils import run_async_from_sync
if TYPE_CHECKING:
from .event import Event
from .state import State
from .states import States
TModel = TypeVar("TModel")
[docs]
class StateChart(Generic[TModel], metaclass=StateMachineMetaclass):
"""
Args:
model: An optional external object to store state. See :ref:`domain models`.
state_field (str): The model's field which stores the current state.
Default: ``state``.
start_value: An optional start state value if there's no current state assigned
on the :ref:`domain models`. Default: ``None``.
listeners: An optional list of objects that provides attributes to be used as callbacks.
See :ref:`listeners` for more details.
**kwargs: Additional keyword arguments available for dependency injection into
callbacks. These are passed to class listener ``setup()`` methods and to the
initial activation callbacks (e.g. ``on_enter_<state>``).
"""
TransitionNotAllowed = TransitionNotAllowed
"""Shortcut alias for easy exception handling.
Example::
try:
sm.send("an-inexistent-event")
except sm.TransitionNotAllowed:
pass
"""
_loop_sleep_in_ms = 0.001
allow_event_without_transition: bool = True
"""If ``False`` when an event does not result in a transition, an exception
``TransitionNotAllowed`` will be raised. If ``True`` the state machine allows triggering
events that may not lead to a state :ref:`transition`, including tolerance to unknown
:ref:`event` triggers. Default: ``True``."""
enable_self_transition_entries: bool = True
"""If `False` (default), when a self-transition is selected,
the state entry/exit actions will not be executed. If `True`, the state entry actions
will be executed, which is conformant with the SCXML spec.
"""
atomic_configuration_update: bool = False
"""If `False` (default), the state machine will follow the SCXML
specification, that means in a microstep, it will first exit and execute exit callbacks
for all the states in the exit set in reversed document order, then execute the
transition content (on callbaks), then enter all the states in the enter set in
document order.
If `True`, the state machine will execute the exit callbacks, the on transition
callbacks, then atomically update the configuration of exited and entered states, then
execute the enter callbacks.
"""
catch_errors_as_events: bool = True
"""If ``True`` (default), runtime exceptions in callbacks (guards, actions, entry/exit)
produce an ``error.execution`` internal event instead of propagating, as mandated by the
SCXML specification. If ``False``, exceptions propagate normally."""
start_configuration_values: list[Any] = []
"""Default state values to be entered when the state machine starts.
If empty (default), the root ``initial`` state will be used.
"""
# -- Attributes set by StateMachineMetaclass during class construction --
name: str
"""The class name of the state machine (e.g. ``"TrafficLightMachine"``)."""
id: str
"""Lowercase version of :attr:`name` (e.g. ``"trafficlightmachine"``)."""
states: "States"
"""Collection of top-level :ref:`State` objects declared on this class."""
states_map: dict[Any, "State"]
"""Mapping from each state's ``value`` to the corresponding :ref:`State` instance.
Includes states at all nesting levels (compound children, parallel regions, etc.)."""
initial_state: "State | None"
"""The single top-level initial :ref:`State`, or ``None`` for abstract classes."""
final_states: "list[State]"
"""List of top-level :ref:`State` objects marked as ``final``."""
_abstract: bool
_events: "dict[Event, None]"
_protected_attrs: set
_specs: CallbackSpecList
_class_listeners: list[Any]
prepare: SpecListGrouper
def __init__(
self,
model: "TModel | None" = None,
state_field: str = "state",
start_value: Any = None,
listeners: "list[object] | None" = None,
**kwargs: Any,
):
self.model: TModel = model if model is not None else Model() # type: ignore[assignment]
"""The external model object that holds domain state, or an internal
:class:`Model` instance when none is provided. See :ref:`domain models`."""
self.history_values: dict[str, list[State]] = {}
"""Mapping from compound state IDs to the list of states that were active
the last time that compound state was exited. Used by history pseudo-states
to restore previous configurations."""
self.state_field = state_field
self.start_configuration_values = (
[start_value] if start_value is not None else list(self.start_configuration_values)
)
self._callbacks = CallbacksRegistry()
self._config = self._build_configuration()
self._listeners: dict[int, Any] = {}
"""Listeners that provides attributes to be used as callbacks."""
if self._abstract:
raise InvalidDefinition(_("There are no states or transitions."))
class_listener_instances = self._resolve_class_listeners(**kwargs)
all_listeners = class_listener_instances + (listeners or [])
self._register_callbacks(all_listeners)
# Activate the initial state, this only works if the outer scope is sync code.
# for async code, the user should manually call `await sm.activate_initial_state()`
# after state machine creation.
self._engine = self._get_engine()
self._engine.start(**kwargs)
def _get_engine(self):
if self._callbacks.has_async_callbacks:
return AsyncEngine(self)
return SyncEngine(self)
def _resolve_class_listeners(self, **kwargs: Any) -> list[object]:
resolved: list[object] = []
for entry in self._class_listeners:
if callable(entry):
instance = entry()
setup = getattr(instance, "setup", None)
if setup is not None:
sig = SignatureAdapter.from_callable(setup)
ba = sig.bind_expected(self, **kwargs)
try:
setup(*ba.args, **ba.kwargs)
except TypeError as err:
raise TypeError(
f"Error calling setup() on listener {type(instance).__name__}: {err}"
) from err
else:
instance = entry
resolved.append(instance)
return resolved
def _build_configuration(self) -> Configuration:
"""Create InstanceState entries and return a new Configuration."""
instance_states: dict[str, Any] = {}
events = self.__class__._events
for state in self.states_map.values():
ist = InstanceState(state, self)
instance_states[state.id] = ist
if state.id not in events:
vars(self)[state.id] = ist
return Configuration(
instance_states=instance_states,
model=self.model,
state_field=self.state_field,
states_map=self.states_map,
)
[docs]
def activate_initial_state(self) -> Any:
result = self._engine.activate_initial_state()
if not isawaitable(result):
return result
return run_async_from_sync(result)
def _processing_loop(self, caller_future: "Any | None" = None) -> Any:
result = self._engine.processing_loop(caller_future)
if not isawaitable(result):
return result
return run_async_from_sync(result)
def __setattr__(self, name, value):
# Fast path: internal/private attributes are never state IDs.
if not name.startswith("_") and name in self.__class__.states_map:
raise StateMachineError(
_("State overriding is not allowed. Trying to add '{}' to {}").format(value, name)
)
super().__setattr__(name, value)
def __repr__(self):
configuration_ids = [s.id for s in self.configuration]
return (
f"{type(self).__name__}(model={self.model!r}, state_field={self.state_field!r}, "
f"configuration={configuration_ids!r})"
)
def __format__(self, fmt: str) -> str:
from .contrib.diagram.formatter import formatter
return formatter.render(self, fmt)
def __getstate__(self):
state = {k: v for k, v in self.__dict__.items() if not isinstance(v, InstanceState)}
del state["_callbacks"]
del state["_config"]
del state["_engine"]
return state
def __setstate__(self, state: dict[str, Any]) -> None:
listeners = state.pop("_listeners")
self.__dict__.update(state) # type: ignore[attr-defined]
self._callbacks = CallbacksRegistry()
self._config = self._build_configuration()
self._listeners = {}
# _listeners already contained both class-level and runtime listeners
# when serialized, so just re-register them all.
self._register_callbacks([])
if listeners:
self.add_listener(*listeners.values())
self._engine = self._get_engine()
self._engine.start()
def _get_initial_configuration(self):
initial_state_values = (
self.start_configuration_values
if self.start_configuration_values
else [self.initial_state.value] # type: ignore[union-attr]
)
try:
return [self.states_map[value] for value in initial_state_values]
except KeyError as err:
raise InvalidStateValue(initial_state_values) from err
[docs]
def bind_events_to(self, *targets):
"""Bind the state machine events to the target objects."""
for event in self.events:
trigger = getattr(self, event)
for target in targets:
if hasattr(target, event):
warnings.warn(
f"Attribute '{event}' already exists on {target!r}. Skipping binding.",
UserWarning,
stacklevel=2,
)
continue
setattr(target, event, trigger)
def _add_listener(self, listeners: "Listeners", allowed_references: SpecReference = SPECS_ALL):
registry = self._callbacks
listeners.resolve(self._specs, registry=registry, allowed_references=allowed_references)
for visited in iterate_states_and_transitions(self.states):
listeners.resolve(
visited._specs,
registry=registry,
allowed_references=allowed_references,
)
return self
def _register_callbacks(self, listeners: list[object]):
self._listeners.update({id(listener): listener for listener in listeners})
self._add_listener(
Listeners.from_listeners(
(
Listener.from_obj(self, skip_attrs=self._protected_attrs),
Listener.from_obj(self.model, skip_attrs={self.state_field}),
*(Listener.from_obj(listener) for listener in listeners),
)
)
)
check_callbacks = self._callbacks.check
for visited in iterate_states_and_transitions(self.states):
try:
check_callbacks(visited._specs)
except Exception as err:
raise InvalidDefinition(
f"Error on {visited!s} when resolving callbacks: {err}"
) from err
self._callbacks.async_or_sync()
@property
def active_listeners(self) -> list[object]:
"""List of all active listeners attached to this instance.
Includes class-level listeners (resolved from the ``listeners`` class attribute),
constructor ``listeners=`` parameter, and any added via :meth:`add_listener`.
"""
return list(self._listeners.values())
[docs]
def add_listener(self, *listeners):
"""Add a listener.
Listener are a way to generically add behavior to a :ref:`StateMachine` without changing
its internal implementation.
.. seealso::
:ref:`listeners`.
"""
self._listeners.update({id(listener): listener for listener in listeners})
return self._add_listener(
Listeners.from_listeners(Listener.from_obj(listener) for listener in listeners),
allowed_references=SPECS_SAFE,
)
def _repr_html_(self):
return f'<div class="statemachine">{self._repr_svg_()}</div>'
def _repr_svg_(self):
return self._graph().create_svg().decode() # type: ignore[attr-defined]
def _graph(self):
from .contrib.diagram import DotGraphMachine
return DotGraphMachine(self).get_graph()
@property
def configuration_values(self) -> OrderedSet[Any]:
"""The state configuration values is the set of currently active states's values
(or ids if no custom value is defined)."""
return self._config.values
@property
def configuration(self) -> OrderedSet["State"]:
"""The set of currently active states."""
return self._config.states
@configuration.setter
def configuration(self, new_configuration: OrderedSet["State"]):
self._config.states = new_configuration
@property
def current_state_value(self):
"""Get/Set the current :ref:`state` value.
This is a low level API, that can be used to assign any valid state value
completely bypassing all the hooks and validations.
"""
return self._config.value
@current_state_value.setter
def current_state_value(self, value):
self._config.value = value
@property
def current_state(self) -> "State | MutableSet[State]":
"""Get/Set the current :ref:`state`.
.. deprecated:: 3.0.0
Use :attr:`configuration` / :attr:`configuration_values` instead.
"""
warnings.warn(
"""Property `current_state` is deprecated in favor of `configuration`.""",
DeprecationWarning,
stacklevel=2,
)
return self._config.current_state
@current_state.setter
def current_state(self, value):
warnings.warn(
"""Property `current_state` is deprecated in favor of `configuration`.""",
DeprecationWarning,
stacklevel=2,
)
self.current_state_value = value.value
@property
def events(self) -> "list[Event]":
"""List of all :ref:`Event` instances declared on this state machine."""
return [getattr(self, event) for event in self.__class__._events]
@property
def allowed_events(self) -> "list[Event]":
"""List of the current allowed events."""
return [
getattr(self, event)
for state in self.configuration
for event in state.transitions.unique_events
]
[docs]
def enabled_events(self, *args, **kwargs) -> Any:
"""List of the current enabled events, considering guard conditions.
An event is **enabled** if at least one of its transitions from the current
state has all ``cond``/``unless`` guards satisfied.
Args:
*args: Positional arguments forwarded to condition callbacks.
**kwargs: Keyword arguments forwarded to condition callbacks.
Returns:
A list of enabled :ref:`Event` instances.
"""
result = self._engine.enabled_events(*args, **kwargs)
if not isawaitable(result):
return result
return run_async_from_sync(result)
def _put_nonblocking(self, trigger_data: TriggerData, internal: bool = False):
"""Put the trigger on the queue without blocking the caller."""
self._engine.put(trigger_data, internal=internal)
[docs]
def send(
self,
event: str,
*args,
delay: float = 0,
send_id: "str | None" = None,
internal: bool = False,
**kwargs,
) -> Any:
"""Send an :ref:`Event` to the state machine.
:param event: The trigger for the state machine, specified as an event id string.
:param args: Additional positional arguments to pass to the event.
:param delay: A time delay in milliseconds to process the event. Default is 0.
:param send_id: An identifier for the event, used with ``cancel_event()`` to cancel
delayed events.
:param kwargs: Additional keyword arguments to pass to the event.
.. seealso::
See: :ref:`triggering events`.
"""
know_event = getattr(self, event, None)
event_name = know_event.name if know_event else event
delay = (
delay if delay else know_event and know_event.delay or 0
) # first the param, then the event, or 0
event_instance = BoundEvent(
id=event, name=event_name, delay=delay, internal=internal, _sm=self
)
result = event_instance(*args, send_id=send_id, **kwargs)
if not isawaitable(result):
return result
return run_async_from_sync(result)
[docs]
def raise_(
self, event: str, *args, delay: float = 0, send_id: "str | None" = None, **kwargs
) -> Any:
"""Send an :ref:`Event` to the state machine in the internal event queue.
Events on the internal queue are processed immediately within the current
macrostep, before any pending external events. This is equivalent to calling
``send(..., internal=True)``.
.. seealso::
See: :ref:`triggering-events`.
"""
return self.send(event, *args, delay=delay, send_id=send_id, internal=True, **kwargs)
[docs]
def cancel_event(self, send_id: str):
"""Cancel all the delayed events with the given ``send_id``."""
self._engine.cancel_event(send_id)
@property
def is_terminated(self):
"""Whether the state machine has reached a final state.
Returns ``True`` when a top-level final state has been entered and the
engine is no longer running. This is the recommended way to check for
completion -- it works for flat, compound, and parallel topologies.
"""
return not self._engine.running
[docs]
class StateMachine(StateChart):
allow_event_without_transition: bool = False
enable_self_transition_entries: bool = False
atomic_configuration_update: bool = True
catch_errors_as_events: bool = False