ros_sugar.core.monitor

Monitor

Module Contents

Classes

Monitor

Monitor is a ROS2 Node (not Lifecycle) responsible of monitoring the status of the stack (rest of the running nodes) and managing requests/responses from the Orchestrator.

API

class ros_sugar.core.monitor.Monitor(components_names: List[str], events_actions: Optional[Dict[ros_sugar.core.event.Event, List[ros_sugar.core.action.Action]]] = None, events_to_emit: Optional[List[ros_sugar.core.event.Event]] = None, config: Optional[ros_sugar.config.BaseConfig] = None, services_components: Optional[List[ros_sugar.core.component.BaseComponent]] = None, action_servers_components: Optional[List[ros_sugar.core.component.BaseComponent]] = None, activate_on_start: Optional[List[str]] = None, activation_timeout: Optional[float] = None, activation_attempt_time: float = 1.0, component_name: Optional[str] = None, **_)

Bases: rclpy.node.Node

Monitor is a ROS2 Node (not Lifecycle) responsible of monitoring the status of the stack (rest of the running nodes) and managing requests/responses from the Orchestrator.

Note

When launching the stack using the Launcher, the user is not required to configure the Monitor. The Launcher will configure and launch its own Monitor internally.

Main Functionalities:

  • Creates Subscribers to registered Events. The Monitor is configured to declare an InternalEvent back to the Launcher so the corresponding Action can be executed (see source implementation in launch_actions.py)

  • Creates Subscribers to all registered Components health status topics

  • Creates clients for all components main services and main action servers

  • Creates service clients to components reconfiguration services to handle actions sent from the Launcher

add_internal_event_action_pair(event_id: str, action: ros_sugar.core.action.Action) None

Adds an internal event action pair to the monitor configuration.

Parameters:
  • event_id (str) – ID of the event to be monitored

  • action (Action) – Action to be executed on event trigger

rclpy_init_node(*args, **kwargs)

To init the node with rclpy and activate default services

activate()

Activate all subscribers/publishers/etc…

watch_and_activate_component(component_name: str) None

Poll for a respawned component and drive it back to the active state via direct lifecycle service calls once it is reachable.

Called by the Launcher after a process-level respawn. Bypasses the LifecycleTransition launch action to avoid duplicate ChangeState dispatch from stale LifecycleEventManager instances left behind by the crashed process.

configure_component(component: ros_sugar.core.component.BaseComponent, new_config: Union[object, str], keep_alive: bool) Any

Configure a given component from config instance or config file Creates and send the request to the component service

Parameters:
  • component (BaseComponent) – Component to configure

  • config (object | str) – Config instance or path to config file

  • keep_alive (bool) – To keep the component running while configuring

  • executor (ROS Executor, optional) – Used to spin the monitor node until the service response is received, defaults to None

update_parameter(component: Union[ros_sugar.core.component.BaseComponent, str], param_name: str, new_value: Any, keep_alive: bool = True) Any

Sends a ChangeParameter service request to given component

Parameters:
  • component (Union[BaseComponent, str]) – description

  • param_name (str) – description

  • new_value (Any) – description

  • keep_alive (bool, optional) – description, defaults to True

update_parameters(component: Union[ros_sugar.core.component.BaseComponent, str], params_names: List[str], new_values: List, keep_alive: bool = True, **_) Any

Sends a ChangeParameters service request to given component

Parameters:
  • component (BaseComponent) – description

  • params_names (List[str]) – description

  • new_values (List) – description

  • keep_alive (bool, optional) – description, defaults to True

send_srv_request(srv_request_msg: Any = None, srv_name: Optional[str] = None, srv_type: Optional[type] = None, **_) None

Action to send a ROS2 service request during runtime

Parameters:
  • srv_name (str) – Service name

  • srv_type (type) – Service type (ROS2 service)

  • srv_request_msg (Any) – Service request message

send_action_goal(action_request_msg: Any = None, action_name: Optional[str] = None, action_type: Optional[type] = None, **_) None

Action to send a ROS2 action goal during runtime

Parameters:
  • action_name (str) – ROS2 action name

  • action_type (type) – ROS2 action type

  • action_request_msg (Any) – ROS2 action goal message

send_component_action_goal(component_name: str, action_request_msg: Any = None, **_) Tuple[bool, str]

Action to send a ROS2 action goal during runtime

Parameters:
  • action_name (str) – ROS2 action name

  • action_type (type) – ROS2 action type

  • action_request_msg (Any) – ROS2 action goal message

publish_message(topic: ros_sugar.io.topic.Topic, msg: Any, publish_rate: Optional[float] = None, publish_period: Optional[float] = None, **_) None

Action to publish a message to a given topic

Parameters:
  • topic (Topic) – Published topic

  • msg (Any) – Published message

  • publish_rate (Optional[float], optional) – Publishing rate, if None the message is published once, defaults to None

  • publish_period (Optional[float], optional) – Publishing period, if none and rate is given the message is published forever, defaults to None