Skip to main content

Week 13: Conversational Robotics

Enable robots to engage in natural, contextual conversations using large language models, speech recognition, and multi-modal understanding.

Introduction to Conversational AI for Robotics

Conversational robotics combines natural language processing (NLP), speech technologies, and embodied AI to create robots that communicate like humans while understanding physical context.

Why Conversational AI Matters for Robots

Natural Interaction:
Voice is the most natural interface for humans. Conversational robots lower the barrier for non-technical users.

Contextual Understanding:
Unlike chatbots, conversational robots can reference the physical world: "Pick up that red cup" requires vision and language grounding.

Task Assistance:
Guide users through complex tasks, answer questions, and provide real-time feedback during human-robot collaboration.

Accessibility:
Enable elderly, disabled, or hands-busy users to control robots through natural speech.

Challenges in Robotic Conversation

Grounding:
Mapping language to physical entities and actions in the robot's environment.

Real-Time Constraints:
Speech processing must be fast enough for natural dialogue (< 300ms response time).

Noise Robustness:
Robots operate in noisy environments with motor sounds, multiple speakers, and ambient noise.

Safety and Alignment:
LLM outputs must be validated before executing physical actions to prevent harm.

Integrating GPT Models for Conversational AI

Large Language Models like GPT-4 provide powerful natural language understanding and generation capabilities.

Setting Up OpenAI GPT Integration

import openai
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import os

class ConversationalRobot(Node):
def __init__(self):
super().__init__('conversational_robot')

# OpenAI API setup
openai.api_key = os.getenv('OPENAI_API_KEY')

# Conversation history
self.conversation_history = [
{
"role": "system",
"content": """You are a helpful humanoid robot assistant named Atlas.
You can see, move, and manipulate objects. Keep responses concise and actionable.
When asked to perform physical tasks, respond with structured commands."""
}
]

# ROS 2 interfaces
self.speech_sub = self.create_subscription(
String, '/speech/recognized', self.handle_speech, 10)
self.response_pub = self.create_publisher(
String, '/speech/response', 10)
self.action_pub = self.create_publisher(
String, '/robot/action_command', 10)

self.get_logger().info('Conversational robot initialized')

def handle_speech(self, msg):
"""Process recognized speech with GPT"""
user_input = msg.data
self.get_logger().info(f'User said: "{user_input}"')

# Add to conversation history
self.conversation_history.append({
"role": "user",
"content": user_input
})

# Get GPT response
response = self.get_gpt_response()

# Parse and execute
text_response, action = self.parse_response(response)

# Publish response for TTS
response_msg = String()
response_msg.data = text_response
self.response_pub.publish(response_msg)

# Execute action if present
if action:
self.execute_action(action)

def get_gpt_response(self):
"""Query GPT-4 for response"""
try:
response = openai.ChatCompletion.create(
model="gpt-4",
messages=self.conversation_history,
max_tokens=150,
temperature=0.7
)

assistant_message = response.choices[0].message['content']

# Add to history
self.conversation_history.append({
"role": "assistant",
"content": assistant_message
})

# Keep conversation history manageable (last 10 exchanges)
if len(self.conversation_history) > 21: # system + 20 messages
self.conversation_history = [self.conversation_history[0]] + \
self.conversation_history[-20:]

return assistant_message

except Exception as e:
self.get_logger().error(f'GPT API error: {e}')
return "I'm having trouble understanding right now."

def parse_response(self, response):
"""Extract text and action commands from GPT response"""
# Check if response contains action command
# Format: "I'll do that. [ACTION: navigate_to(kitchen)]"
if '[ACTION:' in response:
parts = response.split('[ACTION:')
text = parts[0].strip()
action = parts[1].split(']')[0].strip()
return text, action
else:
return response, None

def execute_action(self, action_command):
"""Execute robot action"""
self.get_logger().info(f'Executing action: {action_command}')

action_msg = String()
action_msg.data = action_command
self.action_pub.publish(action_msg)

def main(args=None):
rclpy.init(args=args)
robot = ConversationalRobot()
rclpy.spin(robot)
robot.destroy_node()
rclpy.shutdown()

if __name__ == '__main__':
main()

Function Calling for Robot Actions

GPT models support function calling to structure robot commands.

import json

class StructuredActionRobot(ConversationalRobot):
def __init__(self):
super().__init__()

