Event & Action System Internals¶
This document covers the internal mechanics of the event-action system in Sugarcoat, including event types, condition evaluation, action dispatch, and the fallback system.
Event Overview¶
An Event (defined in ros_sugar.core.event) represents a condition on ROS topic data that, when satisfied, triggers one or more Action instances. Events are evaluated continuously by the Monitor node against a shared topic cache (the “blackboard”).
Event Construction¶
Events are constructed from Condition objects, which are typically built implicitly through Python comparison operators on topic message builders:
from ros_sugar.io import Topic
from ros_sugar.io.supported_types import Float32
sensor = Topic(name="/battery_level", msg_type=Float32)
# This builds a Condition via MsgConditionBuilder.__gt__
low_battery = Event(sensor.msg.data < 10.0)
The expression sensor.msg.data returns a MsgConditionBuilder that captures the attribute path ["data"]. Applying a comparison operator (e.g., <) produces a Condition object with:
topic_name:"/battery_level"attribute_path:["data"]operator_func:operator.ltref_value:10.0
Condition Tree¶
Conditions can be composed using logical operators to form a tree:
# AND: both conditions must be true
critical = Event((sensor.msg.data < 5.0) & (motor.msg.status == 1))
# OR: either condition triggers
alert = Event((sensor.msg.data < 10.0) | (temp.msg.data > 80.0))
Internally, this creates a composite Condition with a ConditionLogicOp (AND, OR, or NOT) and a list of sub_conditions. Evaluation is recursive – the Condition.evaluate() method walks the tree and applies each leaf condition against the topic cache.
Nested Attribute Access¶
MsgConditionBuilder supports chained attribute access to reach deeply nested fields in ROS messages:
odom = Topic(name="/odom", msg_type=Odometry)
# Access odom.pose.pose.position.x
position_event = Event(odom.msg.pose.pose.position.x > 5.0)
Attribute paths are validated at construction time against the ROS message type hierarchy. An AttributeError is raised if the path is invalid.
Event Types (Condition Patterns)¶
Sugarcoat supports several event patterns, all built using Condition expressions:
Pattern |
Description |
Example |
|---|---|---|
OnAny |
Fires when any data arrives on the topic |
|
OnEqual |
Fires when value equals reference |
|
OnGreater |
Fires when value exceeds reference |
|
OnLess |
Fires when value falls below reference |
|
OnDifferent |
Fires when value differs from reference |
|
OnChange |
Fires on transition from False to True |
|
OnCondition |
Fires on arbitrary compound condition |
|
OnExternalEvent |
Fires from an |
Used internally by |
OnAny¶
When a Topic object is passed directly (rather than a Condition), the event fires whenever all involved topics have data present in the blackboard:
camera_ready = Event(camera_topic)
OnChange¶
Setting on_change=True adds edge-detection semantics. The event fires only on the transition from False to True, not while the condition remains true:
# Fires once when the robot enters the danger zone, not continuously
entered_danger = Event(sensor.msg.data < 0.5, on_change=True)
JSON Serialization¶
Events and their conditions support full serialization for multi-process execution. When components run in separate processes, events are serialized via Event.to_json() / Event.from_json(), which in turn serializes the Condition tree. This is used by the Launcher when spawning components via ExecuteProcess.
event_json = my_event.to_json()
restored_event = Event.from_json(event_json)
The serialization preserves the complete condition tree, operator functions (mapped by name), reference values, and topic metadata.
Action Dispatch¶
An Action (defined in ros_sugar.core.action) wraps a callable that is executed when an event triggers. Actions can wrap:
A component method (decorated with
@component_action)A plain Python function
An async coroutine
A ROS launch action (e.g.,
LogInfo)
Registering Actions¶
Actions are associated with events through the Launcher.add_pkg() method:
from ros_sugar.core import Action, Event
stop_action = Action(my_component.emergency_stop)
launcher.add_pkg(
components=[my_component],
events_actions={low_battery: stop_action},
)
Internally, event.register_actions() stores the actions on the event. When check_condition() evaluates to True, the actions are submitted to a shared ThreadPoolExecutor for non-blocking execution.
Topic Data Injection¶
When an action fires, the current topic cache (a dict mapping topic names to their latest messages) is passed as a topics keyword argument. Actions can use this to access the data that triggered the event:
def handle_collision(topics: dict = None):
scan = topics.get("/laser_scan")
# ... react to scan data ...
Automatic Type Conversion¶
If an event involves a single topic, Action._setup_conversions() attempts to create an automatic parser from the event’s message type to the action’s expected input type. This allows actions to receive converted data without manual parsing.
@component_action Decorator¶
The component_action decorator (in ros_sugar.utils) validates action methods at call time:
Instance check: Ensures the method is called on a
LifecycleNodeinstance.Return type: Verifies the return annotation is
boolorNone.Lifecycle state: If
active=True, the component must be in the Active state.
The decorator can be used bare or with parameters:
from ros_sugar.utils import component_action
class Navigator(BaseComponent):
# Basic usage
@component_action
def stop(self) -> bool:
self.cmd_vel_publisher.publish(Twist())
return True
# With an OpenAI-compatible tool description (for LLM orchestration)
@component_action(description={
"type": "function",
"function": {
"name": "navigate_to",
"description": "Navigate the robot to the specified coordinates.",
"parameters": {
"type": "object",
"properties": {
"x": {"type": "number"},
"y": {"type": "number"},
},
},
},
})
def navigate_to(self, *, x: float, y: float) -> bool:
...
When description is provided, it is stored on the wrapper as _action_description and can be used by orchestrating LLM agents to discover available tools. When omitted, the method’s docstring is used instead.
Fallback System¶
The fallback system provides automatic failure recovery. It is managed by ComponentFallbacks (in ros_sugar.core.fallbacks).
ComponentFallbacks¶
ComponentFallbacks holds a set of Fallback objects, one for each failure level:
Attribute |
Triggered When |
|---|---|
|
|
|
|
|
|
|
Any failure without a specific fallback defined |
|
All fallbacks for a failure level have been exhausted |
Fallback¶
Each Fallback (an attrs-defined class) wraps one or more Action instances and a max_retries count:
from ros_sugar.core import Action, ComponentFallbacks, Fallback
fallbacks = ComponentFallbacks(
on_component_fail=Fallback(
action=[Action(component.restart), Action(component.shutdown)],
max_retries=3,
),
on_algorithm_fail=Fallback(
action=Action(component.reset_algorithm),
max_retries=5,
),
)
Failure Hierarchy¶
When a failure is detected, ComponentFallbacks follows this resolution order:
Look for a fallback specific to the failure level (
on_algorithm_fail,on_component_fail, oron_system_fail).If no specific fallback is defined, fall back to
on_any_fail.For each fallback, execute the current action up to
max_retriestimes.If
max_retriesis exhausted and the fallback has a list of actions, move to the next action in the list.If all actions in the list are exhausted, set the
giveupflag and executeon_giveupif defined.
A successful fallback execution (action returns True) resets the health status to STATUS_HEALTHY.
Default Behavior¶
By default, BaseComponent sets on_any_fail to Action(self.broadcast_status) with max_retries=None (infinite retries). This means any failure that does not have a specific fallback will cause the component to broadcast its status continuously, allowing the Monitor to detect the issue.
Monitor’s Role in the Event-Action Loop¶
The Monitor ties events, actions, and fallbacks together:
Event evaluation: The
Monitorsubscribes to all topics involved in registered events. On each message, it updates the blackboard and evaluates all events.Action dispatch: When an event triggers, the
Monitoreither executes the action directly (for simple callables) or emits anInternalEventthat theLauncherhandles (for lifecycle transitions or process-level actions).Health monitoring: The
Monitorsubscribes to each component’sComponentStatustopic. When a failure is detected, it triggers the component’s fallback chain.Staleness detection:
EventBlackboardEntrytracks message UUIDs and timestamps. TheMonitoruses these to avoid re-triggering events on stale data and to perform lazy expiration of old entries.