From a225994879adcbc94b5b239700f09a3b946a46a0 Mon Sep 17 00:00:00 2001 From: cryptocifer Date: Mon, 27 Nov 2023 21:06:40 +0300 Subject: [PATCH 01/11] try fixing memory leak --- relay/wsconn.go | 3 --- relay/wshandlers.go | 6 +++--- 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/relay/wsconn.go b/relay/wsconn.go index 4935acb..f2d42a6 100644 --- a/relay/wsconn.go +++ b/relay/wsconn.go @@ -3,7 +3,6 @@ package relay import ( "bytes" "encoding/json" - "fmt" "strings" "github.com/RabbyHub/derelay/log" @@ -31,7 +30,6 @@ func (c *client) MarshalLogObject(encoder zapcore.ObjectEncoder) error { if c != nil { encoder.AddString("id", c.id) encoder.AddString("role", string(c.role)) - encoder.AddString("session", string(c.session)) encoder.AddArray("pubTopics", c.pubTopics) encoder.AddArray("subTopics", c.subTopics) } @@ -87,7 +85,6 @@ func (c *client) send(message SocketMessage) { case c.sendbuf <- message: default: metrics.IncSendBlocking() - log.Error("client sendbuf full", fmt.Errorf(""), zap.Any("client", c), zap.Any("len(sendbuf)", len(c.sendbuf)), zap.Any("message", message)) } } diff --git a/relay/wshandlers.go b/relay/wshandlers.go index 5727c12..f494a2d 100644 --- a/relay/wshandlers.go +++ b/relay/wshandlers.go @@ -54,11 +54,11 @@ func (ws *WsServer) subMessage(message SocketMessage) { if err := ws.redisSubConn.Subscribe(context.TODO(), messageChanKey(topic)); err != nil { log.Warn("[redisSub] subscribe to topic fail", zap.String("topic", topic), zap.Any("client", subscriber)) } - log.Info("subscribe to topic", zap.String("topic", topic), zap.Any("client", subscriber)) + //log.Info("subscribe to topic", zap.String("topic", topic), zap.Any("client", subscriber)) // forward cached notificatoins if there's any notifications := ws.getCachedMessages(topic, true) - log.Info("pending notifications", zap.String("topic", topic), zap.Any("num", len(notifications)), zap.Any("client", subscriber)) + //log.Info("pending notifications", zap.String("topic", topic), zap.Any("num", len(notifications)), zap.Any("client", subscriber)) for _, notification := range notifications { subscriber.send(notification) } @@ -84,7 +84,7 @@ func (ws *WsServer) subMessage(message SocketMessage) { if noti.Phase == string(SessionRequest) { // handle the 1st case stated above subscriber.session = noti.Topic metrics.IncReceivedSessions() - log.Info("session been scanned", zap.Any("topic", topic), zap.Any("client", subscriber)) + //log.Info("session been scanned", zap.Any("topic", topic), zap.Any("client", subscriber)) // notify the topic publisher, aka the dapp, that the session request has been received by wallet key := dappNotifyChanKey(noti.Topic) From 202bbf0a86c54ecd74dc6606bb988e7dc124375e Mon Sep 17 00:00:00 2001 From: cryptocifer Date: Mon, 27 Nov 2023 21:09:35 +0300 Subject: [PATCH 02/11] update --- relay/wsconn.go | 4 ++-- relay/wsserver.go | 8 ++++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/relay/wsconn.go b/relay/wsconn.go index f2d42a6..4b9a427 100644 --- a/relay/wsconn.go +++ b/relay/wsconn.go @@ -30,8 +30,8 @@ func (c *client) MarshalLogObject(encoder zapcore.ObjectEncoder) error { if c != nil { encoder.AddString("id", c.id) encoder.AddString("role", string(c.role)) - encoder.AddArray("pubTopics", c.pubTopics) - encoder.AddArray("subTopics", c.subTopics) + //encoder.AddArray("pubTopics", c.pubTopics) + //encoder.AddArray("subTopics", c.subTopics) } return nil } diff --git a/relay/wsserver.go b/relay/wsserver.go index 2e84c6f..3f1d2cb 100644 --- a/relay/wsserver.go +++ b/relay/wsserver.go @@ -70,8 +70,8 @@ func (ws *WsServer) NewClientConn(w http.ResponseWriter, r *http.Request) { conn: conn, id: generateRandomBytes16(), ws: ws, - pubTopics: NewTopicSet(), - subTopics: NewTopicSet(), + //pubTopics: NewTopicSet(), + //subTopics: NewTopicSet(), sendbuf: make(chan SocketMessage, 256), quit: make(chan struct{}), } @@ -100,12 +100,12 @@ func (ws *WsServer) Run() { switch message.Type { case Pub: // do not modify wsserver's local variable in seperate goroutine - message.client.pubTopics.Set(message.Topic) + //message.client.pubTopics.Set(message.Topic) ws.publishers.Set(message.Topic, message.client) go ws.pubMessage(message) log.Info("local message", zap.Any("client", message.client), zap.Any("message", message)) case Sub: - message.client.subTopics.Set(message.Topic) + //message.client.subTopics.Set(message.Topic) ws.subscribers.Set(message.Topic, message.client) go ws.subMessage(message) log.Info("local message", zap.Any("client", message.client), zap.Any("message", message)) From 25b9fcd806045ee05e8878f4ba3e902116ac0bd3 Mon Sep 17 00:00:00 2001 From: cryptocifer Date: Mon, 27 Nov 2023 21:16:54 +0300 Subject: [PATCH 03/11] update --- relay/wsserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay/wsserver.go b/relay/wsserver.go index 3f1d2cb..a2a0144 100644 --- a/relay/wsserver.go +++ b/relay/wsserver.go @@ -147,7 +147,7 @@ func (ws *WsServer) Run() { } case client := <-ws.register: - log.Info("new client connection", zap.Any("client", client)) + //log.Info("new client connection", zap.Any("client", client)) metrics.IncNewConnection() ws.clients[client] = struct{}{} metrics.SetCurrentConnections(len(ws.clients)) From dac26da7841e8a33f87435e4afcb12567b696f95 Mon Sep 17 00:00:00 2001 From: cryptocifer Date: Tue, 28 Nov 2023 06:26:24 +0300 Subject: [PATCH 04/11] increase profile timeout --- main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 5d947f9..4fc9551 100644 --- a/main.go +++ b/main.go @@ -37,8 +37,8 @@ func startMetricServer(config *config.MetricConfig) { s := &http.Server{ Addr: config.Listen, Handler: r, - ReadTimeout: 30 * time.Second, - WriteTimeout: 30 * time.Second, + ReadTimeout: 60 * time.Second, + WriteTimeout: 60 * time.Second, MaxHeaderBytes: 1 << 20, } From f6070e6e78dba04278becadbded95793c34e8846 Mon Sep 17 00:00:00 2001 From: cryptocifer Date: Tue, 28 Nov 2023 07:25:58 +0300 Subject: [PATCH 05/11] fix cpu performance issue --- relay/schema.go | 10 ++++++++++ relay/schema_test.go | 24 ++++++++++++++++++++++++ relay/wshandlers.go | 22 ++++++++++++---------- relay/wsserver.go | 8 ++++---- 4 files changed, 50 insertions(+), 14 deletions(-) diff --git a/relay/schema.go b/relay/schema.go index 664d18b..608e364 100644 --- a/relay/schema.go +++ b/relay/schema.go @@ -159,6 +159,16 @@ func NewTopicSet() *TopicSet { } } +func (tm TopicSet) Get() map[string]struct{} { + tm.Lock() + defer tm.Unlock() + newmap := make(map[string]struct{}) + for key, value := range tm.Data { + newmap[key] = value + } + return newmap +} + func (tm TopicSet) Set(topic string) { tm.Lock() defer tm.Unlock() diff --git a/relay/schema_test.go b/relay/schema_test.go index 540a522..508b7da 100644 --- a/relay/schema_test.go +++ b/relay/schema_test.go @@ -85,3 +85,27 @@ func TestGetTopicsByClientAndClear(t *testing.T) { t.Errorf("length error, expected: %v, actual: %v", expectedLen, actualLen) } } + +func TestTopicGet(t *testing.T) { + ts := NewTopicSet() + + ts.Set("hello") + ts.Set("hello1") + ts.Set("hello2") + + topics := ts.Get() + actualLen := len(topics) + expectedLen := 3 + if actualLen != expectedLen { + t.Errorf("length error, expected: %v, actual: %v", expectedLen, actualLen) + } + if _, ok := topics["hello"]; !ok { + t.Errorf("key does not exists") + } + if _, ok := topics["hello1"]; !ok { + t.Errorf("key does not exists") + } + if _, ok := topics["hello2"]; !ok { + t.Errorf("key does not exists") + } +} diff --git a/relay/wshandlers.go b/relay/wshandlers.go index f494a2d..4ef4eff 100644 --- a/relay/wshandlers.go +++ b/relay/wshandlers.go @@ -24,12 +24,12 @@ func (ws *WsServer) pubMessage(message SocketMessage) { } } - log.Info("publish message", zap.Any("client", publisher), zap.Any("topic", message.Topic)) + //log.Info("publish message", zap.Any("client", publisher), zap.Any("topic", message.Topic)) metrics.IncTotalMessages() key := messageChanKey(topic) if count, _ := ws.redisConn.Publish(context.TODO(), key, message).Result(); count >= 1 { - log.Info("message published", zap.Any("client", publisher), zap.Any("topic", topic)) + //log.Info("message published", zap.Any("client", publisher), zap.Any("topic", topic)) if publisher.role == Dapp { publisher.send(SocketMessage{ Topic: message.Topic, @@ -38,7 +38,7 @@ func (ws *WsServer) pubMessage(message SocketMessage) { }) } } else { - log.Info("cache message", zap.Any("client", publisher), zap.Any("topic", topic)) + //log.Info("cache message", zap.Any("client", publisher), zap.Any("topic", topic)) metrics.IncCachedMessages() if message.Phase == string(SessionRequest) { metrics.IncNewRequestedSessions() @@ -136,11 +136,12 @@ func (ws *WsServer) handleClientDisconnect(client *client) { // clear the client from the subscribed and published topics channelsToClear := []string{} - subscribedTopics := ws.subscribers.GetTopicsByClient(client, true) + //subscribedTopics := ws.subscribers.GetTopicsByClient(client, true) + subscribedTopics := client.subTopics.Get() - for _, topic := range subscribedTopics { - count := len(ws.subscribers.Get(topic)) - if count == 0 { + for topic := range subscribedTopics { + ws.subscribers.Unset(topic, client) + if ws.subscribers.Len(topic) == 0 { channelsToClear = append(channelsToClear, messageChanKey(topic)) } } @@ -148,8 +149,9 @@ func (ws *WsServer) handleClientDisconnect(client *client) { if client.role == Dapp { // clear dapp notify channels publishedChannels := []string{} - for _, topic := range ws.publishers.GetTopicsByClient(client, true) { - if len(ws.publishers.Get(topic)) == 0 { + for topic := range client.pubTopics.Get() { + ws.publishers.Unset(topic, client) + if ws.publishers.Len(topic) == 0 { publishedChannels = append(publishedChannels, dappNotifyChanKey(topic)) } } @@ -168,7 +170,7 @@ func (ws *WsServer) handleClientDisconnect(client *client) { if client.role == Dapp { return } - for _, topic := range subscribedTopics { + for topic := range subscribedTopics { go func(topic string) { key := dappNotifyChanKey(topic) ws.redisConn.Publish(context.TODO(), key, SocketMessage{ diff --git a/relay/wsserver.go b/relay/wsserver.go index a2a0144..a4ef9f5 100644 --- a/relay/wsserver.go +++ b/relay/wsserver.go @@ -70,8 +70,8 @@ func (ws *WsServer) NewClientConn(w http.ResponseWriter, r *http.Request) { conn: conn, id: generateRandomBytes16(), ws: ws, - //pubTopics: NewTopicSet(), - //subTopics: NewTopicSet(), + pubTopics: NewTopicSet(), + subTopics: NewTopicSet(), sendbuf: make(chan SocketMessage, 256), quit: make(chan struct{}), } @@ -100,12 +100,12 @@ func (ws *WsServer) Run() { switch message.Type { case Pub: // do not modify wsserver's local variable in seperate goroutine - //message.client.pubTopics.Set(message.Topic) + message.client.pubTopics.Set(message.Topic) ws.publishers.Set(message.Topic, message.client) go ws.pubMessage(message) log.Info("local message", zap.Any("client", message.client), zap.Any("message", message)) case Sub: - //message.client.subTopics.Set(message.Topic) + message.client.subTopics.Set(message.Topic) ws.subscribers.Set(message.Topic, message.client) go ws.subMessage(message) log.Info("local message", zap.Any("client", message.client), zap.Any("message", message)) From 0366a5e75ab02d338522827a924066d5119da3f7 Mon Sep 17 00:00:00 2001 From: cryptocifer Date: Tue, 28 Nov 2023 07:27:24 +0300 Subject: [PATCH 06/11] revert unneeded changes --- main.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 4fc9551..5d947f9 100644 --- a/main.go +++ b/main.go @@ -37,8 +37,8 @@ func startMetricServer(config *config.MetricConfig) { s := &http.Server{ Addr: config.Listen, Handler: r, - ReadTimeout: 60 * time.Second, - WriteTimeout: 60 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, MaxHeaderBytes: 1 << 20, } From 8eff606d9d57558f207a2f92abdde9faa3886f8b Mon Sep 17 00:00:00 2001 From: cryptocifer Date: Wed, 29 Nov 2023 19:22:15 +0300 Subject: [PATCH 07/11] code cleanup --- relay/wsconn.go | 2 +- relay/wshandlers.go | 1 - relay/wsserver.go | 4 ---- 3 files changed, 1 insertion(+), 6 deletions(-) diff --git a/relay/wsconn.go b/relay/wsconn.go index 4b9a427..7cd6155 100644 --- a/relay/wsconn.go +++ b/relay/wsconn.go @@ -18,7 +18,7 @@ type client struct { id string // randomly generate, just for logging role RoleType // dapp or wallet - session string // session id + pubTopics *TopicSet subTopics *TopicSet diff --git a/relay/wshandlers.go b/relay/wshandlers.go index 4ef4eff..6f43aa5 100644 --- a/relay/wshandlers.go +++ b/relay/wshandlers.go @@ -82,7 +82,6 @@ func (ws *WsServer) subMessage(message SocketMessage) { // messages from wallet if noti.Phase == string(SessionRequest) { // handle the 1st case stated above - subscriber.session = noti.Topic metrics.IncReceivedSessions() //log.Info("session been scanned", zap.Any("topic", topic), zap.Any("client", subscriber)) diff --git a/relay/wsserver.go b/relay/wsserver.go index a4ef9f5..e1ffcae 100644 --- a/relay/wsserver.go +++ b/relay/wsserver.go @@ -139,10 +139,6 @@ func (ws *WsServer) Run() { // * relay generated fake "ack" for the wallet for _, publisher := range ws.GetDappPublisher(message.Topic) { log.Info("wallet updates, notify dapp", zap.Any("client", publisher), zap.Any("message", message)) - // if SessionReceived, the message topic is the QRCode topic, use it as the session id - if message.Phase == string(SessionReceived) { - publisher.session = message.Topic - } publisher.send(message) } From dcdc943dd4703926f1e987e6f2ac0193d81afb34 Mon Sep 17 00:00:00 2001 From: cryptocifer Date: Wed, 29 Nov 2023 20:46:47 +0300 Subject: [PATCH 08/11] fix oom --- relay/wshandlers.go | 16 +++++++--------- relay/wsserver.go | 2 +- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/relay/wshandlers.go b/relay/wshandlers.go index 6f43aa5..662f69a 100644 --- a/relay/wshandlers.go +++ b/relay/wshandlers.go @@ -144,17 +144,15 @@ func (ws *WsServer) handleClientDisconnect(client *client) { channelsToClear = append(channelsToClear, messageChanKey(topic)) } } - - if client.role == Dapp { - // clear dapp notify channels - publishedChannels := []string{} - for topic := range client.pubTopics.Get() { - ws.publishers.Unset(topic, client) - if ws.publishers.Len(topic) == 0 { - publishedChannels = append(publishedChannels, dappNotifyChanKey(topic)) + for topic := range client.pubTopics.Get() { + ws.publishers.Unset(topic, client) + if ws.publishers.Len(topic) == 0 { + channelsToClear = append(channelsToClear, messageChanKey(topic)) + // for dapp, need to further clear notify channels + if client.role == Dapp { + channelsToClear = append(channelsToClear, dappNotifyChanKey(topic)) } } - channelsToClear = append(channelsToClear, publishedChannels...) } if len(channelsToClear) > 0 { diff --git a/relay/wsserver.go b/relay/wsserver.go index e1ffcae..f499f20 100644 --- a/relay/wsserver.go +++ b/relay/wsserver.go @@ -72,7 +72,7 @@ func (ws *WsServer) NewClientConn(w http.ResponseWriter, r *http.Request) { ws: ws, pubTopics: NewTopicSet(), subTopics: NewTopicSet(), - sendbuf: make(chan SocketMessage, 256), + sendbuf: make(chan SocketMessage, 8), quit: make(chan struct{}), } From 9031d510921d08887480f1e1cecda248767c9d79 Mon Sep 17 00:00:00 2001 From: cryptocifer Date: Wed, 29 Nov 2023 21:28:14 +0300 Subject: [PATCH 09/11] code cleanup --- relay/pendingSession.go | 112 ----------------------------------- relay/pendingSession_test.go | 76 ------------------------ relay/wsconn.go | 5 +- relay/wshandlers.go | 12 ++-- relay/wsserver.go | 2 - 5 files changed, 8 insertions(+), 199 deletions(-) delete mode 100644 relay/pendingSession.go delete mode 100644 relay/pendingSession_test.go diff --git a/relay/pendingSession.go b/relay/pendingSession.go deleted file mode 100644 index bd59ce3..0000000 --- a/relay/pendingSession.go +++ /dev/null @@ -1,112 +0,0 @@ -package relay - -import ( - "sort" - "sync" - "time" -) - -type PendingSessions []*pendingSession - -// SortedPendingSessions stores pending sessions in sorted manner by expireTime, meanwhile provides an O(1) time if lookup by topic -type SortedPendingSessions struct { - sessions PendingSessions - mapping map[string]*pendingSession - mutex sync.Mutex -} - -type pendingSession struct { - expireTime time.Time - topic string -} - -func (pq PendingSessions) Len() int { return len(pq) } - -func (pq PendingSessions) Less(i, j int) bool { - return pq[i].expireTime.Before(pq[j].expireTime) -} - -func (pq PendingSessions) Swap(i, j int) { - pq[i], pq[j] = pq[j], pq[i] -} - -func (pq *PendingSessions) insert(p *pendingSession) { - // i points to the smallest index that expires before p - i := sort.Search(len(*pq), func(i int) bool { - return (*pq)[i].expireTime.Before(p.expireTime) - }) - *pq = append(*pq, &pendingSession{}) // allocate a empty value at the end - copy((*pq)[i+1:], (*pq)[i:]) - (*pq)[i] = p -} - -func (pq *PendingSessions) delete(p *pendingSession) { - // i points to the smallest index that expires earlier or same as p - i := sort.Search(len(*pq), func(i int) bool { - return !(*pq)[i].expireTime.After(p.expireTime) - }) - - target := i - for j, session := range (*pq)[i:] { - if session.expireTime.Equal((*pq)[i].expireTime) && session.topic == (*pq)[i].topic { - target = i + j - break - } - } - if target < len(*pq) { - copy((*pq)[target:], (*pq)[target+1:]) - *pq = (*pq)[:len(*pq)-1] - } -} - -func (sps *SortedPendingSessions) insert(p *pendingSession) { - sps.mutex.Lock() - defer sps.mutex.Unlock() - - if _, ok := sps.mapping[p.topic]; ok { - return - } - - sps.sessions.insert(p) - sps.mapping[p.topic] = p -} - -// deleteByTopic deletes the pending session by topic -// the return indicates whether exists the pending session identified by the topic -func (sps *SortedPendingSessions) deleteByTopic(topic string) bool { - sps.mutex.Lock() - defer sps.mutex.Unlock() - - p, ok := sps.mapping[topic] - if ok { - sps.sessions.delete(p) - } - delete(sps.mapping, topic) - - return true -} - -// peak peaks the session that will be expired earliest but ain't remove it -func (sps *SortedPendingSessions) peak() *pendingSession { - if sps.sessions.Len() < 1 { - return nil - } - return sps.sessions[sps.sessions.Len()-1] -} - -// pop pops the session that will be expired earliest -func (sps *SortedPendingSessions) pop() { - sps.mutex.Lock() - defer sps.mutex.Unlock() - - session := sps.sessions[sps.sessions.Len()-1] - sps.sessions = sps.sessions[:sps.sessions.Len()-1] - delete(sps.mapping, session.topic) -} - -func NewSortedPendingSessions() *SortedPendingSessions { - return &SortedPendingSessions{ - sessions: PendingSessions{}, - mapping: make(map[string]*pendingSession), - } -} diff --git a/relay/pendingSession_test.go b/relay/pendingSession_test.go deleted file mode 100644 index a17f6bf..0000000 --- a/relay/pendingSession_test.go +++ /dev/null @@ -1,76 +0,0 @@ -package relay - -import ( - "testing" - "time" -) - -func TestInsertPendingSession(t *testing.T) { - sps := NewSortedPendingSessions() - - session1 := &pendingSession{ - expireTime: time.Now().Add(5 * time.Second), // expire - topic: "topic1", - } - - session2 := &pendingSession{ - expireTime: time.Now().Add(4 * time.Second), // expire - topic: "topic2", - } - - session3 := &pendingSession{ - expireTime: time.Now().Add(3 * time.Second), // expire - topic: "topic3", - } - - sps.insert(session1) - sps.insert(session3) - sps.insert(session2) - - actual := sps.peak() - if actual.topic != session3.topic { - t.Errorf("unexpected top session, expected: %v, actual: %v\n", session3.topic, actual.topic) - } - - sps.pop() - actual = sps.peak() - if actual.topic != session2.topic { - t.Errorf("unexpected top session, expected: %v, actual: %v\n", session2.topic, actual.topic) - } - - sps.pop() - actual = sps.peak() - if actual.topic != session1.topic { - t.Errorf("unexpected top session, expected: %v, actual: %v\n", session1.topic, actual.topic) - } -} - -func TestDeletePendingSession(t *testing.T) { - sps := NewSortedPendingSessions() - - session1 := &pendingSession{ - expireTime: time.Now().Add(5 * time.Second), // expire - topic: "topic1", - } - - session2 := &pendingSession{ - expireTime: time.Now().Add(4 * time.Second), // expire - topic: "topic2", - } - - session3 := &pendingSession{ - expireTime: time.Now().Add(3 * time.Second), // expire - topic: "topic3", - } - - sps.insert(session1) - sps.insert(session3) - sps.insert(session2) - - sps.deleteByTopic("topic3") - - actual := sps.peak() - if actual.topic != session2.topic { - t.Errorf("unexpected top session, expected: %v, actual: %v\n", session2.topic, actual.topic) - } -} diff --git a/relay/wsconn.go b/relay/wsconn.go index 7cd6155..6ccdd2b 100644 --- a/relay/wsconn.go +++ b/relay/wsconn.go @@ -18,7 +18,6 @@ type client struct { id string // randomly generate, just for logging role RoleType // dapp or wallet - pubTopics *TopicSet subTopics *TopicSet @@ -30,8 +29,8 @@ func (c *client) MarshalLogObject(encoder zapcore.ObjectEncoder) error { if c != nil { encoder.AddString("id", c.id) encoder.AddString("role", string(c.role)) - //encoder.AddArray("pubTopics", c.pubTopics) - //encoder.AddArray("subTopics", c.subTopics) + encoder.AddArray("pubTopics", c.pubTopics) + encoder.AddArray("subTopics", c.subTopics) } return nil } diff --git a/relay/wshandlers.go b/relay/wshandlers.go index 662f69a..b6ebc7f 100644 --- a/relay/wshandlers.go +++ b/relay/wshandlers.go @@ -24,12 +24,12 @@ func (ws *WsServer) pubMessage(message SocketMessage) { } } - //log.Info("publish message", zap.Any("client", publisher), zap.Any("topic", message.Topic)) + log.Debug("publish message", zap.Any("client", publisher), zap.Any("topic", message.Topic)) metrics.IncTotalMessages() key := messageChanKey(topic) if count, _ := ws.redisConn.Publish(context.TODO(), key, message).Result(); count >= 1 { - //log.Info("message published", zap.Any("client", publisher), zap.Any("topic", topic)) + log.Debug("message published", zap.Any("client", publisher), zap.Any("topic", topic)) if publisher.role == Dapp { publisher.send(SocketMessage{ Topic: message.Topic, @@ -38,7 +38,7 @@ func (ws *WsServer) pubMessage(message SocketMessage) { }) } } else { - //log.Info("cache message", zap.Any("client", publisher), zap.Any("topic", topic)) + log.Debug("cache message", zap.Any("client", publisher), zap.Any("topic", topic)) metrics.IncCachedMessages() if message.Phase == string(SessionRequest) { metrics.IncNewRequestedSessions() @@ -54,11 +54,11 @@ func (ws *WsServer) subMessage(message SocketMessage) { if err := ws.redisSubConn.Subscribe(context.TODO(), messageChanKey(topic)); err != nil { log.Warn("[redisSub] subscribe to topic fail", zap.String("topic", topic), zap.Any("client", subscriber)) } - //log.Info("subscribe to topic", zap.String("topic", topic), zap.Any("client", subscriber)) + log.Debug("subscribe to topic", zap.String("topic", topic), zap.Any("client", subscriber)) // forward cached notificatoins if there's any notifications := ws.getCachedMessages(topic, true) - //log.Info("pending notifications", zap.String("topic", topic), zap.Any("num", len(notifications)), zap.Any("client", subscriber)) + log.Debug("pending notifications", zap.String("topic", topic), zap.Any("num", len(notifications)), zap.Any("client", subscriber)) for _, notification := range notifications { subscriber.send(notification) } @@ -83,7 +83,7 @@ func (ws *WsServer) subMessage(message SocketMessage) { if noti.Phase == string(SessionRequest) { // handle the 1st case stated above metrics.IncReceivedSessions() - //log.Info("session been scanned", zap.Any("topic", topic), zap.Any("client", subscriber)) + log.Debug("session been scanned", zap.Any("topic", topic), zap.Any("client", subscriber)) // notify the topic publisher, aka the dapp, that the session request has been received by wallet key := dappNotifyChanKey(noti.Topic) diff --git a/relay/wsserver.go b/relay/wsserver.go index f499f20..41b06f3 100644 --- a/relay/wsserver.go +++ b/relay/wsserver.go @@ -80,7 +80,6 @@ func (ws *WsServer) NewClientConn(w http.ResponseWriter, r *http.Request) { go client.read() go client.write() - //go client.heartbeat() } func (ws *WsServer) Run() { @@ -143,7 +142,6 @@ func (ws *WsServer) Run() { } case client := <-ws.register: - //log.Info("new client connection", zap.Any("client", client)) metrics.IncNewConnection() ws.clients[client] = struct{}{} metrics.SetCurrentConnections(len(ws.clients)) From 56906d2906a67c5905f3e7eef5b66603996c27c6 Mon Sep 17 00:00:00 2001 From: cryptocifer Date: Wed, 29 Nov 2023 21:30:05 +0300 Subject: [PATCH 10/11] code cleanup --- metrics/connection.go | 11 ----------- relay/schema.go | 10 ++++++++++ relay/util.go | 14 -------------- relay/wsserver.go | 3 --- 4 files changed, 10 insertions(+), 28 deletions(-) delete mode 100644 relay/util.go diff --git a/metrics/connection.go b/metrics/connection.go index c27ffab..6d60fb3 100644 --- a/metrics/connection.go +++ b/metrics/connection.go @@ -30,13 +30,6 @@ var ( Name: "send_blockings", Help: "Number of send blocking connections", }) - - countMessageFromClosed = prometheus.NewCounter(prometheus.CounterOpts{ - Namespace: promNamespace, - Subsystem: promSubsystem, - Name: "sending_on_closed", - Help: "Number of sending on closed connections", - }) ) func IncNewConnection() { @@ -51,10 +44,6 @@ func IncSendBlocking() { countSendBlocking.Inc() } -func IncMessageFromClosed() { - countMessageFromClosed.Inc() -} - func SetCurrentConnections(num int) { gaugeCurrentConnections.Set(float64(num)) } diff --git a/relay/schema.go b/relay/schema.go index 608e364..638d612 100644 --- a/relay/schema.go +++ b/relay/schema.go @@ -1,6 +1,8 @@ package relay import ( + "crypto/rand" + "encoding/base64" "encoding/json" "strings" "sync" @@ -188,3 +190,11 @@ type ClientUnregisterEvent struct { client *client reason error } + +func generateRandomBytes16() string { + buf := make([]byte, 16) + if _, err := rand.Read(buf); err != nil { + return "" + } + return base64.StdEncoding.EncodeToString(buf) +} diff --git a/relay/util.go b/relay/util.go deleted file mode 100644 index 1642081..0000000 --- a/relay/util.go +++ /dev/null @@ -1,14 +0,0 @@ -package relay - -import ( - "crypto/rand" - "encoding/base64" -) - -func generateRandomBytes16() string { - buf := make([]byte, 16) - if _, err := rand.Read(buf); err != nil { - return "" - } - return base64.StdEncoding.EncodeToString(buf) -} diff --git a/relay/wsserver.go b/relay/wsserver.go index 41b06f3..50a84c5 100644 --- a/relay/wsserver.go +++ b/relay/wsserver.go @@ -90,9 +90,6 @@ func (ws *WsServer) Run() { for { select { case message := <-ws.localCh: - if _, ok := ws.clients[message.client]; !ok { - metrics.IncMessageFromClosed() - } // local message could be "pub", "sub" or "ack" or "ping" // pub/sub message handler may contain time-consuming operations(e.g. read/write redis) // so put them in separate goroutine to avoid blocking wsserver main loop From f290ef24d80b00397dec0481dd8c229323841b80 Mon Sep 17 00:00:00 2001 From: cryptocifer Date: Wed, 29 Nov 2023 21:31:02 +0300 Subject: [PATCH 11/11] cleanup --- relay/wsserver.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/relay/wsserver.go b/relay/wsserver.go index 50a84c5..d4cfb91 100644 --- a/relay/wsserver.go +++ b/relay/wsserver.go @@ -134,7 +134,7 @@ func (ws *WsServer) Run() { // * SessionResumed // * relay generated fake "ack" for the wallet for _, publisher := range ws.GetDappPublisher(message.Topic) { - log.Info("wallet updates, notify dapp", zap.Any("client", publisher), zap.Any("message", message)) + log.Debug("wallet updates, notify dapp", zap.Any("client", publisher), zap.Any("message", message)) publisher.send(message) }