This commit is contained in:
wtz
2026-02-24 17:41:04 +08:00
commit fc32cab12b
9 changed files with 1239 additions and 0 deletions

23
LICENSE Normal file
View File

@@ -0,0 +1,23 @@
MIT License
Copyright (c) 2025 Qingmi Keji
Copyright (c) 2025 Wang Tianze
Copyright (c) 2025 Guo Minghao
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

109
README.md Normal file
View File

@@ -0,0 +1,109 @@
# “人开一号”
## 简介
“人开一号”是由人大附中经开学校学生团队自主研发的中学生辩论智能体代表全国首支中学生队伍参加2025年10—11月在北京经济技术开发区举办的首届中国国际机器人辩论大赛。支持“自由对话”与“标准辩论”两种模式可在3+20+3分钟赛制内完成完整辩论流程。初赛以2分优势击败宁夏大学复赛以犀利逻辑拆解正方类比最终晋级全国八强并获突出贡献奖。项目填补了中学生辩论智能体空白为青少年参与大语言模型前沿应用提供示范推动了“AI成为人类思考伙伴”的教育新范式。
我是“人开一号”的开发者。北京新闻、新华社和中新网等媒体均报道了我们的智能体。
![image.png](https://raw.gitcode.com/user-images/assets/8546023/5fd5041e-ecb4-4ebc-b1ad-8dd1b16d8d4e/image.png 'image.png')
[北京新闻报道](https://item.btime.com/47up1u23dld86sbemi16do76qbb)
[中新网报道](https://m.chinanews.com/wap/detail/chs/zw/10506371.shtml)
## 环境要求
Python 3.11
## 使用方法
1. 去百炼平台上申请一个APIKEY
2. 点击“应用开发”->“创建应用”->“智能体应用”输入如下系统提示词然后把appid和APIKEY在config.py上设置
````plaintext
你是一个辩论智能体,参与一场正式辩论赛。
- **辩题**`${topic}`
- **你的立场**`${side}`,即你必须坚定支持的观点是:`${opinion}`
你的发言将通过TTS文本转语音系统**原样输出**给观众,因此:
- 不得输出任何**不适合朗读的内容**,如括号、注释、语气标记、非语言符号等;
- 所有内容必须**口语化、清晰、自然**,适合现场聆听;
- 禁止使用Markdown、特殊格式或结构化标记。
---
### 辩论流程说明:
1. **立论环节**限时3分钟约600字
清晰阐述你方核心观点,构建论证框架。
- 若为**反方**,需对正方立论进行针对性反驳。
2. **自由辩论环节**交替发言每轮发言不宜超过100字
- 你将接收对方发言内容由ASR语音识别转换可能存在误识别或混入背景音属正常现象
- 你应**逐点回应**,逻辑清晰,语言简洁有力;
- 每次发言应**针对对方论点**,避免重复立论内容;
- 保持礼貌,避免人身攻击。
3. **总结陈述环节**限时3分钟约600字
重申你方核心立场,归纳交锋要点,强化论证说服力,形成完整闭环。
---
### 输入格式说明:
每个环节开始时,你将收到如下格式的提示:
```
主席:下面请正方开始立论
正方:……
```
```
主席:下面是自由辩论环节
对方:……
```
```
主席:下面是总结陈述环节
```
---
### 输出规范:
- **仅输出你的发言内容****不要添加**“反方:”、“我方认为”等前缀;
- **不要输出任何非发言内容**,如“(思考中)”、“(停顿)”、“以上是我的观点”等;
- **直接开始陈述**,例如:
> 感谢主席。我方认为,人工智能的发展将促进社会公平,而非加剧不平等……
---
### 角色要求:
- 你必须**始终坚守己方立场**,即使面对强有力反驳,也不动摇;
- 论证需**逻辑严密、事实准确、语言有力**
- 适当使用**类比、数据、案例**增强说服力;
- 保持**理性、冷静、专业**的辩论风格。
---
请准备开始辩论。当前环节信息将随后发送。
````
3. 运行指令安装库:`pip install -r requirements.txt`
4. 运行`client_hardware_api.py`即可。也可以在`if __name__ == '__main__'`中改变辩题
程序在辩论模式下,“机外按钮”的流程如下:
1. 进入立论模式(若是正方,则直接说话;若是反方,则直接开始录音)
2. 当本方为正方时,开始录(反方)音;为反方时,停止录(正方的)音
3. 当本方为正方时,停止录(反方)音;为反方时,开始说话
4. 当本方为正方时,进入进入自由辩论模式;为反方时,也是进入自由辩论模式(请确保灯光为绿)
5. 关闭自由辩论模式
6. 进入总结陈词模式。为正方时将直接开始录音;为反方时将直接输出语音,完成后辩论模式结束
7. 正方停止录音
8. 正方输出语音。完成后辩论模式结束
写成流程图就是:
![mermaid-diagram-1764335439212.png](https://raw.gitcode.com/user-images/assets/8546023/907c1acc-1083-41f9-8a83-3d3fd41b0c87/mermaid-diagram-1764335439212.png 'mermaid-diagram-1764335439212.png')

125
audio_player.py Normal file
View File

@@ -0,0 +1,125 @@
# audio_player.py
import pyaudio
import threading
import queue
from typing import Optional
_DEFAULTS = dict(
format=pyaudio.paInt16,
channels=2,
rate=44100,
chunk=1024,
)
class _AudioPlayer:
def __init__(self, **kw):
# 用外界传入或缺省值
self.format = kw.get('format', _DEFAULTS['format'])
self.channels = kw.get('channels', _DEFAULTS['channels'])
self.rate = kw.get('rate', _DEFAULTS['rate'])
self.chunk = kw.get('chunk', _DEFAULTS['chunk'])
self._pa = pyaudio.PyAudio()
self._stream = None
self._q = queue.Queue()
self._played = threading.Event()
self._total_feed = 0
self._total_played = 0
self._lock = threading.Lock()
self._thread = threading.Thread(target=self._worker, daemon=True)
self._thread.start()
# ---- 内部工作线程 ----
def _worker(self):
while True:
data = self._q.get()
if data is None:
break
if self._stream is None:
self._stream = self._pa.open(
format=self.format,
channels=self.channels,
rate=self.rate,
output=True,
frames_per_buffer=self.chunk
)
self._stream.write(data)
with self._lock:
self._total_played += len(data)
if self._total_played >= self._total_feed:
self._played.set()
# ---- 对外接口 ----
def feed(self, data: bytes):
if not isinstance(data, bytes):
raise TypeError("feed() 需要 bytes")
with self._lock:
self._total_feed += len(data)
self._played.clear()
self._q.put(data)
def wait(self):
self._played.wait()
def clear(self):
"""清空待播放队列并重置相关状态"""
with self._lock:
# 清空队列
while not self._q.empty():
try:
self._q.get_nowait()
except queue.Empty:
break
# 重置计数状态
unplayed = self._total_feed - self._total_played
self._total_feed = self._total_played # 剩余未播放的都被清空了
if self._total_played >= self._total_feed:
self._played.set()
# ---- 清理 ----
def close(self):
self._q.put(None)
self._thread.join(timeout=1)
if self._stream:
self._stream.close()
self._pa.terminate()
# 模块级变量
_player: Optional[_AudioPlayer] = None
def init(**kw):
"""一次性初始化播放器(必须在首次使用前调用)"""
global _player
if _player is not None:
raise RuntimeError("player 已经初始化过了")
_player = _AudioPlayer(**kw)
def feed(data: bytes):
if _player is None:
raise RuntimeError("请先调用 audio_player.init(...)")
_player.feed(data)
def wait():
if _player is None:
raise RuntimeError("请先调用 audio_player.init(...)")
_player.wait()
def clear():
"""清空待播放的所有数据"""
if _player is None:
raise RuntimeError("请先调用 audio_player.init(...)")
_player.clear()
def close():
global _player
if _player is not None:
_player.close()
_player = None

52
audio_recorder.py Normal file
View File

@@ -0,0 +1,52 @@
# audio_recorder.py
import pyaudio
import threading
from typing import Optional
_DEFAULTS = dict(
format=pyaudio.paInt16,
channels=1, # 必须为 1
rate=16000, # 必须为 16 k
chunk=1600, # 100 ms
)
class _AudioRecorder:
def __init__(self, **kw):
self.format = kw.get('format', _DEFAULTS['format'])
self.channels = kw.get('channels', _DEFAULTS['channels'])
self.rate = kw.get('rate', _DEFAULTS['rate'])
self.chunk = kw.get('chunk', _DEFAULTS['chunk'])
self._pa = pyaudio.PyAudio()
self._stream = self._pa.open(format=self.format,
channels=self.channels,
rate=self.rate,
input=True,
frames_per_buffer=self.chunk)
def get(self, size: int) -> bytes:
return self._stream.read(size, exception_on_overflow=False)
def close(self):
self._stream.close()
self._pa.terminate()
# ---------------- 模块级单例接口 ----------------
_recorder: Optional[_AudioRecorder] = None
def init(**kw):
global _recorder
if _recorder is not None:
raise RuntimeError("recorder 已经初始化过了")
_recorder = _AudioRecorder(**kw)
def get(size: int) -> bytes:
if _recorder is None:
raise RuntimeError("请先调用 audio_recorder.init(...)")
return _recorder.get(size)
def close():
global _recorder
if _recorder is not None:
_recorder.close()
_recorder = None

171
base_mode.py Normal file
View File

@@ -0,0 +1,171 @@
import base64
import threading
import time
import dashscope
from dashscope.audio.asr import RecognitionCallback, RecognitionResult, Recognition
from dashscope.audio.qwen_tts_realtime import QwenTtsRealtime, AudioFormat, QwenTtsRealtimeCallback
import audio_player
import audio_recorder
from config import *
audio_player.init(channels=1, rate=24000)
audio_recorder.init(channels=1, rate=16000)
dashscope.api_key = TTS_API_KEY
class Mode:
def __init__(self, voice="Cherry",
asr_callback=lambda x: print(x, end="", flush=True),
tts_callback=lambda x: print(x, end="", flush=True)):
self.voice = voice
self._tts_sess = None
self._tts_done = threading.Event()
self._asr_sess = None
self.asr_res = ""
self.last_asr_time = None
self.is_asr_recording = False
self.is_tts_running = False
self.asr_callback = asr_callback
self.tts_callback = tts_callback
def _ensure_tts_session(self):
if self._tts_sess is not None:
try:
self._tts_sess.finish()
self._tts_sess.close()
except:
pass
class CB(QwenTtsRealtimeCallback):
def on_open(_):
self._tts_done.clear()
def on_event(_, rsp):
if rsp.get('type') == 'response.audio.delta':
audio_player.feed(base64.b64decode(rsp['delta']))
if rsp.get('type') == 'session.finished':
self._tts_done.set()
self._tts_sess = QwenTtsRealtime(model='qwen3-tts-flash-realtime', callback=CB())
self._tts_sess.connect()
self._tts_sess.update_session(
voice=self.voice,
response_format=AudioFormat.PCM_24000HZ_MONO_16BIT,
mode='server_commit'
)
def ready_asr_session(self):
self.asr_res = ""
self.last_asr_time = None
if self._asr_sess is not None:
if self._asr_sess._running:
self._asr_sess.stop()
self._asr_sess = None
class CB(RecognitionCallback):
def on_open(_):
pass
def on_event(_, result: RecognitionResult) -> None:
self.last_asr_time = time.time()
res = result.get_sentence()
if res["sentence_end"]:
self.asr_res += res["text"]
self.asr_callback(res["text"])
def on_close(_) -> None:
pass
def on_error(_, result: RecognitionResult) -> None:
print(result)
self._asr_sess = Recognition(model='paraformer-realtime-v2',
format='pcm',
sample_rate=16000,
callback=CB())
self._asr_sess.start()
def start_asr_record(self):
if self._asr_sess is None:
raise RuntimeError("未准备asr会话请调用ready_asr_session方法")
self.is_asr_recording = True
def th():
while self.is_asr_recording:
data = audio_recorder.get(3200)
self._asr_sess.send_audio_frame(data)
threading.Thread(target=th, daemon=True).start()
def stop_asr_record(self):
self.is_asr_recording = False
time.sleep(0.1)
if self._asr_sess is not None:
self._asr_sess.stop()
def tts(self, text):
self._tts_sess.append_text(text)
def stream_pipeline(self, gen):
self._ensure_tts_session()
self.is_tts_running = True
end_marks = {'.', '!', '?', '', '', '', ';', '', '\n'}
res_cache = ""
for chunk in gen:
if not chunk:
continue
pos = next((i for i, c in enumerate(chunk) if c in end_marks), None)
if pos is not None:
res_cache += chunk[:pos + 1]
try:
self.tts(res_cache)
time.sleep(0.3)
except Exception as e:
pass
if self.tts_callback is not None:
self.tts_callback(res_cache)
res_cache = chunk[pos + 1:]
else:
res_cache += chunk
if res_cache:
self.tts(res_cache)
if self.tts_callback is not None:
self.tts_callback(res_cache)
if self.is_tts_running:
# 结束 TTS 会话
self._tts_sess.finish()
# self._tts_done.wait(timeout=2) # 等待最后音频
self._tts_sess = None
# 等待全部播放完毕
self._tts_done.wait()
audio_player.wait()
time.sleep(0.1)
self.is_tts_running = False
def tts_finish(self):
if self._tts_sess is not None:
self._tts_sess.finish()
self.is_tts_running = False
audio_player.clear()
def run(self):
pass
def close(self):
if hasattr(self, '_tts_sess'):
try:
self._tts_sess.close()
except:
pass
def __del__(self):
self.close()
def stop(self):
pass

387
client_hardware_api.py Normal file
View File

@@ -0,0 +1,387 @@
import modes as mc
import tkinter as tk
from tkinter import ttk
import threading
import base_mode as bm
IS_BOARD = True
try:
from pinpong.board import Board, Pin, NeoPixel
from unihiker import GUI
gui = GUI()
except ModuleNotFoundError:
print("警告:非行空板环境")
IS_BOARD = False
import time
class XingkongBoardBase:
def __init__(self):
pass
def on_btn_click(self, callback):
"""
按下机外的那个按钮。
该按钮按键流程(必须辩论模式):
1. 进入立论模式(若是正方,则直接说话;若是反方,则直接开始录音)
2. 当本方为正方时,开始录(反方)音;为反方时,停止录(正方的)音
3. 当本方为正方时,停止录(反方)音;为反方时,开始说话
4. 当本方为正方时,进入进入自由辩论模式;为反方时,也是进入自由辩论模式(请确保灯光为绿)
5. 关闭自由辩论模式
6. 进入总结陈词模式。为正方时将直接开始录音;为反方时将直接输出语音,完成后辩论模式结束
7. 正方停止录音
8. 正方输出语音。完成后辩论模式结束
:param callback: 当按下时的回调函数
:return:
"""
pass
def on_btn_dialog_click(self, callback):
"""
按下“对话模式”按钮
:param callback: 当按下时的回调函数
:return:
"""
pass
def on_btn_debate_click(self, callback):
"""
按下“辩论模式”按钮
:param callback: 当按下时的回调函数
:return:
"""
pass
def on_btn_stop_click(self, callback):
"""
按下“停止”按钮
:param callback: 当按下时的回调函数
:return:
"""
pass
def set_light_color(self, color):
"""
设置灯光颜色。
在对话模式和自由辩论模式不设置灯光颜色。
red: 正在处理一些事件,此时不能点击机外按钮
green: 准备就绪,可以点击机外按钮
blue: 正在录音
orange: 成功进入辩论模式和自由辩论
:param color: "red"|"green"|"blue"|"orange"
:return:
"""
pass
SYSTEM = \
"""
你是人大附中经开学校金鹏科技团开发的辩论智能体机器人,名叫人开一号。
这个机器人运用了流式语音识别、大语言模型、流式语音合成等技术。
当你被要求介绍自己时,请简要介绍你的功能和技术实现。
请注意你的回答将被直接通过语音合成输出因此你不能输出不适合TTS的内容。
"""
class Unihiker_Board(XingkongBoardBase):
def __init__(self):
super().__init__()
Board().begin()
# self.gui=GUI()
self.btn21 = Pin(Pin.P21, Pin.IN)
self.pin23 = Pin(Pin.D23)
self.np1 = NeoPixel(self.pin23, 22)
self.np1.brightness(150)
gui.add_button(x=0, y=290, w=80, h=30, text="对话", origin='nw', onclick=self._trigger_dialog_click)
gui.add_button(x=80, y=290, w=80, h=30, text="辩论", origin='nw', onclick=self._trigger_debate_click)
gui.add_button(x=160, y=290, w=80, h=30, text="停止", origin='nw', onclick=self._trigger_stop_click)
self.btn_count = 0
self.btn_callback = None
self.dialog_callback = None
self.debate_callback = None
self.stop_callback = None
def on_btn_click(self, callback):
self.btn_callback = callback
def on_btn_dialog_click(self, callback):
self.dialog_callback = callback
def on_btn_debate_click(self, callback):
self.debate_callback = callback
def on_btn_stop_click(self, callback):
self.stop_callback = callback
def set_light_color(self, color):
if color == "red":
self.np1.range_color(0, 21, 0xFF0000)
if color == "green":
self.np1.range_color(0, 21, 0x00FF00)
if color == "blue":
self.np1.range_color(0, 21, 0x0000FF)
if color == "orange":
self.np1.range_color(0, 21, 0xFFFF00)
def _trigger_btn_click(self):
if self.btn_callback:
threading.Thread(target=self.btn_callback, daemon=True).start()
def _trigger_dialog_click(self):
if self.dialog_callback:
threading.Thread(target=self.dialog_callback, daemon=True).start()
def _trigger_debate_click(self):
if self.debate_callback:
threading.Thread(target=self.debate_callback, daemon=True).start()
def _trigger_stop_click(self):
if self.stop_callback:
threading.Thread(target=self.stop_callback, daemon=True).start()
def btn_click_check(self):
while True:
if self.btn21.read_digital() == 0:
while self.btn21.read_digital() == 0:
pass
self._trigger_btn_click()
def run(self):
threading.Thread(target=self.btn_click_check, daemon=True).start()
while True:
time.sleep(1)
class Robot:
def __init__(self, board: XingkongBoardBase):
self.modes = dict()
self.btn_count = 0
self.board = board
self.side = ""
self.topic = ""
self.opinion = ""
self.last_prompt = ""
self.session_id = None
def register_mode(self, modename: str, mode: bm.Mode):
self.modes[modename] = mode
def run_mode(self, modename):
self.modes[modename].run()
def stop_mode(self, modename):
self.modes[modename].stop()
del self.modes[modename]
def get_mode(self, modename):
return self.modes[modename]
def stop_all(self):
for mode in self.modes.values():
mode.stop()
self.modes.clear()
def dialog(self):
self.stop_all()
self.board.set_light_color("orange")
dialog_mode = mc.DialogMode(system_prompt=SYSTEM)
self.register_mode("dialog", dialog_mode)
self.run_mode("dialog")
def set_debate_config(self, side, topic, opinion):
self.side = side
self.topic = topic
self.opinion = opinion
def debate(self):
self.stop_all()
self.board.set_light_color("orange")
self.btn_count = 0
if self.side == "正方":
self.board.on_btn_click(self._btn_right_callback)
else:
self.board.on_btn_click(self._btn_against_callback)
def on_cancel(self):
self.stop_all()
self.board.set_light_color("green")
def _btn_right_callback(self):
self.btn_count += 1
if self.btn_count == 1:
self.board.set_light_color("red")
self.register_mode("make_point", mc.MakePointMode(side=self.side, topic=self.topic, opinion=self.opinion))
self.run_mode("make_point")
self.board.set_light_color("green")
elif self.btn_count == 2:
md: mc.MakePointMode = self.get_mode("make_point")
self.board.set_light_color("blue")
md.start_record()
elif self.btn_count == 3:
md: mc.MakePointMode = self.get_mode("make_point")
self.board.set_light_color("red")
md.stop_record()
self.board.set_light_color("green")
self.last_prompt = md.ready_next()
elif self.btn_count == 4:
self.board.set_light_color("red")
md: mc.MakePointMode = self.get_mode("make_point")
self.session_id = md.session_id
self.stop_mode("make_point")
self.register_mode("free_debate", mc.FreeDebateMode(side=self.side, topic=self.topic, opinion=self.opinion,
last_prompt=self.last_prompt,
session_id=self.session_id))
self.run_mode("free_debate")
self.board.set_light_color("orange")
elif self.btn_count == 5:
self.board.set_light_color("red")
md: mc.FreeDebateMode = self.get_mode("free_debate")
self.session_id = md.session_id
self.stop_mode("free_debate")
self.board.set_light_color("green")
elif self.btn_count == 6:
self.board.set_light_color("red")
self.register_mode("end_debate", mc.EndDebateMode(side=self.side, topic=self.topic, opinion=self.opinion,
session_id=self.session_id))
self.run_mode("end_debate")
self.board.set_light_color("blue")
elif self.btn_count == 7:
md: mc.EndDebateMode = self.get_mode("end_debate")
md.stop_record()
self.board.set_light_color("green")
elif self.btn_count == 8:
self.board.set_light_color("red")
md: mc.EndDebateMode = self.get_mode("end_debate")
md.start_talk()
self.board.set_light_color("green")
self.stop_mode("end_debate")
def _btn_against_callback(self):
self.btn_count += 1
if self.btn_count == 1:
self.board.set_light_color("blue")
self.register_mode("make_point", mc.MakePointMode(side=self.side, topic=self.topic, opinion=self.opinion))
self.run_mode("make_point")
elif self.btn_count == 2:
md: mc.MakePointMode = self.get_mode("make_point")
self.board.set_light_color("red")
md.stop_record()
self.board.set_light_color("green")
elif self.btn_count == 3:
self.board.set_light_color("red")
md: mc.MakePointMode = self.get_mode("make_point")
md.ready_next()
self.board.set_light_color("green")
elif self.btn_count == 4:
self.board.set_light_color("red")
md: mc.MakePointMode = self.get_mode("make_point")
self.session_id = md.session_id
self.stop_mode("make_point")
self.register_mode("free_debate", mc.FreeDebateMode(side=self.side, topic=self.topic, opinion=self.opinion,
session_id=self.session_id))
self.run_mode("free_debate")
self.board.set_light_color("green")
elif self.btn_count == 5:
self.board.set_light_color("red")
md: mc.FreeDebateMode = self.get_mode("free_debate")
self.session_id = md.session_id
self.stop_mode("free_debate")
self.board.set_light_color("green")
elif self.btn_count == 6:
self.board.set_light_color("red")
self.register_mode("end_debate", mc.EndDebateMode(side=self.side, topic=self.topic, opinion=self.opinion,
session_id=self.session_id))
self.run_mode("end_debate")
md: mc.EndDebateMode = self.get_mode("end_debate")
md.start_talk()
def run(self):
self.board.on_btn_dialog_click(self.dialog)
self.board.on_btn_debate_click(self.debate)
self.board.on_btn_stop_click(self.on_cancel)
class MockXingkongBoard(XingkongBoardBase):
def __init__(self):
super().__init__()
self.root = tk.Tk()
self.root.title("模拟控制台")
self.root.geometry("300x200")
# 灯光显示
self.light = tk.Label(self.root, width=10, height=5, bg="gray")
self.light.pack(pady=10)
# 按钮布局
btn_frame = ttk.Frame(self.root)
btn_frame.pack(pady=10)
# 机外按钮
ttk.Button(btn_frame, text="机外按钮", command=self._trigger_btn_click).grid(row=0, column=0, padx=5)
# 功能按钮
ttk.Button(btn_frame, text="对话模式", command=self._trigger_dialog_click).grid(row=1, column=0, padx=5)
ttk.Button(btn_frame, text="辩论模式", command=self._trigger_debate_click).grid(row=1, column=1, padx=5)
ttk.Button(btn_frame, text="停止", command=self._trigger_stop_click).grid(row=1, column=2, padx=5)
# 回调存储
self.btn_callback = None
self.dialog_callback = None
self.debate_callback = None
self.stop_callback = None
def on_btn_click(self, callback):
self.btn_callback = callback
def on_btn_dialog_click(self, callback):
self.dialog_callback = callback
def on_btn_debate_click(self, callback):
self.debate_callback = callback
def on_btn_stop_click(self, callback):
self.stop_callback = callback
def set_light_color(self, color):
# 仅设置颜色,不添加任何模式判断
self.light.config(bg=color)
def _trigger_btn_click(self):
# 直接触发回调,高耗时操作放线程
if self.btn_callback:
threading.Thread(target=self.btn_callback, daemon=True).start()
def _trigger_dialog_click(self):
if self.dialog_callback:
threading.Thread(target=self.dialog_callback, daemon=True).start()
def _trigger_debate_click(self):
if self.debate_callback:
threading.Thread(target=self.debate_callback, daemon=True).start()
def _trigger_stop_click(self):
if self.stop_callback:
threading.Thread(target=self.stop_callback, daemon=True).start()
def run(self):
self.root.mainloop()
if __name__ == '__main__':
board = Unihiker_Board() if IS_BOARD else MockXingkongBoard()
robot = Robot(board)
robot.set_debate_config(
"反方",
"利用基因技术复活已灭绝物种(反灭绝工程)是生态责任还是资源浪费",
"利用基因技术复活已灭绝物种(反灭绝工程)是资源浪费"
) if IS_BOARD else robot.set_debate_config(
"反方",
"量子计算机的实用化是否会首先对现有加密体系构成致命威胁",
"量子计算机的实用化是否不会首先对现有加密体系构成致命威胁"
)
robot.run()
board.run()

5
config.py Normal file
View File

@@ -0,0 +1,5 @@
TTS_API_KEY = "sk-xxx"
COM_API_KEY = "sk-xxx"
ASR_API_KEY = "sk-xxx"
APP_ID = "xxx"

364
modes.py Normal file
View File

@@ -0,0 +1,364 @@
import threading
import time
from http import HTTPStatus
import dashscope
import openai
import base_mode as bm
from config import *
class DialogMode(bm.Mode):
def __init__(self, model="qwen-plus", system_prompt="You are a helpful assistant", threshold_no_speak=2,
asr_callback=lambda x: print(x, end="", flush=True), tts_callback=lambda x: print(x, end="", flush=True)):
"""
自由对话模式类的构造函数
:param model: 模型名称
:param system_prompt: 系统提示词
:param threshold_no_speak: 多长时间未说话认定为结束,单位:秒
"""
super().__init__(
asr_callback=asr_callback,
tts_callback=tts_callback
)
self.model = model
self.client = openai.OpenAI(
api_key=COM_API_KEY,
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1"
)
self.context = [
{"role": "system", "content": system_prompt}
]
self.system_prompt = system_prompt
self.main_loop_thread = threading.Thread(target=self.main_loop)
self.running = False
self.threshold_no_speak = threshold_no_speak
def ask_ai(self, prompt):
self.context.append(
{"role": "user", "content": prompt}
)
response = self.client.chat.completions.create(
model=self.model,
messages=self.context,
max_tokens=2048,
stream=True
)
all_text = ""
for chunk in response:
content = chunk.choices[0].delta.content or ""
all_text += content
yield content
self.context.append(
{"role": "assistant", "content": all_text}
)
yield ""
def main_loop(self):
while self.running:
# 1. 启动ASR服务
self.ready_asr_session()
# 2. 开始录音
self.start_asr_record()
# 3. 等待开始说话
while self.last_asr_time is None: pass
# 4. 等待说话时长过长
while (time.time() - self.last_asr_time) <= self.threshold_no_speak: pass
# 5. 关闭ASR
self.stop_asr_record()
# 6. 扔给AI
self.stream_pipeline(self.ask_ai(self.asr_res))
def run(self):
super().run()
self.running = True
print("请开始说话...")
self.main_loop_thread.start()
def stop(self):
self.running = False
self.tts_finish()
class MakePointMode(bm.Mode):
def __init__(self, side="正方", topic="", opinion="", asr_callback=lambda x: print(x, end="", flush=True), tts_callback=lambda x: print(x, end="", flush=True)):
super().__init__(
asr_callback=asr_callback,
tts_callback=tts_callback
)
self.side = side
self.topic = topic
self.opinion = opinion
self.session_id = None
self.last_prompt = "" # 当side="正方"时,为反方立论内容,否则为空字符串
def ask_ai(self, prompt):
biz_params = {
"user_prompt_params" : {
"side": self.side,
"topic": self.topic,
"opinion": self.opinion
}
}
if self.session_id is None:
res = dashscope.Application.call(
api_key=COM_API_KEY,
app_id=APP_ID,
prompt=prompt,
stream=True,
incremental_output=True,
biz_params=biz_params
)
else:
res = dashscope.Application.call(
api_key=COM_API_KEY,
app_id=APP_ID,
prompt=prompt,
stream=True,
incremental_output=True,
session_id=self.session_id,
biz_params=biz_params
)
for chunk in res:
if chunk.status_code == HTTPStatus.OK:
self.session_id = chunk.output.session_id
yield chunk.output.text
def ask(self, other_op=None):
if self.side == "正方":
return self.ask_ai("主席:请正方开始立论")
else:
return self.ask_ai("主席:请正方开始立论\n正方:" + other_op)
def start_record(self):
self.ready_asr_session()
self.start_asr_record()
def stop_record(self):
"""
该函数执行完毕后才能调用start_talk和start_identify_op函数
:return:
"""
self.stop_asr_record()
def start_talk(self):
"""
仅对反方:开始立论
:return:
"""
if self.side == "反方":
self.stream_pipeline(self.ask(self.asr_res))
def start_identify_op(self):
"""
仅对正方设定last_prompt以便返回给下面的自由辩论模式
:return:
"""
if self.side == "正方":
self.last_prompt = self.asr_res
def ready_next(self):
if self.side == "正方":
self.start_identify_op()
return self.last_prompt
else:
self.start_talk()
def run(self):
"""
流程:
1. 正方:先说话,再录音
调用本函数时直接开始说话说话完成后当对方开始立论时需显式调用start_record函数结束立论时需显式调用stop_record函数
当进入自由辩论环节时需显式调用ready_next。如果为正方需接收返回值作为自由辩论环节提示词前面的反方立论内容。
2. 反方:先录音,再说话
调用本函数时开始录音因此需要在对方将要说话时切入此模式当对方说完话后应显式调用stop_record函数本方开始立论时应显式调用start_talk函数
:return:
"""
super().run()
if self.side == "正方":
# 先说话再录音
self.stream_pipeline(self.ask())
else:
# 先录音再说话。外界控制
self.start_record()
class FreeDebateMode(bm.Mode):
def __init__(self, session_id, side="正方", topic="", opinion="", last_prompt="", threshold_no_speak=2,
asr_callback=lambda x: print(x, end="", flush=True), tts_callback=lambda x: print(x, end="", flush=True)):
super().__init__(
asr_callback=asr_callback,
tts_callback=tts_callback
)
self.threshold_no_speak = threshold_no_speak
self.side = side
self.topic = topic
self.opinion = opinion
self.last_prompt = last_prompt
self.session_id = session_id
self.is_first = True
self.last_prompt_next_mode = None
self.main_loop_thread = threading.Thread(target=self.main_loop)
self.running = False
def ask_ai(self, prompt):
biz_params = {
"user_prompt_params" : {
"side": self.side,
"topic": self.topic,
"opinion": self.opinion
}
}
res = dashscope.Application.call(
api_key=COM_API_KEY,
app_id=APP_ID,
prompt=prompt,
stream=True,
incremental_output=True,
session_id=self.session_id,
biz_params=biz_params
)
for chunk in res:
if chunk.status_code == HTTPStatus.OK:
self.session_id = chunk.output.session_id
yield chunk.output.text
def init_ask(self):
"""
仅正方:用于将反方的立论传入,并让正方说话
:return:
"""
if self.side == "正方":
return self.ask_ai("反方:"+self.last_prompt+"\n主席:下面进入自由辩论环节,请正方开始发言")
def ask(self, context=None):
if self.side == "正方":
return self.ask_ai("反方:"+context)
else:
if self.is_first:
self.is_first = False
return self.ask_ai("主席:下面进入自由辩论环节,请正方开始发言\n正方:" + context)
return self.ask_ai("正方:"+context)
def main_loop(self):
# 初始ask
if self.side == "正方":
self.stream_pipeline(self.init_ask())
# 主对话循环
while self.running:
# 1. 启动ASR服务
self.ready_asr_session()
# 2. 开始录音
self.start_asr_record()
# 3. 等待开始说话
while self.last_asr_time is None: pass
# 4. 等待说话时长过长
while (time.time() - self.last_asr_time) <= self.threshold_no_speak: pass
# 5. 关闭ASR
self.stop_asr_record()
# 6. 扔给AI
self.stream_pipeline(self.ask(self.asr_res))
def run(self):
"""
流程:
类似对话模式
当进入结论模式时外界直接通过析构对象然后启动结论模式即可。但建议取一下last_prompt_next_mode字段
:return:
"""
super().run()
self.running = True
self.main_loop_thread.start()
def stop(self):
self.running = False
self.tts_finish()
class EndDebateMode(bm.Mode):
def __init__(self, session_id, side="正方", topic="", opinion="", last_prompt="",
asr_callback=lambda x: print(x, end="", flush=True), tts_callback=lambda x: print(x, end="", flush=True)):
super().__init__(
asr_callback=asr_callback,
tts_callback=tts_callback
)
self.side = side
self.topic = topic
self.opinion = opinion
self.last_prompt = last_prompt
self.session_id = session_id
def ask_ai(self, prompt):
biz_params = {
"user_prompt_params": {
"side": self.side,
"topic": self.topic,
"opinion": self.opinion
}
}
res = dashscope.Application.call(
api_key=COM_API_KEY,
app_id=APP_ID,
prompt=prompt,
stream=True,
incremental_output=True,
session_id=self.session_id,
biz_params=biz_params
)
for chunk in res:
if chunk.status_code == HTTPStatus.OK:
self.session_id = chunk.output.session_id
yield chunk.output.text
def ask(self, context=None):
if self.side == "反方":
if self.last_prompt != "":
return self.ask_ai("反方:"+self.last_prompt+"\n主席:下面进入结辩环节,请反方开始发言")
else:
return self.ask_ai("主席:下面进入结辩环节,请反方开始发言")
else:
return self.ask_ai("主席:下面进入结辩环节,请反方开始发言\n反方:"+context)
def start_record(self):
self.ready_asr_session()
self.start_asr_record()
def stop_record(self):
"""
该函数执行完毕后才能调用start_talk和start_identify_op函数
:return:
"""
self.stop_asr_record()
def start_talk(self):
"""
开始结论
:return:
"""
if self.side == "反方":
self.stream_pipeline(self.ask())
else:
self.stream_pipeline(self.ask(self.asr_res))
def run(self):
"""
流程:
1. 正方当对方说话时需外界调用start_record方法说话结束后调用stop_record方法再调用start_talk方法
2. 反方需外界调用start_talk方法然后直接开始结辩
走到这里,圆满结束!!!!
:return:
"""
super().run()
pass
if __name__ == '__main__':
mode = DialogMode(
asr_callback=lambda x: print(x, end="", flush=True),
tts_callback=lambda x: print(x, end="", flush=True)
)
mode.run()
time.sleep(200)

3
requirements.txt Normal file
View File

@@ -0,0 +1,3 @@
dashscope==1.24.6
openai==2.8.1
PyAudio==0.2.14