運用 python 讓電腦即時監聽,錄音,辨識成文字。當聽到 再見 時,程式就會停下來。----語音助理前哨站(opus)
import threading
import time
# pyenv shell 3.13.1
# sudo apt install portaudio19-dev
# pip install pyaudio
import pyaudio
import numpy as np
import wave
# pyenv shell 3.13.1
# pip3 install torch torchvision --index-url https://download.pytorch.org/whl/cpu
import torch
# pyenv shell 3.13.1
# sudo apt install ffmpeg
# pip install -U openai-whisper
import whisper
# pyenv shell 3.13.1
# pip install opencc
from opencc import OpenCC
# pip install soundfile
import soundfile
import subprocess
import io
iIndexWrite=0
iIndexRead=0
iCount=0
iMax=10
booStop=False
def getFDecibel(oAudioData):
oAudioData=oAudioData.astype(np.float32) # 轉換為浮點數,避免整數溢出
fRMS=np.sqrt(np.mean(np.square(oAudioData))) # 計算rms
if fRMS>0:
fDecibel=20*np.log10(fRMS+1e-10) # 避免 log(0) 錯誤
else:
fDecibel=-np.inf
return fDecibel
def getStrAduioFilename(iIndex):
return "rec_"+str(iIndex)+".ogg"
def threadA():
global iIndexWrite, iIndexRead, iCount, iMax, booStop
CHUNK=1024 # 單次讀取的樣本數
FORMAT=pyaudio.paInt16 # 音訊格式(16-bit)
CHANNELS=1 # 單聲道
RATE=16000 # 取樣率(Hz)
THRESHOLD_DB=70 # 觸發錄音的分貝閥值
SILENCE_DURATION=1 # 安靜維持幾秒後停止錄音
DROP_DURATION=2 # 長度不足就丟棄
CUT_DURATION=30 # 超過就截斷
oAudio=pyaudio.PyAudio()
oStream=oAudio.open(format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK)
booRecording=False
oaAudioData=[]
iTotalSamples=0
fSilenceStart=None
print("開始聆聽")
try:
while booStop==False:
# 讀取音訊數據
oAudioData=np.frombuffer(oStream.read(CHUNK, exception_on_overflow=False), dtype=np.int16)
fDecibel=getFDecibel(oAudioData)
# print(f"目前分貝數:{fDecibel:.2f} dB")
if fDecibel>THRESHOLD_DB:
fSilenceStart=None # 重置靜音計時
if booRecording==False and fDecibel>THRESHOLD_DB:
print("偵測到聲音,開始錄音...")
booRecording=True
oaAudioData=[]
oaAudioData.append(oAudioData)
iTotalSamples=oAudioData.shape[0]
elif booRecording==True:
oaAudioData.append(oAudioData)
iTotalSamples=iTotalSamples+oAudioData.shape[0]
fSeconds=iTotalSamples/RATE
if fSilenceStart is None:
fSilenceStart=time.time() # 開始計算靜音時間
elif ((time.time()-fSilenceStart)>=SILENCE_DURATION) or (fSeconds>CUT_DURATION):
if fSeconds<DROP_DURATION:
print("不足"+str(DROP_DURATION)+"秒,不予儲存")
else:
if fSeconds>CUT_DURATION:
print("超過"+str(CUT_DURATION)+"秒,截斷錄音。("+str(fSeconds)+")")
else:
print("偵測到靜音,停止錄音。("+str(fSeconds)+")")
if iCount<iMax:
oTotalAudioData=np.concatenate(oaAudioData)
# 寫入 wav 到記憶體
oWaveBuffer=io.BytesIO()
soundfile.write(oWaveBuffer, oTotalAudioData, RATE, format="wav")
oWaveBuffer.seek(0)
# 壓縮成 Opus 存入記憶體
oOpusBuffer=io.BytesIO()
oProcess=subprocess.Popen(
['ffmpeg', '-i', 'pipe:0', '-c:a', 'libopus', '-b:a', '32k', '-f', 'ogg', 'pipe:1'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.DEVNULL
)
oOpusData,_=oProcess.communicate(input=oWaveBuffer.read())
oOpusBuffer.write(oOpusData)
oOpusBuffer.seek(0)
strFilename=getStrAduioFilename(iIndexWrite)
print(f"正在儲存錄音檔案:{strFilename}")
with open(strFilename, "wb") as oF:
oF.write(oOpusBuffer.getvalue())
print("錄音儲存完成!")
oWaveBuffer.close()
oOpusBuffer.close()
iIndexWrite=(iIndexWrite+1) % iMax
iCount=iCount+1
else:
print("空間不足,不予儲存")
booRecording=False
oaAudioData=[]
iTotalSamples=0
fSilenceStart=None
except KeyboardInterrupt:
booStop=True
print("手動結束聆聽")
# 停止並關閉音訊流
oStream.stop_stream()
oStream.close()
oAudio.terminate()
def threadB():
global iIndexWrite, iIndexRead, iCount, iMax, booStop
strDevice="cuda" if torch.cuda.is_available() else "cpu"
booCuda=True if strDevice=="cuda" else False
oModel=whisper.load_model("tiny").to(strDevice) # tiny, base, small, medium, large
oCC=OpenCC('s2t') # 's2t' 表示簡體轉繁體
print("使用裝置:"+strDevice)
while booStop==False:
if iCount>0:
strFilename=getStrAduioFilename(iIndexRead)
oResult=oModel.transcribe(strFilename, language="zh", fp16=booCuda) # 假如有cuda支援,用fp16=True會更快
strTraditionalText=oCC.convert(oResult["text"])
print("------")
print("辨識 "+strFilename+" 結果:"+strTraditionalText)
print("------")
iIndexRead=(iIndexRead+1) % iMax
iCount=iCount-1
if strTraditionalText.find("再見")!=-1:
print('Goodbye!')
booStop=True
time.sleep(0.1)
# 創建兩個執行緒
oThreadA=threading.Thread(target=threadA)
oThreadB=threading.Thread(target=threadB)
# 啟動執行緒
oThreadA.start()
oThreadB.start()
# 讓主執行緒保持運行
oThreadA.join()
oThreadB.join()
print('兩個執行緒都停下來了')
可以給 raspberry pi 5 用的行動電源
https://24h.pchome.com.tw/prod/DYAO8Z-A900IEQJT
Xiaomi 小米行動電源 25000 212W
$2,499
raspberry pi 5 + ssd
Raspberry Pi M.2 HAT+ (官方出版)(支援2230/2242)
PCIe2.0 to dual M.2 hat for Raspberry Pi 5, Support NVMe SSD, Support Hailo8/8L (seeed studio)(支援2230/2242/2260/2280)
經過實測,兩款介面都可以支援 SSD,但,運作正常的 SSD 如下,
(1).(2242)RICELEE 256GB RL256G2242G2 M.2 2242 PCIeGen3 x4 NVMe SSD
(2).(2230)Raspberry Pi SSD 512GB NVMe RATED DC+3.3v 2.0A Manufactured by Brwin Storage Technology Co., Ltd
test
<!doctype html>
<html lang="zh-Hant-TW">
<head>
<meta http-equiv="content-type" content="text/html; charset=utf-8">
<title>my title</title>
<style>
html, body
{
width: 100vw;
height: 100vh;
overflow-x: clip;
overflow-y: clip;
background-color: white;
}
#divControlPanel
{
display: inline-block;
background-color: crimson;
color: white;
position: fixed;
top: 100px;
left: 100px;
border-radius: 0.5em;
}
#divControlPanelTitle
{
cursor: grab;
}
</style>
</head>
<body ondragover="bodyDragOver(event);">
hello, this is a test!
<div id="divControlPanel">
<div id="divControlPanelTitle" draggable="true" ondragstart="cpt_dragstart(event);" ondrop="cpt_drop(event);">
這是控制面板
<button type="button" onclick="showHideElement('divControlPanelContent');">開合</button>
</div>
<div id="divControlPanelContent">
</div>
</div>
<script>
function showHideElement(strId)
{
var o1=document.getElementById(strId);
if(o1.style.display=="none") { o1.style.display=""; }
else { o1.style.display="none"; }
}
</script>
<script>
var oControlPanel=document.getElementById("divControlPanel");
var dOffsetX=0, dOffsetY=0;
function cpt_dragstart(oEvent)
{
var oRectCP=oControlPanel.getBoundingClientRect();
dOffsetX=oEvent.clientX-oRectCP.left;
dOffsetY=oEvent.clientY-oRectCP.top;
}
function bodyDragOver(oEvent)
{
oEvent.preventDefault();
oControlPanel.style.left=(oEvent.clientX-dOffsetX)+"px";
oControlPanel.style.top=(oEvent.clientY-dOffsetY)+"px";
}
function cpt_drop(oEvent)
{
oEvent.preventDefault();
oControlPanel.style.left=(oEvent.clientX-dOffsetX)+"px";
oControlPanel.style.top=(oEvent.clientY-dOffsetY)+"px";
}
</script>
</body>
</html>
numpy 初探
import numpy as np
x=np.array([ [1,2,3],[4,5,6] ])
y=np.array([ [2,9],[4,5],[8,1] ])
x.dot(y) # dot product
print(
"type:",type(x),"\n",
"shape:",x.shape,"\n",
"size:",x.size,"\n",
"dimension:",x.ndim,"\n",
"datatype:",x.dtype,"\n",
"bytes:",x.nbytes)
x=np.array([ [1,2,3],[4,5,6] ], dtype=np.float64)
x=np.array([ [1,2,3],[4,5,6] ], dtype=np.complex64)
x=np.array([ [1,2,3],[4,5,6] ], dtype=np.uint32)
import numpy as np
# print(np.__version__)
def show_something(element):
print('ndim=',element.ndim,'','shape=',element.shape)
print(element)
print('------')
v01=np.array(42)
v02=np.array([42])
v03=np.array([1,2,3,4,5])
v04=np.array([[1,2,3,4,5],[6,7,8,9,10]])
v05=np.array([ [[1,2,3],[4,5,6]],[[7,8,9],[10,11,12]] ])
v06=np.array([1,2,3,4], ndmin=5)
show_something(v01)
show_something(v02)
show_something(v03)
show_something(v04)
show_something(v05)
show_something(v06)
print(v01)
print(v03[3])
print('v05[1,1,2]->',v05[1,1,2],'')
print(v05[-1,-1,-3])