Skip to content

Commit

Permalink
年翻更新
Browse files Browse the repository at this point in the history
1、qa自动缓存改为手动采纳;
2、socket10001映射到websocket 9001;
3、修复声音沟通接口无法收音问题;
4、修复阿里云不稳定问题;
  • Loading branch information
wangzai23333 committed Nov 6, 2024
1 parent 10d419e commit 19e5273
Show file tree
Hide file tree
Showing 12 changed files with 402 additions and 62 deletions.
3 changes: 3 additions & 0 deletions asr/ali_nls.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(self, username):
self.__URL = 'wss://nls-gateway-cn-shenzhen.aliyuncs.com/ws/v1'
self.__ws = None
self.__frames = []
self.started = False
self.__closing = False
self.__task_id = ''
self.done = False
Expand Down Expand Up @@ -86,6 +87,8 @@ def on_message(self, ws, message):
data = json.loads(message)
header = data['header']
name = header['name']
if name == 'TranscriptionStarted':
self.started = True
if name == 'SentenceEnd':
self.done = True
self.finalResults = data['payload']['result']
Expand Down
1 change: 1 addition & 0 deletions asr/funasr.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ def __init__(self, username):
self.__reconnect_delay = 1
self.__reconnecting = False
self.username = username
self.started = True


# 收到websocket消息的处理
Expand Down
140 changes: 95 additions & 45 deletions core/content_db.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,13 @@
import threading
import functools
from utils import util

def synchronized(func):
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
with self.lock:
return func(self, *args, **kwargs)
return wrapper
@functools.wraps(func)
def wrapper(self, *args, **kwargs):
with self.lock:
return func(self, *args, **kwargs)
return wrapper

__content_tb = None
def new_instance():
Expand All @@ -18,73 +19,122 @@ def new_instance():
__content_tb.init_db()
return __content_tb


class Content_Db:

def __init__(self) -> None:
self.lock = threading.Lock()



