Week 13: Conversational Robotics
Enable robots to engage in natural, contextual conversations using large language models, speech recognition, and multi-modal understanding.
Introduction to Conversational AI for Robotics
Conversational robotics combines natural language processing (NLP), speech technologies, and embodied AI to create robots that communicate like humans while understanding physical context.
Why Conversational AI Matters for Robots
Natural Interaction:
Voice is the most natural interface for humans. Conversational robots lower the barrier for non-technical users.
Contextual Understanding:
Unlike chatbots, conversational robots can reference the physical world: "Pick up that red cup" requires vision and language grounding.
Task Assistance:
Guide users through complex tasks, answer questions, and provide real-time feedback during human-robot collaboration.
Accessibility:
Enable elderly, disabled, or hands-busy users to control robots through natural speech.
Challenges in Robotic Conversation
Grounding:
Mapping language to physical entities and actions in the robot's environment.
Real-Time Constraints:
Speech processing must be fast enough for natural dialogue (< 300ms response time).
Noise Robustness:
Robots operate in noisy environments with motor sounds, multiple speakers, and ambient noise.
Safety and Alignment:
LLM outputs must be validated before executing physical actions to prevent harm.
Integrating GPT Models for Conversational AI
Large Language Models like GPT-4 provide powerful natural language understanding and generation capabilities.
Setting Up OpenAI GPT Integration
import openai
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import os
class ConversationalRobot(Node):
def __init__(self):
super().__init__('conversational_robot')
# OpenAI API setup
openai.api_key = os.getenv('OPENAI_API_KEY')
# Conversation history
self.conversation_history = [
{
"role": "system",
"content": """You are a helpful humanoid robot assistant named Atlas.
You can see, move, and manipulate objects. Keep responses concise and actionable.
When asked to perform physical tasks, respond with structured commands."""
}
]
# ROS 2 interfaces
self.speech_sub = self.create_subscription(
String, '/speech/recognized', self.handle_speech, 10)
self.response_pub = self.create_publisher(
String, '/speech/response', 10)
self.action_pub = self.create_publisher(
String, '/robot/action_command', 10)
self.get_logger().info('Conversational robot initialized')
def handle_speech(self, msg):
"""Process recognized speech with GPT"""
user_input = msg.data
self.get_logger().info(f'User said: "{user_input}"')
# Add to conversation history
self.conversation_history.append({
"role": "user",
"content": user_input
})
# Get GPT response
response = self.get_gpt_response()
# Parse and execute
text_response, action = self.parse_response(response)
# Publish response for TTS
response_msg = String()
response_msg.data = text_response
self.response_pub.publish(response_msg)
# Execute action if present
if action:
self.execute_action(action)
def get_gpt_response(self):
"""Query GPT-4 for response"""
try:
response = openai.ChatCompletion.create(
model="gpt-4",
messages=self.conversation_history,
max_tokens=150,
temperature=0.7
)
assistant_message = response.choices[0].message['content']
# Add to history
self.conversation_history.append({
"role": "assistant",
"content": assistant_message
})
# Keep conversation history manageable (last 10 exchanges)
if len(self.conversation_history) > 21: # system + 20 messages
self.conversation_history = [self.conversation_history[0]] + \
self.conversation_history[-20:]
return assistant_message
except Exception as e:
self.get_logger().error(f'GPT API error: {e}')
return "I'm having trouble understanding right now."
def parse_response(self, response):
"""Extract text and action commands from GPT response"""
# Check if response contains action command
# Format: "I'll do that. [ACTION: navigate_to(kitchen)]"
if '[ACTION:' in response:
parts = response.split('[ACTION:')
text = parts[0].strip()
action = parts[1].split(']')[0].strip()
return text, action
else:
return response, None
def execute_action(self, action_command):
"""Execute robot action"""
self.get_logger().info(f'Executing action: {action_command}')
action_msg = String()
action_msg.data = action_command
self.action_pub.publish(action_msg)
def main(args=None):
rclpy.init(args=args)
robot = ConversationalRobot()
rclpy.spin(robot)
robot.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Function Calling for Robot Actions
GPT models support function calling to structure robot commands.
import json
class StructuredActionRobot(ConversationalRobot):
def __init__(self):
super().__init__()
# Define available robot functions
self.functions = [
{
"name": "navigate_to_location",
"description": "Move the robot to a specified location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "Target location (e.g., 'kitchen', 'bedroom')"
}
},
"required": ["location"]
}
},
{
"name": "pick_object",
"description": "Pick up an object",
"parameters": {
"type": "object",
"properties": {
"object_name": {
"type": "string",
"description": "Name of object to pick"
},
"color": {
"type": "string",
"description": "Color of the object (optional)"
}
},
"required": ["object_name"]
}
},
{
"name": "place_object",
"description": "Place held object at location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "Where to place the object"
}
},
"required": ["location"]
}
}
]
def get_gpt_response_with_functions(self):
"""Get GPT response with function calling"""
try:
response = openai.ChatCompletion.create(
model="gpt-4",
messages=self.conversation_history,
functions=self.functions,
function_call="auto"
)
message = response.choices[0].message
# Check if GPT wants to call a function
if message.get("function_call"):
function_name = message["function_call"]["name"]
arguments = json.loads(message["function_call"]["arguments"])
self.get_logger().info(
f'GPT calling function: {function_name} with {arguments}')
# Execute the function
result = self.execute_robot_function(function_name, arguments)
# Add function result to conversation
self.conversation_history.append({
"role": "function",
"name": function_name,
"content": json.dumps(result)
})
# Get final response
final_response = openai.ChatCompletion.create(
model="gpt-4",
messages=self.conversation_history
)
return final_response.choices[0].message['content']
else:
return message['content']
except Exception as e:
self.get_logger().error(f'Error: {e}')
return "I encountered an error."
def execute_robot_function(self, function_name, arguments):
"""Execute robot action and return result"""
if function_name == "navigate_to_location":
location = arguments["location"]
# Call navigation system
self.get_logger().info(f'Navigating to {location}')
return {"status": "success", "message": f"Arrived at {location}"}
elif function_name == "pick_object":
obj = arguments["object_name"]
color = arguments.get("color", "")
self.get_logger().info(f'Picking {color} {obj}')
return {"status": "success", "message": f"Picked up {obj}"}
elif function_name == "place_object":
location = arguments["location"]
self.get_logger().info(f'Placing object at {location}')
return {"status": "success", "message": f"Placed at {location}"}
return {"status": "error", "message": "Unknown function"}
Speech Recognition and Synthesis
Speech-to-Text with Whisper
OpenAI's Whisper provides state-of-the-art speech recognition with multilingual support.
import whisper
import pyaudio
import wave
import numpy as np
from rclpy.node import Node
from std_msgs.msg import String
class WhisperSpeechRecognition(Node):
def __init__(self):
super().__init__('speech_recognition')
# Load Whisper model
self.get_logger().info('Loading Whisper model...')
self.model = whisper.load_model("base") # Options: tiny, base, small, medium, large
self.get_logger().info('Whisper model loaded')
# Audio settings
self.RATE = 16000
self.CHUNK = 1024
self.CHANNELS = 1
# Publisher
self.speech_pub = self.create_publisher(
String, '/speech/recognized', 10)
# Start listening
self.start_listening()
def start_listening(self):
"""Continuously listen and transcribe"""
audio_interface = pyaudio.PyAudio()
stream = audio_interface.open(
format=pyaudio.paInt16,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK
)
self.get_logger().info('Listening...')
frames = []
silence_threshold = 500
silence_duration = 0
max_silence = 2.0 # seconds
while rclpy.ok():
data = stream.read(self.CHUNK)
audio_data = np.frombuffer(data, dtype=np.int16)
# Voice activity detection (simple)
if np.abs(audio_data).mean() > silence_threshold:
frames.append(data)
silence_duration = 0
else:
silence_duration += self.CHUNK / self.RATE
# If silence after speech, transcribe
if len(frames) > 0 and silence_duration > max_silence:
self.transcribe_audio(frames)
frames = []
silence_duration = 0
def transcribe_audio(self, frames):
"""Transcribe recorded audio with Whisper"""
# Convert to numpy array
audio_data = np.frombuffer(b''.join(frames), dtype=np.int16)
audio_float = audio_data.astype(np.float32) / 32768.0
# Transcribe
result = self.model.transcribe(
audio_float,
language='en',
task='transcribe'
)
text = result['text'].strip()
if text:
self.get_logger().info(f'Recognized: "{text}"')
# Publish
msg = String()
msg.data = text
self.speech_pub.publish(msg)
Text-to-Speech (TTS)
from gtts import gTTS
import pygame
import tempfile
import os
class TextToSpeech(Node):
def __init__(self):
super().__init__('text_to_speech')
# Initialize pygame mixer for audio playback
pygame.mixer.init()
# Subscribe to response text
self.subscription = self.create_subscription(
String, '/speech/response', self.speak, 10)
self.get_logger().info('TTS initialized')
def speak(self, msg):
"""Convert text to speech and play"""
text = msg.data
self.get_logger().info(f'Speaking: "{text}"')
try:
# Generate speech
tts = gTTS(text=text, lang='en', slow=False)
# Save to temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as fp:
temp_filename = fp.name
tts.save(temp_filename)
# Play audio
pygame.mixer.music.load(temp_filename)
pygame.mixer.music.play()
# Wait for playback to finish
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10)
# Cleanup
os.unlink(temp_filename)
except Exception as e:
self.get_logger().error(f'TTS error: {e}')
Natural Language Understanding
Intent Classification
from transformers import pipeline
import torch
class IntentClassifier(Node):
def __init__(self):
super().__init__('intent_classifier')
# Load zero-shot classification model
self.classifier = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli",
device=0 if torch.cuda.is_available() else -1
)
# Define robot intents
self.intents = [
"navigation",
"object_manipulation",
"information_query",
"greeting",
"confirmation",
"cancellation"
]
self.speech_sub = self.create_subscription(
String, '/speech/recognized', self.classify_intent, 10)
def classify_intent(self, msg):
"""Classify user utterance into intent"""
text = msg.data
# Classify
result = self.classifier(text, self.intents)
top_intent = result['labels'][0]
confidence = result['scores'][0]
self.get_logger().info(
f'Intent: {top_intent} (confidence: {confidence:.2f})')
# Route to appropriate handler
if top_intent == "navigation" and confidence > 0.7:
self.handle_navigation(text)
elif top_intent == "object_manipulation" and confidence > 0.7:
self.handle_manipulation(text)
# ... etc
def handle_navigation(self, text):
"""Process navigation intent"""
self.get_logger().info(f'Processing navigation: {text}')
# Extract location, plan path, execute
def handle_manipulation(self, text):
"""Process manipulation intent"""
self.get_logger().info(f'Processing manipulation: {text}')
# Extract object, grasp, move
Slot Filling with NER
from transformers import AutoTokenizer, AutoModelForTokenClassification
from transformers import pipeline
class EntityExtractor(Node):
def __init__(self):
super().__init__('entity_extractor')
# Named Entity Recognition model
self.ner_pipeline = pipeline(
"ner",
model="dslim/bert-base-NER",
aggregation_strategy="simple"
)
def extract_entities(self, text):
"""Extract entities from text"""
entities = self.ner_pipeline(text)
# Parse results
extracted = {
'locations': [],
'objects': [],
'persons': []
}
for entity in entities:
if entity['entity_group'] == 'LOC':
extracted['locations'].append(entity['word'])
elif entity['entity_group'] == 'PER':
extracted['persons'].append(entity['word'])
# Add custom logic for objects
return extracted
def parse_command(self, text):
"""Parse command with custom slot filling"""
# Example: "Bring me the red cup from the kitchen"
entities = self.extract_entities(text)
# Extract color, object, location
command = {
'action': None,
'object': None,
'color': None,
'location': None
}
# Action detection
if 'bring' in text.lower() or 'get' in text.lower():
command['action'] = 'fetch'
elif 'put' in text.lower() or 'place' in text.lower():
command['action'] = 'place'
# Color extraction (simple)
colors = ['red', 'blue', 'green', 'yellow', 'black', 'white']
for color in colors:
if color in text.lower():
command['color'] = color
break
# Object (simplified - use object detection instead)
objects = ['cup', 'bottle', 'book', 'pen', 'phone']
for obj in objects:
if obj in text.lower():
command['object'] = obj
break
# Location from NER
if entities['locations']:
command['location'] = entities['locations'][0]
return command
Multi-Modal Interaction
Vision-Language Grounding
Grounding language to visual perception: "Pick up the red cup" requires identifying which object is the red cup.
import torch
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import numpy as np
class VisionLanguageGrounding(Node):
def __init__(self):
super().__init__('vision_language_grounding')
# Load CLIP model
self.model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
self.processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
# Camera subscriber
self.image_sub = self.create_subscription(
Image, '/camera/rgb', self.image_callback, 10)
self.current_image = None
def image_callback(self, msg):
"""Store latest camera image"""
# Convert ROS Image to PIL Image
# (implementation depends on message format)
self.current_image = self.ros_to_pil(msg)
def ground_object(self, object_description, bounding_boxes, image_crops):
"""
Find which bounding box matches the description
Args:
object_description: e.g., "red cup"
bounding_boxes: List of detected object bounding boxes
image_crops: List of cropped images for each box
Returns:
Index of best matching box
"""
# Prepare inputs
inputs = self.processor(
text=[object_description],
images=image_crops,
return_tensors="pt",
padding=True
)
# Get similarity scores
outputs = self.model(**inputs)
logits_per_image = outputs.logits_per_image
probs = logits_per_image.softmax(dim=0)
# Best match
best_idx = probs.argmax().item()
confidence = probs[best_idx].item()
self.get_logger().info(
f'Grounded "{object_description}" to box {best_idx} '
f'(confidence: {confidence:.2f})')
return best_idx, confidence
def ros_to_pil(self, ros_image):
"""Convert ROS Image to PIL Image"""
# Placeholder - implement based on your message format
pass
Multi-Modal Fusion: Speech + Gesture
Combine speech commands with pointing gestures for intuitive interaction.
class MultiModalFusion(Node):
def __init__(self):
super().__init__('multimodal_fusion')
# Subscribers
self.speech_sub = self.create_subscription(
String, '/speech/recognized', self.speech_callback, 10)
self.gesture_sub = self.create_subscription(
PointStamped, '/gesture/pointing_direction', self.gesture_callback, 10)
# State
self.last_speech = None
self.last_gesture = None
self.last_speech_time = None
self.last_gesture_time = None
# Fusion window (both must occur within this time)
self.fusion_window = 2.0 # seconds
def speech_callback(self, msg):
"""Record speech command"""
self.last_speech = msg.data
self.last_speech_time = self.get_clock().now()
self.try_fusion()
def gesture_callback(self, msg):
"""Record pointing gesture"""
self.last_gesture = msg.point
self.last_gesture_time = self.get_clock().now()
self.try_fusion()
def try_fusion(self):
"""Attempt to fuse speech and gesture"""
if self.last_speech is None or self.last_gesture is None:
return
# Check if both are recent
time_diff = abs(
(self.last_speech_time - self.last_gesture_time).nanoseconds / 1e9
)
if time_diff < self.fusion_window:
# Fuse!
self.process_multimodal_command(
self.last_speech, self.last_gesture
)
# Reset
self.last_speech = None
self.last_gesture = None
def process_multimodal_command(self, speech, pointing_direction):
"""
Process combined command
Example: "Pick up that" + [pointing at object]
"""
self.get_logger().info(
f'Multi-modal command: "{speech}" + pointing at {pointing_direction}')
# Check for deictic references (that, this, there)
deictic_words = ['that', 'this', 'there', 'here']
has_deictic = any(word in speech.lower() for word in deictic_words)
if has_deictic:
# Use pointing to resolve reference
target_object = self.find_object_in_direction(pointing_direction)
# Extract action from speech
if 'pick' in speech.lower():
self.execute_pick(target_object)
elif 'go' in speech.lower() or 'move' in speech.lower():
self.execute_navigate(pointing_direction)
def find_object_in_direction(self, direction):
"""Find object in pointing direction using vision"""
# Raycast in pointing direction
# Return closest object
pass
def execute_pick(self, target):
"""Execute pick action"""
self.get_logger().info(f'Executing pick: {target}')
def execute_navigate(self, direction):
"""Navigate in pointed direction"""
self.get_logger().info(f'Navigating toward: {direction}')
Attention and Gaze Control
class GazeController(Node):
def __init__(self):
super().__init__('gaze_controller')
# Head/eye control publisher
self.gaze_pub = self.create_publisher(
JointState, '/head_controller/command', 10)
# Face tracking
self.face_sub = self.create_subscription(
DetectionArray, '/face_detections', self.face_callback, 10)
# Speaking indicator
self.speaking_sub = self.create_subscription(
Bool, '/robot/is_speaking', self.speaking_callback, 10)
self.is_speaking = False
self.current_speaker = None
def face_callback(self, msg):
"""Track detected faces"""
if not msg.detections:
return
# Find closest/most prominent face
largest_face = max(msg.detections, key=lambda d: d.bbox.size)
# If robot is listening, look at speaker
if not self.is_speaking:
self.look_at_face(largest_face)
self.current_speaker = largest_face
def speaking_callback(self, msg):
"""Update speaking status"""
self.is_speaking = msg.data
# When starting to speak, look at the person we're addressing
if self.is_speaking and self.current_speaker:
self.look_at_face(self.current_speaker)
def look_at_face(self, face_detection):
"""Control head to look at face"""
# Compute head pan/tilt to center face in view
image_center_x = 640 # Example resolution
image_center_y = 480
face_center_x = face_detection.bbox.center.x
face_center_y = face_detection.bbox.center.y
# Simple proportional control
pan_error = (face_center_x - image_center_x) / image_center_x
tilt_error = (face_center_y - image_center_y) / image_center_y
# Create joint command
pan_angle = pan_error * 0.5 # Scale to radians
tilt_angle = -tilt_error * 0.3
joint_cmd = JointState()
joint_cmd.name = ['head_pan', 'head_tilt']
joint_cmd.position = [pan_angle, tilt_angle]
self.gaze_pub.publish(joint_cmd)
Dialogue Management
Conversational Context Tracking
class DialogueManager(Node):
def __init__(self):
super().__init__('dialogue_manager')
# Dialogue state
self.current_task = None
self.task_stage = None
self.slots = {}
# Conversation history
self.history = []
def handle_utterance(self, text):
"""Process user utterance with context"""
# Add to history
self.history.append({'speaker': 'user', 'text': text})
# Check if continuing previous task
if self.current_task:
response = self.continue_task(text)
else:
response = self.start_new_task(text)
# Add response to history
self.history.append({'speaker': 'robot', 'text': response})
return response
def start_new_task(self, text):
"""Initialize new task"""
# Classify task type
if 'bring' in text.lower() or 'fetch' in text.lower():
self.current_task = 'fetch'
self.task_stage = 'object_identification'
self.slots = {'object': None, 'location': None}
# Extract what we can
# ... (use NER/slot filling)
# Ask for missing information
if not self.slots['object']:
return "What would you like me to bring?"
else:
return f"I'll bring you the {self.slots['object']}."
# ... other task types
def continue_task(self, text):
"""Continue active task"""
if self.current_task == 'fetch':
if self.task_stage == 'object_identification':
self.slots['object'] = text
self.task_stage = 'execution'
return f"Okay, I'll get the {text}."
return "I'm working on it."
def complete_task(self):
"""Reset after task completion"""
self.current_task = None
self.task_stage = None
self.slots = {}
Practical Exercise
Week 13 Assignment: Complete Conversational Robot
Objective: Build an end-to-end conversational robot system
Tasks:
- Integrate GPT-4 for natural language understanding and response generation
- Implement speech recognition using Whisper (or alternative)
- Add text-to-speech for robot responses
- Create vision-language grounding to identify objects from descriptions
- Implement multi-modal fusion combining speech and pointing gestures
- Design dialogue manager for multi-turn conversations
- Add safety filtering to prevent dangerous commands
Deliverable:
Complete ROS 2 package demonstrating:
- Natural conversation with context tracking
- Voice-controlled object manipulation: "Bring me that red cup" while pointing
- Multi-turn dialogue for complex tasks
- Video demonstration with analysis of response times and accuracy
Evaluation Criteria:
- Response latency < 2 seconds
- Intent classification accuracy > 85%
- Successful object grounding in 90%+ of cases
- Natural, contextual responses
Next: Week 14 will cover deployment, safety systems, and preparing robots for real-world operation.