Skip to main content

Chapter 3: Services, Actions, and Parameters

In Chapter 2, you learned about topics—the asynchronous, many-to-many communication pattern ideal for streaming sensor data. But not all robot communication fits the publish-subscribe model. What if you need:

  • A one-time computation (e.g., inverse kinematics for an arm)? → Use services (synchronous request-response)
  • A long-running task with progress updates (e.g., navigate to a waypoint)? → Use actions (asynchronous with feedback)
  • Configurable behavior (e.g., adjust PID gains at runtime)? → Use parameters (dynamic configuration)

This chapter teaches you these advanced communication primitives and shows how to integrate AI decision logic with ROS 2 control systems.

3.1 Services: Request-Response Communication

3.1.1 Service Definitions

ROS 2 services use .srv files defining request and response types:

# example_interfaces/srv/AddTwoInts.srv
int64 a
int64 b
---
int64 sum

The --- separator divides request (top) from response (bottom). Standard service types include:

  • std_srvs/srv/Trigger: No request data, bool response + message
  • std_srvs/srv/SetBool: Bool request, bool response + message
  • example_interfaces/srv/AddTwoInts: Two integers → sum

3.1.2 Creating a Service Server

A service server listens for requests and executes a callback function:

add_two_ints_server.py
Intermediate⏱ ~20 min
1import rclpy
2from rclpy.node import Node
3from example_interfaces.srv import AddTwoInts
4
5class AddTwoIntsServer(Node):
6 def __init__(self):
7 super().__init__('add_two_ints_server')
8
9 # Create service: service type, service name, callback function
10 self.srv = self.create_service(
11 AddTwoInts,
12 'add_two_ints',
13 self.add_two_ints_callback
14 )
15
16 self.get_logger().info('Add Two Ints Server started')
17
18 def add_two_ints_callback(self, request, response):
19 """Service callback: receives request, populates response"""
20 response.sum = request.a + request.b
21 self.get_logger().info(
22 f'Incoming request: a={request.a}, b={request.b}'
23 )
24 self.get_logger().info(f'Sending response: sum={response.sum}')
25 return response
26
27def main(args=None):
28 rclpy.init(args=args)
29 node = AddTwoIntsServer()
30 rclpy.spin(node)
31 node.destroy_node()
32 rclpy.shutdown()
33
34if __name__ == '__main__':
35 main()

Service Server API:

self.srv = self.create_service(
srv_type=AddTwoInts, # Service type (from .srv file)
srv_name='add_two_ints', # Service name (string)
callback=self.callback_function # Function to handle requests
)

Callback Signature:

def callback(self, request, response):
# request: object with fields from request part of .srv
# response: object with fields from response part of .srv
response.field1 = compute_value()
return response # MUST return response object

3.1.3 Creating a Service Client

A service client sends requests and waits for responses:

add_two_ints_client.py
Intermediate⏱ ~20 min
1import sys
2import rclpy
3from rclpy.node import Node
4from example_interfaces.srv import AddTwoInts
5
6class AddTwoIntsClient(Node):
7 def __init__(self):
8 super().__init__('add_two_ints_client')
9
10 # Create client: service type, service name
11 self.client = self.create_client(AddTwoInts, 'add_two_ints')
12
13 # Wait for service to be available (timeout after 5 seconds)
14 while not self.client.wait_for_service(timeout_sec=5.0):
15 self.get_logger().info('Service not available, waiting...')
16
17 self.req = AddTwoInts.Request()
18
19 def send_request(self, a, b):
20 """Send async request and return future"""
21 self.req.a = a
22 self.req.b = b
23 self.future = self.client.call_async(self.req)
24 return self.future
25
26def main(args=None):
27 rclpy.init(args=args)
28
29 if len(sys.argv) != 3:
30 print('Usage: python3 add_two_ints_client.py <a> <b>')
31 return
32
33 client_node = AddTwoIntsClient()
34 future = client_node.send_request(int(sys.argv[1]), int(sys.argv[2]))
35
36 # Spin until response received
37 rclpy.spin_until_future_complete(client_node, future)
38
39 if future.result() is not None:
40 response = future.result()
41 client_node.get_logger().info(
42 f'Result: {sys.argv[1]} + {sys.argv[2]} = {response.sum}'
43 )
44 else:
45 client_node.get_logger().error('Service call failed')
46
47 client_node.destroy_node()
48 rclpy.shutdown()
49
50if __name__ == '__main__':
51 main()

