Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 0 additions & 32 deletions main/xiaozhi-server/core/handle/helloHandle.py

This file was deleted.

11 changes: 11 additions & 0 deletions main/xiaozhi-server/core/handle/messageType.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
from enum import Enum


class MessageType(Enum):
"""消息类型枚举"""
HELLO = "hello"
ABORT = "abort"
LISTEN = "listen"
IOT = "iot"
MCP = "mcp"
SERVER = "server"
2 changes: 1 addition & 1 deletion main/xiaozhi-server/core/handle/receiveAudioHandle.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from core.handle.sendAudioHandle import send_stt_message
from core.handle.intentHandler import handle_user_intent
from core.utils.output_counter import check_device_output_limit
from core.handle.abortHandle import handleAbortMessage
from core.handle.textHandler.abortHandle import handleAbortMessage
from core.handle.sendAudioHandle import SentenceType
from core.utils.util import audio_to_data_stream

Expand Down
156 changes: 8 additions & 148 deletions main/xiaozhi-server/core/handle/textHandle.py
Original file line number Diff line number Diff line change
@@ -1,154 +1,14 @@
import json
import time
from core.handle.abortHandle import handleAbortMessage
from core.handle.helloHandle import handleHelloMessage
from core.providers.tools.device_mcp import handle_mcp_message
from core.utils.util import remove_punctuation_and_length, filter_sensitive_info
from core.handle.receiveAudioHandle import startToChat, handleAudioMessage
from core.providers.tools.device_iot import handleIotDescriptors, handleIotStatus
from core.handle.reportHandle import enqueue_asr_report
import asyncio
from core.handle.textMessageHandlerRegistry import TextMessageHandlerRegistry
from core.handle.textMessageProcessor import TextMessageProcessor

TAG = __name__

# 全局处理器注册表
message_registry = TextMessageHandlerRegistry()

# 创建全局消息处理器实例
message_processor = TextMessageProcessor(message_registry)

async def handleTextMessage(conn, message):
"""处理文本消息"""
try:
msg_json = json.loads(message)
if isinstance(msg_json, int):
conn.logger.bind(tag=TAG).info(f"收到文本消息:{message}")
await conn.websocket.send(message)
return
if msg_json["type"] == "hello":
conn.logger.bind(tag=TAG).info(f"收到hello消息:{message}")
await handleHelloMessage(conn, msg_json)
elif msg_json["type"] == "abort":
conn.logger.bind(tag=TAG).info(f"收到abort消息:{message}")
await handleAbortMessage(conn)
elif msg_json["type"] == "listen":
conn.logger.bind(tag=TAG).info(f"收到listen消息:{message}")
if "mode" in msg_json:
conn.client_listen_mode = msg_json["mode"]
conn.logger.bind(tag=TAG).debug(
f"客户端拾音模式:{conn.client_listen_mode}"
)
if msg_json["state"] == "start":
conn.client_have_voice = True
conn.client_voice_stop = False
elif msg_json["state"] == "stop":
conn.client_have_voice = True
conn.client_voice_stop = True
if len(conn.asr_audio) > 0:
await handleAudioMessage(conn, b"")
elif msg_json["state"] == "detect":
conn.client_have_voice = False
conn.asr_audio.clear()
if "text" in msg_json:
conn.last_activity_time = time.time() * 1000
original_text = msg_json["text"] # 保留原始文本
filtered_len, filtered_text = remove_punctuation_and_length(
original_text
)
# 识别是否是唤醒词
is_wakeup_words = filtered_text in conn.config.get("wakeup_words")
if not is_wakeup_words:
# 上报纯文字数据(复用ASR上报功能,但不提供音频数据)
enqueue_asr_report(conn, original_text, [])
# 否则需要LLM对文字内容进行答复
await startToChat(conn, original_text)
elif msg_json["type"] == "iot":
conn.logger.bind(tag=TAG).info(f"收到iot消息:{message}")
if "descriptors" in msg_json:
asyncio.create_task(handleIotDescriptors(conn, msg_json["descriptors"]))
if "states" in msg_json:
asyncio.create_task(handleIotStatus(conn, msg_json["states"]))
elif msg_json["type"] == "mcp":
conn.logger.bind(tag=TAG).info(f"收到mcp消息:{message[:100]}")
if "payload" in msg_json:
asyncio.create_task(
handle_mcp_message(conn, conn.mcp_client, msg_json["payload"])
)
elif msg_json["type"] == "server":
# 记录日志时过滤敏感信息
conn.logger.bind(tag=TAG).info(
f"收到服务器消息:{filter_sensitive_info(msg_json)}"
)
# 如果配置是从API读取的,则需要验证secret
if not conn.read_config_from_api:
return
# 获取post请求的secret
post_secret = msg_json.get("content", {}).get("secret", "")
secret = conn.config["manager-api"].get("secret", "")
# 如果secret不匹配,则返回
if post_secret != secret:
await conn.websocket.send(
json.dumps(
{
"type": "server",
"status": "error",
"message": "服务器密钥验证失败",
}
)
)
return
# 动态更新配置
if msg_json["action"] == "update_config":
try:
# 更新WebSocketServer的配置
if not conn.server:
await conn.websocket.send(
json.dumps(
{
"type": "server",
"status": "error",
"message": "无法获取服务器实例",
"content": {"action": "update_config"},
}
)
)
return

