From c27267d1abbe345644f324484474fb6313766b3a Mon Sep 17 00:00:00 2001 From: "devin.yf" Date: Thu, 6 Jun 2024 23:58:14 +0800 Subject: [PATCH] paraformer: websocket conn control && add debug log --- config/log_config.go | 2 ++ example/paraformer/realtime/speech2text.go | 7 +++--- httpclient/http_client.go | 2 +- httpclient/websocket.go | 28 +++++++++++----------- paraformer/paraformer_ws.go | 14 ++++------- tongyiclient.go | 11 ++++++--- 6 files changed, 33 insertions(+), 31 deletions(-) diff --git a/config/log_config.go b/config/log_config.go index e47395d..8fc7797 100644 --- a/config/log_config.go +++ b/config/log_config.go @@ -6,8 +6,10 @@ import ( "os" ) +//nolint:gochecknoglobals var Debug = false +//nolint:gochecknoinits func init() { if !Debug { log.SetOutput(io.Discard) diff --git a/example/paraformer/realtime/speech2text.go b/example/paraformer/realtime/speech2text.go index 22f40cc..8c74bd4 100644 --- a/example/paraformer/realtime/speech2text.go +++ b/example/paraformer/realtime/speech2text.go @@ -57,7 +57,6 @@ func main() { reader := bufio.NewReader(voiceReader) ctx := context.Background() - // defer cancel() if err := cli.CreateSpeechToTextGeneration(ctx, req, reader); err != nil { panic(err) } @@ -65,9 +64,9 @@ func main() { // 等待语音识别结果输出 time.Sleep(5 * time.Second) // 手动关闭语音识别 - cli.CloseSpeechToTextGeneration() - time.Sleep(1 * time.Second) - + if err := cli.CloseSpeechToTextGeneration(); err != nil { + panic(err) + } } // 读取音频文件中的录音 模拟实时语音流. 这里下载的官方文档中的示例音频文件. diff --git a/httpclient/http_client.go b/httpclient/http_client.go index 042aa08..fe52673 100644 --- a/httpclient/http_client.go +++ b/httpclient/http_client.go @@ -16,7 +16,7 @@ import ( "strings" "time" - _ "github.com/devinyf/dashscopego/config" + _ "github.com/devinyf/dashscopego/config" //nolint:revive ) type HTTPOption func(c *HTTPCli) diff --git a/httpclient/websocket.go b/httpclient/websocket.go index c76ddc2..cc2f3aa 100644 --- a/httpclient/websocket.go +++ b/httpclient/websocket.go @@ -7,9 +7,8 @@ import ( "net/http" "time" + _ "github.com/devinyf/dashscopego/config" //nolint:revive "github.com/gorilla/websocket" - - _ "github.com/devinyf/dashscopego/config" ) const ( @@ -49,15 +48,20 @@ func (c *WsClient) ConnClient(req interface{}) error { err, ok := <-c.errChan if ok && err != nil { - log.Println("error: ", err) + log.Println("errChain error: ", err) } return nil } func (c *WsClient) CloseClient() error { + c.CancelFn() + close(c.inputChan) close(c.outputChan) close(c.errChan) + + // wait for write a closeMessage when inputchan is closed. + time.Sleep(500 * time.Millisecond) c.Conn.Close() return nil } @@ -91,16 +95,13 @@ type WsClient struct { outputChan chan WsMessage errChan chan error Over bool - Ctx context.Context CancelFn context.CancelFunc } -func NewWsClient(url string, headers http.Header, ctx context.Context, cancel context.CancelFunc) *WsClient { +func NewWsClient(url string, headers http.Header) *WsClient { return &WsClient{ - URL: url, - Headers: headers, - Ctx: ctx, - CancelFn: cancel, + URL: url, + Headers: headers, } } @@ -120,14 +121,14 @@ func (c *WsClient) readPump() { c.Conn.SetReadLimit(maxMessageSize) if err := c.Conn.SetReadDeadline(pongDelay); err != nil { - log.Printf("error: %v", err) + log.Printf("SetReadDeadline error: %v", err) } c.Conn.SetPongHandler(pongFn) for !c.Over { _, message, err := c.Conn.ReadMessage() if err != nil { if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) { - log.Printf("error: %v", err) + log.Printf("ReadMessage error: %v", err) c.errChan <- err } break @@ -142,7 +143,6 @@ func (c *WsClient) readPump() { // Process the message (this part needs to be implemented based on your application logic). } log.Print("ws read over") - } // writePump pumps messages from the write channel to the websocket connection. @@ -162,13 +162,13 @@ func (c *WsClient) writePump() { // The write channel is closed. err := c.Conn.WriteMessage(websocket.CloseMessage, []byte{}) if err != nil { - log.Printf("error: %v", err) + log.Printf("WriteMessage error: %v", err) } return } err := c.Conn.SetWriteDeadline(time.Now().Add(writeWait)) if err != nil { - log.Printf("error: %v", err) + log.Printf("SetWriteDeadline error: %v", err) } // TODO: 临时输出 diff --git a/paraformer/paraformer_ws.go b/paraformer/paraformer_ws.go index 8d006c1..ba64db6 100644 --- a/paraformer/paraformer_ws.go +++ b/paraformer/paraformer_ws.go @@ -12,14 +12,12 @@ import ( // real-time voice recognition -func ConnRecognitionClient(ctx context.Context, request *Request, token string) (*httpclient.WsClient, error) { +func ConnRecognitionClient(request *Request, token string) (*httpclient.WsClient, error) { // Initialize the client with the necessary parameters. header := http.Header{} header.Add("Authorization", token) - ctx_ws, cancelFn := context.WithCancel(ctx) - - client := httpclient.NewWsClient(ParaformerWSURL, header, ctx_ws, cancelFn) + client := httpclient.NewWsClient(ParaformerWSURL, header) if err := client.ConnClient(request); err != nil { return nil, err @@ -29,8 +27,6 @@ func ConnRecognitionClient(ctx context.Context, request *Request, token string) } func CloseRecognitionClient(cli *httpclient.WsClient) error { - cli.CancelFn() - if err := cli.CloseClient(); err != nil { log.Printf("close client error: %v", err) return err @@ -47,7 +43,7 @@ type ResultWriter interface { WriteResult(str string) error } -func HandleRecognitionResult(cli *httpclient.WsClient, fn StreamingFunc) { +func HandleRecognitionResult(ctx context.Context, cli *httpclient.WsClient, fn StreamingFunc) { outputChan, errChan := cli.ResultChans() // TODO: handle errors. @@ -61,7 +57,7 @@ BREAK_FOR: } // streaming callback func - if err := fn(cli.Ctx, output.Data); err != nil { + if err := fn(ctx, output.Data); err != nil { log.Println("error: ", err) break BREAK_FOR } @@ -71,7 +67,7 @@ BREAK_FOR: log.Println("error: ", err) break BREAK_FOR } - case <-cli.Ctx.Done(): + case <-ctx.Done(): cli.Over = true log.Println("Done") break BREAK_FOR diff --git a/tongyiclient.go b/tongyiclient.go index 6578c4e..9aab82d 100644 --- a/tongyiclient.go +++ b/tongyiclient.go @@ -3,6 +3,7 @@ package dashscopego import ( "bufio" "context" + "errors" "fmt" "strings" @@ -213,14 +214,18 @@ func (q *TongyiClient) CreateSpeechToTextGeneration(ctx context.Context, request request.Payload.Model = q.Model } - wsCli, err := paraformer.ConnRecognitionClient(ctx, request, q.token) + wsCli, err := paraformer.ConnRecognitionClient(request, q.token) if err != nil { return err } + + innerCtx, cancel := context.WithCancel(ctx) + wsCli.CancelFn = cancel + q.wsCli = wsCli // handle response by stream callback - go paraformer.HandleRecognitionResult(wsCli, request.StreamingFn) + go paraformer.HandleRecognitionResult(innerCtx, wsCli, request.StreamingFn) for { // this buf can not be reused, @@ -244,7 +249,7 @@ func (q *TongyiClient) CreateSpeechToTextGeneration(ctx context.Context, request func (q *TongyiClient) CloseSpeechToTextGeneration() error { if q.wsCli == nil { - panic("wsCli is nil") + return errors.New("wsCli is nil") } return paraformer.CloseRecognitionClient(q.wsCli)