Service Client API:

# 1. Create client
self.client = self.create_client(AddTwoInts, 'add_two_ints')

# 2. Wait for service availability
self.client.wait_for_service(timeout_sec=5.0)

# 3. Create request object
request = AddTwoInts.Request()
request.a = 10
request.b = 20

# 4. Call service asynchronously
future = self.client.call_async(request)

# 5. Wait for response
rclpy.spin_until_future_complete(node, future)
response = future.result()

3.1.4 Service Communication Flow

sequenceDiagram
participant Client as Service Client<br/>(AddTwoIntsClient)
participant Server as Service Server<br/>(AddTwoIntsServer)

Client->>Client: create_client()
Client->>Server: wait_for_service()
Note over Server: Server already running

Client->>Server: call_async(Request{a=5, b=3})
Note over Client: Non-blocking call,<br/>returns Future

activate Server
Server->>Server: add_two_ints_callback()
Server->>Server: Compute: 5 + 3 = 8
Server->>Client: Response{sum=8}
deactivate Server

Client->>Client: future.result() = 8
Note over Client: Blocking wait for Future

3.2 Actions: Long-Running Tasks with Feedback

Services are great for quick computations, but what about tasks that take seconds or minutes? Actions extend the service pattern with:

  • Feedback: Server sends progress updates during execution
  • Cancellation: Client can cancel the goal mid-execution
  • Result: Final outcome when task completes

3.2.1 Action Definitions

Actions use .action files with three parts:

# example_interfaces/action/Fibonacci.action
# Goal
int32 order
---
# Result
int32[] sequence
---
# Feedback
int32[] partial_sequence

Three sections:

  1. Goal: What the client requests (e.g., "navigate to (x, y)")
  2. Result: Final outcome (e.g., "reached goal" or "failed - obstacle")
  3. Feedback: Periodic updates (e.g., "50% complete, 2.3m remaining")

3.2.2 Creating an Action Server

fibonacci_action_server.py
Advanced⏱ ~30 min
1import time
2import rclpy
3from rclpy.node import Node
4from rclpy.action import ActionServer
5from example_interfaces.action import Fibonacci
6
7class FibonacciActionServer(Node):
8 def __init__(self):
9 super().__init__('fibonacci_action_server')
10
11 # Create action server: action type, action name, execute callback
12 self._action_server = ActionServer(
13 self,
14 Fibonacci,
15 'fibonacci',
16 self.execute_callback
17 )
18
19 self.get_logger().info('Fibonacci Action Server started')
20
21 def execute_callback(self, goal_handle):
22 """Execute the action: generate Fibonacci sequence"""
23 self.get_logger().info('Executing goal...')
24
25 feedback_msg = Fibonacci.Feedback()
26 feedback_msg.partial_sequence = [0, 1]
27
28 # Generate Fibonacci sequence up to order N
29 for i in range(1, goal_handle.request.order):
30 # Check if cancellation requested
31 if goal_handle.is_cancel_requested:
32 goal_handle.canceled()
33 self.get_logger().info('Goal canceled')
34 return Fibonacci.Result()
35
36 # Compute next Fibonacci number
37 feedback_msg.partial_sequence.append(
38 feedback_msg.partial_sequence[i] +
39 feedback_msg.partial_sequence[i - 1]
40 )
41
42 # Publish feedback
43 self.get_logger().info(
44 f'Feedback: {feedback_msg.partial_sequence}'
45 )
46 goal_handle.publish_feedback(feedback_msg)
47
48 time.sleep(1) # Simulate long-running computation
49
50 # Mark goal as succeeded and return result
51 goal_handle.succeed()
52
53 result = Fibonacci.Result()
54 result.sequence = feedback_msg.partial_sequence
55 self.get_logger().info(f'Result: {result.sequence}')
56 return result
57
58def main(args=None):
59 rclpy.init(args=args)
60 node = FibonacciActionServer()
61 rclpy.spin(node)
62 node.destroy_node()
63 rclpy.shutdown()
64
65if __name__ == '__main__':
66 main()

