ros_sugar.robot.transports

Transport abstractions for robot plugins.

A Transport abstracts where robot data comes from and goes to. The robot plugin owns its transports; the plugin HOST (running in the launcher process) opens them, while component processes either consume decoded feedback from the FeedbackBus or send commands through a send-only (egress) transport.

Submodules

Package Contents

Classes

SubscriptionHandle

Returned by Transport.subscribe call unsubscribe to detach.

Transport

Base class for all robot plugin transports.

API

class ros_sugar.robot.transports.SubscriptionHandle(unsubscribe: Callable[[], None])

Returned by Transport.subscribe call unsubscribe to detach.

unsubscribe() None

Detach the callback registered with Transport.subscribe.

class ros_sugar.robot.transports.Transport(name: str, *, keep_alive_fn: Optional[Callable[[], None]] = None, keep_alive_rate_hz: Optional[float] = None, route_via_host: bool = False)

Bases: abc.ABC

Base class for all robot plugin transports.

Parameters:
  • name (str) – Unique transport name within the plugin.

  • keep_alive_fn (Optional[Callable[[], None]]) – Optional callable invoked periodically by the launcher while the plugin is active (e.g. a heartbeat packet).

  • keep_alive_rate_hz (Optional[float]) – Rate at which keep_alive_fn is invoked.

  • route_via_host (bool) – If True, commands on this transport are forwarded to the plugin HOST over the feedback bus instead of being sent directly from the component process. Use for session/heartbeat-bound protocols where a single client must own the connection.

abstractmethod open() None

Open the transport fully (ingress + egress). Called once on the HOST.

open_egress() None

Open the transport for sending only.

Called in component processes that issue commands on this transport directly (i.e. route_via_host is False). Defaults to open for transports where there is no separate egress-only mode.

abstractmethod close() None

Close the transport and release all resources.

is_open() bool

Return whether the transport is currently open.

abstractmethod send(payload: Any) bool

Send a payload to the robot. Returns True on success.

subscribe(on_msg: Callable[[Any], None]) ros_sugar.robot.transports.SubscriptionHandle

Register a callback invoked with each raw inbound payload.

Only meaningful on the HOST. The raw payload type is transport-specific (bytes for UDP, the HTTP response body for HTTP, the SDK object for SDK callbacks); a Feedback decoder turns it into a ROS message.

property kind: str

Short transport-kind label used for introspection.