# Define available robot functions
self.functions = [
{
"name": "navigate_to_location",
"description": "Move the robot to a specified location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "Target location (e.g., 'kitchen', 'bedroom')"
}
},
"required": ["location"]
}
},
{
"name": "pick_object",
"description": "Pick up an object",
"parameters": {
"type": "object",
"properties": {
"object_name": {
"type": "string",
"description": "Name of object to pick"
},
"color": {
"type": "string",
"description": "Color of the object (optional)"
}
},
"required": ["object_name"]
}
},
{
"name": "place_object",
"description": "Place held object at location",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "Where to place the object"
}
},
"required": ["location"]
}
}
]

def get_gpt_response_with_functions(self):
"""Get GPT response with function calling"""
try:
response = openai.ChatCompletion.create(
model="gpt-4",
messages=self.conversation_history,
functions=self.functions,
function_call="auto"
)

message = response.choices[0].message

# Check if GPT wants to call a function
if message.get("function_call"):
function_name = message["function_call"]["name"]
arguments = json.loads(message["function_call"]["arguments"])

self.get_logger().info(
f'GPT calling function: {function_name} with {arguments}')

# Execute the function
result = self.execute_robot_function(function_name, arguments)

# Add function result to conversation
self.conversation_history.append({
"role": "function",
"name": function_name,
"content": json.dumps(result)
})

# Get final response
final_response = openai.ChatCompletion.create(
model="gpt-4",
messages=self.conversation_history
)

return final_response.choices[0].message['content']
else:
return message['content']

except Exception as e:
self.get_logger().error(f'Error: {e}')
return "I encountered an error."

def execute_robot_function(self, function_name, arguments):
"""Execute robot action and return result"""
if function_name == "navigate_to_location":
location = arguments["location"]
# Call navigation system
self.get_logger().info(f'Navigating to {location}')
return {"status": "success", "message": f"Arrived at {location}"}

elif function_name == "pick_object":
obj = arguments["object_name"]
color = arguments.get("color", "")
self.get_logger().info(f'Picking {color} {obj}')
return {"status": "success", "message": f"Picked up {obj}"}

elif function_name == "place_object":
location = arguments["location"]
self.get_logger().info(f'Placing object at {location}')
return {"status": "success", "message": f"Placed at {location}"}

return {"status": "error", "message": "Unknown function"}

Speech Recognition and Synthesis

Speech-to-Text with Whisper

OpenAI's Whisper provides state-of-the-art speech recognition with multilingual support.

import whisper
import pyaudio
import wave
import numpy as np
from rclpy.node import Node
from std_msgs.msg import String

class WhisperSpeechRecognition(Node):
def __init__(self):
super().__init__('speech_recognition')

# Load Whisper model
self.get_logger().info('Loading Whisper model...')
self.model = whisper.load_model("base") # Options: tiny, base, small, medium, large
self.get_logger().info('Whisper model loaded')

# Audio settings
self.RATE = 16000
self.CHUNK = 1024
self.CHANNELS = 1

# Publisher
self.speech_pub = self.create_publisher(
String, '/speech/recognized', 10)

# Start listening
self.start_listening()

def start_listening(self):
"""Continuously listen and transcribe"""
audio_interface = pyaudio.PyAudio()

stream = audio_interface.open(
format=pyaudio.paInt16,
channels=self.CHANNELS,
rate=self.RATE,
input=True,
frames_per_buffer=self.CHUNK
)

self.get_logger().info('Listening...')

frames = []
silence_threshold = 500
silence_duration = 0
max_silence = 2.0 # seconds

while rclpy.ok():
data = stream.read(self.CHUNK)
audio_data = np.frombuffer(data, dtype=np.int16)

# Voice activity detection (simple)
if np.abs(audio_data).mean() > silence_threshold:
frames.append(data)
silence_duration = 0
else:
silence_duration += self.CHUNK / self.RATE

# If silence after speech, transcribe
if len(frames) > 0 and silence_duration > max_silence:
self.transcribe_audio(frames)
frames = []
silence_duration = 0

def transcribe_audio(self, frames):
"""Transcribe recorded audio with Whisper"""
# Convert to numpy array
audio_data = np.frombuffer(b''.join(frames), dtype=np.int16)
audio_float = audio_data.astype(np.float32) / 32768.0

