diff --git a/CHANGELOG.md b/CHANGELOG.md index 5c93f8e19..6486ea871 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,7 @@ ### Bug Fixes +* **bug:** memory leak in websocket handler ([#892](https://github.com/dymensionxyz/dymint/issues/892)) ([02fcbde](https://github.com/dymensionxyz/dymint/commit/48c263fbde71594ec34e0f731d9febc0702fcbde)) * **bug:** sync from da and p2p when starting a node ([#763](https://github.com/dymensionxyz/dymint/issues/763)) ([68ffd05](https://github.com/dymensionxyz/dymint/commit/68ffd05794949ddc42df1c132d1fde5f21b505f4)) * **celestia test:** fix race in test ([#755](https://github.com/dymensionxyz/dymint/issues/755)) ([0b36781](https://github.com/dymensionxyz/dymint/commit/0b367818bf6aa8da4a4fd8e4e5c78223b60b44e0)) * **celestia:** impl retry on submit ([#748](https://github.com/dymensionxyz/dymint/issues/748)) ([61630eb](https://github.com/dymensionxyz/dymint/commit/61630eb458197abe2440a81426210000dff25d40)) diff --git a/rpc/json/ws.go b/rpc/json/ws.go index 36f45baa7..d9b194ccb 100644 --- a/rpc/json/ws.go +++ b/rpc/json/ws.go @@ -2,14 +2,16 @@ package json import ( "bytes" - "context" + "errors" "io" "net/http" + "time" "github.com/gorilla/websocket" - "github.com/dymensionxyz/dymint/types" tmpubsub "github.com/tendermint/tendermint/libs/pubsub" + + "github.com/dymensionxyz/dymint/types" ) type wsConn struct { @@ -18,6 +20,8 @@ type wsConn struct { logger types.Logger } +const writeWait = 10 * time.Second + func (wsc *wsConn) sendLoop() { for msg := range wsc.queue { writer, err := wsc.conn.NextWriter(websocket.TextMessage) @@ -25,6 +29,7 @@ func (wsc *wsConn) sendLoop() { wsc.logger.Error("create writer", "error", err) continue } + _ = wsc.conn.SetWriteDeadline(time.Now().Add(writeWait)) _, err = writer.Write(msg) if err != nil { wsc.logger.Error("write message", "error", err) @@ -50,28 +55,32 @@ func (h *handler) wsHandler(w http.ResponseWriter, r *http.Request) { h.logger.Error("update to WebSocket connection", "error", err) return } - remoteAddr := wsc.RemoteAddr().String() - defer func() { - err := wsc.Close() - if err != nil { - h.logger.Error("close WebSocket connection", "err") - } - }() ws := &wsConn{ conn: wsc, queue: make(chan []byte), logger: h.logger, } + + defer func() { + close(ws.queue) + + if err := ws.conn.Close(); err != nil { + h.logger.Error("close WebSocket connection", "err") + } + }() + go ws.sendLoop() + remoteAddr := ws.conn.RemoteAddr().String() + for { - mt, r, err := wsc.NextReader() + mt, rdr, err := wsc.NextReader() if err != nil { if _, ok := err.(*websocket.CloseError); ok { h.logger.Debug("WebSocket connection closed", "reason", err) - err := h.srv.client.EventBus.UnsubscribeAll(context.Background(), remoteAddr) - if err != nil && err != tmpubsub.ErrSubscriptionNotFound { + err := h.srv.client.EventBus.UnsubscribeAll(r.Context(), remoteAddr) + if err != nil && !errors.Is(err, tmpubsub.ErrSubscriptionNotFound) { h.logger.Error("unsubscribe addr from events", "addr", remoteAddr, "err", err) } } else { @@ -85,13 +94,14 @@ func (h *handler) wsHandler(w http.ResponseWriter, r *http.Request) { h.logger.Debug("expected text message") continue } - req, err := http.NewRequest(http.MethodGet, "", r) - req.RemoteAddr = remoteAddr + req, err := http.NewRequest(http.MethodGet, "", rdr) if err != nil { h.logger.Error("create request", "error", err) continue } + req.RemoteAddr = remoteAddr + writer := new(bytes.Buffer) h.serveJSONRPCforWS(newResponseWriter(writer), req, ws) ws.queue <- writer.Bytes()