SG200X 使用LicheeRV Nano / Duo-s实现识别-CHAT-语音合成功能

开发板:LicheeRV Nano / Duo-s

1、语音转文字ASR

将麦克风输入的语音实时转换为文字,并将语音识别的文字实时显示在LCD屏幕

阿里云API (阿里云登录 - 欢迎登录阿里云,安全稳定的云计算服务平台)

appKey = ‘s9NZm8ozBKyX63vK’
apt
token = ‘f1ce8db3539a4b08b775debdebc23a89’

appKey = ‘P918jP30TLJNHi3Q’ #s9NZm8ozBKyX63vK’ #‘RxkHgzYYYYLIP4OD’

token = ‘080e17fafb8d4101ac25d5f778bd82fc’ #f1ce8db3539a4b08b775debdebc23a89

2、CHAT

文字传入到大语言模型进行chat,将chat的response实时显示在LCD屏幕

大语言模型使用Kimi chat API

API_KEY = “sk-ycJsTdyonBbTzSSBnqogZUjjEUlhzZmEkKf1pZmsO0vd8FYA”

BASE_URL = “https://api.moonshot.cn/v1/chat/completions”,

智谱chat API

API_KEY = “ebb785194c713e7b419ca8742277d414.hCBC11QCZvC5N0YK”

BASE_URL = “https://open.bigmodel.cn/api/paas/v4/chat/completions

3、语音合成TTS

将chat得到的response文字转换为语音在扬声器输出

host = ‘nls-gateway-cn-shanghai.aliyuncs.com

4、语音唤醒

可执行文件编译步骤参考简介 | Milk-V

# 板子上操作
# 导入依赖库
export LD_LIBRARY_PATH='/mnt/system/lib'
# 运行语音唤醒
./sample_aud_order panns_cv181x.cvimodel 8000 2 0

LicheeRV nano

LicheeRV nano网络设置命令

# 在sd卡第一个分区创建wifi.sta文件启用sta模式:
touch /boot/wifi.sta
rm /boot/wifi.ap /boot/wifi.mon

# 然后将AP的SSID和密码写入文件:
echo ssid > /boot/wifi.ssid
echo pass > /boot/wifi.pass

# 重启Wifi服务
/etc/init.d/S30wifi stop
/etc/init.d/S30wifi start

亦可通过sh脚本一键执行:

vi wifi-nano.sh
# 执行i进入编辑模式,将以下内容写入
#######################################
#!/bin/bash
# 创建wifi.sta文件并启用sta模式
touch /boot/wifi.sta
rm /boot/wifi.ap /boot/wifi.mon
# 提示用户输入WiFi SSID
echo "Enter your WiFi SSID:"
read ssid_input
# 提示用户输入WiFi密码
echo "Enter your WiFi password:"
read -s pass_input
# 写入用户提供的SSID和密码到文件
echo "$ssid_input" > /boot/wifi.ssid
echo "$pass_input" > /boot/wifi.pass
# 重启Wifi服务
/etc/init.d/S30wifi stop
/etc/init.d/S30wifi start
#######################################
# ESC退出编辑模式,:wq保存退出
# 执行sh
sh wifi-nano.sh

LCD屏测试相关命令

# 清屏
cat /dev/zero > /dev/fb0
# 花屏
cat /dev/random > /dev/fb0
# 亦可测试lvgl的demo(显示lvgl官网样例)

麦克风及扬声器相关设置命令:

# 麦克风音量设置
amixer -Dhw:0 cset name='ADC Capture Volume' 24
# 录音命令(Ctrl+C结束录音):
arecord  -f dat -c 1 -r 16000 XXXX.wav

# 播放录音:
aplay XXXX.wav
# 扬声器播放音量设置(假设音量设置为24)
# 两种方式:
amixer -Dhw:3 cset name='Speaker Playback Volume' 24
amixer cset -c 3 numid=6 24

相关设备查看命令

# 查看录音设备
arecord -l
# 查看播放设备
aplay -l
# 查看具体设备号的信息(假设设备号为3)
amixer contents -c 3

##########################################################################

Duo-s

注:USB转TTL串口线序如下图,RX->TX TX->RX

Duo-s网络设置命令

# 编辑`/etc/wpa_supplicant.conf`文件,将内容更换为如下内容,
# 并替换 `ssid` 和 `psk` 为要连接的 WIFI 账号和密码
ctrl_interface=/var/run/wpa_supplicant
ap_scan=1
update_config=1 v

