from collections import deque
from typing import TYPE_CHECKING
from typing import Any
from typing import Dict # deprecated since 3.9: https://peps.python.org/pep-0585/
from typing import List # deprecated since 3.9: https://peps.python.org/pep-0585/
from .dispatcher import ObjectConfig
from .dispatcher import resolver_factory
from .event import Event
from .event_data import EventData
from .event_data import TriggerData
from .exceptions import InvalidDefinition
from .exceptions import InvalidStateValue
from .exceptions import TransitionNotAllowed
from .factory import StateMachineMetaclass
from .i18n import _
from .model import Model
from .transition import Transition
if TYPE_CHECKING:
from .state import State
[docs]class StateMachine(metaclass=StateMachineMetaclass):
"""
Args:
model: An optional external object to store state. See :ref:`domain models`.
state_field (str): The model's field which stores the current state.
Default: ``state``.
start_value: An optional start state value if there's no current state assigned
on the :ref:`domain models`. Default: ``None``.
rtc (bool): Controls the :ref:`processing model`. Defaults to ``True``
that corresponds to a **run-to-completion** (RTC) model.
allow_event_without_transition: If ``False`` when an event does not result in a transition,
an exception ``TransitionNotAllowed`` will be raised.
If ``True`` the state machine allows triggering events that may not lead to a state
:ref:`transition`, including tolerance to unknown :ref:`event` triggers.
Default: ``False``.
"""
TransitionNotAllowed = TransitionNotAllowed
"""Shortcut for easy exception handling.
Example::
try:
sm.send("an-inexistent-event")
except sm.TransitionNotAllowed:
pass
"""
_events: Dict[Any, Any] = {}
states: List["State"] = []
"""List of all state machine :ref:`states`."""
states_map: Dict[Any, "State"] = {}
"""Map of ``state.value`` to the corresponding :ref:`state`."""
def __init__(
self,
model: Any = None,
state_field: str = "state",
start_value: Any = None,
rtc: bool = True,
allow_event_without_transition: bool = False,
):
self.model = model if model else Model()
self.state_field = state_field
self.start_value = start_value
self.allow_event_without_transition = allow_event_without_transition
self.__rtc = rtc
self.__processing: bool = False
self._external_queue: deque = deque()
assert hasattr(self, "_abstract")
if self._abstract:
raise InvalidDefinition(_("There are no states or transitions."))
initial_transition = Transition(
None, self._get_initial_state(), event="__initial__"
)
self._setup(initial_transition)
self._activate_initial_state(initial_transition)
def __repr__(self):
current_state_id = self.current_state.id if self.current_state_value else None
return (
f"{type(self).__name__}(model={self.model!r}, state_field={self.state_field!r}, "
f"current_state={current_state_id!r})"
)
def _get_initial_state(self):
current_state_value = (
self.start_value if self.start_value else self.initial_state.value
)
try:
return self.states_map[current_state_value]
except KeyError as err:
raise InvalidStateValue(current_state_value) from err
def _activate_initial_state(self, initial_transition):
if self.current_state_value is None:
# send an one-time event `__initial__` to enter the current state.
# current_state = self.current_state
initial_transition.before.clear()
initial_transition.on.clear()
initial_transition.after.clear()
event_data = EventData(
trigger_data=TriggerData(
machine=self,
event=initial_transition.event,
),
transition=initial_transition,
)
self._activate(event_data)
def _get_protected_attrs(self):
return {
"_abstract",
"model",
"state_field",
"start_value",
"initial_state",
"final_states",
"states",
"_events",
"states_map",
"send",
} | {s.id for s in self.states}
def _visit_states_and_transitions(self, visitor):
for state in self.states:
visitor(state)
for transition in state.transitions:
visitor(transition)
def _setup(self, initial_transition):
machine = ObjectConfig(self, skip_attrs=self._get_protected_attrs())
model = ObjectConfig(self.model, skip_attrs={self.state_field})
default_resolver = resolver_factory(machine, model)
# clone states and transitions to avoid sharing callbacks references between instances
self.states_map = {
state.value: state.clone()._setup(self, default_resolver)
for state in self.states
}
self.states = list(self.states_map.values())
for state in self.states:
for transition in state.transitions:
transition._setup(self, default_resolver)
initial_transition._setup(self, default_resolver)
self.add_observer(machine, model)
[docs] def add_observer(self, *observers):
"""Add an observer.
Observers are a way to generically add behavior to a :ref:`StateMachine` without changing
its internal implementation.
.. seealso::
:ref:`observers`.
"""
resolvers = [resolver_factory(ObjectConfig.from_obj(o)) for o in observers]
self._visit_states_and_transitions(lambda x: x._add_observer(*resolvers))
return self
def _repr_html_(self):
return f'<div class="statemachine">{self._repr_svg_()}</div>'
def _repr_svg_(self):
return self._graph().create_svg().decode()
def _graph(self):
from .contrib.diagram import DotGraphMachine
return DotGraphMachine(self).get_graph()
@property
def current_state_value(self):
"""Get/Set the current :ref:`state` value.
This is a low level API, that can be used to assign any valid state value
completely bypassing all the hooks and validations.
"""
value = getattr(self.model, self.state_field, None)
return value
@current_state_value.setter
def current_state_value(self, value):
if value not in self.states_map:
raise InvalidStateValue(value)
setattr(self.model, self.state_field, value)
@property
def current_state(self) -> "State":
"""Get/Set the current :ref:`state`.
This is a low level API, that can be to assign any valid state
completely bypassing all the hooks and validations.
"""
return self.states_map[self.current_state_value]
@current_state.setter
def current_state(self, value):
self.current_state_value = value.value
@property
def events(self):
return self.__class__.events
@property
def allowed_events(self):
"""List of the current allowed events."""
return [
getattr(self, event)
for event in self.current_state.transitions.unique_events
]
def _process(self, trigger):
"""Process event triggers.
The simplest implementation is the non-RTC (synchronous),
where the trigger will be run immediately and the result collected as the return.
.. note::
While processing the trigger, if others events are generated, they
will also be processed immediately, so a "nested" behavior happens.
If the machine is on ``rtc`` model (queued), the event is put on a queue, and only the
first event will have the result collected.
.. note::
While processing the queue items, if others events are generated, they
will be processed sequentially (and not nested).
"""
if not self.__rtc:
# The machine is in "synchronous" mode
return trigger()
# The machine is in "queued" mode
# Add the trigger to queue and start processing in a loop.
self._external_queue.append(trigger)
# We make sure that only the first event enters the processing critical section,
# next events will only be put on the queue and processed by the same loop.
if self.__processing:
return
return self._processing_loop()
def _processing_loop(self):
"""Execute the triggers in the queue in order until the queue is empty"""
self.__processing = True
# We will collect the first result as the processing result to keep backwards compatibility
# so we need to use a sentinel object instead of `None` because the first result may
# be also `None`, and on this case the `first_result` may be overridden by another result.
sentinel = object()
first_result = sentinel
try:
while self._external_queue:
trigger = self._external_queue.popleft()
try:
result = trigger()
if first_result is sentinel:
first_result = result
except Exception:
# Whe clear the queue as we don't have an expected behavior
# and cannot keep processing
self._external_queue.clear()
raise
finally:
self.__processing = False
return first_result if first_result is not sentinel else None
def _activate(self, event_data: EventData):
transition = event_data.transition
source = event_data.state
target = transition.target
result = transition.before.call(*event_data.args, **event_data.extended_kwargs)
if source is not None and not transition.internal:
source.exit.call(*event_data.args, **event_data.extended_kwargs)
result += transition.on.call(*event_data.args, **event_data.extended_kwargs)
self.current_state = target
event_data.state = target
if not transition.internal:
target.enter.call(*event_data.args, **event_data.extended_kwargs)
transition.after.call(*event_data.args, **event_data.extended_kwargs)
if len(result) == 0:
result = None
elif len(result) == 1:
result = result[0]
return result
[docs] def send(self, event, *args, **kwargs):
"""Send an :ref:`Event` to the state machine.
.. seealso::
See: :ref:`triggering events`.
"""
event = Event(event)
return event.trigger(self, *args, **kwargs)