Action Server API:

self._action_server = ActionServer(
node=self, # Parent node
action_type=Fibonacci, # Action type
action_name='fibonacci', # Action name
execute_callback=self.execute # Callback function
)

def execute_callback(self, goal_handle):
# 1. Access goal: goal_handle.request.field
order = goal_handle.request.order

# 2. Publish feedback periodically
feedback = Fibonacci.Feedback()
goal_handle.publish_feedback(feedback)

# 3. Check for cancellation
if goal_handle.is_cancel_requested:
goal_handle.canceled()
return Fibonacci.Result()

# 4. Mark as succeeded and return result
goal_handle.succeed()
result = Fibonacci.Result()
return result

3.2.3 Creating an Action Client

fibonacci_action_client.py
Advanced⏱ ~25 min
1import rclpy
2from rclpy.node import Node
3from rclpy.action import ActionClient
4from example_interfaces.action import Fibonacci
5
6class FibonacciActionClient(Node):
7 def __init__(self):
8 super().__init__('fibonacci_action_client')
9
10 # Create action client
11 self._action_client = ActionClient(
12 self,
13 Fibonacci,
14 'fibonacci'
15 )
16
17 def send_goal(self, order):
18 """Send goal and register callbacks"""
19 goal_msg = Fibonacci.Goal()
20 goal_msg.order = order
21
22 self.get_logger().info('Waiting for action server...')
23 self._action_client.wait_for_server()
24
25 self.get_logger().info(f'Sending goal: order={order}')
26
27 # Send goal with feedback callback
28 self._send_goal_future = self._action_client.send_goal_async(
29 goal_msg,
30 feedback_callback=self.feedback_callback
31 )
32
33 # Register callback for goal acceptance
34 self._send_goal_future.add_done_callback(self.goal_response_callback)
35
36 def goal_response_callback(self, future):
37 """Called when server accepts/rejects goal"""
38 goal_handle = future.result()
39
40 if not goal_handle.accepted:
41 self.get_logger().info('Goal rejected')
42 return
43
44 self.get_logger().info('Goal accepted')
45
46 # Get result asynchronously
47 self._get_result_future = goal_handle.get_result_async()
48 self._get_result_future.add_done_callback(self.get_result_callback)
49
50 def feedback_callback(self, feedback_msg):
51 """Called periodically during execution"""
52 feedback = feedback_msg.feedback
53 self.get_logger().info(
54 f'Received feedback: {feedback.partial_sequence}'
55 )
56
57 def get_result_callback(self, future):
58 """Called when action completes"""
59 result = future.result().result
60 self.get_logger().info(f'Result: {result.sequence}')
61 rclpy.shutdown()
62
63def main(args=None):
64 rclpy.init(args=args)
65 action_client = FibonacciActionClient()
66 action_client.send_goal(10)
67 rclpy.spin(action_client)
68
69if __name__ == '__main__':
70 main()

3.2.4 Action Communication Flow

sequenceDiagram
participant Client as Action Client
participant Server as Action Server

Client->>Server: send_goal_async(Goal{order=10})
Server->>Client: Goal Accepted

loop Every computation step
Server->>Server: Compute next Fibonacci
Server->>Client: Feedback{partial_sequence}
Note over Client: feedback_callback()
end

alt Goal Succeeded
Server->>Client: Result{sequence=[0,1,1,2,3,5,...]}
Note over Client: get_result_callback()
else Goal Canceled
Client->>Server: cancel_goal()
Server->>Client: Result{} (empty)
end

3.3 Parameters: Runtime Configuration

Parameters allow you to configure node behavior at runtime without restarting the node.

3.3.1 Declaring and Using Parameters

