Chapter 3: Services, Actions, and Parameters
In Chapter 2, you learned about topics—the asynchronous, many-to-many communication pattern ideal for streaming sensor data. But not all robot communication fits the publish-subscribe model. What if you need:
- A one-time computation (e.g., inverse kinematics for an arm)? → Use services (synchronous request-response)
- A long-running task with progress updates (e.g., navigate to a waypoint)? → Use actions (asynchronous with feedback)
- Configurable behavior (e.g., adjust PID gains at runtime)? → Use parameters (dynamic configuration)
This chapter teaches you these advanced communication primitives and shows how to integrate AI decision logic with ROS 2 control systems.
3.1 Services: Request-Response Communication
3.1.1 Service Definitions
ROS 2 services use .srv files defining request and response types:
# example_interfaces/srv/AddTwoInts.srv
int64 a
int64 b
---
int64 sum
The --- separator divides request (top) from response (bottom). Standard service types include:
std_srvs/srv/Trigger: No request data, bool response + messagestd_srvs/srv/SetBool: Bool request, bool response + messageexample_interfaces/srv/AddTwoInts: Two integers → sum
3.1.2 Creating a Service Server
A service server listens for requests and executes a callback function:
1import rclpy2from rclpy.node import Node3from example_interfaces.srv import AddTwoInts45class AddTwoIntsServer(Node):6 def __init__(self):7 super().__init__('add_two_ints_server')89 # Create service: service type, service name, callback function10 self.srv = self.create_service(11 AddTwoInts,12 'add_two_ints',13 self.add_two_ints_callback14 )1516 self.get_logger().info('Add Two Ints Server started')1718 def add_two_ints_callback(self, request, response):19 """Service callback: receives request, populates response"""20 response.sum = request.a + request.b21 self.get_logger().info(22 f'Incoming request: a={request.a}, b={request.b}'23 )24 self.get_logger().info(f'Sending response: sum={response.sum}')25 return response2627def main(args=None):28 rclpy.init(args=args)29 node = AddTwoIntsServer()30 rclpy.spin(node)31 node.destroy_node()32 rclpy.shutdown()3334if __name__ == '__main__':35 main()
Service Server API:
self.srv = self.create_service(
srv_type=AddTwoInts, # Service type (from .srv file)
srv_name='add_two_ints', # Service name (string)
callback=self.callback_function # Function to handle requests
)
Callback Signature:
def callback(self, request, response):
# request: object with fields from request part of .srv
# response: object with fields from response part of .srv
response.field1 = compute_value()
return response # MUST return response object
3.1.3 Creating a Service Client
A service client sends requests and waits for responses:
1import sys2import rclpy3from rclpy.node import Node4from example_interfaces.srv import AddTwoInts56class AddTwoIntsClient(Node):7 def __init__(self):8 super().__init__('add_two_ints_client')910 # Create client: service type, service name11 self.client = self.create_client(AddTwoInts, 'add_two_ints')1213 # Wait for service to be available (timeout after 5 seconds)14 while not self.client.wait_for_service(timeout_sec=5.0):15 self.get_logger().info('Service not available, waiting...')1617 self.req = AddTwoInts.Request()1819 def send_request(self, a, b):20 """Send async request and return future"""21 self.req.a = a22 self.req.b = b23 self.future = self.client.call_async(self.req)24 return self.future2526def main(args=None):27 rclpy.init(args=args)2829 if len(sys.argv) != 3:30 print('Usage: python3 add_two_ints_client.py <a> <b>')31 return3233 client_node = AddTwoIntsClient()34 future = client_node.send_request(int(sys.argv[1]), int(sys.argv[2]))3536 # Spin until response received37 rclpy.spin_until_future_complete(client_node, future)3839 if future.result() is not None:40 response = future.result()41 client_node.get_logger().info(42 f'Result: {sys.argv[1]} + {sys.argv[2]} = {response.sum}'43 )44 else:45 client_node.get_logger().error('Service call failed')4647 client_node.destroy_node()48 rclpy.shutdown()4950if __name__ == '__main__':51 main()
Service Client API:
# 1. Create client
self.client = self.create_client(AddTwoInts, 'add_two_ints')
# 2. Wait for service availability
self.client.wait_for_service(timeout_sec=5.0)
# 3. Create request object
request = AddTwoInts.Request()
request.a = 10
request.b = 20
# 4. Call service asynchronously
future = self.client.call_async(request)
# 5. Wait for response
rclpy.spin_until_future_complete(node, future)
response = future.result()
3.1.4 Service Communication Flow
sequenceDiagram
participant Client as Service Client<br/>(AddTwoIntsClient)
participant Server as Service Server<br/>(AddTwoIntsServer)
Client->>Client: create_client()
Client->>Server: wait_for_service()
Note over Server: Server already running
Client->>Server: call_async(Request{a=5, b=3})
Note over Client: Non-blocking call,<br/>returns Future
activate Server
Server->>Server: add_two_ints_callback()
Server->>Server: Compute: 5 + 3 = 8
Server->>Client: Response{sum=8}
deactivate Server
Client->>Client: future.result() = 8
Note over Client: Blocking wait for Future
3.2 Actions: Long-Running Tasks with Feedback
Services are great for quick computations, but what about tasks that take seconds or minutes? Actions extend the service pattern with:
- Feedback: Server sends progress updates during execution
- Cancellation: Client can cancel the goal mid-execution
- Result: Final outcome when task completes
3.2.1 Action Definitions
Actions use .action files with three parts:
# example_interfaces/action/Fibonacci.action
# Goal
int32 order
---
# Result
int32[] sequence
---
# Feedback
int32[] partial_sequence
Three sections:
- Goal: What the client requests (e.g., "navigate to (x, y)")
- Result: Final outcome (e.g., "reached goal" or "failed - obstacle")
- Feedback: Periodic updates (e.g., "50% complete, 2.3m remaining")
3.2.2 Creating an Action Server
1import time2import rclpy3from rclpy.node import Node4from rclpy.action import ActionServer5from example_interfaces.action import Fibonacci67class FibonacciActionServer(Node):8 def __init__(self):9 super().__init__('fibonacci_action_server')1011 # Create action server: action type, action name, execute callback12 self._action_server = ActionServer(13 self,14 Fibonacci,15 'fibonacci',16 self.execute_callback17 )1819 self.get_logger().info('Fibonacci Action Server started')2021 def execute_callback(self, goal_handle):22 """Execute the action: generate Fibonacci sequence"""23 self.get_logger().info('Executing goal...')2425 feedback_msg = Fibonacci.Feedback()26 feedback_msg.partial_sequence = [0, 1]2728 # Generate Fibonacci sequence up to order N29 for i in range(1, goal_handle.request.order):30 # Check if cancellation requested31 if goal_handle.is_cancel_requested:32 goal_handle.canceled()33 self.get_logger().info('Goal canceled')34 return Fibonacci.Result()3536 # Compute next Fibonacci number37 feedback_msg.partial_sequence.append(38 feedback_msg.partial_sequence[i] +39 feedback_msg.partial_sequence[i - 1]40 )4142 # Publish feedback43 self.get_logger().info(44 f'Feedback: {feedback_msg.partial_sequence}'45 )46 goal_handle.publish_feedback(feedback_msg)4748 time.sleep(1) # Simulate long-running computation4950 # Mark goal as succeeded and return result51 goal_handle.succeed()5253 result = Fibonacci.Result()54 result.sequence = feedback_msg.partial_sequence55 self.get_logger().info(f'Result: {result.sequence}')56 return result5758def main(args=None):59 rclpy.init(args=args)60 node = FibonacciActionServer()61 rclpy.spin(node)62 node.destroy_node()63 rclpy.shutdown()6465if __name__ == '__main__':66 main()
Action Server API:
self._action_server = ActionServer(
node=self, # Parent node
action_type=Fibonacci, # Action type
action_name='fibonacci', # Action name
execute_callback=self.execute # Callback function
)
def execute_callback(self, goal_handle):
# 1. Access goal: goal_handle.request.field
order = goal_handle.request.order
# 2. Publish feedback periodically
feedback = Fibonacci.Feedback()
goal_handle.publish_feedback(feedback)
# 3. Check for cancellation
if goal_handle.is_cancel_requested:
goal_handle.canceled()
return Fibonacci.Result()
# 4. Mark as succeeded and return result
goal_handle.succeed()
result = Fibonacci.Result()
return result
3.2.3 Creating an Action Client
1import rclpy2from rclpy.node import Node3from rclpy.action import ActionClient4from example_interfaces.action import Fibonacci56class FibonacciActionClient(Node):7 def __init__(self):8 super().__init__('fibonacci_action_client')910 # Create action client11 self._action_client = ActionClient(12 self,13 Fibonacci,14 'fibonacci'15 )1617 def send_goal(self, order):18 """Send goal and register callbacks"""19 goal_msg = Fibonacci.Goal()20 goal_msg.order = order2122 self.get_logger().info('Waiting for action server...')23 self._action_client.wait_for_server()2425 self.get_logger().info(f'Sending goal: order={order}')2627 # Send goal with feedback callback28 self._send_goal_future = self._action_client.send_goal_async(29 goal_msg,30 feedback_callback=self.feedback_callback31 )3233 # Register callback for goal acceptance34 self._send_goal_future.add_done_callback(self.goal_response_callback)3536 def goal_response_callback(self, future):37 """Called when server accepts/rejects goal"""38 goal_handle = future.result()3940 if not goal_handle.accepted:41 self.get_logger().info('Goal rejected')42 return4344 self.get_logger().info('Goal accepted')4546 # Get result asynchronously47 self._get_result_future = goal_handle.get_result_async()48 self._get_result_future.add_done_callback(self.get_result_callback)4950 def feedback_callback(self, feedback_msg):51 """Called periodically during execution"""52 feedback = feedback_msg.feedback53 self.get_logger().info(54 f'Received feedback: {feedback.partial_sequence}'55 )5657 def get_result_callback(self, future):58 """Called when action completes"""59 result = future.result().result60 self.get_logger().info(f'Result: {result.sequence}')61 rclpy.shutdown()6263def main(args=None):64 rclpy.init(args=args)65 action_client = FibonacciActionClient()66 action_client.send_goal(10)67 rclpy.spin(action_client)6869if __name__ == '__main__':70 main()
3.2.4 Action Communication Flow
sequenceDiagram
participant Client as Action Client
participant Server as Action Server
Client->>Server: send_goal_async(Goal{order=10})
Server->>Client: Goal Accepted
loop Every computation step
Server->>Server: Compute next Fibonacci
Server->>Client: Feedback{partial_sequence}
Note over Client: feedback_callback()
end
alt Goal Succeeded
Server->>Client: Result{sequence=[0,1,1,2,3,5,...]}
Note over Client: get_result_callback()
else Goal Canceled
Client->>Server: cancel_goal()
Server->>Client: Result{} (empty)
end
3.3 Parameters: Runtime Configuration
Parameters allow you to configure node behavior at runtime without restarting the node.
3.3.1 Declaring and Using Parameters
1import rclpy2from rclpy.node import Node34class ParameterNode(Node):5 def __init__(self):6 super().__init__('parameter_node')78 # Declare parameters with default values9 self.declare_parameter('robot_name', 'atlas')10 self.declare_parameter('max_speed', 1.5) # m/s11 self.declare_parameter('debug_mode', False)1213 # Get parameter values14 robot_name = self.get_parameter('robot_name').value15 max_speed = self.get_parameter('max_speed').value16 debug_mode = self.get_parameter('debug_mode').value1718 self.get_logger().info(f'Robot: {robot_name}')19 self.get_logger().info(f'Max Speed: {max_speed} m/s')20 self.get_logger().info(f'Debug Mode: {debug_mode}')2122 # Use parameters in node logic23 self.timer = self.create_timer(24 1.0,25 lambda: self.get_logger().info(26 f'Max speed: {self.get_parameter("max_speed").value}'27 )28 )2930def main(args=None):31 rclpy.init(args=args)32 node = ParameterNode()33 rclpy.spin(node)34 node.destroy_node()35 rclpy.shutdown()3637if __name__ == '__main__':38 main()
Setting Parameters at Launch:
# Command line
ros2 run my_package parameter_node --ros-args \
-p robot_name:=optimus \
-p max_speed:=2.0 \
-p debug_mode:=true
# Inspect parameters
ros2 param list
ros2 param get /parameter_node robot_name
ros2 param set /parameter_node max_speed 3.0
3.3.2 Dynamic Parameter Updates with Callbacks
1import rclpy2from rclpy.node import Node3from rcl_interfaces.msg import SetParametersResult45class DynamicParameterNode(Node):6 def __init__(self):7 super().__init__('dynamic_parameter_node')89 self.declare_parameter('threshold', 50.0)1011 # Register parameter callback12 self.add_on_set_parameters_callback(self.parameter_callback)1314 self.get_logger().info('Dynamic Parameter Node started')1516 def parameter_callback(self, params):17 """Called whenever parameters are updated"""18 for param in params:19 if param.name == 'threshold':20 if param.value < 0.0 or param.value > 100.0:21 self.get_logger().warn(22 f'Invalid threshold: {param.value} (must be 0-100)'23 )24 return SetParametersResult(successful=False)2526 self.get_logger().info(27 f'Threshold updated: {param.value}'28 )2930 return SetParametersResult(successful=True)3132def main(args=None):33 rclpy.init(args=args)34 node = DynamicParameterNode()35 rclpy.spin(node)36 node.destroy_node()37 rclpy.shutdown()3839if __name__ == '__main__':40 main()
3.4 AI-ROS Integration Patterns
Now let's bridge everything together: integrating AI decision logic with ROS 2 communication.
3.4.1 AI Agent Pattern
1import rclpy2from rclpy.node import Node3from geometry_msgs.msg import Twist4from sensor_msgs.msg import LaserScan5from enum import Enum6import math78class RobotState(Enum):9 """Finite state machine states"""10 FORWARD = 111 TURNING_LEFT = 212 TURNING_RIGHT = 313 STOPPED = 41415class ObstacleAvoidanceAI:16 """Pure Python AI agent - no ROS dependencies"""1718 def __init__(self, obstacle_distance_threshold=0.5):19 self.state = RobotState.FORWARD20 self.obstacle_threshold = obstacle_distance_threshold2122 def decide_action(self, laser_data):23 """24 AI decision logic: analyze sensor data, return velocity command2526 Args:27 laser_data: dict with 'front', 'left', 'right' distances2829 Returns:30 dict: {'linear': float, 'angular': float}31 """32 front_dist = laser_data.get('front', float('inf'))33 left_dist = laser_data.get('left', float('inf'))34 right_dist = laser_data.get('right', float('inf'))3536 # State machine logic37 if front_dist < self.obstacle_threshold:38 # Obstacle ahead - turn towards open side39 if left_dist > right_dist:40 self.state = RobotState.TURNING_LEFT41 return {'linear': 0.0, 'angular': 0.5}42 else:43 self.state = RobotState.TURNING_RIGHT44 return {'linear': 0.0, 'angular': -0.5}45 else:46 # Clear path - move forward47 self.state = RobotState.FORWARD48 return {'linear': 0.3, 'angular': 0.0}4950class AIRobotController(Node):51 """ROS 2 wrapper for AI agent"""5253 def __init__(self):54 super().__init__('ai_robot_controller')5556 # Initialize AI agent (pure Python)57 self.ai_agent = ObstacleAvoidanceAI(obstacle_distance_threshold=0.5)5859 # ROS 2 communication60 self.cmd_vel_pub = self.create_publisher(Twist, 'cmd_vel', 10)61 self.laser_sub = self.create_subscription(62 LaserScan,63 'scan',64 self.laser_callback,65 1066 )6768 # Control loop timer (10 Hz decision rate)69 self.timer = self.create_timer(0.1, self.control_loop)7071 self.latest_laser_data = None72 self.get_logger().info('AI Robot Controller started')7374 def laser_callback(self, msg):75 """Store latest laser scan data"""76 # Convert LaserScan to simplified format for AI77 ranges = msg.ranges78 self.latest_laser_data = {79 'front': min(ranges[0:30] + ranges[-30:]), # Front 60 degrees80 'left': min(ranges[30:90]), # Left 60 degrees81 'right': min(ranges[-90:-30]) # Right 60 degrees82 }8384 def control_loop(self):85 """10 Hz decision loop"""86 if self.latest_laser_data is None:87 return # No sensor data yet8889 # AI decision90 action = self.ai_agent.decide_action(self.latest_laser_data)9192 # Publish command93 cmd_msg = Twist()94 cmd_msg.linear.x = action['linear']95 cmd_msg.angular.z = action['angular']96 self.cmd_vel_pub.publish(cmd_msg)9798 self.get_logger().info(99 f'State: {self.ai_agent.state.name}, '100 f'Cmd: linear={action["linear"]:.2f}, '101 f'angular={action["angular"]:.2f}'102 )103104def main(args=None):105 rclpy.init(args=args)106 node = AIRobotController()107 rclpy.spin(node)108 node.destroy_node()109 rclpy.shutdown()110111if __name__ == '__main__':112 main()
3.5 Hands-On Exercise: Implement a Calculator Service
Multi-Operation Calculator Service
Learning Objectives
Problem Statement
Create a calculator service that supports four operations: addition, subtraction, multiplication, and division.
Requirements
Service Definition (srv/Calculate.srv):
float64 a
float64 b
string operation # "add", "subtract", "multiply", "divide"
---
float64 result
bool success
string message
Service Server (calculator_server.py):
- Service name:
/calculator - Support all four operations
- Return
success=Falseand error message for:- Division by zero
- Invalid operation string
- Log each request and response
Service Client (calculator_client.py):
- Accept command-line arguments:
a b operation - Call service and display result
- Handle service call failures gracefully
Testing
# Terminal 1: Run server
python3 calculator_server.py
# Terminal 2: Test with client
python3 calculator_client.py 15.5 3.2 add
python3 calculator_client.py 10 0 divide # Should fail gracefully
# Terminal 3: Test with CLI
ros2 service call /calculator my_package/srv/Calculate \
"{a: 100, b: 25, operation: 'subtract'}"
Deliverable
- Three files:
Calculate.srv,calculator_server.py,calculator_client.py - Screenshot showing successful addition and failed division by zero
- Answer: How would you extend this to support exponentiation?
Key Takeaways
✓ Core Concepts Mastered
- Services provide synchronous request-response communication for infrequent operations like computations, triggers, and queries
- Actions extend services with feedback and cancellation for long-running tasks (> 1 second) like navigation and manipulation
- Action servers publish periodic feedback during execution, allowing clients to monitor progress and cancel goals
- Parameters enable runtime configuration with dynamic updates through callbacks and validation
- AI-ROS integration follows separation of concerns with pure Python AI logic wrapped in ROS nodes
- Async service calls (
call_async) are preferred over blocking calls to prevent node deadlocks - Real-time control requires < 100ms decision cycles with profiling and optimization for AI inference latency
What's Next?
In Chapter 4: URDF Robot Modeling, you'll learn to define robot structure:
- URDF syntax for links (rigid bodies) and joints (connections)
- Visual and collision geometry for simulation
- Kinematic chains for humanoid robots
- RViz2 visualization with joint_state_publisher
Prerequisite Check: Before proceeding, ensure you can:
- ✅ Create a service server and client for a custom service type
- ✅ Explain when to use services vs. actions
- ✅ Implement parameter validation with callbacks
- ✅ Separate AI decision logic from ROS communication code
Next: Chapter 4: URDF Robot Modeling (coming soon)