ros_sugar.io.topic

ROS Topic Configuration

Module Contents

Classes

Topic

Class for ROS topic configuration (name, type and QoS)

AllowedTopics

Configure allowed types to restrict a component Topic

Functions

get_all_msg_types

Gets all message types from supported data_types

get_msg_type

Gets a message type from supported data_types given a string name

API

ros_sugar.io.topic.get_all_msg_types(msg_types_module: types.ModuleType = supported_types) List[Type[ros_sugar.io.supported_types.SupportedType]]

Gets all message types from supported data_types

Returns:

Supported data types

Return type:

list[type]

ros_sugar.io.topic.get_msg_type(type_name: Union[Type[ros_sugar.io.supported_types.SupportedType], str], msg_types_module: Optional[types.ModuleType] = supported_types) Union[Type[ros_sugar.io.supported_types.SupportedType], str]

Gets a message type from supported data_types given a string name

Parameters:

type_name (str) – Message name

Returns:

Supported data type or None if not found

Return type:

type

class ros_sugar.io.topic.Topic

Bases: ros_sugar.config.BaseAttrs, typing.Generic[ros_sugar.utils.MsgT]

Class for ROS topic configuration (name, type and QoS)

Name

Type, Default

Description

name

str

Topic name

msg_type

type | str

Topic message type, can be provided as a ‘type’ or the type name as a string

qos_profile

QoSConfig, QoSConfig()

QoS (Quality of Service) configuration

data_timeout

float, 1.0

Used in event management. Time to hold the topic data for processing before considered “stale” (seconds)

property msg: ros_sugar.condition.MsgConditionBuilder

Get the ROS message object path builder associated with this Topic Used for parsing topic message attributes for Actions and Event parsers

Returns:

ROS message type

Return type:

MsgT

asdict(filter: Optional[Callable] = None) Dict
to_dict() Dict
from_dict(dict_obj: Dict) None
from_file(file_path: str, nested_root_name: Union[str, None] = None, get_common: bool = False) bool
to_json() Union[str, bytes, bytearray]
from_json(json_obj: Union[str, bytes, bytearray]) None
has_attribute(attr_name: str) bool
get_attribute_type(attr_name: str) Optional[type]
update_value(attr_name: str, attr_value: Any) bool
classmethod get_fields_info(class_object) Dict[str, Dict[str, Any]]
class ros_sugar.io.topic.AllowedTopics

Bases: ros_sugar.config.BaseAttrs

Configure allowed types to restrict a component Topic

asdict(filter: Optional[Callable] = None) Dict
to_dict() Dict
from_dict(dict_obj: Dict) None
from_file(file_path: str, nested_root_name: Union[str, None] = None, get_common: bool = False) bool
to_json() Union[str, bytes, bytearray]
from_json(json_obj: Union[str, bytes, bytearray]) None
has_attribute(attr_name: str) bool
get_attribute_type(attr_name: str) Optional[type]
update_value(attr_name: str, attr_value: Any) bool
classmethod get_fields_info(class_object) Dict[str, Dict[str, Any]]