ros_sugar.robot.plugin

RobotPlugin - the general-purpose robot plugin contract for Sugarcoat.

Plugin authors subclass RobotPlugin and, in a declarative init (no I/O), populate transports, feedbacks, commands, actions and events. One instance is passed to a Launcher.

Bring-up and tear-down are owned by RobotPluginHost, which the launcher attaches to a plugin instance on the host side. Component subprocesses get a plain plugin instance rebuilt from RobotPlugin.to_spec and use its consumer API.

If an author needs host-side setup that depends on the rclpy node (binding a RosServiceTransport is the canonical example), they override the optional on_attached hook; the host calls it once after wiring is complete.

Module Contents

Classes

PluginMetadata

Descriptive metadata for a robot plugin.

RobotPlugin

Base class for robot plugins.

RobotPluginHost

Owns the host-side lifecycle of a RobotPlugin.

API

exception ros_sugar.robot.plugin.AmbiguousPluginEntryError

Bases: LookupError

Raised when Topic(use_plugin=True) matches more than one plugin entry by message type; the recipe must name the topic after one of the plugin’s registry keys to disambiguate.

add_note()
with_traceback()
class ros_sugar.robot.plugin.PluginMetadata

Bases: ros_sugar.config.BaseAttrs

Descriptive metadata for a robot plugin.

Parameters:
  • name – Short robot name, e.g. "Lite3".

  • vendor – Manufacturer / vendor name.

  • version – Plugin or interface version string.

  • description – A 1-3 sentence plain-language description of the robot; its form factor, how it moves, and what it is for.

asdict(filter: Optional[Callable] = None) Dict
to_dict() Dict
from_dict(dict_obj: Dict) None
from_file(file_path: str, nested_root_name: Union[str, None] = None, get_common: bool = False) bool
to_json() Union[str, bytes, bytearray]
from_json(json_obj: Union[str, bytes, bytearray]) None
has_attribute(attr_name: str) bool
get_attribute_type(attr_name: str) Optional[type]
update_value(attr_name: str, attr_value: Any) bool
classmethod get_fields_info(class_object) Dict[str, Dict[str, Any]]
class ros_sugar.robot.plugin.RobotPlugin

Base class for robot plugins.

Subclass and populate the registries in a declarative init. Keep init free of I/O and accept only JSON-serializable keyword arguments - the launcher serializes those into a spec so each component subprocess can rebuild the plugin. Open sockets, start threads and bind nodes through RobotPluginHost, not from init.

to_spec() Dict[str, Any]

Return a JSON-serializable spec used to rebuild this plugin in a component subprocess.

static from_spec(spec: Dict[str, Any], bus_endpoint: Optional[Any] = None) ros_sugar.robot.plugin.RobotPlugin

Rebuild a plugin from a to_spec dict.

Parameters:
  • spec – The spec dict produced by to_spec.

  • bus_endpoint – socket name of the host’s feedback bus; when given, a connected robot.bus.SocketFeedbackBus is attached so the plugin’s consumer API (subscribe_feedback / send_command) talks to the host across the process boundary.

property bus: Optional[ros_sugar.robot.bus.FeedbackBus]

The feedback bus this plugin publishes to / consumes from.

set_bus(bus: ros_sugar.robot.bus.FeedbackBus) None

Attach the feedback bus. The host calls this during bring-up; the from_spec path uses it to give a CLIENT-side plugin its connected SocketFeedbackBus.

on_attached(node: Any, bus: ros_sugar.robot.bus.FeedbackBus) None

Optional host-side setup hook — override in subclasses that need it.

RobotPluginHost calls this once, after opening transports, wiring feedback dispatch and starting heartbeats. Override it when the plugin needs to do something with the host’s rclpy node - typically binding a robot.transports.ros.RosServiceTransport client.

Parameters:
  • node – rclpy node owned by the launcher (may be None in standalone/test contexts).

  • bus – the feedback bus already attached to the plugin.

resolve_feedback(topic_name: str, msg_type_name: str) Optional[ros_sugar.robot.feedback.Feedback]

Return the Feedback standing in for an input topic.

Two-step resolution:

  1. Exact match on plugin.feedbacks[topic_name] – the recipe author’s io.topic.Topic.name doubles as the registry key for disambiguation.

  2. Unique-type fallback: a single feedback whose msg_type matches msg_type_name. Common case for plugins exposing one entry per message type.

Raises:
  • TypeError – when the topic name matches a feedback by key but the message types disagree – the recipe is mis-wired.

  • AmbiguousPluginEntryError – when the topic name doesn’t match any key and multiple feedbacks share msg_type_name; the error lists the available keys so the recipe author can rename the topic to disambiguate.

resolve_command(topic_name: str, msg_type_name: str) Optional[ros_sugar.robot.command.RobotCommand]

Return the RobotCommand standing in for an output topic. Same resolution rules as resolve_feedback.

subscribe_feedback(feedback: ros_sugar.robot.feedback.Feedback, on_ros_msg: Callable[[Any], None]) ros_sugar.robot.bus.BusHandle

Subscribe to a feedback stream; on_ros_msg is called with each decoded ROS message. Used by components for non-ROS feedback.

open_command(command: ros_sugar.robot.command.RobotCommand) None

Prepare a command transport for sending from a component process.

For route_via_host commands nothing is opened (payloads go over the bus); otherwise the transport’s egress side is opened.

send_command(command: ros_sugar.robot.command.RobotCommand, payload: Any) bool

Send an already-encoded command payload, routing via the host bus or directly through the transport as configured.

list_feedbacks() List[ros_sugar.robot.feedback.FeedbackSpec]

List every feedback stream this plugin exposes.

list_commands() List[ros_sugar.robot.command.CommandSpec]

List every command surface this plugin exposes.

list_actions() List[ros_sugar.robot.registries.ActionSpec]

List every high-level action factory this plugin exposes.

list_events() List[ros_sugar.robot.registries.EventSpec]

List every event factory this plugin exposes.

describe() Dict[str, Any]

Return a JSON-serializable introspection tree for this plugin.

class ros_sugar.robot.plugin.RobotPluginHost(plugin: ros_sugar.robot.plugin.RobotPlugin, node: Any, bus: ros_sugar.robot.bus.FeedbackBus, monitor_feed: Optional[Callable[[str, Any], None]] = None)

Owns the host-side lifecycle of a RobotPlugin.

Plugin classes are declarative — they describe what the robot exposes but do no I/O. The host runs in the launcher process, opens the plugin’s transports, wires feedback decoding into the feedback bus and the Monitor’s event blackboard, runs heartbeats, and forwards route_via_host commands. Component subprocesses do not use a host — they construct a plain plugin from a spec and call its consumer API only.

Parameters:
  • plugin – The plugin instance to manage.

  • node – rclpy node the plugin can use for host-side ROS interactions (e.g. RosServiceTransport clients). May be None in standalone and test contexts where no ROS node is available.

  • bus – The feedback bus to use for fan-out to component consumers.

  • monitor_feed – Optional callable (channel, ros_msg) -> None that receives every decoded feedback message — the launcher wires this to Monitor.feed_external_topic so plugin events fire on the same machinery as ROS-topic events.

open() None

Bring the plugin up host-side.

Attaches the bus to the plugin, opens non-ROS transports, wires feedback decoders to the bus + monitor, registers route_via_host command forwarders, starts heartbeats, and finally calls the plugin’s optional RobotPlugin.on_attached hook.

close() None

Tear the plugin down host-side. Idempotent.