# Transcribe
result = self.model.transcribe(
audio_float,
language='en',
task='transcribe'
)

text = result['text'].strip()

if text:
self.get_logger().info(f'Recognized: "{text}"')

# Publish
msg = String()
msg.data = text
self.speech_pub.publish(msg)

Text-to-Speech (TTS)

from gtts import gTTS
import pygame
import tempfile
import os

class TextToSpeech(Node):
def __init__(self):
super().__init__('text_to_speech')

# Initialize pygame mixer for audio playback
pygame.mixer.init()

# Subscribe to response text
self.subscription = self.create_subscription(
String, '/speech/response', self.speak, 10)

self.get_logger().info('TTS initialized')

def speak(self, msg):
"""Convert text to speech and play"""
text = msg.data
self.get_logger().info(f'Speaking: "{text}"')

try:
# Generate speech
tts = gTTS(text=text, lang='en', slow=False)

# Save to temporary file
with tempfile.NamedTemporaryFile(delete=False, suffix='.mp3') as fp:
temp_filename = fp.name
tts.save(temp_filename)

# Play audio
pygame.mixer.music.load(temp_filename)
pygame.mixer.music.play()

# Wait for playback to finish
while pygame.mixer.music.get_busy():
pygame.time.Clock().tick(10)

# Cleanup
os.unlink(temp_filename)

except Exception as e:
self.get_logger().error(f'TTS error: {e}')

Natural Language Understanding

Intent Classification

from transformers import pipeline
import torch

class IntentClassifier(Node):
def __init__(self):
super().__init__('intent_classifier')

# Load zero-shot classification model
self.classifier = pipeline(
"zero-shot-classification",
model="facebook/bart-large-mnli",
device=0 if torch.cuda.is_available() else -1
)

# Define robot intents
self.intents = [
"navigation",
"object_manipulation",
"information_query",
"greeting",
"confirmation",
"cancellation"
]

self.speech_sub = self.create_subscription(
String, '/speech/recognized', self.classify_intent, 10)

def classify_intent(self, msg):
"""Classify user utterance into intent"""
text = msg.data

# Classify
result = self.classifier(text, self.intents)

top_intent = result['labels'][0]
confidence = result['scores'][0]

self.get_logger().info(
f'Intent: {top_intent} (confidence: {confidence:.2f})')

# Route to appropriate handler
if top_intent == "navigation" and confidence > 0.7:
self.handle_navigation(text)
elif top_intent == "object_manipulation" and confidence > 0.7:
self.handle_manipulation(text)
# ... etc

def handle_navigation(self, text):
"""Process navigation intent"""
self.get_logger().info(f'Processing navigation: {text}')
# Extract location, plan path, execute

def handle_manipulation(self, text):
"""Process manipulation intent"""
self.get_logger().info(f'Processing manipulation: {text}')
# Extract object, grasp, move

Slot Filling with NER

from transformers import AutoTokenizer, AutoModelForTokenClassification
from transformers import pipeline

class EntityExtractor(Node):
def __init__(self):
super().__init__('entity_extractor')

# Named Entity Recognition model
self.ner_pipeline = pipeline(
"ner",
model="dslim/bert-base-NER",
aggregation_strategy="simple"
)

def extract_entities(self, text):
"""Extract entities from text"""
entities = self.ner_pipeline(text)

# Parse results
extracted = {
'locations': [],
'objects': [],
'persons': []
}

for entity in entities:
if entity['entity_group'] == 'LOC':
extracted['locations'].append(entity['word'])
elif entity['entity_group'] == 'PER':
extracted['persons'].append(entity['word'])
# Add custom logic for objects

return extracted

def parse_command(self, text):
"""Parse command with custom slot filling"""
# Example: "Bring me the red cup from the kitchen"

entities = self.extract_entities(text)

# Extract color, object, location
command = {
'action': None,
'object': None,
'color': None,
'location': None
}

# Action detection
if 'bring' in text.lower() or 'get' in text.lower():
command['action'] = 'fetch'
elif 'put' in text.lower() or 'place' in text.lower():
command['action'] = 'place'

# Color extraction (simple)
colors = ['red', 'blue', 'green', 'yellow', 'black', 'white']
for color in colors:
if color in text.lower():
command['color'] = color
break

# Object (simplified - use object detection instead)
objects = ['cup', 'bottle', 'book', 'pen', 'phone']
for obj in objects:
if obj in text.lower():
command['object'] = obj
break

