Events¶
Dynamic behavior switching based on real-time environmental context.
Sugarcoat’s Event-Driven architecture enables dynamic behavior switching based on real-time environmental context. This allows robots to react instantly to changes in their internal state or external environment without complex, brittle if/else chains.
An Event in Sugarcoat monitors a specific ROS2 Topic, and defines a triggering condition based on the incoming topic data. You can write natural Python expressions (e.g., topic.msg.data > 5) to define exactly when an event should trigger the associated Action(s).
Compose Logic - Combine triggers using simple Pythonic syntax (
(lidar_clear) & (goal_seen)).Fuse Data - Monitor multiple topics simultaneously via a synchronized Blackboard that ensures data freshness.
Stay Fast - All evaluation happens asynchronously in a dedicated worker pool. Your main component loop never blocks.
Think in Behaviors
Events are designed to be read like a sentence: “If the battery is low AND we are far from home, THEN navigate to the charging dock.”
Tip
Events can be paired with Sugarcoat Action(s) or with any standard ROS2 Launch Action
Defining Events¶
The Event API uses a fluent, expressive syntax that allows you to access ROS2 message attributes directly via topic.msg.
Basic Single-Topic Event¶
from ros_sugar.core import Event
from ros_sugar.io import Topic
# 1. Define the Source
# `data_timeout` parameter is optional. It ensures data is considered "stale" after 0.5s
battery = Topic(name="/battery_level", msg_type="Float32", data_timeout=0.5)
# 2. Define the Event
# Triggers when percentage drops below 20%
low_batt_event = Event(battery.msg.data < 20.0)
Composed Conditions (Logic & Multi-Topic)¶
You can combine multiple conditions using standard Python bitwise operators (&, |, ~) to create complex behavioral triggers. Events can also span multiple different topics. Sugarcoat automatically manages a “Blackboard” of the latest messages from all involved topics, ensuring synchronization and data “freshness”.
Example: Trigger a “Stop” event only if an obstacle is detected AND the robot is currently in “Auto” mode.
from ros_sugar.core import Event
from ros_sugar.io import Topic
lidar_topic = Topic(name="/person_detected", msg_type="Bool", data_timeout=0.5)
status_topic = Topic(name="/robot_mode", msg_type="String", data_timeout=60.0)
# Complex Multi-Topic Condition
emergency_stop_event = Event((lidar_topic.msg.data.is_true()) & (status_topic.msg.data == "AUTO"))
Handling Stale Data
When combining multiple topics, data synchronization is critical. Use the data_timeout parameter on your Topic definition to ensure you never act on old sensor data.
Event Configuration¶
Refine when and how the event triggers using these parameters:
On Change (
on_change=True) - Triggers only when the condition transitions fromFalsetoTrue(Edge Trigger). Useful for state transitions (e.g., “Goal Reached”) rather than continuous firing.On Any (
Topic) - If you pass theTopicobject itself as the condition, the event triggers on every received message, regardless of content.Handle Once (
handle_once=True) - The event will fire exactly one time during the lifecycle of the system. Useful for initialization sequences.Event Delay (
keep_event_delay=2.0) - Prevents rapid firing (debouncing). Ignores subsequent triggers for the specified duration (in seconds).
Supported Conditional Operators¶
You can use standard Python operators or specific helper methods on any topic attribute to define the event triggering condition
Operator / Method |
Description |
Example |
|---|---|---|
|
Equality checks. |
|
|
Numeric comparisons. |
|
|
Boolean True check. |
|
|
Boolean False check. |
|
|
Value exists in a list. |
|
|
Value is not in a list. |
|
|
String/List contains a value. |
|
|
List contains at least one of the values. |
|
|
List contains all of the values. |
|
|
List contains none of the values. |
|
Usage Examples¶
1. Automatic Adaptation (Terrain Switching)¶
Scenario: A perception or ML node publishes a string to /terrain_type. We want to change the robot’s gait when the terrain changes.
1from typing import Literal
2from ros_sugar.component import BaseComponent
3
4class QuadrupedController(BaseComponent):
5 def __init__(self, *args, **kwargs):
6 super().__init__(*args, **kwargs)
7 # Some logic
8
9 def switch_gait_controller(self, controller_type: Literal['stairs', 'sand', 'snow', 'gravel']):
10 self.get_logger().info("New terrain detected! Switching gait.")
11 # Logic to change controller parameters...
1from my_pkg.components import QuadrupedController
2from ros_sugar.core import Event, Action
3from ros_sugar.io import Topic
4from ros_sugar import Launcher
5
6quad_controller = QuadrupedController(component_name="quadruped_controller")
7
8# Define the Event Topic
9terrain_topic = Topic(name="/terrain_type", msg_type="String")
10
11# Define the Event
12# Logic: Trigger when the detected terrain changes
13# on_change=True ensures we only trigger the switch the FIRST time stairs are seen.
14# Add an optional delay to prevent rapid event triggering
15event_terrain_changed = Event(terrain_topic,on_change=True, keep_event_delay=60.0)
16
17# Define the Action
18# Call self.switch_gait_controller() when triggered and pass the detected terrain to the method
19change_gait_action = Action(method=self.activate_stairs_controller, args=(terrain_topic.msg.data))
20
21# Register
22my_launcher = Launcher()
23my_launcher.add_pkg(
24 components=[quad_controller],
25 events_actions={stairs_event: change_gait_action},
26 )
2. An Autonomous Drone¶
Scenario: An autonomous drone stops if an obstacle is close OR the bumper is hit. It also sends a warning if the battery is low AND we are far from the land.
from ros_sugar.core import Event, Action
from ros_sugar.io import Topic
# --- Topics ---
proximity_sensor = Topic(name="/radar_front", msg_type="Float32", data_timeout=0.2)
bumper = Topic(name="/bumper", msg_type="Bool", data_timeout=0.1)
battery = Topic(name="/battery", msg_type="Float32")
location = Topic(name="/pose", msg_type="Pose")
# --- Conditions ---
# 1. Safety Condition (Composite OR)
# Stop if proximity_sensor < 0.2m OR Bumper is Hit
is_danger = (proximity_sensor.msg.data < 0.2) | (bumper.msg.data.is_true())
# 2. Return Home Condition (Composite AND)
# Return if Battery < 20% AND Distance > 100m
needs_return = (battery.msg.data < 20.0) & (location.position.z > 100.0)
# --- Events ---
safety_event = Event(is_danger)
return_event = Event(needs_return, on_change=True)
Next Steps¶
Now that you understand Events, learn how to attach them to Actions, to execute context-aware methods when an Event triggers.