ros_sugar.robot.plugin¶
RobotPlugin - the general-purpose robot plugin contract for Sugarcoat.
Plugin authors subclass RobotPlugin and, in a declarative init (no I/O),
populate transports, feedbacks, commands, actions and events. One instance is passed to a Launcher.
Bring-up and tear-down are owned by RobotPluginHost, which the launcher attaches
to a plugin instance on the host side. Component subprocesses get a plain plugin
instance rebuilt from RobotPlugin.to_spec and use its consumer API.
If an author needs host-side setup that depends on the rclpy node (binding a
RosServiceTransport is the canonical example), they override the optional
on_attached hook; the host calls it once after wiring is complete.
Module Contents¶
Classes¶
Descriptive metadata for a robot plugin. |
|
Base class for robot plugins. |
|
Owns the host-side lifecycle of a |
API¶
- exception ros_sugar.robot.plugin.AmbiguousPluginEntryError¶
Bases:
LookupErrorRaised when
Topic(use_plugin=True)matches more than one plugin entry by message type; the recipe must name the topic after one of the plugin’s registry keys to disambiguate.- add_note()¶
- with_traceback()¶
- class ros_sugar.robot.plugin.PluginMetadata¶
Bases:
ros_sugar.config.BaseAttrsDescriptive metadata for a robot plugin.
- Parameters:
name – Short robot name, e.g.
"Lite3".vendor – Manufacturer / vendor name.
version – Plugin or interface version string.
description – A 1-3 sentence plain-language description of the robot; its form factor, how it moves, and what it is for.
- asdict(filter: Optional[Callable] = None) Dict¶
- to_dict() Dict¶
- from_dict(dict_obj: Dict) None¶
- from_file(file_path: str, nested_root_name: Union[str, None] = None, get_common: bool = False) bool¶
- to_json() Union[str, bytes, bytearray]¶
- from_json(json_obj: Union[str, bytes, bytearray]) None¶
- has_attribute(attr_name: str) bool¶
- get_attribute_type(attr_name: str) Optional[type]¶
- update_value(attr_name: str, attr_value: Any) bool¶
- classmethod get_fields_info(class_object) Dict[str, Dict[str, Any]]¶
- class ros_sugar.robot.plugin.RobotPlugin¶
Base class for robot plugins.
Subclass and populate the registries in a declarative
init. Keepinitfree of I/O and accept only JSON-serializable keyword arguments - the launcher serializes those into a spec so each component subprocess can rebuild the plugin. Open sockets, start threads and bind nodes throughRobotPluginHost, not frominit.- to_spec() Dict[str, Any]¶
Return a JSON-serializable spec used to rebuild this plugin in a component subprocess.
- static from_spec(spec: Dict[str, Any], bus_endpoint: Optional[Any] = None) ros_sugar.robot.plugin.RobotPlugin¶
Rebuild a plugin from a
to_specdict.- Parameters:
spec – The spec dict produced by
to_spec.bus_endpoint – socket name of the host’s feedback bus; when given, a connected
robot.bus.SocketFeedbackBusis attached so the plugin’s consumer API (subscribe_feedback/send_command) talks to the host across the process boundary.
- property bus: Optional[ros_sugar.robot.bus.FeedbackBus]¶
The feedback bus this plugin publishes to / consumes from.
- set_bus(bus: ros_sugar.robot.bus.FeedbackBus) None¶
Attach the feedback bus. The host calls this during bring-up; the
from_specpath uses it to give a CLIENT-side plugin its connectedSocketFeedbackBus.
- on_attached(node: Any, bus: ros_sugar.robot.bus.FeedbackBus) None¶
Optional host-side setup hook — override in subclasses that need it.
RobotPluginHostcalls this once, after opening transports, wiring feedback dispatch and starting heartbeats. Override it when the plugin needs to do something with the host’s rclpy node - typically binding arobot.transports.ros.RosServiceTransportclient.- Parameters:
node – rclpy node owned by the launcher (may be
Nonein standalone/test contexts).bus – the feedback bus already attached to the plugin.
- resolve_feedback(topic_name: str, msg_type_name: str) Optional[ros_sugar.robot.feedback.Feedback]¶
Return the
Feedbackstanding in for an input topic.Two-step resolution:
Exact match on
plugin.feedbacks[topic_name]– the recipe author’sio.topic.Topic.namedoubles as the registry key for disambiguation.Unique-type fallback: a single feedback whose
msg_typematchesmsg_type_name. Common case for plugins exposing one entry per message type.
- Raises:
TypeError – when the topic name matches a feedback by key but the message types disagree – the recipe is mis-wired.
AmbiguousPluginEntryError – when the topic name doesn’t match any key and multiple feedbacks share
msg_type_name; the error lists the available keys so the recipe author can rename the topic to disambiguate.
- resolve_command(topic_name: str, msg_type_name: str) Optional[ros_sugar.robot.command.RobotCommand]¶
Return the
RobotCommandstanding in for an output topic. Same resolution rules asresolve_feedback.
- subscribe_feedback(feedback: ros_sugar.robot.feedback.Feedback, on_ros_msg: Callable[[Any], None]) ros_sugar.robot.bus.BusHandle¶
Subscribe to a feedback stream;
on_ros_msgis called with each decoded ROS message. Used by components for non-ROS feedback.
- open_command(command: ros_sugar.robot.command.RobotCommand) None¶
Prepare a command transport for sending from a component process.
For
route_via_hostcommands nothing is opened (payloads go over the bus); otherwise the transport’s egress side is opened.
- send_command(command: ros_sugar.robot.command.RobotCommand, payload: Any) bool¶
Send an already-encoded command payload, routing via the host bus or directly through the transport as configured.
- list_feedbacks() List[ros_sugar.robot.feedback.FeedbackSpec]¶
List every feedback stream this plugin exposes.
- list_commands() List[ros_sugar.robot.command.CommandSpec]¶
List every command surface this plugin exposes.
- list_actions() List[ros_sugar.robot.registries.ActionSpec]¶
List every high-level action factory this plugin exposes.
- list_events() List[ros_sugar.robot.registries.EventSpec]¶
List every event factory this plugin exposes.
- describe() Dict[str, Any]¶
Return a JSON-serializable introspection tree for this plugin.
- class ros_sugar.robot.plugin.RobotPluginHost(plugin: ros_sugar.robot.plugin.RobotPlugin, node: Any, bus: ros_sugar.robot.bus.FeedbackBus, monitor_feed: Optional[Callable[[str, Any], None]] = None)¶
Owns the host-side lifecycle of a
RobotPlugin.Plugin classes are declarative — they describe what the robot exposes but do no I/O. The host runs in the launcher process, opens the plugin’s transports, wires feedback decoding into the feedback bus and the Monitor’s event blackboard, runs heartbeats, and forwards
route_via_hostcommands. Component subprocesses do not use a host — they construct a plain plugin from a spec and call its consumer API only.- Parameters:
plugin – The plugin instance to manage.
node – rclpy node the plugin can use for host-side ROS interactions (e.g.
RosServiceTransportclients). May beNonein standalone and test contexts where no ROS node is available.bus – The feedback bus to use for fan-out to component consumers.
monitor_feed – Optional callable
(channel, ros_msg) -> Nonethat receives every decoded feedback message — the launcher wires this toMonitor.feed_external_topicso plugin events fire on the same machinery as ROS-topic events.
- open() None¶
Bring the plugin up host-side.
Attaches the bus to the plugin, opens non-ROS transports, wires feedback decoders to the bus + monitor, registers
route_via_hostcommand forwarders, starts heartbeats, and finally calls the plugin’s optionalRobotPlugin.on_attachedhook.
- close() None¶
Tear the plugin down host-side. Idempotent.