ros_sugar.core.event

Event

Module Contents

Classes

InternalEvent

Class to transform a Kompass event to ROS launch event using event key name

OnInternalEvent

ROS EventHandler for InternalEvent.

EventBlackboardEntry

A container for timestamped messages stored in the Event Blackboard.

Event

An Event is defined by a change in a ROS2 message value on a specific topic. Events are created to alert a robot software stack to any dynamic change at runtime.

API

class ros_sugar.core.event.InternalEvent(event_name: str, topics_value: Dict)

Bases: launch.event.Event

Class to transform a Kompass event to ROS launch event using event key name

property event_name

Getter of internal event name

Returns:

Event name

Return type:

str

property topics_value

Getter of internal event name

Returns:

Event name

Return type:

str

class ros_sugar.core.event.OnInternalEvent(*, internal_event_name: str, entities: ros_sugar.utils.SomeEntitiesType, handle_once: bool = False)

Bases: launch.event_handler.EventHandler

ROS EventHandler for InternalEvent.

handle(event: launch.event.Event, context) Optional[ros_sugar.utils.SomeEntitiesType]

Overriding handle to inject event data into the entities.

class ros_sugar.core.event.EventBlackboardEntry

A container for timestamped messages stored in the Event Blackboard.

This class wraps raw ROS messages with metadata to enable:

  1. Time-To-Live (TTL) Checks: Using timestamp to invalidate old data.

  2. Idempotency: Using a unique id to prevent the same message instance from triggering the same event multiple times.

Parameters:
  • msg (Any) – The actual data payload (e.g., a ROS message).

  • timestamp (float) – The standard Unix timestamp (float) when the message was received.

  • id (str) – A unique UUID4 string identifying this specific message reception instance. Defaults to a new UUID if not provided.

validate(timeout: Optional[float] = None, stale_id: Optional[str] = None)

Validate the data freshness

Parameters:
  • timeout (Optional[float], optional) – Maximum lifetime, defaults to None

  • stale_id (Optional[str], optional) – ID of the latest stale message, defaults to None

classmethod get(entries_dict: Dict[str, ros_sugar.core.event.EventBlackboardEntry], topic_name: str, timeout: Optional[float], stale_id: Optional[str] = None) Optional[Any]

Retrieves data from entries_dict:

  • If data is missing: returns None

  • If data is present but expired: Deletes it and returns None (Lazy Expiration)

  • If data is valid: returns the entry

class ros_sugar.core.event.Event(event_condition: Union[ros_sugar.io.topic.Topic, ros_sugar.condition.Condition], on_change: bool = False, handle_once: bool = False, keep_event_delay: float = 0.0)

An Event is defined by a change in a ROS2 message value on a specific topic. Events are created to alert a robot software stack to any dynamic change at runtime.

Events are used by matching them to ‘Actions’; an Action is meant to be executed at runtime once the Event is triggered.

property under_processing: bool

If event is triggered and associated action is getting executed

Returns:

Event under processing flag

Return type:

bool

property id: str

Getter of the event unique id

Returns:

Unique ID

Return type:

str

reset()

Reset event processing

clear() None

Clear event trigger

raise_event_trigger() None

Raise event trigger

to_dict() Dict

Property to parse the event into a dictionary

Returns:

Event description dictionary

Return type:

Dict

classmethod from_dict(dict_obj: Dict)

Setter of the event using a dictionary

Parameters:

dict_obj (Dict) – Event description dictionary

to_json() str

Property to get/set the event using a json

Returns:

Event description dictionary as json

Return type:

str

classmethod from_json(json_obj: Union[str, bytes, bytearray])

Property to get/set the event using a json

Parameters:

json_obj (Union[str, bytes, bytearray]) – Event description dictionary as json

get_involved_topics() List[ros_sugar.io.topic.Topic]

Get all the topics required for monitoring this event

Returns:

Required topics

Return type:

List[Topic]

get_last_processed_id(topic_name: str) Optional[str]

Get the unique ID of the last processed message for a given topic

Parameters:

topic_name (str) – Topic name

Returns:

The last unique ID if the topic was processed earlier, otherwise None

Return type:

Optional[str]

verify_required_action_topics(action: ros_sugar.core.action.Action) None

Verify the action topic parsers (if present) against an event. Raises a ‘ValueError’ if there is a mismatch.

Parameters:

event (Event) – Event to verify against

register_actions(actions: Union[ros_sugar.core.action.Action, Callable, List[Union[ros_sugar.core.action.Action, Callable]]]) None

Register an Action or a set of Actions to execute on trigger

Parameters:

actions (Union[Action, List[Action]]) – Action or a list of Actions

clear_actions() None

Clear all registered on trigger Actions

check_condition(global_topic_cache: Dict[str, ros_sugar.core.event.EventBlackboardEntry]) None

Replaces existing trigger logic. Evaluates the root Condition tree against the global cache.