第 16 章 语音交互系统¶
前面十五章,我们已经让 Go2 能连上、能动、能看见环境,也能自己找路。但交互方式还很"工程师"—— 键盘、终端、命令行。到了这一章,我们给它加上一条更像真实产品的入口:你开口说话,它听懂、执行,再用语音回你。
本章你将学到¶
- 理解一条完整语音交互链里最常见的四个环节:
VAD、ASR、LLM、TTS - 明白为什么本项目最后选了本地识别 + 规则映射 + 预生成语音反馈这条更稳的工程路线
- 用
faster-whisper跑通中文语音识别,并理解small模型为什么比medium更适合扩展坞 CPU - 把识别结果映射到 Go2 的动作接口,实现"小白,站起来""小白,前进""停止"这类命令
- 处理 6 类真正会把语音系统搞崩的坑:音频设备占用、采样率、误识别、危险动作、DDS 崩溃、扩展坞常驻服务
背景与原理¶
一条语音管线通常有哪四段¶
语音交互常被说成"语音助手",但工程上最好把它拆开看。最常见的是下面四段:
| 模块 | 全称 | 干什么 | 本章是否主线使用 |
|---|---|---|---|
VAD |
Voice Activity Detection | 判断"现在有没有人在说话" | 是 |
ASR |
Automatic Speech Recognition | 把语音转成文字 | 是 |
LLM |
Large Language Model | 做复杂意图理解、多轮对话 | 可选 |
TTS |
Text To Speech | 把文本变成语音播报 | 是 |
把这四段翻译成 Go2 的场景,就是:
- 麦克风先判断你是不是开口了
- 识别模型把"小白,站起来"转成文本
- 意图层决定这句到底对应哪个动作
- 播报层给你回一句"好的"
这一章我们不会一上来就把 LLM 塞进去。原因很简单:
- "前进""坐下""停止" 这类命令,规则映射就够了
- 规则匹配更可控,也更容易做安全兜底
- 先把基础链跑通,后面再加大模型才不会把问题搅成一锅粥
所以本章的主线是:
VAD + ASR + 规则意图映射 + TTS
为什么本书选"本地 ASR + 在线生成语音素材"的混合方案¶
刚开始做语音系统时,最容易出现两种冲动:
- 要么全本地
- 要么全在线
但真落到 Go2 上,这两个极端都不太舒服。
全本地的优点¶
- 隐私最好
- 断网也能跑
- 延迟可控
全在线的问题¶
- 网络一抖就卡
- 语音命令这种高频交互,很容易把体验拖垮
- 敏感场景不适合把内容全发出去
所以我们最后走的是折中路线:
- ASR 本地做:用
faster-whisper在本机直接识别 - TTS 素材预生成:首次用
edge-tts生成几段固定反馈音频,运行时只播放本地mp3
这条路线有两个很实用的好处:
- 运行时不需要每次都联网调用 TTS
- 播放固定短语比临时合成更稳,也更快
隐私提示
edge-tts 背后用的是微软的在线语音服务。
如果你用它来生成反馈语音,文本内容会离开本机。
所以本章把它放在"预生成固定提示音"这个位置,不建议直接拿它播敏感内容。
faster-whisper 是什么¶
faster-whisper 可以理解成:
- 模型本体还是 Whisper 那套识别能力
- 但底层推理由
CTranslate2加速 - CPU 和 GPU 上都更省资源
它特别适合这章的原因有三个:
- Python 接口直接
- 支持
int8量化,CPU 友好 - 中文识别效果够用,而且能加
initial_prompt提示词表
在本项目的真实验证里,一个很值钱的结论是:
medium模型在扩展坞 ARM CPU 上延迟偏大,经常要十几秒small模型虽然精度略低一点,但能把延迟压到约 3.5 秒量级
这就是很典型的工程取舍:
- 不是追求学术上最强
- 而是追求"机器人说完你能忍住不骂娘"
为什么最终实现没有把 TTS 做成"实时在线朗读"¶
如果你只看功能描述,会本能觉得:
既然装了
edge-tts,那是不是每次都该把"好的,我来了"实时合成出来?
理论上可以,但项目里最后没有这么做。原因是:
- 反馈语句其实很固定
- 运行时临时联网生成更容易出故障
- 一旦设备占用或网络不稳,整条交互链会被拖慢
所以现有代码用的是:
- 先生成
wake.mp3、ok.mp3、unknown.mp3、stop.mp3这类固定音频 - 运行时用
gst-play或gst-launch-1.0 playbin直接本地播放
这是一个非常典型的"别犯傻"决策。
扩展坞音频硬件的真相¶
这一段一定要提前说,不然后面你很容易在硬件上白耗半天。
项目里实际摸底后确认:
- Go2 主板上的麦克风和扬声器,并不是扩展坞这边随手就能直接调的那种设备
- 扩展坞虽然能看到一堆 ALSA / PulseAudio 设备,但并不代表真有可用音频通路
- 最稳的工程做法,还是外接标准 USB Audio 设备
所以本章主线默认你用的是:
- USB 麦克风
- 或 USB 耳麦一体设备
如果你说的"无线麦克风"底层其实只是一个标准 USB Audio 接收器,那也可以。
但如果它只是某个私有串口模块,那它大概率就不在这章的主线里。
架构总览¶
先看最终跑通版的数据流。它不是四个 ROS2 节点拼起来的大拼盘,而是一个单进程 Python 程序,内部自己管理录音、识别、动作和播报。
flowchart LR
A[USB 麦克风 / 耳麦] --> B[parecord 录音子进程]
B --> C[能量阈值 VAD]
C --> D[faster-whisper]
D --> E[ASR 归一化]
E --> F[规则意图映射]
F --> G[危险动作二次确认]
G --> H[SDK worker 子进程]
H --> I[SportClient 动作调用]
I --> J[Go2 执行动作]
F --> K[固定反馈语句]
K --> L[本地 mp3 播放]
L --> M[USB 扬声器 / 耳机]
N[可选: LLM 意图理解] -.扩展路线.-> F
这条链有 3 个地方尤其重要:
- 录音不用
sounddevice做主线,而是用parecord子进程 - 动作执行放进独立 worker 子进程,避免 DDS / SDK 崩溃把主程序一起带走
- TTS 播放固定 mp3,不在运行时临时合成
如果你把这 3 个点记住,后面很多设计就不会看着"怪",实际上它们全是踩坑踩出来的。
状态机怎么工作¶
这套语音系统不是一直不停地听完一句接一句瞎执行,它内部有个很朴素但很好用的状态机:
这个"一条指令执行完就回待机"的模式,比持续激活更适合实机。
原因很现实:
- 可以减少误触发
- 不容易把播报回声当成下一条命令
- 出了问题更容易停住
环境准备¶
前置成果¶
这一章默认你已经有两样东西:
- 第 2 章讲过的 Go2 动作接口认知
- 一套能正常
pip install、能运行 Python 脚本的开发环境
如果你准备直接在扩展坞上部署,还需要额外确认:
- Go2 网络已经通
- 你知道当前控制网卡名是什么
项目里实际出现过两种常见网卡名:
- 笔记本开发时常见
enp111s0 - 扩展坞侧最终常见
eth0
这一章我们会把它做成可配置项,而不是硬写死。
硬件建议¶
主线建议只用一种最稳的组合:
- 一个标准 USB 音频输入设备
- 一个标准 USB 音频输出设备
最省事的是直接用 USB 耳麦一体设备,因为:
- 输入输出路径都清楚
- 设备枚举更稳定
- 后面做 systemd 常驻和热插拔也更好配
别一开始就赌 Go2 自带音频链
这条路在项目里实际踩过坑。
如果你只是想先把语音链跑通,别拿自己给自己加难度。
先用标准 USB Audio 设备,后面再优化硬件形态。
需要安装的系统依赖¶
我们这章主线用 parecord 录音,用 gst 播音频,所以先把系统工具装上:
# parecord 来自 PulseAudio 工具集
# gst-launch / gst-play 用来播本地 mp3
sudo apt install -y \
pulseaudio-utils \
gstreamer1.0-tools \
gstreamer1.0-plugins-good \
gstreamer1.0-plugins-bad
如果你想顺手保留早期 sounddevice 调试路线,可以再装一份 PortAudio:
需要安装的 Python 包¶
主线要装的 Python 包有 3 个:
faster-whisperedge-ttsnumpy
可选再装一个:
sounddevice
模型准备¶
这章我们用 faster-whisper 的 small 模型。
第一次运行时,模型会自动下载到用户缓存目录。如果你之前用过 Speech Note 或别的 Whisper 工具,也很可能已经缓存过一份,这时就不用重复下。
如果你准备把系统部署到扩展坞并长期离线运行,建议再做一步:
- 把缓存好的模型复制到
go2_voice/model_small/
这样后面即使扩展坞没网,也能直接起。
先确认音频设备能不能用¶
在继续写代码前,先查查系统有没有认到 USB 音频:
如果你看到带 usb 的输入源和输出 sink,说明路线基本没跑偏。
再做一个最小播放测试:
实现步骤¶
这一章我们把实现放进一个新的 Python 包 go2_voice。
它不是"一堆节点",而是一个入口脚本 + 一个音频生成脚本 + 一个 launch 文件。
建议的目录结构长这样:
go2_voice/
├── go2_voice/
│ ├── __init__.py
│ ├── voice_cmd.py
│ └── build_prompts.py
├── audio/
├── launch/
│ └── voice.launch.py
├── package.xml
├── setup.py
└── setup.cfg
步骤一:创建 go2_voice 包¶
先在工作空间里创建一个 ament_python 包:
然后手动补一个 audio/ 目录和 launch/ 目录:
# 这两个目录 ros2 pkg create 不会自动给你建
cd ~/go2_tutorial_ws/src/go2_voice
mkdir -p audio launch go2_voice
touch go2_voice/__init__.py
步骤二:先把固定反馈音频生成出来¶
这一小步很值钱。我们先把最常见的反馈语句提前做成 mp3:
wake.mp3ok.mp3unknown.mp3stop.mp3timeout.mp3confirm.mp3
这样主程序后面只管播放,不管在线合成。
在 go2_voice/go2_voice/build_prompts.py 里写一个小脚本:
#!/usr/bin/env python3
"""
把固定反馈语句预生成成 mp3。
运行一次就够,后面主程序只播本地文件。
"""
from pathlib import Path # 跨平台路径处理,拼接 audio/ 目录更稳
import edge_tts # 微软在线 TTS 的 Python 封装,这里用它生成素材音频
VOICE = "zh-CN-XiaoxiaoNeural" # 示例中文音色,如果不可用先用 edge-tts --list-voices 查看
OUTPUT_DIR = Path(__file__).resolve().parents[1] / "audio"
PROMPTS = {
"wake": "我在,请说指令",
"ok": "好的",
"unknown": "没听懂,请再说一次",
"stop": "好的,已经停下来了",
"timeout": "没有新的指令,我先休息啦",
"confirm": "这个动作有点危险,请再确认一次",
}
def main() -> None:
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
for name, text in PROMPTS.items():
target = OUTPUT_DIR / f"{name}.mp3"
communicator = edge_tts.Communicate(text=text, voice=VOICE)
communicator.save_sync(str(target))
print(f"已生成: {target.name}")
if __name__ == "__main__":
main()
上面这段的关键点只有两个:
edge_tts.Communicate(...)负责请求在线语音save_sync(...)直接把结果保存成 mp3 文件
写好后先运行一次:
如果你想换音色,先列一下当前可用中文声音:
步骤三:在主程序里先解决录音 + VAD¶
这一章的主程序放在 go2_voice/go2_voice/voice_cmd.py。
先把配置、录音子进程和能量阈值 VAD 搭起来。
这里有个重要选择:
- 桌面机第一版常用
sounddevice - 但项目里最后跑稳的是
parecord
所以本章主线直接用 parecord。
先写配置和录音部分:
#!/usr/bin/env python3
"""
Go2 语音控制主程序。
主线是:待机 -> 唤醒 -> 一条指令 -> 执行动作 -> 回待机。
"""
import multiprocessing # 把 SDK 调用丢到子进程,避免崩了带走主程序
import os # 读取环境变量,比如当前控制网卡名
import queue # 录音线程和主循环之间传音频块
import subprocess # 调 parecord 和 gst 播放命令
import threading # 后台读录音子进程 stdout
import time # 状态机超时、动作等待都要用
from enum import Enum # 用枚举表达待机/激活状态
from pathlib import Path # 拼接 audio/ 路径
import numpy as np # 音频块 RMS 能量计算和拼接
from faster_whisper import WhisperModel # 本地 ASR 核心模型
SAMPLE_RATE = 16000
CHANNELS = 1
BLOCK_DURATION = 0.1
BLOCK_SIZE = int(SAMPLE_RATE * BLOCK_DURATION)
BYTES_PER_BLOCK = BLOCK_SIZE * 2 # int16 每个采样 2 字节
ENERGY_THRESHOLD = 0.02
SILENCE_TIMEOUT = 0.8
MIN_SPEECH_DURATION = 0.2
MAX_SPEECH_DURATION = 10.0
ACTIVE_TIMEOUT = 30.0
NETWORK_INTERFACE = os.getenv("GO2_NET_IFACE", "eth0")
MODEL_PATH = os.getenv("GO2_WHISPER_MODEL", "small")
WAKE_WORDS = ["小白"]
AUDIO_DIR = Path(__file__).resolve().parents[1] / "audio"
audio_queue: "queue.Queue[np.ndarray]" = queue.Queue()
_parecord_proc = None
_recording = False
class State(Enum):
SLEEPING = "sleeping"
ACTIVE = "active"
接着把 parecord 和后台读取线程补上:
def _find_usb_source() -> str | None:
"""从 PulseAudio/PipeWire 兼容层里找 USB 麦克风 source 名称。"""
try:
output = subprocess.check_output(
["pactl", "list", "sources", "short"],
stderr=subprocess.DEVNULL,
).decode()
except Exception:
return None
for line in output.strip().splitlines():
if "usb" in line.lower() and "monitor" not in line.lower():
return line.split()[1]
return None
def _parecord_reader() -> None:
"""后台线程:不断把 parecord 的 PCM 数据读出来,切成小块塞进队列。"""
global _parecord_proc
buffer = b""
while _recording and _parecord_proc and _parecord_proc.poll() is None:
data = _parecord_proc.stdout.read(BYTES_PER_BLOCK)
if not data:
break
buffer += data
while len(buffer) >= BYTES_PER_BLOCK:
block = buffer[:BYTES_PER_BLOCK]
buffer = buffer[BYTES_PER_BLOCK:]
samples = np.frombuffer(block, dtype=np.int16).astype(np.float32) / 32768.0
audio_queue.put(samples)
def start_recording() -> None:
"""启动 parecord 子进程和读取线程。"""
global _parecord_proc, _recording
cmd = [
"parecord",
f"--rate={SAMPLE_RATE}",
f"--channels={CHANNELS}",
"--format=s16le",
"--raw",
]
source_name = _find_usb_source()
if source_name:
cmd.append(f"--device={source_name}")
_parecord_proc = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL,
)
_recording = True
threading.Thread(target=_parecord_reader, daemon=True).start()
def stop_recording() -> None:
"""退出时记得把录音子进程收干净。"""
global _recording, _parecord_proc
_recording = False
if _parecord_proc:
_parecord_proc.terminate()
_parecord_proc.wait(timeout=3)
_parecord_proc = None
最后把最朴素但非常实用的 VAD 写出来:
def record_speech(timeout: float = 0.0) -> np.ndarray | None:
"""
等待一段语音。
timeout=0 表示无限等待;
返回 None 表示超时或没检测到有效语音。
"""
speech_chunks: list[np.ndarray] = []
is_speaking = False
speech_start = 0.0
silence_start = 0.0
wait_start = time.time()
while True:
try:
chunk = audio_queue.get(timeout=0.5)
except queue.Empty:
if timeout > 0 and not is_speaking and time.time() - wait_start > timeout:
return None
continue
energy = float(np.sqrt(np.mean(chunk ** 2)))
if not is_speaking:
if timeout > 0 and time.time() - wait_start > timeout:
return None
if energy > ENERGY_THRESHOLD:
is_speaking = True
speech_start = time.time()
speech_chunks = [chunk]
else:
speech_chunks.append(chunk)
if energy < ENERGY_THRESHOLD:
if silence_start == 0:
silence_start = time.time()
elif time.time() - silence_start > SILENCE_TIMEOUT:
duration = time.time() - speech_start
if duration < MIN_SPEECH_DURATION:
speech_chunks.clear()
is_speaking = False
silence_start = 0.0
continue
return np.concatenate(speech_chunks)
else:
silence_start = 0.0
if time.time() - speech_start > MAX_SPEECH_DURATION:
return np.concatenate(speech_chunks)
这段看着不高级,但在本项目里它非常够用。
先用能量阈值把系统做出来,比一上来把自己绑死在复杂 VAD 模型上靠谱得多。
步骤四:接上 faster-whisper 和意图映射¶
现在我们把"听见声音"变成"听懂内容"。
先写一个最小可用的 ASR 函数:
def transcribe(model: WhisperModel, audio_data: np.ndarray) -> str:
"""把一段音频转成中文文本。"""
segments, _ = model.transcribe(
audio_data,
language="zh",
beam_size=3,
vad_filter=True,
vad_parameters={
"min_silence_duration_ms": 500,
"speech_pad_ms": 200,
},
initial_prompt=(
"小白;坐下;站起;前进;后退;左转;右转;"
"打招呼;伸懒腰;转圈;跳舞;比心;前空翻;后空翻;"
"停止;确认;取消"
),
)
return "".join(segment.text for segment in segments).strip()
这里有三个参数很值得你记一下:
compute_type="int8":CPU 上更友好vad_filter=True:让模型自己再帮你清一遍静音initial_prompt=...:把常见命令词提前喂进去,中文识别会稳一点
接着,把真实项目里很有用的ASR 归一化补上。
为什么要这一步?
因为 Whisper 很爱把:
- "小白" 识别成 "小百"
- "左转" 识别成 "做转"
- "比心" 识别成 "笔心"
在 voice_cmd.py 里继续加:
CHAR_NORMALIZE = {
"確": "确", "認": "认", "後": "后", "轉": "转", "動": "动",
}
WORD_NORMALIZE = {
"小百": "小白",
"拜拜": "小白",
"做转": "左转",
"有转": "右转",
"笔心": "比心",
"后翻": "后空翻",
"前翻": "前空翻",
"组合计": "组合一",
}
def normalize_asr(text: str) -> str:
"""繁简修正 + 已知误识别修正。"""
text = "".join(CHAR_NORMALIZE.get(ch, ch) for ch in text)
for wrong, right in sorted(WORD_NORMALIZE.items(), key=lambda item: -len(item[0])):
text = text.replace(wrong, right)
return text
然后定义唤醒词、停止词和命令映射:
STOP_KEYWORDS = ["停", "停下", "停止", "暂停", "取消", "够了", "结束"]
DANGEROUS_ACTIONS = {"back_flip", "front_flip", "combo_1"}
COMMAND_MAP = [
(["坐下", "蹲下", "趴下"], "stand_down", "趴下"),
(["站起", "起来", "起立", "站立"], "stand_up", "站起来"),
(["前进", "向前", "往前"], "move_forward", "前进"),
(["后退", "往后", "退后"], "move_back", "后退"),
(["左转", "向左", "往左"], "turn_left", "左转"),
(["右转", "向右", "往右"], "turn_right", "右转"),
(["打招呼", "握手", "你好"], "hello", "打招呼"),
(["伸懒腰", "伸展"], "stretch", "伸懒腰"),
(["转圈", "转一圈"], "spin", "原地转圈"),
(["跳舞"], "dance1", "跳舞"),
(["比心"], "heart", "比心"),
(["后空翻"], "back_flip", "后空翻"),
(["前空翻"], "front_flip", "前空翻"),
(["恢复", "复位"], "recovery", "恢复站立"),
]
def check_wake_word(text: str) -> bool:
return any(word in text for word in WAKE_WORDS)
def check_stop(text: str) -> bool:
return any(word in text for word in STOP_KEYWORDS)
def parse_intent(text: str) -> tuple[str, str] | None:
text = text.strip().lower()
for keywords, action_id, desc in COMMAND_MAP:
if any(keyword in text for keyword in keywords):
return action_id, desc
return None
这里有个很重要的设计判断:
- 唤醒词检查和动作匹配分开做
- 停止词单独放高优先级
因为实机上最不能出事的不是"识别不到",而是"明明让它停,它还在往前冲"。
步骤五:把 Go2 动作调用包进独立 worker 子进程¶
这一步是整章最像"工程活"的地方。
如果你只在桌面上跑 Python 脚本,会本能觉得:
直接在主线程里调
SportClient不就完了?
但项目里的真实情况是:
- SDK 调用曾经出现过 segfault
- 某些 DDS 版本和 Unitree SDK 组合不稳定
- 如果主程序和动作调用绑死在一个进程,一崩全崩
所以最终代码用了一个很朴素但很有效的方案:
- 主进程负责录音、识别、状态机
- 子进程专门负责
SportClient - 子进程挂了就重拉
先写 worker:
def _sdk_worker(cmd_queue: multiprocessing.Queue, result_queue: multiprocessing.Queue, network_interface: str) -> None:
"""动作执行子进程:只负责 SDK 初始化和动作调用。"""
from unitree_sdk2py.core.channel import ChannelFactoryInitialize
from unitree_sdk2py.go2.sport.sport_client import SportClient
ChannelFactoryInitialize(0, network_interface)
client = SportClient()
client.SetTimeout(10.0)
client.Init()
result_queue.put("READY")
while True:
action_id = cmd_queue.get()
if action_id == "__EXIT__":
break
try:
if action_id == "stand_down":
ret = client.StandDown()
elif action_id == "stand_up":
ret = client.StandUp()
elif action_id == "recovery":
ret = client.RecoveryStand()
elif action_id == "move_forward":
t0 = time.time()
while time.time() - t0 < 1.7:
client.Move(0.3, 0.0, 0.0)
time.sleep(0.05)
client.StopMove()
ret = 0
elif action_id == "move_back":
t0 = time.time()
while time.time() - t0 < 1.7:
client.Move(-0.3, 0.0, 0.0)
time.sleep(0.05)
client.StopMove()
ret = 0
elif action_id == "turn_left":
t0 = time.time()
while time.time() - t0 < 3.14:
client.Move(0.0, 0.0, 0.5)
time.sleep(0.05)
client.StopMove()
ret = 0
elif action_id == "turn_right":
t0 = time.time()
while time.time() - t0 < 3.14:
client.Move(0.0, 0.0, -0.5)
time.sleep(0.05)
client.StopMove()
ret = 0
elif action_id == "hello":
ret = client.Hello()
elif action_id == "stretch":
ret = client.Stretch()
elif action_id == "spin":
t0 = time.time()
while time.time() - t0 < 4.19:
client.Move(0.0, 0.0, 1.5)
time.sleep(0.05)
client.StopMove()
ret = 0
elif action_id == "dance1":
ret = client.Dance1()
elif action_id == "heart":
ret = client.Heart()
elif action_id == "back_flip":
ret = client.BackFlip()
elif action_id == "front_flip":
ret = client.FrontFlip()
elif action_id == "emergency_stop":
client.StopMove()
client.RecoveryStand()
ret = 0
else:
ret = -1
result_queue.put("OK" if ret in (None, 0) else f"FAIL:{ret}")
except Exception as exc:
result_queue.put(f"ERROR:{exc}")
再写一个主进程可调用的控制器:
class Go2Controller:
def __init__(self) -> None:
self._cmd_q: multiprocessing.Queue = multiprocessing.Queue()
self._res_q: multiprocessing.Queue = multiprocessing.Queue()
self._proc: multiprocessing.Process | None = None
self._spawn_worker()
def _spawn_worker(self) -> None:
if self._proc and self._proc.is_alive():
self._proc.terminate()
self._proc.join(timeout=3)
self._cmd_q = multiprocessing.Queue()
self._res_q = multiprocessing.Queue()
self._proc = multiprocessing.Process(
target=_sdk_worker,
args=(self._cmd_q, self._res_q, NETWORK_INTERFACE),
daemon=True,
)
self._proc.start()
ready = self._res_q.get(timeout=15)
if ready != "READY":
raise RuntimeError(f"SDK worker 初始化失败: {ready}")
def _ensure_worker(self) -> None:
if self._proc is None or not self._proc.is_alive():
self._spawn_worker()
def execute(self, action_id: str) -> str:
self._ensure_worker()
self._cmd_q.put(action_id)
for _ in range(6): # 最多等 3 秒
try:
return self._res_q.get(timeout=0.5)
except Exception:
if self._proc and not self._proc.is_alive():
self._spawn_worker()
return "OK" # 命令很可能已经发出
return "OK"
def emergency_stop(self) -> None:
self.execute("emergency_stop")
这段代码背后的核心思想只有一句:
宁可把动作调用包得土一点,也别让主进程跟着一起死。
步骤六:把危险动作确认和本地播报接进主循环¶
现在补上两个安全层:
- 危险动作二次确认
- 固定反馈 mp3 播放
先写播放函数:
def _find_usb_sink() -> str | None:
"""找 USB 输出设备名,给 gst 播放时显式指定。"""
try:
output = subprocess.check_output(
["pactl", "list", "sinks", "short"],
stderr=subprocess.DEVNULL,
).decode()
except Exception:
return None
for line in output.strip().splitlines():
if "usb" in line.lower():
return line.split()[1]
return None
def _gst_play_cmd(path: Path) -> list[str]:
sink_name = _find_usb_sink()
if sink_name:
return [
"gst-launch-1.0",
"playbin",
f"uri=file://{path}",
f"audio-sink=pulsesink device={sink_name}",
]
return ["gst-play-1.0", "--no-interactive", str(path)]
def play_audio_blocking(name: str) -> None:
path = AUDIO_DIR / f"{name}.mp3"
if path.exists():
subprocess.run(_gst_play_cmd(path), stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
然后把危险动作确认做出来:
CONFIRM_KEYWORDS = ["确认", "确定", "好的", "可以", "执行", "来吧"]
def drain_audio_queue() -> None:
"""清掉积压音频,避免上一轮残留回声干扰当前识别。"""
while True:
try:
audio_queue.get_nowait()
except queue.Empty:
break
def confirm_dangerous(model: WhisperModel, desc: str) -> bool:
"""危险动作必须二次确认。"""
play_audio_blocking("confirm")
time.sleep(0.3)
drain_audio_queue()
audio_data = record_speech(timeout=8.0)
if audio_data is None:
play_audio_blocking("timeout")
return False
confirm_text = normalize_asr(transcribe(model, audio_data))
if any(keyword in confirm_text for keyword in CONFIRM_KEYWORDS):
return True
if any(keyword in confirm_text for keyword in ["不", "不要", "取消", "算了", "别"]):
play_audio_blocking("stop")
return False
play_audio_blocking("unknown")
return False
到这里,语音系统才算开始像一个真正能上狗的东西。
因为空翻、组合动作这些命令,你不能只靠"我感觉识别得挺准"这种自信去赌。
步骤七:把主循环和 launch 收口¶
最后把所有零件接成一条完整流程。
继续在 voice_cmd.py 里补 main():
def main() -> None:
model = WhisperModel(MODEL_PATH, device="cpu", compute_type="int8")
controller = Go2Controller()
start_recording()
state = State.SLEEPING
try:
while True:
if state == State.SLEEPING:
audio_data = record_speech(timeout=0.0)
if audio_data is None:
continue
text = normalize_asr(transcribe(model, audio_data))
if not text:
continue
if not check_wake_word(text):
continue
play_audio_blocking("wake")
time.sleep(0.3)
drain_audio_queue()
# 唤醒词后面直接带命令: "小白,站起来"
result = parse_intent(text)
if result is not None:
action_id, desc = result
else:
state = State.ACTIVE
audio_data = record_speech(timeout=ACTIVE_TIMEOUT)
if audio_data is None:
play_audio_blocking("timeout")
state = State.SLEEPING
continue
text = normalize_asr(transcribe(model, audio_data))
if not text:
play_audio_blocking("timeout")
state = State.SLEEPING
continue
if check_stop(text):
controller.emergency_stop()
play_audio_blocking("stop")
state = State.SLEEPING
continue
result = parse_intent(text)
if result is None:
play_audio_blocking("unknown")
state = State.SLEEPING
continue
action_id, desc = result
if action_id in DANGEROUS_ACTIONS:
if not confirm_dangerous(model, desc):
state = State.SLEEPING
continue
controller.execute(action_id)
play_audio_blocking("ok")
state = State.SLEEPING
finally:
stop_recording()
if __name__ == "__main__":
main()
主线就是这么简单:
- 先听唤醒词
- 再听一条命令
- 执行动作
- 播放反馈
- 回待机
最后,再加一个最小 launch 文件,方便统一启动:
# 文件: launch/voice.launch.py
# 作用:用 launch 启动 go2_voice 主程序
from launch import LaunchDescription
from launch.actions import ExecuteProcess
def generate_launch_description() -> LaunchDescription:
return LaunchDescription([
ExecuteProcess(
cmd=["ros2", "run", "go2_voice", "voice_cmd"],
output="screen",
)
])
再把 setup.py 的入口补上:
from setuptools import setup
package_name = "go2_voice"
setup(
name=package_name,
version="0.0.0",
packages=[package_name],
data_files=[
("share/ament_index/resource_index/packages", [f"resource/{package_name}"]),
(f"share/{package_name}", ["package.xml"]),
(f"share/{package_name}/launch", ["launch/voice.launch.py"]),
(f"share/{package_name}/audio", [
"audio/wake.mp3",
"audio/ok.mp3",
"audio/unknown.mp3",
"audio/stop.mp3",
"audio/timeout.mp3",
"audio/confirm.mp3",
]),
],
install_requires=["setuptools"],
zip_safe=True,
maintainer="you",
maintainer_email="you@example.com",
description="Go2 voice interaction demo",
license="Apache-2.0",
entry_points={
"console_scripts": [
"voice_cmd = go2_voice.voice_cmd:main",
"build_prompts = go2_voice.build_prompts:main",
],
},
)
进阶:扩展坞长期运行为什么还会再加 systemd 和 udev
主线代码跑通之后,项目里还额外做了两层工程化:
systemd service:开机常驻udev + systemd-run:USB 耳机插上自动启动,拔掉自动停止
这些都很有用,但它们属于"把 demo 变成长期服务"的范畴。
本章主线先把语音闭环跑通,扩展部署放到常见问题和进阶说明里讲。
编译与运行¶
先生成固定反馈语音:
再编译包:
# 编译 go2_voice 并加载环境
cd ~/go2_tutorial_ws
source /opt/ros/humble/setup.bash
colcon build --packages-select go2_voice
source install/setup.bash
如果你要在扩展坞上跑,先把控制网卡设好:
最后启动语音系统:
或者用 launch:
# 用 launch 启动,方便后面接 systemd
cd ~/go2_tutorial_ws
source install/setup.bash
ros2 launch go2_voice voice.launch.py
如果你想先做最小检查,先看音频设备和模型是否正常:
结果验证¶
验证 1:最小闭环¶
先不要一上来就测空翻。
第一轮只测最朴素的三件事:
- 说"小白"
- 听到播报"我在,请说指令"
- 再说"站起来"
如果系统正常,你应该看到:
- 语音播报正常
- Go2 执行
StandUp - 执行结束后回到待机
验证 2:停止指令¶
再测一条更关键的:
- 说"小白"
- 说"前进"
- 紧接着说"停止"
理想现象是:
- 当前动作被打断
- Go2 执行
StopMove + RecoveryStand - 语音播报"好的,已经停下来了"
验证 3:危险动作二次确认¶
最后再测危险动作:
- 说"小白"
- 说"后空翻"
- 系统先播确认提示
- 再说"确认"
这一步通过后,说明你的二次确认链已经生效。
如果它在没确认时就直接翻了,那不是"体验问题",那是安全设计没做好。
验证 4:误识别容错¶
你还可以故意说几句项目里出现过的误识别:
- "小百"
- "做转"
- "笔心"
如果归一化逻辑写对了,这些也应该还能映射回:
- "小白"
- "左转"
- "比心"
常见问题¶
1. 扩展坞上根本找不到可用麦克风或扬声器¶
现象:
pactl list sources short里没看到可用输入- 或者看到一堆设备,但录不到声音
原因:
- Go2 主板自带音频链并没有按你想的方式暴露给扩展坞
- 你接的设备可能不是标准 USB Audio
解决:
- 先换成标准 USB 耳麦
- 再用
pactl list sources short/pactl list sinks short确认设备名
2. 录音和播报打架,一播放就报设备忙¶
现象:
- 录音能跑
- 一播 mp3 就报
Device is busy
原因:
- 录放音都在抢同一个底层音频设备
- 早期用
sounddevice时尤其容易出现独占问题
解决:
- 主线改用
parecord录音子进程 - 播放时显式指定
pulsesink - 播放后
sleep 0.3s再清空录音队列,避免回声被当命令
3. medium 模型太慢,一句话等十几秒¶
原因:
- 扩展坞 CPU 不是给你跑大号 Whisper 当桌面机用的
解决:
- 先换
small compute_type用int8beam_size保守一点,别瞎卷
项目里的真实经验就是:
medium更准- 但
small才更像能用
4. 中文识别老是差一点,比如"小白"变"小百"¶
原因:
- 中文短指令很容易谐音
- 机器人动作词又特别集中
解决:
- 补
initial_prompt - 做
normalize_asr() - 把项目里已经踩过的误识别词都收进去
5. 危险动作太容易误触发¶
现象:
- 你说"打招呼",它差点执行空翻
解决:
- 危险动作必须走二次确认
- 停止词优先级必须高于普通动作词
- 空翻、组合动作都不要放进"一句话直接执行"的无门槛路径
6. SDK 偶发崩溃,主程序直接退出¶
原因:
- DDS 版本和 Unitree SDK 不兼容
- 或者 SDK 调用链本身偶发不稳
解决:
- 把
SportClient放进独立 worker 子进程 - 扩展坞侧如果你也遇到
cyclonedds兼容问题,优先核对版本
项目里实际踩出的一个血泪结论是:
- 某些环境下
cyclonedds 0.10.5会和 Unitree SDK 打架 - 降到
0.10.2后稳定性明显改善
7. parecord 录不到 USB 麦克风¶
原因:
- PulseAudio 默认 source 不是你当前的 USB 设备
- 无头环境下默认设备更容易漂
解决:
- 不要赌默认设备
- 启动时先用
pactl list sources short找 USB source - 再把
--device=<source_name>明确传给parecord
8. 说完指令后,系统把自己的播报又识别成下一条命令¶
原因:
- 扬声器回声被麦克风收进去了
解决:
- 播放后延迟几百毫秒
- 清空音频队列
- 麦克风和扬声器物理上尽量别互相正对
9. 噪声环境下 VAD 总误触发¶
原因:
ENERGY_THRESHOLD太低
解决:
- 先打印几轮静音时的平均能量
- 再把阈值从
0.02往上慢慢加
别反着来。
如果你上来就把阈值提得很高,最后会变成"环境噪声没了,人声也没了"。
10. 想做扩展坞常驻服务,但 SSH 断开后程序就没了¶
解决方向:
- 用
systemd service托管 - 再用
udev + systemd-run处理 USB 热插拔
项目里最后跑通的是这套模式:
- 开机如果耳机已插好,服务自动启动
- 运行中插上耳机,5 秒后自动启动
- 拔掉耳机,服务自动停止
这部分不影响本章主线,但如果你要把语音系统长期挂在扩展坞上,它非常值得做。
本章小结¶
这一章我们把 Go2 的交互入口从"命令行"升级成了"说人话也能控制"。
看起来好像只是多了一个麦克风,但真正值钱的是你已经把一条完整的语音闭环拆清楚了:
VAD负责判断有没有说话ASR负责把声音变成文字- 规则映射负责把文字变成动作
- 本地音频播放负责给出反馈
更重要的是,你没有走那条看起来更"高级"、实际上更容易崩的路。
我们没有强行全在线,也没有一上来就把 LLM 塞进主链路,而是用了更像真实工程的组合:
- 本地
faster-whisper - 规则意图映射
- 预生成语音提示
- SDK worker 子进程隔离
如果你现在已经能说出:
- 为什么
small比medium更适合扩展坞 - 为什么
parecord比sounddevice更适合这套最终实现 - 为什么危险动作必须二次确认
那这一章就不是"抄完一段代码",而是真的入门了。
下一步¶
语音已经能让 Go2 "听懂你说什么"了。下一章,我们再给它补上另一只眼睛:接入相机和视觉模型,让它不光能听,还能看懂场景和目标。
继续阅读:第 17 章 视觉与 VLM 应用
拓展阅读¶
faster-whisper官方仓库: https://github.com/SYSTRAN/faster-whisperedge-tts官方仓库: https://github.com/rany2/edge-tts- OpenAI Whisper 论文
- PulseAudio / PipeWire 兼容层文档
- Unitree SDK2 Python 示例和动作接口说明