Adding Processing Pipelines to I/O

This guide covers how to inject data transformations into the input and output paths of a component without modifying the component itself. Read Architecture Overview and Extending the Type System first.

Overview

Data flows through a component in two directions:

ROS message → Callback → post-processors → component._execution_step()
component._execution_step() → pre-processors → Publisher → ROS message

You can insert processing functions at two points:

  • Post-processors on callbacks: transform data after the callback deserializes it but before the component reads it.

  • Pre-processors on publishers: transform data after the component produces it but before it is converted to a ROS message and published.

This lets you add filtering, coordinate transforms, format conversion, or bridging logic from outside the component.

Callback Post-Processors

Adding at the Component Level

Use add_callback_postprocessor() to attach a function to a specific input topic. The function receives the callback output and must return the same type:

from ros_sugar.io import Topic
from ros_sugar.io.supported_types import Float64

raw = Topic(name="sensor", msg_type=Float64)

component = MyComponent(inputs=[raw], outputs=[...])

def clamp_value(value):
    """Clamp sensor readings to [0, 100]."""
    if value is None:
        return None
    return max(0.0, min(100.0, value))

component.add_callback_postprocessor(raw, clamp_value)

The post-processor is called every time get_output() is invoked on that callback. Multiple post-processors execute in the order they were added.

Adding Directly to a Callback

If you have access to the callback object (e.g. inside a component subclass), you can attach post-processors directly:

def custom_on_activate(self):
    cb = self.callbacks["sensor"]
    cb.add_post_processors([clamp_value, unit_convert])

Post-Processor Signature

def post_processor(output: T) -> T:
    """
    Receives the deserialized callback output.
    Must return the same type, or None to signal no data.
    """
    return transformed_output

Post-processors can also be socket objects for inter-process communication — the output is sent over the socket and the response is used as the transformed value.

Publisher Pre-Processors

Adding at the Component Level

Use add_publisher_preprocessor() to attach a function to a specific output topic. The function receives the data before it is converted to a ROS message:

from ros_sugar.io import Topic
from ros_sugar.io.supported_types import Float64

output = Topic(name="command", msg_type=Float64)

component = MyComponent(inputs=[...], outputs=[output])

def scale_output(output):
    """Scale command output to motor range."""
    if output is None:
        return None
    return output * 0.01

component.add_publisher_preprocessor(output, scale_output)

Pre-Processor Signature

Pre-processors are called with output as a keyword argument:

def pre_processor(output: T) -> T:
    """
    Receives the component's output data before ROS conversion.
    Must return the same type, or None to skip publishing.
    The parameter must be named ``output``.
    """
    return transformed_output

Like post-processors, pre-processors can also be socket objects. Multiple pre-processors execute in order.

Attaching Custom Callbacks

For cases where you need to run logic every time a message arrives on a topic (not just when the component reads the data), use attach_custom_callback():

def log_every_message(msg, topic, output):
    """Called on every message arrival, before execution_step."""
    print(f"Got message on {topic.name}: {output}")

component.attach_custom_callback(sensor_topic, log_every_message)

Custom Callback Signature

def custom_callback(
    msg,        # Raw ROS message
    topic,      # Topic object
    output,     # Processed callback output (after post-processors)
) -> None:
    pass

This is useful for logging, metrics collection, or triggering side effects independent of the component’s main loop.

Event-Triggered Callbacks

Callbacks support an on_callback_execute hook that fires every time a message is received and processed:

def on_new_detection(output):
    """Called every time a new detection arrives."""
    if output is not None:
        log_detection(output)

# Inside a component subclass
def custom_on_activate(self):
    self.callbacks["detections"].on_callback_execute(
        callback=on_new_detection,
        get_processed=True,  # Pass post-processed output (default)
    )

Set get_processed=False to receive the raw callback output before post-processors.

Practical Example: Coordinate Transform Pipeline

This example adds a TF-based coordinate transform to a component’s input without modifying the component:

import numpy as np
from ros_sugar.io import Topic
from ros_sugar.io.supported_types import PoseStamped
from ros_sugar.launch import Launcher

goal = Topic(name="goal_in_camera_frame", msg_type=PoseStamped)
command = Topic(name="command", msg_type=PoseStamped)

planner = MyPlanner(inputs=[goal], outputs=[command])

# Transform goals from camera frame to base frame
def camera_to_base(pose):
    if pose is None:
        return None
    # Apply a static transform (in practice, use TF)
    transformed = pose.copy()
    transformed["position"]["x"] += 0.15  # camera offset
    return transformed

planner.add_callback_postprocessor(goal, camera_to_base)

# Scale commands for a specific robot's actuator range
# Note: pre-processors receive `output` as a keyword argument
def scale_for_robot(output):
    if output is None:
        return None
    output["position"]["x"] *= 0.001  # mm to m
    return output

planner.add_publisher_preprocessor(command, scale_for_robot)

launcher = Launcher()
launcher.add_pkg(components=[planner])
launcher.bringup()

When to Use Processors vs. a New Component

Use case

Approach

Simple data transform (unit conversion, clamping, scaling)

Post/pre-processor

Logging, metrics, side effects on message arrival

attach_custom_callback() or on_callback_execute()

Complex multi-input logic, stateful processing, model inference

New component

Robot-specific topic adaptation

Robot plugin (see Creating a Robot Plugin)

Processors are best for stateless, single-topic transforms. If your transform needs to combine multiple inputs or maintain state across calls, create a dedicated component instead.