#初始化
# 初始化数据库
def init_db(self):
conn = sqlite3.connect('fay.db')
c = conn.cursor()
c.execute('''CREATE TABLE IF NOT EXISTS T_Msg
(id INTEGER PRIMARY KEY autoincrement,
type char(10),
way char(10),
content TEXT NOT NULL,
createtime Int,
username TEXT DEFAULT 'User',
uid Int);''')
(id INTEGER PRIMARY KEY AUTOINCREMENT,
type CHAR(10),
way CHAR(10),
content TEXT NOT NULL,
createtime INT,
username TEXT DEFAULT 'User',
uid INT);''')
# 对话采纳记录表
c.execute('''CREATE TABLE IF NOT EXISTS T_Adopted
(id INTEGER PRIMARY KEY AUTOINCREMENT,
msg_id INTEGER UNIQUE,
adopted_time INT,
FOREIGN KEY(msg_id) REFERENCES T_Msg(id));''')
conn.commit()
conn.close()



# 添加对话
@synchronized
def add_content(self, type, way, content, username='User', uid=0):
conn = sqlite3.connect("fay.db")
cur = conn.cursor()
try:
cur.execute("INSERT INTO T_Msg (type, way, content, createtime, username, uid) VALUES (?, ?, ?, ?, ?, ?)",
(type, way, content, int(time.time()), username, uid))
conn.commit()
last_id = cur.lastrowid
except Exception as e:
util.log(1, "请检查参数是否有误: {}".format(e))
conn.close()
return 0
conn.close()
return last_id

#添加对话
# 根据ID查询对话记录
@synchronized
def add_content(self,type,way,content,username='User',uid=0):
def get_content_by_id(self, msg_id):
conn = sqlite3.connect("fay.db")
cur = conn.cursor()
cur.execute("SELECT * FROM T_Msg WHERE id = ?", (msg_id,))
record = cur.fetchone()
conn.close()
return record

# 添加对话采纳记录
@synchronized
def adopted_message(self, msg_id):
conn = sqlite3.connect('fay.db')
cur = conn.cursor()
# 检查消息ID是否存在
cur.execute("SELECT 1 FROM T_Msg WHERE id = ?", (msg_id,))
if cur.fetchone() is None:
util.log(1, "消息ID不存在")
conn.close()
return False
try:
cur.execute("insert into T_Msg (type,way,content,createtime,username,uid) values (?,?,?,?,?,?)",(type, way, content, time.time(), username, uid))
cur.execute("INSERT INTO T_Adopted (msg_id, adopted_time) VALUES (?, ?)", (msg_id, int(time.time())))
conn.commit()
except:
util.log(1, "请检查参数是否有误")
conn.close()
return 0
except sqlite3.IntegrityError:
util.log(1, "该消息已被采纳")
conn.close()
return False
conn.close()
return cur.lastrowid


return True

#获取对话内容
# 获取对话内容
@synchronized
def get_list(self,way,order,limit,uid=0):
def get_list(self, way, order, limit, uid=0):
conn = sqlite3.connect("fay.db")
cur = conn.cursor()
where_uid = ""
if int(uid) != 0:
where_uid = f" and uid = {uid} "
if(way == 'all'):
cur.execute("select type,way,content,createtime,datetime(createtime, 'unixepoch', 'localtime') as timetext,username from T_Msg where 1 "+where_uid+" order by id "+order+" limit ?",(limit,))
elif(way == 'notappended'):
cur.execute("select type,way,content,createtime,datetime(createtime, 'unixepoch', 'localtime') as timetext,username from T_Msg where way != 'appended' "+where_uid+" order by id "+order+" limit ?",(limit,))
where_uid = f" AND T_Msg.uid = {uid} "
base_query = f"""
SELECT T_Msg.type, T_Msg.way, T_Msg.content, T_Msg.createtime,
datetime(T_Msg.createtime, 'unixepoch', 'localtime') AS timetext,
T_Msg.username,T_Msg.id,
CASE WHEN T_Adopted.msg_id IS NOT NULL THEN 1 ELSE 0 END AS is_adopted
FROM T_Msg
LEFT JOIN T_Adopted ON T_Msg.id = T_Adopted.msg_id
WHERE 1 {where_uid}
"""
if way == 'all':
query = base_query + f" ORDER BY T_Msg.id {order} LIMIT ?"
cur.execute(query, (limit,))
elif way == 'notappended':
query = base_query + f" AND T_Msg.way != 'appended' ORDER BY T_Msg.id {order} LIMIT ?"
cur.execute(query, (limit,))
else:
cur.execute("select type,way,content,createtime,datetime(createtime, 'unixepoch', 'localtime') as timetext,username from T_Msg where way = ? "+where_uid+" order by id "+order+" limit ?",(way,limit,))

query = base_query + f" AND T_Msg.way = ? ORDER BY T_Msg.id {order} LIMIT ?"
cur.execute(query, (way, limit))
list = cur.fetchall()
conn.close()
return list










@synchronized
def get_previous_user_message(self, msg_id):
conn = sqlite3.connect("fay.db")
cur = conn.cursor()
cur.execute("""
SELECT id, type, way, content, createtime, datetime(createtime, 'unixepoch', 'localtime') AS timetext, username
FROM T_Msg
WHERE id < ? AND type != 'fay'
ORDER BY id DESC
LIMIT 1
""", (msg_id,))
record = cur.fetchone()
conn.close()
return record
11 changes: 7 additions & 4 deletions core/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def __record(self):
while self.__running:
try:
record = cfg.config['source']['record']
if not record['enabled']:
if not record['enabled'] and not self.is_remote:
time.sleep(0.1)
continue
self.is_reading = True
Expand All @@ -215,7 +215,6 @@ def __record(self):
self.__running = False
if not data:
continue

#是否可以拾音,不可以就掉弃录音
can_listen = True
#没有开唤醒,但面板或数字人正在播音时不能拾音
Expand All @@ -229,7 +228,7 @@ def __record(self):
if can_listen == False:#掉弃录音
data = None
continue

#计算音量是否满足激活拾音
level = audioop.rms(data, 2)
if len(self.__history_data) >= 10:#保存激活前的音频,以免信息掉失
Expand All @@ -245,9 +244,11 @@ def __record(self):
elif history_percentage < self.__dynamic_threshold:
self.__dynamic_threshold += (history_percentage - self.__dynamic_threshold) * 1


#激活拾音
if percentage > self.__dynamic_threshold:
last_speaking_time = time.time()

if not self.__processing and not isSpeaking and time.time() - last_mute_time > _ATTACK:
isSpeaking = True #用户正在说话
util.printInfo(1, self.username,"聆听中...")
Expand All @@ -259,7 +260,9 @@ def __record(self):
concatenated_audio.clear()
self.__aLiNls = self.asrclient()
try:
self.__aLiNls.start()
task_id = self.__aLiNls.start()
while not self.__aLiNls.started:
time.sleep(0.01)
except Exception as e:
print(e)
util.printInfo(1, self.username, "aliyun asr 连接受限")
Expand Down
117 changes: 117 additions & 0 deletions core/socket_bridge_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
import asyncio
import websockets
import socket
import threading
import time

__wss = None

def new_instance():
global __wss
if __wss is None:
__wss = SocketBridgeService()
return __wss

class SocketBridgeService:
def __init__(self):
self.websockets = {}
self.sockets = {}
self.message_queue = asyncio.Queue()
self.running = True
self.server = None
self.event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.event_loop)

async def handler(self, websocket, path):
ws_id = id(websocket)
self.websockets[ws_id] = websocket
try:
if ws_id not in self.sockets:
self.sockets[ws_id] = await self.create_socket_client()
asyncio.create_task(self.receive_from_socket(ws_id))
async for message in websocket:
await self.send_to_socket(ws_id, message)
except websockets.ConnectionClosed:
pass
finally:
self.close_socket_client(ws_id)

async def create_socket_client(self):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('127.0.0.1', 10001))
return sock

async def send_to_socket(self, ws_id, message):
sock = self.sockets.get(ws_id)
if sock:
asyncio.create_task(self.socket_send(sock, message))

async def socket_send(self, sock, message):
await asyncio.to_thread(sock.sendall, message)

async def receive_from_socket(self, ws_id):
sock = self.sockets.get(ws_id)
while True:
data = await asyncio.to_thread(sock.recv, 1024)
if data:
await self.message_queue.put((ws_id, data))

async def process_message_queue(self):
while True:
if not self.running:
break
ws_id, data = await self.message_queue.get()
websocket = self.websockets.get(ws_id)
if websocket.open:
await websocket.send(data)
self.message_queue.task_done()

def close_socket_client(self, ws_id):
sock = self.sockets.pop(ws_id, None)
if sock:
sock.close()

async def start(self, host='0.0.0.0', port=9001):
self.server = await websockets.serve(self.handler, host, port, loop=self.event_loop)
asyncio.create_task(self.process_message_queue())
await asyncio.Future()

async def shutdown(self):
self.running = False
if self.server:
for ws in self.websockets.values():
await ws.close()
if hasattr(self.server, 'close'):
self.server.close()
await asyncio.gather(*[w.wait_closed() for w in self.websockets.values()])
for sock in self.sockets.values():
sock.close()
if self.server:
await self.server.wait_closed()

def stop_server(self):
self.event_loop.call_soon_threadsafe(self.shutdown)
self.event_loop.run_until_complete(self.shutdown())
self.event_loop.close()

def start_service(self):
self.event_loop.run_until_complete(self.start(host='0.0.0.0', port=9001))
try:
self.event_loop.run_forever()
except KeyboardInterrupt:
pass
finally:
self.stop_server()

if __name__ == '__main__':
service = new_instance()
service_thread = threading.Thread(target=service.start_service)
service_thread.start()

# 等待一些时间或者直到收到停止信号
try:
while service.running:
time.sleep(1)
except KeyboardInterrupt:
service.stop_server()
service_thread.join()
Loading

0 comments on commit 19e5273

Please sign in to comment.