ros_sugar.core.action

Actions

Module Contents

Classes

OpaqueFunction

Action that executes a Python function.

OpaqueCoroutine

Action that adds a Python coroutine function to the launch run loop.

Action

Actions are used by Components and by the Launcher to execute specific methods.

LogInfo

Overrides the LogInfo Action for ros2 launch to change the hard-codded logger name

API

class ros_sugar.core.action.OpaqueFunction(*, function: Callable, args: Optional[Iterable[Any]] = None, kwargs: Optional[Dict[Text, Any]] = None, **left_over_kwargs)

Bases: launch.actions.OpaqueFunction

Action that executes a Python function.

The signature of the function should be:

… code-block:: python

def function(
    context: LaunchContext,
    *args,
    **kwargs
) -> Optional[List[LaunchDescriptionEntity]]:
    ...
class ros_sugar.core.action.OpaqueCoroutine(*, coroutine: Callable[..., Awaitable[None]], args: Optional[Iterable[Any]] = None, kwargs: Optional[Dict[Text, Any]] = None, ignore_context: bool = False, **left_over_kwargs)

Bases: launch.actions.OpaqueCoroutine

Action that adds a Python coroutine function to the launch run loop.

The signature of the coroutine function should be:

… code-block:: python

async def coroutine_func(
    context: LaunchContext,
    *args,
    **kwargs
):
    ...

if ignore_context is False on construction (currently the default), or

… code-block:: python

async def coroutine_func(
    *args,
    **kwargs
):
    ...

if ignore_context is True on construction.

class ros_sugar.core.action.Action(method: Callable, args: Optional[Union[Tuple, List, Any]] = None, kwargs: Optional[Dict] = None)

Actions are used by Components and by the Launcher to execute specific methods.

Actions can either be:

  • Actions paired with Events: in this case the Action is executed when an event is detected. This can be done by a Component if the action is a component method or a Launcher when the action is a system level action or an arbitrary method in the recipe

  • Actions paired with Fallbacks: in this case the Action is executed by a Component when a failure is detected

Actions are defined with:

  • method (Callable)

  • args: Arguments to be passed to the method when executing the action

  • kwargs: Keyword arguments to be passed to the method when executing the action

Usage Example:

    from ros_sugar.component import BaseComponent
    from ros_sugar.config import BaseComponentConfig

    def function():
        print("I am executing action!")

    my_component = BaseComponent(node_name='test_component')
    new_config = BaseComponentConfig(loop_rate=50.0)
    action1 = Action(method=my_component.start)
    action2 = Action(method=my_component.reconfigure, args=(new_config, True)),)
    action3 = Action(method=function)
set_required_runtime_arguments(kwargs: Dict[str, Union[str, ros_sugar.io.supported_types.SupportedType, Type]])

Used to set the missing (dynamic) argument type in derived pre-built actions

get_required_topics() List[ros_sugar.io.Topic]

Get all the required input topic to execute this action

Returns:

Required topics

Return type:

List[Topic]

replace_input_topic(old_topic: ros_sugar.io.Topic, new_topic: ros_sugar.io.Topic)

Replaces an input topic with a new topic if the old topic exists in the required input topics

Parameters:
  • old_topic (Topic) – Old topic

  • new_topic (Topic) – New topic

property executable

Get the action callable

Returns:

description

Return type:

type

property parent_component

Getter of parent component class name if it is a component action, else None

Returns:

description

Return type:

str | None

property action_name: str

Getter of the action name Equals exact executable name if it is not a component action Equals method name in the component if it is a component action

Returns:

description

Return type:

str

property component_action: bool

component_action.

Return type:

bool

property dictionary: Dict

Property to get/set the event using a dictionary

Returns:

Event description dictionary

Return type:

Dict

classmethod deserialize_action(serialized_action_dict: Dict, deserialized_method: Callable) ros_sugar.core.action.Action

Reconstruct Action from serialized action data

Parameters:
  • serialized_action (Union[str, bytearray, bytes]) – Serialized action data

  • deserialized_method (Callable) – Deserialized action method

Returns:

Reconstructed Action Object

Return type:

Action

property json: str

Property to get/set the event using a json

Returns:

Event description dictionary as json

Return type:

str

launch_action(monitor_node=None) Union[ros_sugar.core.action.OpaqueCoroutine, ros_sugar.core.action.OpaqueFunction]

Get the ros launch action

Returns:

description

Return type:

OpaqueCoroutine | OpaqueFunction

class ros_sugar.core.action.LogInfo(*, msg: str, logger_name: Optional[str] = None, **kwargs)

Bases: launch.actions.LogInfo

Overrides the LogInfo Action for ros2 launch to change the hard-codded logger name

Parameters:

LogInfoROSAction (LogInfoROSAction) – Action that logs a message when executed

execute(context: launch.LaunchContext) None

Execute the action.