Event-Driven Architecture¶
Sugarcoat provides a declarative event-driven system that lets you define conditions on ROS2 topics or Python callables and associate them with actions that execute when the conditions are met. This page covers both the user-facing API and the low-level implementation details for developers working on or extending the framework.
Tip
Open the Interactive Architecture Diagram for a visual version of the routing and flow diagrams below, with clickable flow selectors and architecture highlighting.
Part 1 — Event & Action API¶
Condition Types¶
Condition Expression¶
A declarative predicate on a topic message attribute. To build the condition, all the nested attributes of a ROS2 topic are accessible via the msg attribute. The condition is evaluated each time new data arrives on the involved topic.
from ros_sugar.core import Event
from ros_sugar.io import Topic
event_topic = Topic(name="/float_input", msg_type="Float32")
event = Event(event_condition=event_topic.msg.data > 3.0, on_change=True)
The expression event_topic.msg.data returns a MsgConditionBuilder that captures the attribute path ["data"]. Applying a comparison operator (e.g., >) produces a Condition object with the topic name, attribute path, operator function, and reference value.
Nested Attribute Access¶
MsgConditionBuilder supports chained attribute access to reach deeply nested fields in ROS messages:
odom = Topic(name="/odom", msg_type="Odometry")
# Access odom.pose.pose.position.x
position_event = Event(odom.msg.pose.pose.position.x > 5.0)
Attribute paths are validated at construction time against the ROS message type hierarchy. An AttributeError is raised if the path is invalid.
Condition Tree (Compound Conditions)¶
Conditions can be composed using logical operators to form a tree:
sensor = Topic(name="/battery_level", msg_type="Float32")
motor = Topic(name="/motor", msg_type="Int32")
temp = Topic(name="/temperature", msg_type="Float32")
# AND: both conditions must be true
critical = Event((sensor.msg.data < 5.0) & (motor.msg.data == 1))
# OR: either condition triggers
alert = Event((sensor.msg.data < 10.0) | (temp.msg.data > 80.0))
Internally, this creates a composite Condition with a ConditionLogicOp (AND, OR, or NOT) and a list of sub_conditions. Evaluation is recursive — the Condition.evaluate() method walks the tree and applies each leaf condition against the topic cache.
Event Patterns Summary¶
Pattern |
Description |
Example |
|---|---|---|
OnAny |
Fires when any data arrives on the topic |
|
OnEqual |
Fires when value equals reference |
|
OnGreater |
Fires when value exceeds reference |
|
OnLess |
Fires when value falls below reference |
|
OnDifferent |
Fires when value differs from reference |
|
OnChange |
Fires on transition from False to True |
|
OnCondition |
Fires on arbitrary compound condition |
|
Topic (on-any)¶
When a Topic object is passed directly (rather than a Condition), the event fires whenever all involved topics have data present in the blackboard:
event = Event(event_condition=event_topic)
Callable¶
A user-supplied function polled at check_rate Hz. It must return bool and must not be a @component_action method (those are bound to Actions and Fallbacks and cannot be used as conditions).
def timeout_reached() -> bool:
return time.time() - last_update > 5.0
event = Event(event_condition=timeout_reached, check_rate=10.0)
OnChange (Edge Detection)¶
Setting on_change=True adds edge-detection semantics. The event fires only on the transition from False to True, not while the condition remains true:
# Fires once when the robot enters the danger zone, not continuously
entered_danger = Event(sensor.msg.data < 0.5, on_change=True)
JSON Serialization¶
Events and their conditions support full serialization for multi-process execution. When components run in separate processes, events are serialized via Event.to_json() / Event.from_json(), which in turn serializes the Condition tree. This is used by the Launcher when spawning components via ExecuteProcess.
event_json = my_event.to_json()
restored_event = Event.from_json(event_json)
The serialization preserves the complete condition tree, operator functions (mapped by name), reference values, and topic metadata.
Action Types and Ownership¶
Every action will have an owner: the process/node responsible for executing it. Ownership determines how the event/action pair is routed at launch time.
# |
Action type |
Example |
Owner |
|---|---|---|---|
1 |
Inline recipe method |
A plain Python callable defined in the launch script |
Main process (Launcher) |
2 |
Component action |
A method implemented in a component class, decorated with |
The component node |
3 |
System-level action |
Actions available in the |
Main process (Monitor) |
4 |
ROS launch action |
Standard |
Main process (Launcher) |
Registering Actions¶
Actions are associated with events through the Launcher.add_pkg() method:
from ros_sugar.core import Action, Event
stop_action = Action(my_component.emergency_stop)
launcher.add_pkg(
components=[my_component],
events_actions={low_battery: stop_action},
)
The @component_action Decorator¶
Marks a component method as callable from the event system. It enforces:
The method must be a bound method on a
LifecycleNodesubclass.The return type must be
boolorNone.If
active=Trueis passed, the method only executes when the component is in theACTIVElifecycle state.
from ros_sugar.utils import component_action
class Navigator(BaseComponent):
# Basic usage
@component_action
def stop(self) -> bool:
self.cmd_vel_publisher.publish(Twist())
return True
# With an OpenAI-compatible tool description (for LLM orchestration)
@component_action(description={
"type": "function",
"function": {
"name": "navigate_to",
"description": "Navigate the robot to the specified coordinates.",
"parameters": {
"type": "object",
"properties": {
"x": {"type": "number"},
"y": {"type": "number"},
},
},
},
})
def navigate_to(self, *, x: float, y: float) -> bool:
...
When description is provided, it is stored on the wrapper as _action_description and can be used by orchestrating LLM agents to discover available tools. When omitted, the method’s docstring is used instead.
System-Level Actions¶
Provided by the ros_sugar.actions module and executed by the Monitor node:
Action |
Description |
|---|---|
|
Publishes a message on a topic (optionally at a rate for a duration) |
|
Sends a ROS2 service request |
|
Sends a ROS2 action goal |
Dynamic Arguments from Topics¶
Actions can receive live data from ROS topics as arguments. Instead of passing a static value, pass a topic.msg.attribute expression — the framework will automatically extract the value from the event’s topic data at runtime and inject it into the method call.
sensor = Topic(name="/sensor", msg_type="Float32")
def handle_reading(value: float):
print(f"Sensor reading: {value}")
event = Event(event_condition=sensor)
action = Action(method=handle_reading, args=(sensor.msg.data,))
odom = Topic(name="/odom", msg_type=Odometry)
def navigate(x: float, y: float):
print(f"Going to ({x}, {y})")
event = Event(event_condition=odom)
action = Action(
method=navigate,
kwargs={
"x": odom.msg.pose.pose.position.x,
"y": odom.msg.pose.pose.position.y,
},
)
def log_alert(level: str, value: float):
print(f"[{level}] value = {value}")
# "level" is static, "value" comes from the topic at runtime
action = Action(method=log_alert, args=("WARNING", sensor.msg.data))
These expressions (topic.msg.data, odom.msg.pose.pose.position.x, etc.) are MsgConditionBuilder objects — the same ones used to build conditions. When used as action arguments, they tell the framework which topic and which nested attribute to extract at execution time.
Fallback System¶
The fallback system provides automatic failure recovery. It is managed by ComponentFallbacks (defined in ros_sugar.core.fallbacks).
ComponentFallbacks¶
ComponentFallbacks holds a set of Fallback objects, one for each failure level:
Attribute |
Triggered When |
|---|---|
|
|
|
|
|
|
|
Any failure without a specific fallback defined |
|
All fallbacks for a failure level have been exhausted |
Defining Fallbacks¶
Each Fallback wraps one or more Action instances and a max_retries count:
from ros_sugar.core import Action, ComponentFallbacks, Fallback
fallbacks = ComponentFallbacks(
on_component_fail=Fallback(
action=[Action(component.restart), Action(component.shutdown)],
max_retries=3,
),
on_algorithm_fail=Fallback(
action=Action(component.reset_algorithm),
max_retries=5,
),
)
Failure Hierarchy¶
When a failure is detected, ComponentFallbacks follows this resolution order:
Look for a fallback specific to the failure level (
on_algorithm_fail,on_component_fail, oron_system_fail).If no specific fallback is defined, fall back to
on_any_fail.For each fallback, execute the current action up to
max_retriestimes.If
max_retriesis exhausted and the fallback has a list of actions, move to the next action in the list.If all actions in the list are exhausted, set the
giveupflag and executeon_giveupif defined.
A successful fallback execution (action returns True) resets the health status to STATUS_HEALTHY.
Part 2 — Architecture & Routing Internals¶
Event/Action Routing — Who keeps track of What¶
When you associate events with actions in the launch script via launcher.add_pkg(events_actions={...}), the Launcher inspects each action to determine its owner, then routes the event to the appropriate process. This routing logic lives in the Launcher’s __rewrite_actions_for_components method.
Topic-Based Conditions¶
The event can be associated with a list of actions, and whoever owns the action, owns the event monitoring. If one event maps to actions with different owners, the event monitoring is duplicated: each owner subscribes to the event topic independently and triggers only its own action.
Routing rules for each action in the list:
Action is a @component_action?
├─ Yes, lifecycle action (start/stop/restart) → ROS launch event handler (Launcher)
├─ Yes, non-lifecycle → Serialized to component (_components_events_actions)
├─ No, system-level (publish_message, etc.) → Monitor (_monitor_events_actions)
└─ No, inline recipe method / ROS launch → ROS launch event handler (Launcher, via _internal_events)
Callable-Based Conditions¶
Callable-based conditions are always defined in the recipe, so they are always owned by the main process. The Monitor polls them via a timer. The routing then depends on who owns the consequence action:
Case 1 — Action owned by the main process (recipe method, monitor action, or ROS launch action):
The event and action stay together in the main process. The Monitor polls the callable, and on trigger either executes the action directly (monitor actions) or emits back to the Launcher context (recipe methods and ROS launch actions).
Case 2 — Action owned by a component:
The callable condition runs in the main process, but the action must execute inside the component’s node. Since these live in different processes, a bridge event is created:
The Launcher creates a bridge topic:
/event_bridge/e_{event_id}_{component_name}of typestd_msgs/Bool.The Monitor polls the callable condition at
check_rate. When it returnsTrue, the Monitor publishesBool(True)to the bridge topic.The target component subscribes to the bridge topic. On receiving the message, it evaluates the (trivially true) on-any condition and executes the associated
@component_action.
Monitor (main process) Component (separate process)
┌──────────────────────┐ ┌──────────────────────────┐
│ Timer (check_rate) │ │ │
│ ↓ │ │ │
│ callable() == True? │ Bool(True) │ Subscription callback │
│ ↓ yes │ ──────────────────→ │ ↓ │
│ publish to bridge │ /event_bridge/... │ on-any condition → True │
│ │ │ ↓ │
└──────────────────────┘ │ execute @component_action│
└──────────────────────────┘
Low-Level Implementation¶
Key Data Structures¶
EventBlackboardEntry¶
Defined in
ros_sugar/core/event.py
A timestamped wrapper around a ROS message. Every time a topic message is received, it is stored as a blackboard entry with:
msg: The raw ROS message.timestamp: Unix time of reception.id: A UUID4 for idempotency — prevents the same message instance from triggering the same event twice.
The blackboard uses lazy expiration: expired or already-processed entries are cleaned up at evaluation time, not by a background sweep. This avoids lock contention and unnecessary timers.
Event¶
Defined in
ros_sugar/core/event.py
The runtime trigger unit. Holds the condition (a Condition expression, a Topic, or a Callable), maintains trigger state, and executes registered actions. Key behavioral knobs:
on_change: Only fires on a rising edge (false → true transition).handle_once: Fires at most once across the event’s lifetime.keep_event_delay: Throttles re-triggers by holding the “under processing” flag for a fixed duration after actions complete.
Actions are executed via a shared ThreadPoolExecutor (default 10 workers) to avoid blocking the ROS callback thread.
InternalEvent / OnInternalEvent¶
Defined in
ros_sugar/core/event.py
The bridge between the Monitor node and the ROS2 launch system. InternalEvent is a ROS launch event type that carries an event_name and topics_value dict. OnInternalEvent is a ROS launch event handler that matches by event name and injects topic data into the launch entities before execution.
The Monitor Node
Defined in
ros_sugar/core/monitor.py
The Monitor is a ROS2 node that runs in the main process. It is responsible for:
Subscribing to event topics and evaluating topic-based conditions.
Polling callable-based conditions via timers.
Executing system-level actions (publish_message, send_srv_request, etc.).
Emitting internal events back to the Launcher context for actions the Launcher owns.
Health monitoring: subscribing to each component’s
ComponentStatustopic. When a failure is detected, it triggers the component’s fallback chain.
Activation Flow (_activate_event_monitoring)
When the Monitor activates, it:
Reconstructs monitor actions (
__reconstruct_monitor_actions): For events in_monitor_events_actions, it resolves each action by name to the corresponding Monitor method (e.g.,publish_message) and registers it on the Event object.Merges internal events: Events from
_internal_events(those that need to emit back to the Launcher) are appended to the Monitor’s event list.Creates the topic blackboard: A shared
Dict[str, EventBlackboardEntry]that caches the latest message for each topic across all events.Builds a topic → events index (
__events_per_topic): Maps each unique topic name to the list of events that depend on it, enabling efficient lookup on message arrival.Creates one ROS subscription per unique topic: All events sharing a topic share a single subscriber. The callback
__event_topic_callbackupdates the blackboard and evaluates all dependent events.Creates callable-based polling timers (
__start_callable_based_event_timers): One timer per callable-based event, polling atcheck_rateHz (orconfig.loop_rateif not specified).
Topic-Based Condition Evaluation (__event_topic_callback)
On every incoming message:
The blackboard entry for that topic is updated with the new message, timestamp, and a fresh UUID.
All events that depend on this topic are retrieved from
__events_per_topic.For each event, a clean cache subset is built by checking freshness and idempotency for every topic the event needs (via
EventBlackboardEntry.get).event.check_condition(clean_cache_subset)evaluates the condition tree. If triggered, actions are submitted to the thread pool.
Callable-Based Condition Evaluation
Each callable-based event gets its own timer. On each tick:
event.check_action_condition(blackboard)calls the user-supplied callable directly.If it returns
True(accounting foron_changerising-edge logic), the registered actions are submitted to the thread pool.
The Launcher
Defined in
ros_sugar/launch/launcher.py
The Launcher is the entry point of a Sugarcoat application. It is not a ROS2 node — it orchestrates the ROS2 launch system. Its responsibilities regarding events:
Action Routing (__rewrite_actions_for_components)
For each event/action pair provided by the user, the Launcher classifies the action and routes it to the appropriate owner:
Component actions (non-lifecycle): Serialized into
_components_events_actions. The serialized event JSON is later deserialized by the component at startup.Monitor actions: Stored in
_monitor_events_actions, passed directly to the Monitor node at initialization.Launcher-owned actions (inline methods, ROS launch actions, lifecycle actions): Stored in
_ros_events_actionsand the event is added to_internal_events.
For callable-based events the routing is handled by __route_action_based_event, which either keeps the event in the Monitor (Case 1) or creates a bridge topic (Case 2), as described above.
Internal Events Handler Setup (_setup_internal_events_handlers)
For events routed to _ros_events_actions, the Launcher:
Converts each action into a launch entity:
ROS launch actions are used directly.
Lifecycle actions are converted via
_get_action_launch_entity.Inline recipe methods are wrapped as
OpaqueFunctionviaaction.launch_action(monitor_node=...).
Registers an
OnInternalEventhandler for each event name, wrapping the entities list.Adds the handler to the launch description.
At runtime, when the Monitor detects a trigger for one of these events, it emits an InternalEvent to the launch context. The OnInternalEvent handler matches by event name, injects the topic data into the entities, and executes them.
Monitor ↔ Launcher Emission Bridge (ComponentLaunchAction)
Defined in
ros_sugar/launch/launch_actions.py
When the Monitor’s ComponentLaunchAction executes, it registers the _on_internal_event callback on every internal event:
Topic-based internal events:
event.register_actions(partial(self._on_internal_event, event.id))— the emit callback is registered as an action on the Event object. When the event triggers, it calls the callback which emits anInternalEventto the launch context.Callable-based internal events (
_pure_internal_events):_register_pure_internal_event_emit_method(event_id, ...)stores the emit callback in the Monitor’semit_internal_event_methodsdict.
The _on_internal_event method:
Creates an
InternalEventwith the event name.Snapshots the Monitor’s topic blackboard into
topics_value.Emits the event to the launch context via
context.emit_event_sync, usingcall_soon_threadsafefor thread safety.
The Component
Defined in
ros_sugar/core/component.py
Components handle events that are routed to them via _components_events_actions. The mechanism mirrors the Monitor’s topic-based flow.
Event Setup (_turn_on_events_management)
Called during on_activate(). The component:
Creates a topic blackboard (
_events_topics_blackboard).Builds a topic → events index (
__events_per_topic).Registers actions on each event via
event.register_actions(actions).Creates one ROS subscription per unique topic — including bridge topics for callable-based events.
Event Evaluation (__event_topic_callback)
Identical to the Monitor’s flow: update blackboard → lazy cleanup → event.check_condition(clean_cache_subset) → async action execution.
Components never poll callable conditions directly. If a callable condition needs to trigger a component action, the bridge mechanism converts it into a topic-based event from the component’s perspective.
The Action Class
Defined in
ros_sugar/core/action.py
The Action class wraps a callable and manages argument preparation, dynamic topic data extraction, and conversion to ROS launch entities.
Construction and Argument Classification (__verify_args_kwargs)
When an Action is constructed, its args and kwargs are scanned for MsgConditionBuilder objects (expressions like topic.msg.data). These are separated from static values:
Static values are stored directly in
_args(tuple) and_kwargs(dict) and passed to the method on every call.Dynamic values (
MsgConditionBuilderinstances) are stored in a separate__input_topicsdict, keyed asarg_{index}for positional arguments orkwarg_{name}for keyword arguments. Each entry records the topic name and the attribute path to extract at runtime.
Execution (__call__)
When an event triggers, the Event object calls action(topics=global_topic_cache) where global_topic_cache is a dict mapping topic names to their latest ROS messages. The Action.__call__ method then:
Creates mutable copies of the static
_argsand_kwargs.Iterates over
__input_topics. For each entry:Looks up the topic’s message in the
topicsdict.Calls
topic_condition.get_value(object_value=message)which walks the stored attribute path (e.g.,["pose", "pose", "position", "x"]) to extract the nested value from the message.Inserts the value into
call_args(by index) orcall_kwargs(by name).
Runs any registered automatic type conversions (
__prepared_events_conversions).Calls the underlying
executablewith the fully prepared arguments.
Automatic Type Conversion (_setup_conversions)
When an event involves a single topic, the Event calls action._setup_conversions(topic_name, msg_type) at registration time. This uses _create_auto_topic_parser to attempt an automatic conversion from the event’s message type to the action’s expected input types, using three strategies in order:
Exact match: Input and target are the same type — pass through directly.
Duck typing: All target fields exist in the input with matching types — copy matching fields.
Type-based heuristic: Field names differ but types match uniquely — map by type (with a warning).
If a conversion is found, it is stored and applied automatically during __call__.
Wrapping for ROS Launch (launch_action)
Inline recipe methods and ROS launch actions need to execute within the Launcher’s launch context. The launch_action method converts an Action into a launch-compatible entity:
If the action is a monitor action (
_is_monitor_action), it resolves the executable from the Monitor node by name.Wraps the executable in a new function that prepends a
LaunchContextparameter (required by the ROS launch framework).Updates the function’s
__signature__so that ROS launch’s introspection (inspect.signature) sees theLaunchContextparameter.Returns an
OpaqueFunction(for synchronous methods) orOpaqueCoroutine(for async methods).
At runtime, when the Launcher’s OnInternalEvent handler fires, it injects the topics data into the OpaqueFunction’s kwargs before executing it, so the action receives the event’s topic cache just as it would when called directly by the Monitor.
End-to-End Flows¶
Flow 1: Topic → Monitor Action
ROS Topic → Monitor subscription → blackboard update →
condition evaluation → trigger → ThreadPoolExecutor →
Monitor method (e.g. publish_message)
Flow 2: Topic → Component Action
ROS Topic → Component subscription → blackboard update →
condition evaluation → trigger → ThreadPoolExecutor →
@component_action method
Flow 3: Topic → Launcher-Owned Action
ROS Topic → Monitor subscription → blackboard update →
condition evaluation → trigger → _on_internal_event →
emit InternalEvent to launch context →
OnInternalEvent handler matches → execute OpaqueFunction (inline method)
Flow 4: Callable → Monitor Action
Timer (check_rate) → callable() → True →
trigger → ThreadPoolExecutor → Monitor method
Flow 5: Callable → Component Action (Bridge)
Timer (check_rate) → callable() → True →
Monitor publishes Bool(True) to /event_bridge/... →
Component subscription → blackboard update →
on-any condition → True → ThreadPoolExecutor →
@component_action method
Flow 6: Callable → Launcher-Owned Action
Timer (check_rate) → callable() → True →
_on_internal_event → emit InternalEvent to launch context →
OnInternalEvent handler matches → execute OpaqueFunction