network={
  ssid="wifi_test"
  psk="12345678"
  key_mgmt=WPA-PSK}

# 重启网络
wpa_supplicant -B -i wlan0 -c /etc/wpa_supplicant.conf

亦可通过sh脚本一键执行:

vi wifi-duo-s.sh
# 执行i进入编辑模式,将以下内容写入
#######################################
#!/bin/bash
# 提示用户输入WiFi的SSID和密码
read -p "请输入WiFi的SSID: " ssid
read -p "请输入WiFi的密码: " password
# 编辑 /etc/wpa_supplicant.conf 文件
cat <<EOF > /etc/wpa_supplicant.conf
ctrl_interface=/var/run/wpa_supplicant
ap_scan=1
update_config=1

network={
  ssid="$ssid"
  psk="$password"
  key_mgmt=WPA-PSK
}
EOF
# 重启网络
wpa_supplicant -B -i wlan0 -c /etc/wpa_supplicant.conf
echo "WiFi configuration completed."
#######################################
# ESC退出编辑模式,:wq保存退出
# 执行sh
sh wifi-duo-s.sh

LCD屏测试相关命令

# 清屏
cat /dev/zero > /dev/fb0
# 花屏
cat /dev/random > /dev/fb0
# 亦可测试lvgl的demo(显示lvgl官网样例)

麦克风及扬声器相关设置命令:

==注:==使用USB转TTL串口、type-c供电方可使用声卡功能,需要使用专用版本的img

# 麦克风音量设置
amixer -Dhw:0 cset name='ADC Capture Volume' 24
# 录音命令(Ctrl+C结束录音):
arecord  -f dat -c 1 -r 16000 XXXX.wav

# 播放录音:
aplay XXXX.wav
#查看设备详情(假设设备号为3)
amixer contents -c 3
# 扬声器播放音量设置(假设音量设置为24)
# 两种方式:(假设设备号为3)
amixer -Dhw:3 cset name='Speaker Playback Volume' 24
amixer cset -c 3 numid=6 24

相关设备查看命令

# 查看录音设备
arecord -l
# 查看播放设备
aplay -l
# 查看具体设备号的信息(假设设备号为3)
amixer contents -c 3

语音识别-CHAT-语音合成asr_chat_tts整体代码:

0422版本–添加stream流式数据传输及更换智谱APIasr_chat-zp_tts_stream.py

# -*- coding: UTF-8 -*-
import http.client
import urllib.parse
import json
import subprocess
import time
import httpx
import requests

subprocess.Popen(['chmod', '+x', 'stable_demo'])
subprocess.Popen(['./stable_demo'])
print('Asr chat tts begin....')
appKey = 'P918jP30TLJNHi3Q'#s9NZm8ozBKyX63vK'  #'RxkHgzYYYYLIP4OD'
token = '080e17fafb8d4101ac25d5f778bd82fc'

# Chat Configuration
API_KEY = "ebb785194c713e7b419ca8742277d414.hCBC11QCZvC5N0YK"
BASE_URL = "https://open.bigmodel.cn/api/paas/v4/chat/completions"
history = [{"role": "system", "content": "您好!"}]
# Aliyun
# url = 'https://nls-gateway-cn-shanghai.aliyuncs.com/stream/v1/asr'
host = 'nls-gateway-cn-shanghai.aliyuncs.com'

def record_on_gpio(pin):
    is_pressed = False
    audioFilepath = './output.wav'
    while True:
        try:
            with open('/sys/class/gpio/gpio{}/value'.format(pin), 'r') as gpio_file:
                value = gpio_file.read().strip()
            #print('get key value {}',value)
            if value == '1' and not is_pressed:
                # 按键按下时开始录音
                recording_process = subprocess.Popen(['arecord', '-f', 'dat', '-c', '1', '-r', '16000', 'output.wav'])
                is_pressed = True
                print("Recording started.")

            if value == '0' and is_pressed:
                subprocess.Popen(['killall', 'arecord'])
                recording_process.wait()  # 等待录音进程结束
                is_pressed = False
                print("Recording stopped.")
                return audioFilepath
        except Exception as e:
            print("Error:", e)

