ros_sugar.config.base_config

Config Classes for Components and Topics

Module Contents

Classes

QoSConfig

Class for quality of service (QoS) configuration in ROS2

BaseConfig

Node General Parameters

ComponentRunType

Component run type:

ExternalProcessorType

External processor type:

BaseComponentConfig

Component configuration parameters

API

class ros_sugar.config.base_config.QoSConfig

Bases: ros_sugar.config.base_attrs.BaseAttrs

Class for quality of service (QoS) configuration in ROS2

Name

Type, Default

Description

history

int, [qos](https://docs.ros2.org/foxy/api/rclpy/api/qos.html).HistoryPolicy.KEEP_LAST

Sample store type: ALL or LAST (up to N samples, configurable via the queue depth option)

queue_size

int, 10

Used only if the “history” policy was set to “keep last”

reliability

int, qos.ReliabilityPolicy.RELIABLE

Level of reliability in delivering samples

durability

int, qos.DurabilityPolicy.VOLATILE

Determines if the publisher will be persisting samples for “late-joining” subscriptions (Transit Local) or not (Volatile)

to_ros() rclpy.qos.QoSProfile

Converts the config class to ROS2 QoS Profile

Returns:

ROS2 QoS Profile

Return type:

qos.QoSProfile

asdict(filter: Optional[Callable] = None) Dict
to_dict() Dict
from_dict(dict_obj: Dict) None
from_file(file_path: str, nested_root_name: Union[str, None] = None, get_common: bool = False) bool
to_json() Union[str, bytes, bytearray]
from_json(json_obj: Union[str, bytes, bytearray]) None
has_attribute(attr_name: str) bool
get_attribute_type(attr_name: str) Optional[type]
update_value(attr_name: str, attr_value: Any) bool
classmethod get_fields_info(class_object) Dict[str, Dict[str, Any]]
class ros_sugar.config.base_config.BaseConfig

Bases: ros_sugar.config.base_attrs.BaseAttrs

Node General Parameters

Parameters:
  • loop_rate (float) – Rate (Hz) in which the node executes

  • visualization (bool) – To publish additional topics for visualization

asdict(filter: Optional[Callable] = None) Dict
to_dict() Dict
from_dict(dict_obj: Dict) None
from_file(file_path: str, nested_root_name: Union[str, None] = None, get_common: bool = False) bool
to_json() Union[str, bytes, bytearray]
from_json(json_obj: Union[str, bytes, bytearray]) None
has_attribute(attr_name: str) bool
get_attribute_type(attr_name: str) Optional[type]
update_value(attr_name: str, attr_value: Any) bool
classmethod get_fields_info(class_object) Dict[str, Dict[str, Any]]
class ros_sugar.config.base_config.ComponentRunType(*args, **kwds)

Bases: enum.Enum

Component run type:

  • Timed: Executes main in a timed loop

  • Event: Executes based on a trigger topic/event

  • Server: Executes based on a ROS service request from a client

  • ActionServer: Executes based on a ROS action server request from a client

classmethod to_str(enum_value) str

Return string value corresponding to enum value if exists

Parameters:

enum_value (ComponentRunType | str) – description

Raises:

ValueError – If the enum value is not from this class

Returns:

String value

Return type:

str

name()
value()
class ros_sugar.config.base_config.ExternalProcessorType(*args, **kwds)

Bases: enum.Enum

External processor type:

  • MSG_PRE_PROCESSOR: Executes before publishing a ros msg

  • MSG_POST_PROCESSOR: Executes after receiving a ros msg in a callback

classmethod to_str(enum_value) str

Return string value corresponding to enum value if exists

Parameters:

enum_value (ExternalProcessorType | str) – description

Raises:

ValueError – If the enum value is not from this class

Returns:

String value

Return type:

str

name()
value()
class ros_sugar.config.base_config.BaseComponentConfig

Bases: ros_sugar.config.base_config.BaseConfig

Component configuration parameters

Parameters:
  • fallback_rate (float) – Rate (Hz) at which the component checks for fallbacks and executes actions if a failure is detected.

  • log_level (Union[str, LoggingSeverity]) – Logging level for the component’s internal logs. Can be a string or LoggingSeverity enum.

  • rclpy_log_level (Union[str, LoggingSeverity]) – Logging level for rclpy (ROS client library) logs. Can be a string or LoggingSeverity enum.

  • wait_for_restart_time (float) – Time (in seconds) the component waits for a node to come back online after restart. Used to avoid infinite restart loops. Recommended to use a high value.

asdict(filter: Optional[Callable] = None) Dict
to_dict() Dict
from_dict(dict_obj: Dict) None
from_file(file_path: str, nested_root_name: Union[str, None] = None, get_common: bool = False) bool
to_json() Union[str, bytes, bytearray]
from_json(json_obj: Union[str, bytes, bytearray]) None
has_attribute(attr_name: str) bool
get_attribute_type(attr_name: str) Optional[type]
update_value(attr_name: str, attr_value: Any) bool
classmethod get_fields_info(class_object) Dict[str, Dict[str, Any]]