ros_sugar.core.event¶
Event
Module Contents¶
Classes¶
Class to transform a Kompass event to ROS launch event using event key name |
|
ROS EventHandler for InternalEvent. |
|
A container for timestamped messages stored in the Event Blackboard. |
|
An Event is defined by a change in a ROS2 message value on a specific topic. Events are created to alert a robot software stack to any dynamic change at runtime. |
API¶
- class ros_sugar.core.event.InternalEvent(event_name: str, topics_value: Dict)¶
Bases:
launch.event.EventClass to transform a Kompass event to ROS launch event using event key name
- property event_name¶
Getter of internal event name
- Returns:
Event name
- Return type:
str
- property topics_value¶
Getter of internal event name
- Returns:
Event name
- Return type:
str
- class ros_sugar.core.event.OnInternalEvent(*, internal_event_name: str, entities: ros_sugar.utils.SomeEntitiesType, handle_once: bool = False)¶
Bases:
launch.event_handler.EventHandlerROS EventHandler for InternalEvent.
- handle(event: launch.event.Event, context) Optional[ros_sugar.utils.SomeEntitiesType]¶
Overriding handle to inject event data into the entities.
- class ros_sugar.core.event.EventBlackboardEntry¶
A container for timestamped messages stored in the Event Blackboard.
This class wraps raw ROS messages with metadata to enable:
Time-To-Live (TTL) Checks: Using
timestampto invalidate old data.Idempotency: Using a unique
idto prevent the same message instance from triggering the same event multiple times.
- Parameters:
msg (Any) – The actual data payload (e.g., a ROS message).
timestamp (float) – The standard Unix timestamp (float) when the message was received.
id (str) – A unique UUID4 string identifying this specific message reception instance. Defaults to a new UUID if not provided.
- validate(timeout: Optional[float] = None, stale_id: Optional[str] = None)¶
Validate the data freshness
- Parameters:
timeout (Optional[float], optional) – Maximum lifetime, defaults to None
stale_id (Optional[str], optional) – ID of the latest stale message, defaults to None
- classmethod get(entries_dict: Dict[str, ros_sugar.core.event.EventBlackboardEntry], topic_name: str, timeout: Optional[float], stale_id: Optional[str] = None) Optional[Any]¶
Retrieves data from entries_dict:
If data is missing: returns None
If data is present but expired: Deletes it and returns None (Lazy Expiration)
If data is valid: returns the entry
- class ros_sugar.core.event.Event(event_condition: Union[ros_sugar.io.topic.Topic, ros_sugar.condition.Condition], on_change: bool = False, handle_once: bool = False, keep_event_delay: float = 0.0)¶
An Event is defined by a change in a ROS2 message value on a specific topic. Events are created to alert a robot software stack to any dynamic change at runtime.
Events are used by matching them to ‘Actions’; an Action is meant to be executed at runtime once the Event is triggered.
- property under_processing: bool¶
If event is triggered and associated action is getting executed
- Returns:
Event under processing flag
- Return type:
bool
- property id: str¶
Getter of the event unique id
- Returns:
Unique ID
- Return type:
str
- reset()¶
Reset event processing
- clear() None¶
Clear event trigger
- raise_event_trigger() None¶
Raise event trigger
- to_dict() Dict¶
Property to parse the event into a dictionary
- Returns:
Event description dictionary
- Return type:
Dict
- classmethod from_dict(dict_obj: Dict)¶
Setter of the event using a dictionary
- Parameters:
dict_obj (Dict) – Event description dictionary
- to_json() str¶
Property to get/set the event using a json
- Returns:
Event description dictionary as json
- Return type:
str
- classmethod from_json(json_obj: Union[str, bytes, bytearray])¶
Property to get/set the event using a json
- Parameters:
json_obj (Union[str, bytes, bytearray]) – Event description dictionary as json
- get_involved_topics() List[ros_sugar.io.topic.Topic]¶
Get all the topics required for monitoring this event
- Returns:
Required topics
- Return type:
List[Topic]
- get_last_processed_id(topic_name: str) Optional[str]¶
Get the unique ID of the last processed message for a given topic
- Parameters:
topic_name (str) – Topic name
- Returns:
The last unique ID if the topic was processed earlier, otherwise None
- Return type:
Optional[str]
- verify_required_action_topics(action: ros_sugar.core.action.Action) None¶
Verify the action topic parsers (if present) against an event. Raises a ‘ValueError’ if there is a mismatch.
- Parameters:
event (Event) – Event to verify against
- register_actions(actions: Union[ros_sugar.core.action.Action, Callable, List[Union[ros_sugar.core.action.Action, Callable]]]) None¶
Register an Action or a set of Actions to execute on trigger
- clear_actions() None¶
Clear all registered on trigger Actions
- check_condition(global_topic_cache: Dict[str, ros_sugar.core.event.EventBlackboardEntry]) None¶
Replaces existing trigger logic. Evaluates the root Condition tree against the global cache.