# Location from NER
if entities['locations']:
command['location'] = entities['locations'][0]

return command

Multi-Modal Interaction

Vision-Language Grounding

Grounding language to visual perception: "Pick up the red cup" requires identifying which object is the red cup.

import torch
from transformers import CLIPProcessor, CLIPModel
from PIL import Image
import numpy as np

class VisionLanguageGrounding(Node):
def __init__(self):
super().__init__('vision_language_grounding')

# Load CLIP model
self.model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32")
self.processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# Camera subscriber
self.image_sub = self.create_subscription(
Image, '/camera/rgb', self.image_callback, 10)

self.current_image = None

def image_callback(self, msg):
"""Store latest camera image"""
# Convert ROS Image to PIL Image
# (implementation depends on message format)
self.current_image = self.ros_to_pil(msg)

def ground_object(self, object_description, bounding_boxes, image_crops):
"""
Find which bounding box matches the description

Args:
object_description: e.g., "red cup"
bounding_boxes: List of detected object bounding boxes
image_crops: List of cropped images for each box

Returns:
Index of best matching box
"""
# Prepare inputs
inputs = self.processor(
text=[object_description],
images=image_crops,
return_tensors="pt",
padding=True
)

# Get similarity scores
outputs = self.model(**inputs)
logits_per_image = outputs.logits_per_image
probs = logits_per_image.softmax(dim=0)

# Best match
best_idx = probs.argmax().item()
confidence = probs[best_idx].item()

self.get_logger().info(
f'Grounded "{object_description}" to box {best_idx} '
f'(confidence: {confidence:.2f})')

return best_idx, confidence

def ros_to_pil(self, ros_image):
"""Convert ROS Image to PIL Image"""
# Placeholder - implement based on your message format
pass

Multi-Modal Fusion: Speech + Gesture

Combine speech commands with pointing gestures for intuitive interaction.

class MultiModalFusion(Node):
def __init__(self):
super().__init__('multimodal_fusion')

# Subscribers
self.speech_sub = self.create_subscription(
String, '/speech/recognized', self.speech_callback, 10)
self.gesture_sub = self.create_subscription(
PointStamped, '/gesture/pointing_direction', self.gesture_callback, 10)

# State
self.last_speech = None
self.last_gesture = None
self.last_speech_time = None
self.last_gesture_time = None

# Fusion window (both must occur within this time)
self.fusion_window = 2.0 # seconds

def speech_callback(self, msg):
"""Record speech command"""
self.last_speech = msg.data
self.last_speech_time = self.get_clock().now()
self.try_fusion()

def gesture_callback(self, msg):
"""Record pointing gesture"""
self.last_gesture = msg.point
self.last_gesture_time = self.get_clock().now()
self.try_fusion()

def try_fusion(self):
"""Attempt to fuse speech and gesture"""
if self.last_speech is None or self.last_gesture is None:
return

# Check if both are recent
time_diff = abs(
(self.last_speech_time - self.last_gesture_time).nanoseconds / 1e9
)

if time_diff < self.fusion_window:
# Fuse!
self.process_multimodal_command(
self.last_speech, self.last_gesture
)

# Reset
self.last_speech = None
self.last_gesture = None

def process_multimodal_command(self, speech, pointing_direction):
"""
Process combined command
Example: "Pick up that" + [pointing at object]
"""
self.get_logger().info(
f'Multi-modal command: "{speech}" + pointing at {pointing_direction}')

# Check for deictic references (that, this, there)
deictic_words = ['that', 'this', 'there', 'here']
has_deictic = any(word in speech.lower() for word in deictic_words)

if has_deictic:
# Use pointing to resolve reference
target_object = self.find_object_in_direction(pointing_direction)

# Extract action from speech
if 'pick' in speech.lower():
self.execute_pick(target_object)
elif 'go' in speech.lower() or 'move' in speech.lower():
self.execute_navigate(pointing_direction)

def find_object_in_direction(self, direction):
"""Find object in pointing direction using vision"""
# Raycast in pointing direction
# Return closest object
pass

def execute_pick(self, target):
"""Execute pick action"""
self.get_logger().info(f'Executing pick: {target}')

def execute_navigate(self, direction):
"""Navigate in pointed direction"""
self.get_logger().info(f'Navigating toward: {direction}')

