Creating a Robot Plugin¶
This guide walks through building a robot plugin that adapts any Sugarcoat-based stack to a specific robot’s hardware interfaces. A plugin maps generic topic types (e.g. Twist, Odometry) to robot-specific ROS 2 topics or service clients — without modifying any component code.
When to Use a Plugin¶
Use a robot plugin when:
Your robot uses custom ROS message types instead of standard ones (e.g. a custom odometry message instead of
nav_msgs/Odometry).Your robot exposes motion commands as ROS 2 services instead of topic subscriptions.
You want the same Sugarcoat recipe to run on different robots by swapping a single config parameter.
Plugin Structure¶
A robot plugin is a standard Python package that exports two dictionaries:
myrobot_plugin/
├── __init__.py # Exports robot_feedback and robot_action dicts
├── types.py # SupportedType wrappers for custom messages
└── clients.py # Service clients for action interfaces (optional)
The Two Dictionaries¶
# __init__.py
from typing import Dict, Union, Type
from ros_sugar.io import Topic
from ros_sugar.base_clients import ServiceClientHandler
robot_feedback: Dict[str, Union[Topic, Type[ServiceClientHandler]]]
robot_action: Dict[str, Union[Topic, Type[ServiceClientHandler]]]
robot_feedback: Maps standard type names to robot-specific input topics. When a component subscribes to anOdometrytopic, the plugin replaces it with your robot’s custom odometry topic and type.robot_action: Maps standard type names to robot-specific output topics or service clients. When a component publishes aTwistcommand, the plugin routes it to your robot’s custom interface.
Dictionary keys are standard ROS 2 type names (e.g. "Twist", "Odometry", "String"). These are matched against the message types of component input/output topics.
Step 1: Define Custom Type Wrappers¶
Use create_supported_type() to wrap your robot’s custom messages with conversion functions:
# types.py
import numpy as np
from ros_sugar.robot_plugin import create_supported_type
from myrobot_msgs.msg import CustomOdom, CustomTwist
# --- Feedback: custom ROS message → Python type ---
def _odom_callback(msg: CustomOdom, **_) -> np.ndarray:
"""Convert robot's odometry to a numpy array."""
return np.array([msg.x, msg.y, msg.yaw])
RobotOdometry = create_supported_type(CustomOdom, callback=_odom_callback)
# --- Action: Python types → custom ROS message ---
def _twist_converter(output, **_) -> CustomTwist:
"""Convert velocity command to robot's custom twist.
The publisher passes a single output value (e.g. a list or numpy array).
"""
msg = CustomTwist()
msg.vx = output[0]
msg.vy = output[1]
msg.vyaw = output[2]
return msg
RobotTwist = create_supported_type(CustomTwist, converter=_twist_converter)
create_supported_type() Parameters¶
Parameter |
For |
Signature requirement |
|---|---|---|
|
Both |
A valid ROS 2 message class |
|
Feedback (input) |
|
|
Action (output) |
|
Step 2: Create Service Clients (Optional)¶
If your robot exposes commands via ROS 2 services instead of topics, subclass RobotPluginServiceClient:
# clients.py
from rclpy.node import Node
from ros_sugar.robot_plugin import RobotPluginServiceClient
from myrobot_msgs.srv import RobotCommand
class TwistServiceClient(RobotPluginServiceClient):
"""Sends velocity commands via a ROS 2 service."""
def __init__(self, client_node: Node, srv_name: str = "robot_cmd"):
super().__init__(
srv_type=RobotCommand,
srv_name=srv_name,
client_node=client_node,
)
def _publish(self, output, **_) -> bool:
"""Create and send the service request.
Receives a single output value (e.g. a list or numpy array).
"""
req = RobotCommand.Request()
req.vx = output[0]
req.vy = output[1]
req.omega = output[2]
response = self.send_request(req_msg=req)
return response is not None
def start(self) -> bool:
"""Called when the component activates. Enable autonomous mode."""
req = RobotCommand.Request()
req.enable = True
self.send_request(req_msg=req)
return True
def end(self) -> bool:
"""Called when the component deactivates. Disable autonomous mode."""
req = RobotCommand.Request()
req.enable = False
self.send_request(req_msg=req)
return True
Methods to Implement¶
Method |
Required |
Description |
|---|---|---|
|
Yes |
Send the actual command. Receives the same single |
|
Yes |
Lifecycle hook — runs when the component activates. |
|
Yes |
Lifecycle hook — runs when the component deactivates. |
The base class handles pre-processor transfer: when a service client replaces a publisher, any pre-processors attached to the original publisher are preserved and applied before _publish() is called.
Step 3: Register the Plugin¶
Wire everything together in __init__.py:
# __init__.py
from typing import Dict, Union, Type
from ros_sugar.io import Topic
from ros_sugar.base_clients import ServiceClientHandler
from . import types, clients
# Input topics: standard type name → robot-specific topic
robot_feedback: Dict[str, Union[Topic, Type[ServiceClientHandler]]] = {
"Odometry": Topic(name="myrobot/odom", msg_type=types.RobotOdometry),
}
# Output topics: standard type name → robot-specific topic or service client
robot_action: Dict[str, Union[Topic, Type[ServiceClientHandler]]] = {
"Twist": clients.TwistServiceClient,
"TwistStamped": clients.TwistServiceClient,
}
__all__ = ["robot_feedback", "robot_action"]
Actions can map to either a Topic (for topic-based robots) or a RobotPluginServiceClient subclass (for service-based robots). You can mix both in the same plugin.
Step 4: Enable the Plugin¶
Set the plugin module name in the component configuration:
from ros_sugar.config import BaseComponentConfig
config = BaseComponentConfig(
_robot_plugin="myrobot_plugin",
_enable_plugin_feedbacks_handling=True, # Replace input topics (default True)
_enable_plugin_actions_handling=True, # Replace output topics/clients
)
Or pass it through the Launcher to apply to all components:
from ros_sugar.launch import Launcher
launcher = Launcher(robot_plugin="myrobot_plugin")
launcher.add_pkg(components=[...])
launcher.bringup()
How Replacement Works¶
During component activation, if _robot_plugin is set:
The plugin module is imported dynamically.
For each input topic, its message type name is looked up in
robot_feedback. If found, the subscriber is destroyed and recreated with the plugin’s topic name and type.For each output topic, its message type name is looked up in
robot_action:If the value is a
Topic: the publisher is replaced with the new topic name and type.If the value is a
RobotPluginServiceClientsubclass: the publisher is replaced with a service client instance. Pre-processors are transferred automatically.
Components are unaware of the replacement — they continue calling self.callbacks["topic"].get_output() and self.publishers_dict["topic"].publish(...) as normal.
Complete Example¶
A full plugin for a robot with custom odometry and service-based velocity control:
# myrobot_plugin/__init__.py
from typing import Dict, Union, Type
from ros_sugar.io import Topic
from ros_sugar.base_clients import ServiceClientHandler
from . import types, clients
robot_feedback: Dict[str, Union[Topic, Type[ServiceClientHandler]]] = {
"Odometry": Topic(name="myrobot/odom", msg_type=types.RobotOdometry),
"LaserScan": Topic(name="myrobot/lidar", msg_type=types.RobotLidar),
}
robot_action: Dict[str, Union[Topic, Type[ServiceClientHandler]]] = {
"Twist": clients.TwistServiceClient,
"TwistStamped": clients.TwistServiceClient,
}
__all__ = ["robot_feedback", "robot_action"]
Usage in a recipe:
from ros_sugar.config import BaseComponentConfig
from ros_sugar.launch import Launcher
# Same recipe works with any robot — just change the plugin
config = BaseComponentConfig(
_robot_plugin="myrobot_plugin",
_enable_plugin_feedbacks_handling=True,
_enable_plugin_actions_handling=True,
)
my_component = MyComponent(config=config, inputs=[...], outputs=[...])
launcher = Launcher()
launcher.add_pkg(components=[my_component])
launcher.bringup()