Skip to content

Commit

Permalink
paraformer: websocket conn control && add debug log
Browse files Browse the repository at this point in the history
  • Loading branch information
devinyf committed Jun 6, 2024
1 parent 865ffe2 commit c27267d
Show file tree
Hide file tree
Showing 6 changed files with 33 additions and 31 deletions.
2 changes: 2 additions & 0 deletions config/log_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"os"
)

//nolint:gochecknoglobals
var Debug = false

//nolint:gochecknoinits
func init() {
if !Debug {
log.SetOutput(io.Discard)
Expand Down
7 changes: 3 additions & 4 deletions example/paraformer/realtime/speech2text.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,17 +57,16 @@ func main() {
reader := bufio.NewReader(voiceReader)

ctx := context.Background()
// defer cancel()
if err := cli.CreateSpeechToTextGeneration(ctx, req, reader); err != nil {
panic(err)
}

// 等待语音识别结果输出
time.Sleep(5 * time.Second)
// 手动关闭语音识别
cli.CloseSpeechToTextGeneration()
time.Sleep(1 * time.Second)

if err := cli.CloseSpeechToTextGeneration(); err != nil {
panic(err)
}
}

// 读取音频文件中的录音 模拟实时语音流. 这里下载的官方文档中的示例音频文件.
Expand Down
2 changes: 1 addition & 1 deletion httpclient/http_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import (
"strings"
"time"

_ "github.com/devinyf/dashscopego/config"
_ "github.com/devinyf/dashscopego/config" //nolint:revive
)

type HTTPOption func(c *HTTPCli)
Expand Down
28 changes: 14 additions & 14 deletions httpclient/websocket.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
"net/http"
"time"

_ "github.com/devinyf/dashscopego/config" //nolint:revive
"github.com/gorilla/websocket"

_ "github.com/devinyf/dashscopego/config"
)

const (
Expand Down Expand Up @@ -49,15 +48,20 @@ func (c *WsClient) ConnClient(req interface{}) error {

err, ok := <-c.errChan
if ok && err != nil {
log.Println("error: ", err)
log.Println("errChain error: ", err)
}
return nil
}

func (c *WsClient) CloseClient() error {
c.CancelFn()

close(c.inputChan)
close(c.outputChan)
close(c.errChan)

// wait for write a closeMessage when inputchan is closed.
time.Sleep(500 * time.Millisecond)
c.Conn.Close()
return nil
}
Expand Down Expand Up @@ -91,16 +95,13 @@ type WsClient struct {
outputChan chan WsMessage
errChan chan error
Over bool
Ctx context.Context
CancelFn context.CancelFunc
}

func NewWsClient(url string, headers http.Header, ctx context.Context, cancel context.CancelFunc) *WsClient {
func NewWsClient(url string, headers http.Header) *WsClient {
return &WsClient{
URL: url,
Headers: headers,
Ctx: ctx,
CancelFn: cancel,
URL: url,
Headers: headers,
}
}

Expand All @@ -120,14 +121,14 @@ func (c *WsClient) readPump() {

c.Conn.SetReadLimit(maxMessageSize)
if err := c.Conn.SetReadDeadline(pongDelay); err != nil {
log.Printf("error: %v", err)
log.Printf("SetReadDeadline error: %v", err)
}
c.Conn.SetPongHandler(pongFn)
for !c.Over {
_, message, err := c.Conn.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
log.Printf("ReadMessage error: %v", err)
c.errChan <- err
}
break
Expand All @@ -142,7 +143,6 @@ func (c *WsClient) readPump() {
// Process the message (this part needs to be implemented based on your application logic).
}
log.Print("ws read over")

}

// writePump pumps messages from the write channel to the websocket connection.
Expand All @@ -162,13 +162,13 @@ func (c *WsClient) writePump() {
// The write channel is closed.
err := c.Conn.WriteMessage(websocket.CloseMessage, []byte{})
if err != nil {
log.Printf("error: %v", err)
log.Printf("WriteMessage error: %v", err)
}
return
}
err := c.Conn.SetWriteDeadline(time.Now().Add(writeWait))
if err != nil {
log.Printf("error: %v", err)
log.Printf("SetWriteDeadline error: %v", err)
}

// TODO: 临时输出
Expand Down
14 changes: 5 additions & 9 deletions paraformer/paraformer_ws.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,12 @@ import (

// real-time voice recognition

func ConnRecognitionClient(ctx context.Context, request *Request, token string) (*httpclient.WsClient, error) {
func ConnRecognitionClient(request *Request, token string) (*httpclient.WsClient, error) {
// Initialize the client with the necessary parameters.
header := http.Header{}
header.Add("Authorization", token)

ctx_ws, cancelFn := context.WithCancel(ctx)

client := httpclient.NewWsClient(ParaformerWSURL, header, ctx_ws, cancelFn)
client := httpclient.NewWsClient(ParaformerWSURL, header)

if err := client.ConnClient(request); err != nil {
return nil, err
Expand All @@ -29,8 +27,6 @@ func ConnRecognitionClient(ctx context.Context, request *Request, token string)
}

func CloseRecognitionClient(cli *httpclient.WsClient) error {
cli.CancelFn()

if err := cli.CloseClient(); err != nil {
log.Printf("close client error: %v", err)
return err
Expand All @@ -47,7 +43,7 @@ type ResultWriter interface {
WriteResult(str string) error
}

func HandleRecognitionResult(cli *httpclient.WsClient, fn StreamingFunc) {
func HandleRecognitionResult(ctx context.Context, cli *httpclient.WsClient, fn StreamingFunc) {
outputChan, errChan := cli.ResultChans()

// TODO: handle errors.
Expand All @@ -61,7 +57,7 @@ BREAK_FOR:
}

// streaming callback func
if err := fn(cli.Ctx, output.Data); err != nil {
if err := fn(ctx, output.Data); err != nil {
log.Println("error: ", err)
break BREAK_FOR
}
Expand All @@ -71,7 +67,7 @@ BREAK_FOR:
log.Println("error: ", err)
break BREAK_FOR
}
case <-cli.Ctx.Done():
case <-ctx.Done():
cli.Over = true
log.Println("Done")
break BREAK_FOR
Expand Down
11 changes: 8 additions & 3 deletions tongyiclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package dashscopego
import (
"bufio"
"context"
"errors"
"fmt"
"strings"

Expand Down Expand Up @@ -213,14 +214,18 @@ func (q *TongyiClient) CreateSpeechToTextGeneration(ctx context.Context, request
request.Payload.Model = q.Model
}

wsCli, err := paraformer.ConnRecognitionClient(ctx, request, q.token)
wsCli, err := paraformer.ConnRecognitionClient(request, q.token)
if err != nil {
return err
}

innerCtx, cancel := context.WithCancel(ctx)
wsCli.CancelFn = cancel

q.wsCli = wsCli

// handle response by stream callback
go paraformer.HandleRecognitionResult(wsCli, request.StreamingFn)
go paraformer.HandleRecognitionResult(innerCtx, wsCli, request.StreamingFn)

for {
// this buf can not be reused,
Expand All @@ -244,7 +249,7 @@ func (q *TongyiClient) CreateSpeechToTextGeneration(ctx context.Context, request

func (q *TongyiClient) CloseSpeechToTextGeneration() error {
if q.wsCli == nil {
panic("wsCli is nil")
return errors.New("wsCli is nil")
}

return paraformer.CloseRecognitionClient(q.wsCli)
Expand Down

0 comments on commit c27267d

Please sign in to comment.