if not await conn.server.update_config():
await conn.websocket.send(
json.dumps(
{
"type": "server",
"status": "error",
"message": "更新服务器配置失败",
"content": {"action": "update_config"},
}
)
)
return

# 发送成功响应
await conn.websocket.send(
json.dumps(
{
"type": "server",
"status": "success",
"message": "配置更新成功",
"content": {"action": "update_config"},
}
)
)
except Exception as e:
conn.logger.bind(tag=TAG).error(f"更新配置失败: {str(e)}")
await conn.websocket.send(
json.dumps(
{
"type": "server",
"status": "error",
"message": f"更新配置失败: {str(e)}",
"content": {"action": "update_config"},
}
)
)
# 重启服务器
elif msg_json["action"] == "restart":
await conn.handle_restart(msg_json)
else:
conn.logger.bind(tag=TAG).error(f"收到未知类型消息:{message}")
except json.JSONDecodeError:
await conn.websocket.send(message)
await message_processor.process_message(conn, message)
16 changes: 16 additions & 0 deletions main/xiaozhi-server/core/handle/textHandler/abortMessageHandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Dict, Any

from core.handle.messageType import MessageType
from core.handle.textHandler.abortHandle import handleAbortMessage
from core.handle.textMessageHandler import TextMessageHandler


class AbortTextMessageHandler(TextMessageHandler):
"""Abort消息处理器"""

@property
def message_type(self) -> MessageType:
return MessageType.ABORT

async def handle(self, conn, msg_json: Dict[str, Any]) -> None:
await handleAbortMessage(conn)
38 changes: 38 additions & 0 deletions main/xiaozhi-server/core/handle/textHandler/helloMessageHandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import asyncio
import json
from typing import Dict, Any

from core.handle.messageType import MessageType
from core.handle.textMessageHandler import TextMessageHandler
from core.providers.tools.device_mcp import send_mcp_initialize_message, send_mcp_tools_list_request, MCPClient

TAG = __name__


class HelloTextMessageHandler(TextMessageHandler):
"""Hello消息处理器"""

@property
def message_type(self) -> MessageType:
return MessageType.HELLO

