ros_sugar.base_clients

ROS Service/Action Client Wrapper

Module Contents

Classes

ServiceClientConfig

Basic configuration for any ROS service client

ActionClientConfig

Basic configuration for any ROS action client

ServiceClientHandler

General purpose service client class

ActionClientHandler

General purpose action client class

API

class ros_sugar.base_clients.ServiceClientConfig

Bases: ros_sugar.config.BaseAttrs

Basic configuration for any ROS service client

asdict(filter: Optional[Callable] = None) Dict
to_dict() Dict
from_dict(dict_obj: Dict) None
from_file(file_path: str, nested_root_name: Union[str, None] = None, get_common: bool = False) bool
to_json() Union[str, bytes, bytearray]
from_json(json_obj: Union[str, bytes, bytearray]) None
has_attribute(attr_name: str) bool
get_attribute_type(attr_name: str) Optional[type]
update_value(attr_name: str, attr_value: Any) bool
classmethod get_fields_info(class_object) Dict[str, Dict[str, Any]]
class ros_sugar.base_clients.ActionClientConfig

Bases: ros_sugar.config.BaseAttrs

Basic configuration for any ROS action client

asdict(filter: Optional[Callable] = None) Dict
to_dict() Dict
from_dict(dict_obj: Dict) None
from_file(file_path: str, nested_root_name: Union[str, None] = None, get_common: bool = False) bool
to_json() Union[str, bytes, bytearray]
from_json(json_obj: Union[str, bytes, bytearray]) None
has_attribute(attr_name: str) bool
get_attribute_type(attr_name: str) Optional[type]
update_value(attr_name: str, attr_value: Any) bool
classmethod get_fields_info(class_object) Dict[str, Dict[str, Any]]
class ros_sugar.base_clients.ServiceClientHandler(client_node: rclpy.node.Node, config: Optional[ros_sugar.base_clients.ServiceClientConfig] = None, srv_name: Optional[str] = None, srv_type: Optional[type] = None)

General purpose service client class

send_request_from_dict(request_fields: Dict[str, Union[str, Dict]], executor: Optional[rclpy.executors.Executor] = None)

Send a service request using a serialized Dict request data

Parameters:
  • request_fields (Dict[str, str]) – Request data [key, value]

  • executor (Optional[Executor], optional) – Optional ros executor, defaults to None

Returns:

Service result

Return type:

Any

send_request(req_msg, executor: Optional[rclpy.executors.Executor] = None)

Sends a request to the service returns the response In case of failure, the method attempts sending the request again multiple time according to the given config

Parameters:

req_msg (Any) – Service request msg

Returns:

Service result

Return type:

Any

class ros_sugar.base_clients.ActionClientHandler(client_node: rclpy.node.Node, config: Optional[ros_sugar.base_clients.ActionClientConfig] = None, action_name: Optional[str] = None, action_type: Optional[type] = None)

General purpose action client class

reset()

Reset the client handler

send_request_from_dict(request_fields: Dict[str, Union[str, Dict]], wait_until_first_feedback: bool = True)

Send an action request using a serialized Dict request data

Parameters:

request_fields (Dict[str, Union[str, Dict]]) – Request data [key, value]

send_request(request_msg: Any, wait_until_first_feedback: bool = True) bool

Sends a request to an action server

Parameters:
  • request_msg (Action_Type.Goal) – Action request message

  • wait_until_first_feedback (bool, optional) – Wait until the server returns its first feedback, defaults to True

Returns:

If action server is available

Return type:

bool

action_response_callback(future)

Callback when getting the action server responses

Parameters:

future (Any) – Action result future

action_result_callback(future)

Treats the path tracker action result

Parameters:

future (Any) – Action result future

action_feedback_callback(feedback_msg: Any)

Handles feedback messages received during action execution.

Parameters:

feedback_msg (Any) – Action feedback message

got_new_feedback() bool

Checks if the client got a new feedback from the server within a specified time limit

Returns:

Feedback updated on time

Return type:

bool

cancel_request() Tuple[bool, str]

Cancel an active action goal and return result

Returns:

If cancellation is successful

Return type:

Tuple[bool, str]

get_ui_elements() Dict

Get updated client elements for the UI

Returns:

description

Return type:

Dict