Attention and Gaze Control

class GazeController(Node):
def __init__(self):
super().__init__('gaze_controller')

# Head/eye control publisher
self.gaze_pub = self.create_publisher(
JointState, '/head_controller/command', 10)

# Face tracking
self.face_sub = self.create_subscription(
DetectionArray, '/face_detections', self.face_callback, 10)

# Speaking indicator
self.speaking_sub = self.create_subscription(
Bool, '/robot/is_speaking', self.speaking_callback, 10)

self.is_speaking = False
self.current_speaker = None

def face_callback(self, msg):
"""Track detected faces"""
if not msg.detections:
return

# Find closest/most prominent face
largest_face = max(msg.detections, key=lambda d: d.bbox.size)

# If robot is listening, look at speaker
if not self.is_speaking:
self.look_at_face(largest_face)
self.current_speaker = largest_face

def speaking_callback(self, msg):
"""Update speaking status"""
self.is_speaking = msg.data

# When starting to speak, look at the person we're addressing
if self.is_speaking and self.current_speaker:
self.look_at_face(self.current_speaker)

def look_at_face(self, face_detection):
"""Control head to look at face"""
# Compute head pan/tilt to center face in view
image_center_x = 640 # Example resolution
image_center_y = 480

face_center_x = face_detection.bbox.center.x
face_center_y = face_detection.bbox.center.y

# Simple proportional control
pan_error = (face_center_x - image_center_x) / image_center_x
tilt_error = (face_center_y - image_center_y) / image_center_y

# Create joint command
pan_angle = pan_error * 0.5 # Scale to radians
tilt_angle = -tilt_error * 0.3

joint_cmd = JointState()
joint_cmd.name = ['head_pan', 'head_tilt']
joint_cmd.position = [pan_angle, tilt_angle]

self.gaze_pub.publish(joint_cmd)

Dialogue Management

Conversational Context Tracking

class DialogueManager(Node):
def __init__(self):
super().__init__('dialogue_manager')

# Dialogue state
self.current_task = None
self.task_stage = None
self.slots = {}

# Conversation history
self.history = []

def handle_utterance(self, text):
"""Process user utterance with context"""
# Add to history
self.history.append({'speaker': 'user', 'text': text})

# Check if continuing previous task
if self.current_task:
response = self.continue_task(text)
else:
response = self.start_new_task(text)

# Add response to history
self.history.append({'speaker': 'robot', 'text': response})

return response

def start_new_task(self, text):
"""Initialize new task"""
# Classify task type
if 'bring' in text.lower() or 'fetch' in text.lower():
self.current_task = 'fetch'
self.task_stage = 'object_identification'
self.slots = {'object': None, 'location': None}

# Extract what we can
# ... (use NER/slot filling)

# Ask for missing information
if not self.slots['object']:
return "What would you like me to bring?"
else:
return f"I'll bring you the {self.slots['object']}."

# ... other task types

def continue_task(self, text):
"""Continue active task"""
if self.current_task == 'fetch':
if self.task_stage == 'object_identification':
self.slots['object'] = text
self.task_stage = 'execution'
return f"Okay, I'll get the {text}."

return "I'm working on it."

def complete_task(self):
"""Reset after task completion"""
self.current_task = None
self.task_stage = None
self.slots = {}

Practical Exercise

Week 13 Assignment: Complete Conversational Robot

Objective: Build an end-to-end conversational robot system

Tasks:

  1. Integrate GPT-4 for natural language understanding and response generation
  2. Implement speech recognition using Whisper (or alternative)
  3. Add text-to-speech for robot responses
  4. Create vision-language grounding to identify objects from descriptions
  5. Implement multi-modal fusion combining speech and pointing gestures
  6. Design dialogue manager for multi-turn conversations
  7. Add safety filtering to prevent dangerous commands

Deliverable:
Complete ROS 2 package demonstrating:

  • Natural conversation with context tracking
  • Voice-controlled object manipulation: "Bring me that red cup" while pointing
  • Multi-turn dialogue for complex tasks
  • Video demonstration with analysis of response times and accuracy

Evaluation Criteria:

  • Response latency < 2 seconds
  • Intent classification accuracy > 85%
  • Successful object grounding in 90%+ of cases
  • Natural, contextual responses

Next: Week 14 will cover deployment, safety systems, and preparing robots for real-world operation.