async def handle(self, conn, msg_json: Dict[str, Any]) -> None:
audio_params = msg_json.get("audio_params")
if audio_params:
audio_format = audio_params.get("format")
conn.logger.bind(tag=TAG).info(f"客户端音频格式: {audio_format}")
conn.audio_format = audio_format
conn.welcome_msg["audio_params"] = audio_params
features = msg_json.get("features")
if features:
conn.logger.bind(tag=TAG).info(f"客户端特性: {features}")
conn.features = features
if features.get("mcp"):
conn.logger.bind(tag=TAG).info("客户端支持MCP")
conn.mcp_client = MCPClient()
# 发送初始化
asyncio.create_task(send_mcp_initialize_message(conn))
# 发送mcp消息,获取tools列表
asyncio.create_task(send_mcp_tools_list_request(conn))

await conn.websocket.send(json.dumps(conn.welcome_msg))
20 changes: 20 additions & 0 deletions main/xiaozhi-server/core/handle/textHandler/iotMessageHandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import asyncio
from typing import Dict, Any

from core.handle.messageType import MessageType
from core.handle.textMessageHandler import TextMessageHandler
from core.providers.tools.device_iot import handleIotDescriptors, handleIotStatus


class IotTextMessageHandler(TextMessageHandler):
"""IoT消息处理器"""

@property
def message_type(self) -> MessageType:
return MessageType.IOT

async def handle(self, conn, msg_json: Dict[str, Any]) -> None:
if "descriptors" in msg_json:
asyncio.create_task(handleIotDescriptors(conn, msg_json["descriptors"]))
if "states" in msg_json:
asyncio.create_task(handleIotStatus(conn, msg_json["states"]))
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
import time
from typing import Dict, Any

from core.handle.messageType import MessageType
from core.handle.receiveAudioHandle import startToChat, handleAudioMessage
from core.handle.reportHandle import enqueue_asr_report
from core.handle.textMessageHandler import TextMessageHandler
from core.utils.util import remove_punctuation_and_length

TAG = __name__


class ListenTextMessageHandler(TextMessageHandler):
"""Listen消息处理器"""

@property
def message_type(self) -> MessageType:
return MessageType.LISTEN

async def handle(self, conn, msg_json: Dict[str, Any]) -> None:
if "mode" in msg_json:
conn.client_listen_mode = msg_json["mode"]
conn.logger.bind(tag=TAG).debug(
f"客户端拾音模式:{conn.client_listen_mode}"
)
if msg_json["state"] == "start":
conn.client_have_voice = True
conn.client_voice_stop = False
elif msg_json["state"] == "stop":
conn.client_have_voice = True
conn.client_voice_stop = True
if len(conn.asr_audio) > 0:
await handleAudioMessage(conn, b"")
elif msg_json["state"] == "detect":
conn.client_have_voice = False
conn.asr_audio.clear()
if "text" in msg_json:
conn.last_activity_time = time.time() * 1000
original_text = msg_json["text"] # 保留原始文本
filtered_len, filtered_text = remove_punctuation_and_length(
original_text
)
# 识别是否是唤醒词
is_wakeup_words = filtered_text in conn.config.get("wakeup_words")
if not is_wakeup_words:
# 上报纯文字数据(复用ASR上报功能,但不提供音频数据)
enqueue_asr_report(conn, original_text, [])
# 否则需要LLM对文字内容进行答复
await startToChat(conn, original_text)
20 changes: 20 additions & 0 deletions main/xiaozhi-server/core/handle/textHandler/mcpMessageHandler.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import asyncio
from typing import Dict, Any

from core.handle.messageType import MessageType
from core.handle.textMessageHandler import TextMessageHandler
from core.providers.tools.device_mcp import handle_mcp_message


class McpTextMessageHandler(TextMessageHandler):
"""MCP消息处理器"""

@property
def message_type(self) -> MessageType:
return MessageType.MCP

async def handle(self, conn, msg_json: Dict[str, Any]) -> None:
if "payload" in msg_json:
asyncio.create_task(
handle_mcp_message(conn, conn.mcp_client, msg_json["payload"])
)
Loading