備忘錄_20160105(定位) 修改 回首頁

程式 2025-10-09 10:13:19 1759976000 100
運用 python 讓電腦即時監聽,錄音,辨識成文字。當聽到 再見 時,程式就會停下來。----語音助理前哨站(opus)

運用 python 讓電腦即時監聽,錄音,辨識成文字。當聽到 再見 時,程式就會停下來。----語音助理前哨站(opus)

import threading
import time

# pyenv shell 3.13.1
# sudo apt install portaudio19-dev
# pip install pyaudio
import pyaudio

import numpy as np
import wave

# pyenv shell 3.13.1
# pip3 install torch torchvision --index-url https://download.pytorch.org/whl/cpu
import torch 

# pyenv shell 3.13.1
# sudo apt install ffmpeg 
# pip install -U openai-whisper
import whisper 

# pyenv shell 3.13.1
# pip install opencc
from opencc import OpenCC

# pip install soundfile
import soundfile

import subprocess

import io

iIndexWrite=0
iIndexRead=0
iCount=0
iMax=10
booStop=False

def getFDecibel(oAudioData):
    oAudioData=oAudioData.astype(np.float32) # 轉換為浮點數,避免整數溢出
    fRMS=np.sqrt(np.mean(np.square(oAudioData))) # 計算rms
    if fRMS>0:
        fDecibel=20*np.log10(fRMS+1e-10) # 避免 log(0) 錯誤
    else:
        fDecibel=-np.inf
    return fDecibel

def getStrAduioFilename(iIndex):
    return "rec_"+str(iIndex)+".ogg"

def threadA():
    
    global iIndexWrite, iIndexRead, iCount, iMax, booStop
    
    CHUNK=1024 # 單次讀取的樣本數
    FORMAT=pyaudio.paInt16 # 音訊格式(16-bit)
    CHANNELS=1 # 單聲道
    RATE=16000 # 取樣率(Hz)
    THRESHOLD_DB=70 # 觸發錄音的分貝閥值
    SILENCE_DURATION=1 # 安靜維持幾秒後停止錄音
    DROP_DURATION=2 # 長度不足就丟棄
    CUT_DURATION=30 # 超過就截斷
    
    oAudio=pyaudio.PyAudio()
    oStream=oAudio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
    
    booRecording=False
    oaAudioData=[]
    iTotalSamples=0
    fSilenceStart=None
    
    print("開始聆聽")
    
    try:
        while booStop==False:
            # 讀取音訊數據
            oAudioData=np.frombuffer(oStream.read(CHUNK, exception_on_overflow=False), dtype=np.int16)
            fDecibel=getFDecibel(oAudioData)
            
            # print(f"目前分貝數:{fDecibel:.2f} dB")
            
            if fDecibel>THRESHOLD_DB:
                fSilenceStart=None # 重置靜音計時
            
            if booRecording==False and fDecibel>THRESHOLD_DB:
                
                print("偵測到聲音,開始錄音...")
                booRecording=True
                oaAudioData=[]
                oaAudioData.append(oAudioData)
                iTotalSamples=oAudioData.shape[0]
                
            elif booRecording==True:

                oaAudioData.append(oAudioData)
                iTotalSamples=iTotalSamples+oAudioData.shape[0]
                fSeconds=iTotalSamples/RATE
                
                if fSilenceStart is None:
                    fSilenceStart=time.time() # 開始計算靜音時間
                elif ((time.time()-fSilenceStart)>=SILENCE_DURATION) or (fSeconds>CUT_DURATION):
                    
                    if fSeconds<DROP_DURATION:
                        print("不足"+str(DROP_DURATION)+"秒,不予儲存")
                    else:
                        
                        if fSeconds>CUT_DURATION:
                            print("超過"+str(CUT_DURATION)+"秒,截斷錄音。("+str(fSeconds)+")")
                        else:
                            print("偵測到靜音,停止錄音。("+str(fSeconds)+")")
                        
                        if iCount<iMax:
                            
                            oTotalAudioData=np.concatenate(oaAudioData)
                            
                            # 寫入 wav 到記憶體
                            oWaveBuffer=io.BytesIO()
                            soundfile.write(oWaveBuffer, oTotalAudioData, RATE, format="wav")
                            oWaveBuffer.seek(0)
                            
                            # 壓縮成 Opus 存入記憶體
                            oOpusBuffer=io.BytesIO()
                            oProcess=subprocess.Popen(
                              ['ffmpeg', '-i', 'pipe:0', '-c:a', 'libopus', '-b:a', '32k', '-f', 'ogg', 'pipe:1'],
                              stdin=subprocess.PIPE,
                              stdout=subprocess.PIPE,
                              stderr=subprocess.DEVNULL
                            )
                            oOpusData,_=oProcess.communicate(input=oWaveBuffer.read())
                            oOpusBuffer.write(oOpusData)
                            oOpusBuffer.seek(0)
                            
                            strFilename=getStrAduioFilename(iIndexWrite)
                            print(f"正在儲存錄音檔案:{strFilename}")
                            with open(strFilename, "wb") as oF:
                              oF.write(oOpusBuffer.getvalue())
                            print("錄音儲存完成!")
                            
                            oWaveBuffer.close()
                            oOpusBuffer.close()
                            
                            iIndexWrite=(iIndexWrite+1) % iMax
                            iCount=iCount+1
                        else:
                            print("空間不足,不予儲存")
                    
                    booRecording=False
                    oaAudioData=[]
                    iTotalSamples=0
                    fSilenceStart=None
            
    except KeyboardInterrupt:
        booStop=True
        print("手動結束聆聽")
    
    # 停止並關閉音訊流
    oStream.stop_stream()
    oStream.close()
    oAudio.terminate()

def threadB():
    
    global iIndexWrite, iIndexRead, iCount, iMax, booStop
    
    strDevice="cuda" if torch.cuda.is_available() else "cpu"
    booCuda=True if strDevice=="cuda" else False
    oModel=whisper.load_model("tiny").to(strDevice) # tiny, base, small, medium, large
    oCC=OpenCC('s2t')  # 's2t' 表示簡體轉繁體
    
    print("使用裝置:"+strDevice)
    
    while booStop==False:
        
        if iCount>0:
            
            strFilename=getStrAduioFilename(iIndexRead)
            oResult=oModel.transcribe(strFilename, language="zh", fp16=booCuda) # 假如有cuda支援,用fp16=True會更快
            strTraditionalText=oCC.convert(oResult["text"])
            print("------")
            print("辨識 "+strFilename+" 結果:"+strTraditionalText)
            print("------")
            
            iIndexRead=(iIndexRead+1) % iMax
            iCount=iCount-1
            
            if strTraditionalText.find("再見")!=-1:
                print('Goodbye!')
                booStop=True
        
        time.sleep(0.1)
        

# 創建兩個執行緒
oThreadA=threading.Thread(target=threadA)
oThreadB=threading.Thread(target=threadB)

# 啟動執行緒
oThreadA.start()
oThreadB.start()

# 讓主執行緒保持運行
oThreadA.join()
oThreadB.join()

print('兩個執行緒都停下來了')