# audio_recorder.py import pyaudio import threading from typing import Optional _DEFAULTS = dict( format=pyaudio.paInt16, channels=1, # 必须为 1 rate=16000, # 必须为 16 k chunk=1600, # 100 ms ) class _AudioRecorder: def __init__(self, **kw): self.format = kw.get('format', _DEFAULTS['format']) self.channels = kw.get('channels', _DEFAULTS['channels']) self.rate = kw.get('rate', _DEFAULTS['rate']) self.chunk = kw.get('chunk', _DEFAULTS['chunk']) self._pa = pyaudio.PyAudio() self._stream = self._pa.open(format=self.format, channels=self.channels, rate=self.rate, input=True, frames_per_buffer=self.chunk) def get(self, size: int) -> bytes: return self._stream.read(size, exception_on_overflow=False) def close(self): self._stream.close() self._pa.terminate() # ---------------- 模块级单例接口 ---------------- _recorder: Optional[_AudioRecorder] = None def init(**kw): global _recorder if _recorder is not None: raise RuntimeError("recorder 已经初始化过了") _recorder = _AudioRecorder(**kw) def get(size: int) -> bytes: if _recorder is None: raise RuntimeError("请先调用 audio_recorder.init(...)") return _recorder.get(size) def close(): global _recorder if _recorder is not None: _recorder.close() _recorder = None