diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f366569..b7969950 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,10 @@ ## master +- Add support for batched broadcasts. ([@palkan][]) + +It's now possible to publish an array of broadcasting messages (e.g., `[{"stream":"a","data":"..."},"stream":"b","data":"..."}]`). The messages will be delivered in the same order as they were published (within a stream). + - Add support for REDIS_URL with trailing slash hostnames. ([@ardecvz][]) `rueidis` doesn't tolerate the trailing slash hostnames. diff --git a/common/common.go b/common/common.go index b4542763..6277f0aa 100644 --- a/common/common.go +++ b/common/common.go @@ -339,6 +339,14 @@ func PubSubMessageFromJSON(raw []byte) (interface{}, error) { } } + batch := []*StreamMessage{} + + if err := json.Unmarshal(raw, &batch); err == nil { + if len(batch) > 0 && batch[0].Stream != "" { + return batch, nil + } + } + rmsg := RemoteCommandMessage{} if err := json.Unmarshal(raw, &rmsg); err != nil { diff --git a/node/node.go b/node/node.go index 54c9a95d..95074c2b 100644 --- a/node/node.go +++ b/node/node.go @@ -157,6 +157,10 @@ func (n *Node) HandleBroadcast(raw []byte) { switch v := msg.(type) { case common.StreamMessage: n.broker.HandleBroadcast(&v) + case []*common.StreamMessage: + for _, el := range v { + n.broker.HandleBroadcast(el) + } case common.RemoteCommandMessage: n.broker.HandleCommand(&v) } @@ -175,6 +179,10 @@ func (n *Node) HandlePubSub(raw []byte) { switch v := msg.(type) { case common.StreamMessage: n.Broadcast(&v) + case []*common.StreamMessage: + for _, el := range v { + n.Broadcast(el) + } case common.RemoteCommandMessage: n.ExecuteRemoteCommand(&v) } diff --git a/node/node_test.go b/node/node_test.go index ee3a4336..47d891a5 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -655,7 +655,7 @@ func TestRestoreSession(t *testing.T) { }) } -func TestHandlePubSub(t *testing.T) { +func TestBroadcasting(t *testing.T) { node := NewMockNode() go node.hub.Run() @@ -666,23 +666,76 @@ func TestHandlePubSub(t *testing.T) { node.hub.AddSession(session) node.hub.SubscribeSession(session, "test", "test_channel") + node.hub.SubscribeSession(session, "staind_2023", "music_channel") node.hub.AddSession(session2) node.hub.SubscribeSession(session2, "test", "test_channel") + node.hub.SubscribeSession(session2, "staind_2023", "music_channel") node.broker.Subscribe("test") + node.broker.Subscribe("staind_2023") - node.HandlePubSub([]byte("{\"stream\":\"test\",\"data\":\"\\\"abc123\\\"\"}")) + t.Run("HandlePubSub", func(t *testing.T) { + node.HandlePubSub([]byte(`{"stream":"test","data":"\"abc123\""}`)) - expected := "{\"identifier\":\"test_channel\",\"message\":\"abc123\"}" + expected := `{"identifier":"test_channel","message":"abc123"}` - msg, err := session.conn.Read() - assert.Nil(t, err) - assert.Equalf(t, expected, string(msg), "Expected to receive %s but got %s", expected, string(msg)) + msg, err := session.conn.Read() + assert.Nil(t, err) + assert.Equalf(t, expected, string(msg), "Expected to receive %s but got %s", expected, string(msg)) - msg2, err := session2.conn.Read() - assert.Nil(t, err) - assert.Equalf(t, expected, string(msg2), "Expected to receive %s but got %s", expected, string(msg2)) + msg2, err := session2.conn.Read() + assert.Nil(t, err) + assert.Equalf(t, expected, string(msg2), "Expected to receive %s but got %s", expected, string(msg2)) + }) + + t.Run("HandlePubSub_Batch", func(t *testing.T) { + node.HandlePubSub([]byte(`[{"stream":"test","data":"\"follow me\""},{"stream":"untest","data":"\"missing\""},{"stream":"staind_2023","data":"{\"num\":7,\"title\":\"The Fray\"}"}]`)) + + first := `{"identifier":"test_channel","message":"follow me"}` + second := `{"identifier":"music_channel","message":{"num":7,"title":"The Fray"}}` + + msgs, err := readMessages(session.conn, 2) + assert.Nil(t, err) + assert.Contains(t, msgs, first) + assert.Contains(t, msgs, second) + + msgs2, err := readMessages(session2.conn, 2) + assert.Nil(t, err) + assert.Contains(t, msgs2, first) + assert.Contains(t, msgs2, second) + }) + + t.Run("HandleBroadcast", func(t *testing.T) { + node.HandleBroadcast([]byte(`{"stream":"staind_2023","data":"{\"num\":5,\"title\":\"Out of time\"}"}`)) + + expected := `{"identifier":"music_channel","message":{"num":5,"title":"Out of time"}}` + + msg, err := session.conn.Read() + assert.Nil(t, err) + assert.Equalf(t, expected, string(msg), "Expected to receive %s but got %s", expected, string(msg)) + + msg2, err := session2.conn.Read() + assert.Nil(t, err) + assert.Equalf(t, expected, string(msg2), "Expected to receive %s but got %s", expected, string(msg2)) + }) + + t.Run("HandleBroadcast_Batch", func(t *testing.T) { + node.HandleBroadcast([]byte(`[{"stream":"staind_2023","data":"{\"num\":9,\"title\":\"Hate me too\"}"},{"stream":"untest","data":"\"missing\""},{"stream":"staind_2023","data":"{\"num\":10,\"title\":\"Confessions of Fallen\"}"}]`)) + + first := `{"identifier":"music_channel","message":{"num":9,"title":"Hate me too"}}` + second := `{"identifier":"music_channel","message":{"num":10,"title":"Confessions of Fallen"}}` + + msgs, err := readMessages(session.conn, 2) + assert.Nil(t, err) + assert.Contains(t, msgs, first) + assert.Contains(t, msgs, second) + + msgs2, err := readMessages(session2.conn, 2) + assert.Nil(t, err) + assert.Contains(t, msgs2, first) + assert.Contains(t, msgs2, second) + }) } func TestHandlePubSubWithCommand(t *testing.T) { @@ -727,3 +780,18 @@ func toJSON(msg encoders.EncodedMessage) []byte { return b } + +func readMessages(conn Connection, count int) ([]string, error) { + var messages []string + + for i := 0; i < count; i++ { + msg, err := conn.Read() + if err != nil { + return nil, err + } + + messages = append(messages, string(msg)) + } + + return messages, nil +}