def process_chunk(chunk,response_accumulator):
    if chunk.strip() == "[DONE]":
        return True, None
    try:
        data = json.loads(chunk)
        # print('process_chunk data:', data)
        if 'choices' in data and data['choices']:
            for choice in data['choices']:
                if 'delta' in choice and 'content' in choice['delta'] and choice['delta']['content']:
                    result = choice['delta']['content']
                    # print('process_chunk result:', result)
                    response_accumulator.append(result)
                    return False, result
    except Exception as e:
        print(f"处理数据块时出错: {e}")
    return False, None



def chat(query, history):
    history += [{"role": "user", "content": query}]
    data = {
        "model": "glm-4",
        "messages": history,
        "temperature": 0.3,
        "stream": True,
    }

    headers = {
        "Authorization": f"Bearer {API_KEY}",
        "Content-Type": "application/json"
    }

    response_accumulator = []
    response = requests.post(BASE_URL, data=json.dumps(data), headers=headers, stream=True)
    send_to_lvgl(f"[CLEAR]{query}: ")
    try:
        for chunk in response.iter_lines():
            if chunk:
                chunk_str = chunk.decode("utf-8")
                if chunk_str.startswith("data: "):
                    chunk_str = chunk_str[len("data: "):]

                done, result = process_chunk(chunk_str,response_accumulator)
                # print('result is', result)

                chunk_str = "data: " + chunk_str
                # print("Get response:", chunk_str)
                if result:
                    send_to_lvgl(result)

        if done:
            tts_text = ''.join(response_accumulator)
            tts_to_play(tts_text)


    except Exception as e:
        print(f"Error: {str(e)}")


def send_to_lvgl(text):
    pipe_name = '/tmp/query_pipe'
    try:
        with open(pipe_name, 'w') as pipe:
            pipe.write(text)
            pipe.flush()
    except Exception as e:
        print(f"LVGL send error: {e}")


def process(request, token, audioFile) :
    # 读取音频
    print('process {} {}'.format(request, audioFile))
    with open(audioFile, mode = 'rb') as f:
        audioContent = f.read()

    host = 'nls-gateway-cn-shanghai.aliyuncs.com'

    # 设置HTTPS请求头部
    httpHeaders = {
        'X-NLS-Token': token,
        'Content-type': 'application/octet-stream',
        'Content-Length': len(audioContent)
        }

    conn = http.client.HTTPSConnection(host)

    conn.request(method='POST', url=request, body=audioContent, headers=httpHeaders)

    response = conn.getresponse()
    print('Response status and response reason:')
    print(response.status ,response.reason)

    try:
        body = json.loads(response.read())
        text = body['result']
        print('Recognized Text:', text)
        chat_response = chat(text, history)
        print('Chat Response:', chat_response)
    except ValueError:
        print('The response is not json format string')

    conn.close()

def oneloop():
    print('Wait for key press')
    audioFilepath = record_on_gpio(499)

    #print('Wait for first audio')
    format = 'pcm'
    sampleRate = 16000
    enablePunctuationPrediction  = True
    enableInverseTextNormalization = True
    enableVoiceDetection  = False

# 设置RESTful请求参数
    asrurl = f'https://{host}/stream/v1/asr'
    request = asrurl + '?appkey=' + appKey
    request = request + '&format=' + format
    request = request + '&sample_rate=' + str(sampleRate)

    if enablePunctuationPrediction :
        request = request + '&enable_punctuation_prediction=' + 'true'

    if enableInverseTextNormalization :
        request = request + '&enable_inverse_text_normalization=' + 'true'

    if enableVoiceDetection :
        request = request + '&enable_voice_detection=' + 'true'

    print('Request: ' + request)

    process(request, token, audioFilepath)

def tts_to_play(text, file_path='response.wav'):
    ttsurl = f'https://{host}/stream/v1/tts'
    text_encoded = urllib.parse.quote_plus(text)
    tts_request = f"{ttsurl}?appkey={appKey}&token={token}&text={text_encoded}&format=wav&sample_rate=16000"

    conn = http.client.HTTPSConnection(host)
    conn.request('GET', tts_request)
    response = conn.getresponse()
    body = response.read()
    if response.status == 200 and response.getheader('Content-Type') == 'audio/mpeg':
        with open(file_path, 'wb') as f:
            f.write(body)
        print('TTS audio saved successfully')
        subprocess.Popen(['aplay', file_path])
    else:
        print('TTS request failed:', body)
    conn.close()


while True:
    try:
        oneloop()
    except Exception as e:
        print(e)