Chapter 4.4: Executing Actions in ROS 2
We have now converted a voice command to text, used an LLM to create a plan, and used a VLM to find the object required by that plan. The final piece of the puzzle is to translate the plan's abstract steps—like go_to(location='table') or pick_up(object_id=123)—into actual robot movements.
This is handled by an Action Execution node. This node acts as the bridge between the high-level symbolic plan from the LLM and the low-level ROS 2 interfaces that control the robot's hardware.
This chapter describes the structure of an action execution node and how it uses ROS 2 action clients to command different parts of the robot.
The Role of the Action Execution Node
The Action Execution node is the conductor of the orchestra. Its job is to:
- Subscribe to the
/action_plantopic to receive the JSON plan generated by the LLM planner. - Iterate through the plan step by step.
- For each step, call the appropriate ROS 2 action or service to make the robot perform the action.
- Wait for the action to complete successfully before moving to the next step.
- Manage state between steps (e.g., storing the
object_idfrom afind_objectcall to use in apick_upcall). - Handle failures and report status back to the user.
Using ROS 2 Action Clients
Most robotics capabilities that take time to complete, like navigation or manipulation, are exposed as ROS 2 actions. Our execution node will act as a client to these actions.
For our example, we will assume the robot has two primary action servers available:
/navigate_to_pose: A navigation action server (like the one provided by Nav2). It takes aPoseStampedmessage as a goal and moves the robot to that position./pick_and_place: A manipulation action server (which you would build using a motion planner like MoveIt). It might take a goal specifying the object to pick up and the location to place it.
The process for using an action client in rclpy is:
- Create an action client object for the specific action.
- Wait for the action server to become available.
- Create a goal message.
- Asynchronously send the goal to the server.
- Wait for the result and optionally monitor feedback while it executes.
The Action Execution Node Structure
Here is a conceptual outline of our action_executor node.
import rclpy
from rclpy.node import Node
from rclpy.action import ActionClient
from std_msgs.msg import String
import json
# Import the custom action types for navigation and manipulation
# e.g., from nav2_msgs.action import NavigateToPose
# e.g., from my_interfaces.action import PickAndPlace
class ActionExecutorNode(Node):
def __init__(self):
super().__init__('action_executor_node')
# Subscribe to the action plan topic
self.subscription = self.create_subscription(
String,
'action_plan',
self.plan_callback,
10)
# Create action clients
# self._nav_action_client = ActionClient(self, NavigateToPose, 'navigate_to_pose')
# self._manipulation_action_client = ActionClient(self, PickAndPlace, 'pick_and_place')
# A simple state machine to manage context between steps
self.robot_state = {
'found_object_id': None,
'held_object_id': None
}
self.get_logger().info("Action Executor node started.")
def plan_callback(self, msg):
self.get_logger().info(f'Received plan: {msg.data}')
try:
plan = json.loads(msg.data)
self.execute_plan(plan)
except json.JSONDecodeError:
self.get_logger().error("Failed to decode plan JSON.")
def execute_plan(self, plan):
"""
Iterate through the plan and execute each step sequentially.
"""
for step in plan:
function_name = step.get('function')
params = step.get('parameters')
self.get_logger().info(f"Executing step: {function_name}({params})")
success = False
if function_name == 'go_to':
# success = self.execute_navigation(params[0])
success = True # Placeholder
elif function_name == 'find_object':
# In a real system, this would call our vision service
# success, object_id = self.execute_find_object(params[0], params[1])
# self.robot_state['found_object_id'] = object_id
success = True # Placeholder
elif function_name == 'pick_up':
# object_to_pick = self.robot_state['found_object_id']
# success = self.execute_manipulation('pick', object_to_pick)
success = True # Placeholder
elif function_name == 'respond':
# self.execute_speech(params[0])
success = True # Placeholder
if not success:
self.get_logger().error(f"Failed to execute step: {function_name}. Aborting plan.")
return
self.get_logger().info("Plan executed successfully.")
def execute_navigation(self, location_name):
"""
Sends a goal to the navigation action server.
This is a simplified, blocking implementation.
"""
self.get_logger().info(f"Navigating to {location_name}...")
# self._nav_action_client.wait_for_server()
# goal_msg = NavigateToPose.Goal()
# ... set goal pose based on location_name ...
# future = self._nav_action_client.send_goal_async(goal_msg)
# rclpy.spin_until_future_complete(self, future) # Block until goal is accepted
# result_future = future.result().get_result_async()
# rclpy.spin_until_future_complete(self, result_future) # Block until result is received
# return result_future.result().status == 'SUCCEEDED'
return True
# ... other execute_* methods for manipulation, speech, etc.
# main() function as before
This node completes the loop. It translates the symbolic plan from the LLM into concrete calls to the robot's functional capabilities (navigation, manipulation).
The Full VLA Loop
Let's trace a full command from start to finish:
-
User: "Robot, bring me the green apple."
-
Voice Commander Node (
whisper.py):- Captures audio.
- Transcribes it using Whisper.
- Publishes the text
"robot, bring me the green apple"to the/recognized_commandtopic.
-
LLM Planner Node (
llm_planning.py):- Subscribes to
/recognized_command. - Builds a prompt containing the available actions and the user's command.
- Sends the prompt to an LLM.
- Receives a JSON plan from the LLM.
- Publishes the JSON plan
[{"function": "go_to", ...}, ...]to the/action_plantopic.
- Subscribes to
-
Action Executor Node (
action_executor.py):- Subscribes to
/action_plan. - Parses the JSON plan.
- Step 1:
go_to('table'): It calls the/navigate_to_poseaction server to move the robot to the table. It waits for the navigation to succeed. - Step 2:
find_object('apple', 'green'): It calls the/find_object_service(from our Vision Service Node). The service uses a VLM to find the green apple and returns its 3D coordinates. The executor stores these coordinates. - Step 3:
pick_up(...): It calls the/pick_and_placeaction server, passing it the coordinates of the apple, to perform the grasping motion.
- Subscribes to
This modular, ROS-native architecture allows each component to be developed, tested, and upgraded independently while working together to achieve a complex, language-driven task.
Summary
The action execution node is the final link in our VLA chain, translating a symbolic plan into physical robot actions. By using ROS 2 action clients, it can command various subsystems like navigation and manipulation in a structured and reliable way. This architecture, which separates voice recognition, planning, perception, and execution into distinct ROS 2 nodes, provides a powerful and scalable framework for building intelligent, language-enabled robots. The final module will integrate all these concepts into a complete capstone project.