==========
ROS2-based Framework
==========
Challenges
-----------------------------------
Single Node: One Specific Task with Tightly Coupled, Customized Interface
++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
- Works well in isolation but is rigid and non-reusable.
- Time-consuming due to repeated, low-value work:
+ Customizing data interfaces
+ Declaring connections
+ Implementing encode/decode functions
- Developer should focus solely on the core *response function*
.. figure:: images/fig_typical_ROS2_node.png
:alt: Alternative text
:width: 600px
:align: center
Customize interfaces
********************
**Step 1.** Create interface folder in *ros2_ws/src*
.. code-block:: bash
ros2 pkg create --build-type ament_cmake --license Apache-2.0 tutorial_interfaces
**Step 2.** Create *msg*, *srv* folder in *ros2_ws/src/tutorial_interfaces*
.. code-block:: bash
mkdir msg srv
**Step 3.** Make new file
*Num.msg* in *ros2_ws/src/tutorial_interfaces/msg*
.. code-block:: bash
int64 num
*AddThreeInts.srv* in *ros2/src/tutorial_interfaces/srv*
.. code-block:: bash
int64 a
int64 b
int64 c
---
int64 sum
**Step 4.** Change *CMakeLists.txt*
.. code-block:: bash
find_package(geometry_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/Num.msg"
"srv/AddThreeInts.srv"
DEPENDENCIES geometry_msgs # Add packages that above messages depend on, in this case geometry_msgs for Sphere.msg
)
**Step 5.** Change *package.xml*
.. code-block:: bash
geometry_msgs
rosidl_default_generators
rosidl_default_runtime
rosidl_interface_packages
**Step 6.** Build interface
.. code-block:: bash
colcon build --packages-select tutorial_interfaces
source install/setup.bash
Implement Node
****************
**Step 1.** Define and run server node
.. code-block:: python
from tutorial_interfaces.srv import AddThreeInts # CHANGE
import rclpy
from rclpy.node import Node
class MinimalService(Node):
def __init__(self):
super().__init__('minimal_service')
# Declare connection and data interface
self.srv = self.create_service(AddThreeInts, 'add_three_ints', self.add_three_ints_callback) # CHANGE
def add_three_ints_callback(self, request, response):
response.sum = request.a + request.b + request.c # CHANGE
self.get_logger().info('Incoming request\na: %d b: %d c: %d' % (request.a, request.b, request.c)) # CHANGE
return response
def add_three_ints_callback_rewrite(self, request, response):
# Decode
data = {'a': request.a, 'b': request.b, 'c': request.c}
# Response
res_data = data['a'] + data['b'] + data['c']
# Encode
response.sum = res_data
return response
**Step 2.** Run clients
.. code-block:: python
from tutorial_interfaces.srv import AddThreeInts # CHANGE
import sys
import rclpy
from rclpy.node import Node
class MinimalClientAsync(Node):
def __init__(self):
super().__init__('minimal_client_async')
# Declare connection and data interface
self.cli = self.create_client(AddThreeInts, 'add_three_ints') # CHANGE
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.data_interface = AddThreeInts
self.request = self.data_interface.Request()
def send(self, data):
# encode
self.request.a = data['a']
self.request.b = data['b']
self.request.c = data['c']
future = self.cli.call_async(self.req)
while rclpy.ok() and not future.done() and wait_until_done:
time.sleep(0.1)
res = future.result()
# decode
rev_data = {'sum': res.sum}
return rev_data
Multitask: Hard, Even Impossible
++++++++++++++++++++++++
- Multiple tasks within one node
+ quickly results in bloated, hard-to-maintain code
+ manually manage numerous subscribers, publishers, actions, services, and timers, each with different APIs and callback signatures
- Coordination across multiple tasks (e.g., perception, planning, control, diagnostics)
+ forced to split logic across multiple nodes, introducing complex inter-node communication
+ synchronization issues, lifecycle management overhead, and debugging difficulties
Key Advantages of Custom ROS2 Node
-----------------------------------
This lightweight framework wraps vanilla ROS2 (`rclpy`) with a clean **agent-based** design.
Instead of writing repetitive boilerplate for every subscriber, publisher, action, or service, you:
- Add agents with **one line** of code
- Use the **same simple API** (`agent.send()`, `agent.get()`) for all communication types
- Keep your business logic clean (no ROS messages inside your functions)
- Get **automatic logging**, connection waiting, and multi-threaded execution
.. figure:: images/fig_custom_ROS2_node.png
:alt: Alternative text
:width: 600px
:align: center
.. list-table:: Robot Skills
:header-rows: 1
:widths: 30 35 35
* - Feature
- Vanilla ROS2
- This Custom Framework
* - Code length
- Lots of boilerplate
- Very concise
* - Interface consistency
- Different syntax per type
- Uniform: send() / get() everywhere
* - Data handling
- Manual encode/decode everywhere
- One encode_func + decode_func per agent
* - Business logic
- Mixed with ROS calls
- Pure Python in response_func
* - Managing 20+ interfaces
- Messy node class
- Clean node.agents dictionary
* - Logging & debugging
- Manual or external tools
- Built-in: JSON + images + periodic sub logging
* - Waiting for connections
- You write it
- One call: node.wait_until_connected()
* - Concurrency
- Manual executor + callback groups
- Smart defaults for images & high-rate topics
**Perfect for**:
- Robots with many sensors/actuators
- Teams that want fast prototyping
- Projects needing easy debugging & data recording
Overview of the Custom ROS2 Node Implementation
-----------------------------------
This Python code provides a highly modular and extensible framework for building custom ROS2 nodes, centered on the base class NodeAgent and the managing class CustomNode. It abstracts ROS2 communication primitives — topics, actions, services, and timers — into reusable “agents” that are easily configured and combined to handle complex robotic or distributed systems.
The key advantage of this design lies in its strict separation of concerns: **callbacks and send operations no longer contain fragmented, repetitive ROS2 message handling code**. Instead, all low-level details are cleanly split into three dedicated components:
- **encode_func**: Converts pure Python data (e.g., dicts, NumPy arrays) into ROS2 message fields before sending or publishing.
- **decode_func**: Transforms incoming ROS2 messages back into clean, easy-to-use Python data structures.
- **response_func** (or run_func): Contains only the core business logic or processing, operating exclusively on pure Python data with no direct contact with ROS2 message objects.
This clear separation keeps callbacks and send functions short, readable, and focused solely on orchestration, while the actual data conversion and processing logic lives in reusable, testable pure-Python functions.
Combined with ROS2’s rclpy library, the framework adds powerful layers for data encoding/decoding, comprehensive logging, robust error handling, and intelligent concurrency management. Agents inherit from NodeAgent and are aggregated in a CustomNode, which manages spinning, callback execution, and lifecycle.
Key features include:
- **Uniform Interface**: All agents expose the same simple get(), send(), and log_msg() methods, regardless of whether they are subscribers, publishers, actions, services, or timers.
- **Clean Data Handling**: Customizable encode_func and decode_func enable seamless use of native Python formats (JSON, NumPy, OpenCV images).
- **Built-in Logging & Debugging**: Automatic saving of messages, images, and arrays to timestamped directories, plus optional periodic logging of all subscribers.
- **Smart Concurrency**: Multi-threaded executors and callback groups automatically applied, especially for high-rate image topics.
- **Easy Extensibility**: Factory function make_agent() and AGENT_CLASS_DICT simplify adding new agent types or integrating non-ROS protocols (e.g., TCP/IP).
This implementation is especially well-suited for robotics, AI, and IoT applications using ROS2, where nodes must reliably coordinate numerous sensors, actuators, and sophisticated processing pipelines while remaining maintainable and debuggable.
Detailed Breakdown of Components
---------------------------------------
Base Class: NodeAgent
++++++++++++++++++++++
Purpose: Serves as the foundation for all agent types, providing shared functionality for initialization, logging, data formatting, and basic send/receive operations.
Key Methods:
- **__init__()**: Sets up agent properties like name, connection type (e.g., 'sub' for subscriber, 'action_server'), data interface (ROS2 message type), and custom functions for encoding/decoding/response processing. It also tracks connection status and logging flags.
- **make_log_dirs()**: Creates timestamped directories for logs (messages in JSON, images/arrays in PNG/NPY). If is_init is True, it starts a new session; otherwise, it reuses the latest one.
- **log_msg()**: Saves data to files, handling JSON-serializable dicts, NumPy arrays (as images or .npy files), and threading for non-blocking I/O. Useful for auditing complex interactions.
- **reform_response()**: Ensures responses include an 'isdone' flag for consistent completion signaling.
- **get()**: Retrieves data from subscribers or triggers sends for clients. Includes optional info printing for debugging.
- **send()**: Abstract method overridden in subclasses for type-specific sending.
Advantages: By abstracting common operations, it reduces boilerplate code when dealing with diverse ROS2 interfaces.
Derived Agent Classes
++++++++++++++++++++++
These inherit from NodeAgent and implement specific ROS2 behaviors:
ActionServerAgent:
*****************
- Handles action goals via callback(), decoding requests, processing with response_func, and sending feedback/results.
- Uses threading for continuous feedback during long-running tasks.
- Error handling ensures graceful aborts with error messages.
ActionClientAgent:
^^^^^^^^^^^^^^^^^^
- **send()** encodes and sends goals, waits for results (with optional timeout), and decodes responses.
- **Callbacks** for feedback, goal acceptance, and results provide fine-grained control.
TopicAgent:
^^^^^^^^^^^^^^^^^^
- **callback()** decodes incoming messages and applies response_func.
- **send()** and **publish()** encode and publish data, with support for periodic publishing via timers.
Tracks first-message receipt to confirm connections.
ServiceServerAgent:
^^^^^^^^^^^^^^^^^^
- **callback()** decodes requests, processes them, and encodes responses.
ServiceClientAgent:
^^^^^^^^^^^^^^^^^^
- **send()** encodes requests, calls the service asynchronously, and decodes results. Supports non-blocking mode and timing info for performance analysis.
TimerAgent:
^^^^^^^^^^^^^^^^^
- **callback()** simply invokes response_func at intervals, ideal for periodic tasks without external triggers.
**AGENT_CLASS_DICT**: A mapping from connection types (strings like 'action_server') to classes, enabling dynamic agent creation.
Factory Function: make_agent()
*****************
Takes a CustomNode and config dict (e.g., conn_type, data_interface, encode/decode funcs).
Instantiates the appropriate agent class, attaches it to the node (e.g., creating subscriptions, publishers, or timers).
Handles special cases like image sensors with reentrant callback groups for better concurrency.
Supports integration with non-ROS protocols (e.g., TCP/IP) via conditional branching.
Output: Prints confirmation messages for easy debugging.
Managing Class: CustomNode (Inherits from rclpy.node.Node)
*****************
Purpose: Acts as a container for multiple agents, managing their lifecycle and execution.
Key Methods:
- **__init__()**: Initializes node name, logging setup, and shared callback groups. Optional stop_func for cleanup on shutdown.
- **add_agent()**: Configures and adds a single agent, handling name resolution and optional TCP/IP fallback.
- **add_agents()**: Batch-adds agents from a list of configs, with shared kwargs for consistency.
- **listen() / spin()**: Starts callback processing, optionally in a thread. Uses MultiThreadedExecutor if multiple callback groups are needed (e.g., for high-concurrency setups).
- **wait_until_connected()**: Blocks until all subs/clients are connected, ensuring readiness in distributed systems.
- **start_log_subs()**: Periodically logs all subscriber data (e.g., at 10 FPS) to a timestamped directory, handling arrays/images separately for efficiency.
- **stop_log_subs()**: Halts logging thread safely.
- **is_logging_subs**: Property to check logging status.
Agent Storage: Agents are stored in a dict by name for easy access (e.g., node.agents['my_sub'].get()).
Advantages in Flexibility and Handling Complex Workflows
----------------------------------------
This implementation shines in its flexibility, allowing developers to mix and match ROS2 primitives without rewriting core logic:
- **Modular Configuration**: Agents are defined via simple dicts (e.g., in add_agents()), specifying conn_type, data_interface, and custom funcs. This decouples business logic (response_func) from communication details, enabling rapid prototyping. For instance, swapping a topic for an action requires only changing conn_type—no code restructuring.
- **Custom Data Pipelines**: Encode/decode funcs support arbitrary data (e.g., converting dicts to ROS messages, handling images with CvBridge). This integrates seamlessly with libraries like NumPy, OpenCV, or even external APIs, making it adaptable to non-standard ROS workflows.
- **Extensibility**: Inheritance from NodeAgent allows custom subclasses for niche needs (e.g., adding encryption). The factory pattern and AGENT_CLASS_DICT facilitate adding new types without modifying core code.
- **Concurrency and Performance**: Multi-threaded executors and reentrant groups prevent bottlenecks in image-heavy or multi-agent nodes. Timers enable scheduled tasks, while async sends in clients support non-blocking operations.
For complex workflows involving many agents (e.g., a robot with dozens of sensors, controllers, and AI modules):
- **Scalability**: A single CustomNode can manage 100+ agents, each handling a specific role (e.g., 10 subs for sensors, 5 action clients for actuators, 3 timers for health checks). add_agents() batch-adds them from configs, ideal for large systems.
- **Orchestration**: Agents interact via shared node methods (e.g., one agent's response_func can trigger another's send()). This enables intricate pipelines, like a subscriber triggering an action server, which logs results and publishes updates.
- **Debugging in Complexity**: Logging (messages, images, periodic subs) captures state in timestamped dirs, crucial for diagnosing issues in multi-agent setups. Error decorators (@exception_handler) and feedback loops ensure robustness.
- **Distributed Systems**: Waiting for connections and handling timeouts make it reliable for fleets of nodes. For example, in a swarm robotics scenario, one node could coordinate 50+ agents for perception, planning, and execution, with logging for post-analysis.
- **Real-World Example**: Imagine a autonomous vehicle node: Subs for camera/LiDAR, action servers for navigation commands, services for diagnostics, timers for heartbeat signals. This framework allows implementing such a "very complicated work" by composing agents, reducing development time and errors compared to vanilla ROS2.
In summary, this code transforms ROS2 node development into a composable, agent-based paradigm, offering unparalleled flexibility for simple scripts or massive, multi-node ecosystems. It minimizes glue code, enhances maintainability, and scales effortlessly to demanding applications.
Quick Start
-------------
This custom framework makes building ROS2 nodes much simpler and more powerful than vanilla ROS2. You create one CustomNode, then add "agents" for topics, actions, services, or timers — all with a clean, uniform interface.
Basic Setup
+++++++++++
.. code-block:: python
from pyconnect.ros.custom_node import CustomNode
node = CustomNode(name='my_robot_node')
Add Agents
+++++++++++
Subscriber (receive data from a topic)
*******************************
.. code-block:: python
node.add_agent(
conn_name='camera_image', # topic name
conn_type='sub', # subscriber
data_interface=Image, # your ROS message type, e.g. from sensor_msgs.msg import Image
decode_func=my_image_decoder, # function: ROS msg → Python dict
response_func=my_image_processor, # optional: called every time a message arrives
do_log_msg=True # optional: auto-save images + data to disk
)
Publisher (send data to a topic)
*******************************
.. code-block:: python
node.add_agent(
conn_name='command',
conn_type='pub',
data_interface=String,
encode_func=my_string_encoder, # function: Python dict → ROS msg
time_period=0.5 # optional: auto-publish every 0.5 sec
)
Action Client (call a long-running action)
*******************************
.. code-block:: python
node.add_agent(
conn_name='navigate',
conn_type='action_client',
data_interface=NavigateAction, # your custom action type
encode_func=my_goal_encoder,
decode_func=my_result_decoder
)
Action Server (provide a service that can give feedback)
*******************************
.. code-block:: python
node.add_agent(
conn_name='process_data',
conn_type='action_server',
data_interface=ProcessAction,
encode_func=my_result_encoder,
decode_func=my_goal_decoder,
response_func=my_long_task # your main processing function
)
Service Client
*******************************
.. code-block:: python
node.add_agent(
conn_name='get_map',
conn_type='service_client',
data_interface=GetMap
)
Timer (run something periodically)
*******************************
.. code-block:: python
node.add_agent(
conn_name='heartbeat',
conn_type='timer',
time_period=1.0, # every 1 second
response_func=my_health_check
)
Use the Agents (Super Simple API)
---------------------------------
.. code-block:: python
# Get latest data from a subscriber
latest_image_data = node.agents['camera_image'].get() # returns decoded Python dict
# Or with info print
latest_image_data = node.agents['camera_image'].get(show_info=True)
# Send/publish data
node.agents['command'].send({'cmd': 'forward', 'speed': 0.5})
# Call an action
result = node.agents['navigate'].send({'goal_x': 10.0, 'goal_y': 5.0})
# Call a service
map_data = node.agents['get_map'].send({'request_full': True})
Run the Node
----------------------
.. code-block:: python
# Option A: Run in background thread (good for scripts)
node.spin(run_thread=True)
# Option B: Block main thread (good for simple programs)
node.spin() # or node.listen()
Useful Extras
----------------------
.. code-block:: python
# Wait until all sensors/services are connected
node.wait_until_connected()
# Log all incoming subscriber data (images, arrays, etc.) at 10 FPS
node.start_log_subs(fps=10)
# ... later
node.stop_log_subs()
Full Minimal Example (Publisher + Subscriber)
----------------------
.. code-block:: python
from std_msgs.msg import String
from pyconnect.utils import dict2str, str2dict
def encoder(data, msg):
msg.data = dict2str(data)
return msg
def decoder(msg):
return str2dict(msg.data)
# Publisher node
pub_node = CustomNode('talker')
pub_node.add_agent(conn_name='chatter', conn_type='pub', data_interface=String,
encode_func=encoder, time_period=1.0) # sends every second
pub_node.spin(run_thread=True)
# Subscriber node
sub_node = CustomNode('listener')
sub_node.add_agent(conn_name='chatter', conn_type='sub', data_interface=String,
decode_func=decoder, do_log_msg=True)
sub_node.spin()