parameter_example.py
Beginner⏱ ~15 min
1import rclpy
2from rclpy.node import Node
3
4class ParameterNode(Node):
5 def __init__(self):
6 super().__init__('parameter_node')
7
8 # Declare parameters with default values
9 self.declare_parameter('robot_name', 'atlas')
10 self.declare_parameter('max_speed', 1.5) # m/s
11 self.declare_parameter('debug_mode', False)
12
13 # Get parameter values
14 robot_name = self.get_parameter('robot_name').value
15 max_speed = self.get_parameter('max_speed').value
16 debug_mode = self.get_parameter('debug_mode').value
17
18 self.get_logger().info(f'Robot: {robot_name}')
19 self.get_logger().info(f'Max Speed: {max_speed} m/s')
20 self.get_logger().info(f'Debug Mode: {debug_mode}')
21
22 # Use parameters in node logic
23 self.timer = self.create_timer(
24 1.0,
25 lambda: self.get_logger().info(
26 f'Max speed: {self.get_parameter("max_speed").value}'
27 )
28 )
29
30def main(args=None):
31 rclpy.init(args=args)
32 node = ParameterNode()
33 rclpy.spin(node)
34 node.destroy_node()
35 rclpy.shutdown()
36
37if __name__ == '__main__':
38 main()

Setting Parameters at Launch:

# Command line
ros2 run my_package parameter_node --ros-args \
-p robot_name:=optimus \
-p max_speed:=2.0 \
-p debug_mode:=true

# Inspect parameters
ros2 param list
ros2 param get /parameter_node robot_name
ros2 param set /parameter_node max_speed 3.0

3.3.2 Dynamic Parameter Updates with Callbacks

parameter_callback_example.py
Intermediate
1import rclpy
2from rclpy.node import Node
3from rcl_interfaces.msg import SetParametersResult
4
5class DynamicParameterNode(Node):
6 def __init__(self):
7 super().__init__('dynamic_parameter_node')
8
9 self.declare_parameter('threshold', 50.0)
10
11 # Register parameter callback
12 self.add_on_set_parameters_callback(self.parameter_callback)
13
14 self.get_logger().info('Dynamic Parameter Node started')
15
16 def parameter_callback(self, params):
17 """Called whenever parameters are updated"""
18 for param in params:
19 if param.name == 'threshold':
20 if param.value < 0.0 or param.value > 100.0:
21 self.get_logger().warn(
22 f'Invalid threshold: {param.value} (must be 0-100)'
23 )
24 return SetParametersResult(successful=False)
25
26 self.get_logger().info(
27 f'Threshold updated: {param.value}'
28 )
29
30 return SetParametersResult(successful=True)
31
32def main(args=None):
33 rclpy.init(args=args)
34 node = DynamicParameterNode()
35 rclpy.spin(node)
36 node.destroy_node()
37 rclpy.shutdown()
38
39if __name__ == '__main__':
40 main()

3.4 AI-ROS Integration Patterns

Now let's bridge everything together: integrating AI decision logic with ROS 2 communication.

3.4.1 AI Agent Pattern

ai_robot_controller.py
Advanced⏱ ~40 min
1import rclpy
2from rclpy.node import Node
3from geometry_msgs.msg import Twist
4from sensor_msgs.msg import LaserScan
5from enum import Enum
6import math
7
8class RobotState(Enum):
9 """Finite state machine states"""
10 FORWARD = 1
11 TURNING_LEFT = 2
12 TURNING_RIGHT = 3
13 STOPPED = 4
14
15class ObstacleAvoidanceAI:
16 """Pure Python AI agent - no ROS dependencies"""
17
18 def __init__(self, obstacle_distance_threshold=0.5):
19 self.state = RobotState.FORWARD
20 self.obstacle_threshold = obstacle_distance_threshold
21
22 def decide_action(self, laser_data):
23 """
24 AI decision logic: analyze sensor data, return velocity command
25
26 Args:
27 laser_data: dict with 'front', 'left', 'right' distances
28
29 Returns:
30 dict: {'linear': float, 'angular': float}
31 """
32 front_dist = laser_data.get('front', float('inf'))
33 left_dist = laser_data.get('left', float('inf'))
34 right_dist = laser_data.get('right', float('inf'))
35
36 # State machine logic
37 if front_dist < self.obstacle_threshold:
38 # Obstacle ahead - turn towards open side
39 if left_dist > right_dist:
40 self.state = RobotState.TURNING_LEFT
41 return {'linear': 0.0, 'angular': 0.5}
42 else:
43 self.state = RobotState.TURNING_RIGHT
44 return {'linear': 0.0, 'angular': -0.5}
45 else:
46 # Clear path - move forward
47 self.state = RobotState.FORWARD
48 return {'linear': 0.3, 'angular': 0.0}
49
50class AIRobotController(Node):
51 """ROS 2 wrapper for AI agent"""
52
53 def __init__(self):
54 super().__init__('ai_robot_controller')
55
56 # Initialize AI agent (pure Python)
57 self.ai_agent = ObstacleAvoidanceAI(obstacle_distance_threshold=0.5)
58
59 # ROS 2 communication
60 self.cmd_vel_pub = self.create_publisher(Twist, 'cmd_vel', 10)
61 self.laser_sub = self.create_subscription(
62 LaserScan,
63 'scan',
64 self.laser_callback,
65 10
66 )
67
68 # Control loop timer (10 Hz decision rate)
69 self.timer = self.create_timer(0.1, self.control_loop)
70
71 self.latest_laser_data = None
72 self.get_logger().info('AI Robot Controller started')
73
74 def laser_callback(self, msg):
75 """Store latest laser scan data"""
76 # Convert LaserScan to simplified format for AI
77 ranges = msg.ranges
78 self.latest_laser_data = {
79 'front': min(ranges[0:30] + ranges[-30:]), # Front 60 degrees
80 'left': min(ranges[30:90]), # Left 60 degrees
81 'right': min(ranges[-90:-30]) # Right 60 degrees
82 }
83
84 def control_loop(self):
85 """10 Hz decision loop"""
86 if self.latest_laser_data is None:
87 return # No sensor data yet
88
89 # AI decision
90 action = self.ai_agent.decide_action(self.latest_laser_data)
91
92 # Publish command
93 cmd_msg = Twist()
94 cmd_msg.linear.x = action['linear']
95 cmd_msg.angular.z = action['angular']
96 self.cmd_vel_pub.publish(cmd_msg)
97
98 self.get_logger().info(
99 f'State: {self.ai_agent.state.name}, '
100 f'Cmd: linear={action["linear"]:.2f}, '
101 f'angular={action["angular"]:.2f}'
102 )
103
104def main(args=None):
105 rclpy.init(args=args)
106 node = AIRobotController()
107 rclpy.spin(node)
108 node.destroy_node()
109 rclpy.shutdown()
110
111if __name__ == '__main__':
112 main()

3.5 Hands-On Exercise: Implement a Calculator Service

Key Takeaways

✓ Core Concepts Mastered

  • Services provide synchronous request-response communication for infrequent operations like computations, triggers, and queries
  • Actions extend services with feedback and cancellation for long-running tasks (> 1 second) like navigation and manipulation
  • Action servers publish periodic feedback during execution, allowing clients to monitor progress and cancel goals
  • Parameters enable runtime configuration with dynamic updates through callbacks and validation
  • AI-ROS integration follows separation of concerns with pure Python AI logic wrapped in ROS nodes
  • Async service calls (call_async) are preferred over blocking calls to prevent node deadlocks
  • Real-time control requires < 100ms decision cycles with profiling and optimization for AI inference latency

What's Next?

In Chapter 4: URDF Robot Modeling, you'll learn to define robot structure:

  • URDF syntax for links (rigid bodies) and joints (connections)
  • Visual and collision geometry for simulation
  • Kinematic chains for humanoid robots
  • RViz2 visualization with joint_state_publisher

Prerequisite Check: Before proceeding, ensure you can:

  • ✅ Create a service server and client for a custom service type
  • ✅ Explain when to use services vs. actions
  • ✅ Implement parameter validation with callbacks
  • ✅ Separate AI decision logic from ROS communication code

Next: Chapter 4: URDF Robot Modeling (coming soon)