From c4793e154160b9ca75b7b34fb83db837d6a820fb Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Wed, 20 Mar 2024 12:19:36 -0700 Subject: [PATCH 1/5] feat: transient and typed publications --- broker/memory.go | 5 +++++ broker/nats.go | 5 +++++ common/common.go | 4 ++++ hub/gate.go | 8 +++++++- node/broker_integration_test.go | 2 ++ 5 files changed, 23 insertions(+), 1 deletion(-) diff --git a/broker/memory.go b/broker/memory.go index 51744d46..59f6b105 100644 --- a/broker/memory.go +++ b/broker/memory.go @@ -234,6 +234,11 @@ func (b *Memory) Shutdown(ctx context.Context) error { } func (b *Memory) HandleBroadcast(msg *common.StreamMessage) { + if msg.Meta != nil && msg.Meta.Transient { + b.broadcaster.Broadcast(msg) + return + } + offset := b.add(msg.Stream, msg.Data) msg.Epoch = b.GetEpoch() diff --git a/broker/nats.go b/broker/nats.go index db4548a9..ac28c8bf 100644 --- a/broker/nats.go +++ b/broker/nats.go @@ -311,6 +311,11 @@ func (n *NATS) writeEpoch(val string) { } func (n *NATS) HandleBroadcast(msg *common.StreamMessage) { + if msg.Meta != nil && msg.Meta.Transient { + n.broadcaster.Broadcast(msg) + return + } + err := n.Ready(jetstreamReadyTimeout) if err != nil { n.log.Debug("JetStream is not ready yet to publish messages, add to backlog") diff --git a/common/common.go b/common/common.go index caf0d15b..d8cd7865 100644 --- a/common/common.go +++ b/common/common.go @@ -289,6 +289,10 @@ func (m *Message) LogValue() slog.Value { // which can be used to modify delivery behavior type StreamMessageMetadata struct { ExcludeSocket string `json:"exclude_socket,omitempty"` + // BroadcastType defines the message type to be used for messages sent to clients + BroadcastType string `json:"broadcast_type,omitempty"` + // Transient defines whether this message should be stored in the history + Transient bool `json:"transient,omitempty"` } func (smm *StreamMessageMetadata) LogValue() slog.Value { diff --git a/hub/gate.go b/hub/gate.go index 673d0624..fedddd25 100644 --- a/hub/gate.go +++ b/hub/gate.go @@ -162,7 +162,13 @@ func (g *Gate) performBroadcast(streamMsg *common.StreamMessage) { } func buildMessage(msg *common.StreamMessage, identifier string) encoders.EncodedMessage { - return encoders.NewCachedEncodedMessage(msg.ToReplyFor(identifier)) + reply := msg.ToReplyFor(identifier) + + if msg.Meta != nil { + reply.Type = msg.Meta.BroadcastType + } + + return encoders.NewCachedEncodedMessage(reply) } func streamSessionsSnapshot[T comparable](src map[T]map[string]bool) map[T][]string { diff --git a/node/broker_integration_test.go b/node/broker_integration_test.go index a94fa2a9..95f70b1e 100644 --- a/node/broker_integration_test.go +++ b/node/broker_integration_test.go @@ -310,6 +310,8 @@ func sharedIntegrationHistory(t *testing.T, node *Node, controller *mocks.Contro ts := time.Now().Unix() node.HandleBroadcast([]byte(`{"stream": "messages_1","data":"Flavia: buona sera"}`)) + // Transient messages must not be stored in the history + node.HandleBroadcast([]byte(`{"stream": "messages_1","data":"Who's there?","meta":{"transient":true}}`)) node.HandleBroadcast([]byte(`{"stream": "messages_1","data":"Mario: ta-dam!"}`)) node.HandleBroadcast([]byte(`{"stream": "presence_1","data":"1 new notification"}`)) From 0f8fb208f358f8e3a755f17540b708119d3a3dcc Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Thu, 21 Mar 2024 16:54:04 -0700 Subject: [PATCH 2/5] feat: node.Whisper --- common/common.go | 12 ++++++++++ node/node.go | 43 +++++++++++++++++++++++++++++++++ node/node_test.go | 61 +++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+) diff --git a/common/common.go b/common/common.go index d8cd7865..fd6161de 100644 --- a/common/common.go +++ b/common/common.go @@ -64,6 +64,8 @@ const ( HistoryConfirmedType = "confirm_history" HistoryRejectedType = "reject_history" + + WhisperType = "whisper" ) // Disconnect reasons @@ -75,6 +77,11 @@ const ( UNAUTHORIZED_REASON = "unauthorized" ) +// Reserver state fields +const ( + WHISPER_STATE_SUFFIX = "$w" +) + // SessionEnv represents the underlying HTTP connection data: // URL and request headers. // It also carries channel and connection state information used by the RPC app. @@ -514,3 +521,8 @@ func RejectionMessage(identifier string) string { func DisconnectionMessage(reason string, reconnect bool) string { return string(utils.ToJSON(DisconnectMessage{Type: DisconnectType, Reason: reason, Reconnect: reconnect})) } + +// WhisperStateKey returns the field name for the given identifier to store the whispering stream name +func WhisperStateKey(identifier string) string { + return identifier + "/" + WHISPER_STATE_SUFFIX +} diff --git a/node/node.go b/node/node.go index e1753037..422fa62c 100644 --- a/node/node.go +++ b/node/node.go @@ -173,6 +173,8 @@ func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error) { _, err = n.Perform(s, msg) case "history": err = n.History(s, msg) + case "whisper": + err = n.Whisper(s, msg) default: err = fmt.Errorf("unknown command: %s", msg.Command) } @@ -628,6 +630,47 @@ func (n *Node) retreiveHistory(history *common.HistoryRequest, streams []string) return backlog, nil } +// Whisper broadcasts the message to the specified whispering stream to +// all clients except the sender +func (n *Node) Whisper(s *Session, msg *common.Message) error { + // The session must have the whisper stream name defined in the state to be able to whisper + // If the stream is not defined, the whisper message is ignored + env := s.GetEnv() + if env == nil { + return errors.New("session environment is missing") + } + + streamVal, ok := s.ReadInternalState(common.WhisperStateKey(msg.Identifier)) + + if !ok { + s.Log.Debug("whisper stream not found", "identifier", msg.Identifier) + return nil + } + + stream, ok := streamVal.(string) + + if !ok { + s.Log.Warn("whisper stream is not a string", "identifier", msg.Identifier, "value", streamVal) + return nil + } + + broadcast := &common.StreamMessage{ + Stream: stream, + Data: string(utils.ToJSON(msg.Data)), + Meta: &common.StreamMessageMetadata{ + ExcludeSocket: s.GetID(), + BroadcastType: common.WhisperType, + Transient: true, + }, + } + + n.broker.HandleBroadcast(broadcast) + + s.Log.Debug("whispered", "stream", stream) + + return nil +} + // Broadcast message to stream (locally) func (n *Node) Broadcast(msg *common.StreamMessage) { n.metrics.CounterIncrement(metricsBroadcastMsg) diff --git a/node/node_test.go b/node/node_test.go index 47d891a5..e89ab16a 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -258,6 +258,54 @@ func TestPerform(t *testing.T) { }) } +func TestWhisper(t *testing.T) { + node := NewMockNode() + session := NewMockSession("14", node) + session2 := NewMockSession("15", node) + + // Subscribe using different identifiers to make sure whisper is working + // per stream name, not per identifier + defer subscribeSessionToStream(session, node, "test_channel", "test_whisper")() + defer subscribeSessionToStream(session2, node, "test_channel_2", "test_whisper")() + + go node.hub.Run() + defer node.hub.Shutdown() + + t.Run("When whispering stream is configured for sending subscription", func(t *testing.T) { + session.WriteInternalState(common.WhisperStateKey("test_channel"), "test_whisper") + + err := node.Whisper(session, &common.Message{Identifier: "test_channel", Data: "tshh... it's a secret"}) + assert.Nil(t, err) + + expected := `{"type":"whisper","identifier":"test_channel_2","message":"tshh... it's a secret"}` + + msg, err := session2.conn.Read() + assert.NoError(t, err) + assert.Equal(t, expected, string(msg)) + + // Sender do not receive the message + msg, err = session.conn.Read() + assert.Nil(t, msg) + assert.Error(t, err, "Session hasn't received any messages") + }) + + t.Run("When whispering stream is not configured", func(t *testing.T) { + session.InternalState = make(map[string]interface{}) + + err := node.Whisper(session, &common.Message{Identifier: "test_channel", Data: "tshh... it's a secret"}) + assert.Nil(t, err) + + msg, err := session2.conn.Read() + assert.Error(t, err) + assert.Nil(t, msg) + + // Sender do not receive the message + msg, err = session.conn.Read() + assert.Nil(t, msg) + assert.Error(t, err, "Session hasn't received any messages") + }) +} + func TestStreamSubscriptionRaceConditions(t *testing.T) { node := NewMockNode() session := NewMockSession("14", node) @@ -781,6 +829,19 @@ func toJSON(msg encoders.EncodedMessage) []byte { return b } +func subscribeSessionToStream(s *Session, n *Node, identifier string, stream string) func() { + n.hub.AddSession(s) + + s.subscriptions.AddChannel(identifier) + s.subscriptions.AddChannelStream(identifier, stream) + n.hub.SubscribeSession(s, stream, identifier) + n.broker.Subscribe(stream) + + return func() { + n.hub.RemoveSession(s) + } +} + func readMessages(conn Connection, count int) ([]string, error) { var messages []string From 1eee20dc8515a151c04756930c65325fe1a0de84 Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Fri, 22 Mar 2024 11:32:11 -0700 Subject: [PATCH 3/5] refactor: use channel state to store whisper stream --- common/common.go | 11 +++-- etc/anyt/broadcast_tests/whisper_test.rb | 51 ++++++++++++++++++++++++ node/node.go | 12 ++---- node/node_test.go | 4 +- 4 files changed, 61 insertions(+), 17 deletions(-) create mode 100644 etc/anyt/broadcast_tests/whisper_test.rb diff --git a/common/common.go b/common/common.go index fd6161de..a755195c 100644 --- a/common/common.go +++ b/common/common.go @@ -79,7 +79,7 @@ const ( // Reserver state fields const ( - WHISPER_STATE_SUFFIX = "$w" + WHISPER_STREAM_STATE = "$w" ) // SessionEnv represents the underlying HTTP connection data: @@ -135,6 +135,10 @@ func (st *SessionEnv) MergeChannelState(id string, other *map[string]string) { } } +func (st *SessionEnv) RemoveChannelState(id string) { + delete((*st.ChannelStates), id) +} + // Returns a value for the specified key of the specified channel func (st *SessionEnv) GetChannelStateField(id string, field string) string { cst, ok := (*st.ChannelStates)[id] @@ -521,8 +525,3 @@ func RejectionMessage(identifier string) string { func DisconnectionMessage(reason string, reconnect bool) string { return string(utils.ToJSON(DisconnectMessage{Type: DisconnectType, Reason: reason, Reconnect: reconnect})) } - -// WhisperStateKey returns the field name for the given identifier to store the whispering stream name -func WhisperStateKey(identifier string) string { - return identifier + "/" + WHISPER_STATE_SUFFIX -} diff --git a/etc/anyt/broadcast_tests/whisper_test.rb b/etc/anyt/broadcast_tests/whisper_test.rb new file mode 100644 index 00000000..4b4e5078 --- /dev/null +++ b/etc/anyt/broadcast_tests/whisper_test.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +feature "Whisper" do + channel do + def subscribed + stream_from "a" + # whispers_to "a" ??? + __istate__["$w"] = "a" + end + end + + let(:client2) { build_client(ignore: %w[ping welcome]) } + let(:client3) { build_client(ignore: %w[ping welcome]) } + + before do + subscribe_request = {command: "subscribe", identifier: {channel: channel}.to_json} + + client.send(subscribe_request) + client2.send(subscribe_request) + client3.send(subscribe_request) + + ack = { + "identifier" => {channel: channel}.to_json, "type" => "confirm_subscription" + } + + assert_message ack, client.receive + assert_message ack, client2.receive + assert_message ack, client3.receive + end + + scenario %( + Only other clients receive the whisper message + ) do + perform_request = { + :command => "whisper", + :identifier => {channel: channel}.to_json, + "data" => {"event" => "typing", "user" => "Vova"} + } + + client.send(perform_request) + + msg = {"type" => "whisper", "identifier" => {channel: channel}.to_json, "message" => {"event" => "typing", "user" => "Vova"}} + + assert_message msg, client2.receive + assert_message msg, client3.receive + assert_raises(Anyt::Client::TimeoutError) do + msg = client.receive(timeout: 0.5) + raise "Client 1 should not receive the message: #{msg}" + end + end +end diff --git a/node/node.go b/node/node.go index 422fa62c..0bd017b2 100644 --- a/node/node.go +++ b/node/node.go @@ -494,6 +494,7 @@ func (n *Node) Unsubscribe(s *Session, msg *common.Message) (*common.CommandResu // Make sure to remove all streams subscriptions res.StopAllStreams = true + s.env.RemoveChannelState(msg.Identifier) s.subscriptions.RemoveChannel(msg.Identifier) s.Log.Debug("unsubscribed", "identifier", msg.Identifier) @@ -640,20 +641,13 @@ func (n *Node) Whisper(s *Session, msg *common.Message) error { return errors.New("session environment is missing") } - streamVal, ok := s.ReadInternalState(common.WhisperStateKey(msg.Identifier)) + stream := env.GetChannelStateField(msg.Identifier, common.WHISPER_STREAM_STATE) - if !ok { + if stream == "" { s.Log.Debug("whisper stream not found", "identifier", msg.Identifier) return nil } - stream, ok := streamVal.(string) - - if !ok { - s.Log.Warn("whisper stream is not a string", "identifier", msg.Identifier, "value", streamVal) - return nil - } - broadcast := &common.StreamMessage{ Stream: stream, Data: string(utils.ToJSON(msg.Data)), diff --git a/node/node_test.go b/node/node_test.go index e89ab16a..9f9c77d9 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -272,7 +272,7 @@ func TestWhisper(t *testing.T) { defer node.hub.Shutdown() t.Run("When whispering stream is configured for sending subscription", func(t *testing.T) { - session.WriteInternalState(common.WhisperStateKey("test_channel"), "test_whisper") + session.env.MergeChannelState("test_channel", &map[string]string{common.WHISPER_STREAM_STATE: "test_whisper"}) err := node.Whisper(session, &common.Message{Identifier: "test_channel", Data: "tshh... it's a secret"}) assert.Nil(t, err) @@ -290,7 +290,7 @@ func TestWhisper(t *testing.T) { }) t.Run("When whispering stream is not configured", func(t *testing.T) { - session.InternalState = make(map[string]interface{}) + session.env.RemoveChannelState("test_channel") err := node.Whisper(session, &common.Message{Identifier: "test_channel", Data: "tshh... it's a secret"}) assert.Nil(t, err) From 0f37da1478f84205a0b60d51cd35382b0016c437 Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Fri, 22 Mar 2024 12:32:09 -0700 Subject: [PATCH 4/5] feat: pub/sub whispering + docs --- cli/options.go | 6 ++++++ docs/signed_streams.md | 21 +++++++++++++++++++++ etc/anyt/broadcast_tests/whisper_test.rb | 2 +- node/node.go | 1 - node/node_test.go | 2 +- streams/config.go | 3 +++ streams/controller.go | 14 ++++++++++++++ streams/controller_test.go | 23 +++++++++++++++++++++++ 8 files changed, 69 insertions(+), 3 deletions(-) diff --git a/cli/options.go b/cli/options.go index b02c2301..f397eb0d 100644 --- a/cli/options.go +++ b/cli/options.go @@ -1096,6 +1096,12 @@ func signedStreamsCLIFlags(c *config.Config, turboRailsKey *string, cableReadyKe Destination: &c.Streams.Public, }, + &cli.BoolFlag{ + Name: "streams_whisper", + Usage: "Enable whispering for signed pub/sub streams", + Destination: &c.Streams.Whisper, + }, + &cli.BoolFlag{ Name: "turbo_streams", Usage: "Enable Turbo Streams support", diff --git a/docs/signed_streams.md b/docs/signed_streams.md index 41af76a6..73d79b62 100644 --- a/docs/signed_streams.md +++ b/docs/signed_streams.md @@ -142,6 +142,27 @@ $digest = hash_hmac('sha256', $encoded, $SECRET_KEY); $signed_stream_name = $encoded . '--' . $digest; ``` +## Whispering + +_Whispering_ is an ability to publish _transient_ broadcasts from clients, i.e., without touching your backend. This is useful when you want to share client-only information from one connection to others. Typical examples include typing indicators, cursor position sharing, etc. + +Whispering must be enabled explicitly for signed streams via the `--streams_whisper` (`ANYCABLE_STREAMS_WHISPER=true`) option. Public streams always allow whispering. + +Here is an example client code using AnyCable JS SDK: + +```js +let channel = cable.streamFrom("chat/22"); + +channel.on("message", (msg) => { + if (msg.event === "typing") { + console.log(`user ${msg.name} is typing`); + } +}) + +// publishing whispers +channel.whisper({event: "typing", name: user.name}) +``` + ## Hotwire and CableReady support AnyCable provides an ability to terminate Hotwire ([Turbo Streams](https://turbo.hotwired.dev/handbook/streams)) and [CableReady](https://cableready.stimulusreflex.com) (v5+) subscriptions at the WebSocker server using the same signed streams functionality under the hood (and, thus, without performing any RPC calls to authorize subscriptions). diff --git a/etc/anyt/broadcast_tests/whisper_test.rb b/etc/anyt/broadcast_tests/whisper_test.rb index 4b4e5078..cb7603b9 100644 --- a/etc/anyt/broadcast_tests/whisper_test.rb +++ b/etc/anyt/broadcast_tests/whisper_test.rb @@ -39,7 +39,7 @@ def subscribed client.send(perform_request) - msg = {"type" => "whisper", "identifier" => {channel: channel}.to_json, "message" => {"event" => "typing", "user" => "Vova"}} + msg = {"identifier" => {channel: channel}.to_json, "message" => {"event" => "typing", "user" => "Vova"}} assert_message msg, client2.receive assert_message msg, client3.receive diff --git a/node/node.go b/node/node.go index 0bd017b2..8a5f4cb0 100644 --- a/node/node.go +++ b/node/node.go @@ -653,7 +653,6 @@ func (n *Node) Whisper(s *Session, msg *common.Message) error { Data: string(utils.ToJSON(msg.Data)), Meta: &common.StreamMessageMetadata{ ExcludeSocket: s.GetID(), - BroadcastType: common.WhisperType, Transient: true, }, } diff --git a/node/node_test.go b/node/node_test.go index 9f9c77d9..3e26ac84 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -277,7 +277,7 @@ func TestWhisper(t *testing.T) { err := node.Whisper(session, &common.Message{Identifier: "test_channel", Data: "tshh... it's a secret"}) assert.Nil(t, err) - expected := `{"type":"whisper","identifier":"test_channel_2","message":"tshh... it's a secret"}` + expected := `{"identifier":"test_channel_2","message":"tshh... it's a secret"}` msg, err := session2.conn.Read() assert.NoError(t, err) diff --git a/streams/config.go b/streams/config.go index b0760b04..0dfe0849 100644 --- a/streams/config.go +++ b/streams/config.go @@ -9,6 +9,9 @@ type Config struct { // Public determines if public (unsigned) streams are allowed Public bool + // Whisper determines if whispering is enabled for pub/sub streams + Whisper bool + // PubSubChannel is the channel name used for direct pub/sub PubSubChannel string diff --git a/streams/controller.go b/streams/controller.go index 0c1213eb..72f09139 100644 --- a/streams/controller.go +++ b/streams/controller.go @@ -14,6 +14,8 @@ import ( type SubscribeRequest struct { StreamName string `json:"stream_name"` SignedStreamName string `json:"signed_stream_name"` + + whisper bool } func (r *SubscribeRequest) IsPresent() bool { @@ -106,11 +108,18 @@ func (c *Controller) Subscribe(sid string, env *common.SessionEnv, ids string, i c.log.With("identifier", identifier).Debug("verified", "stream", stream) } + var state map[string]string + + if request.whisper { + state = map[string]string{common.WHISPER_STREAM_STATE: stream} + } + return &common.CommandResult{ Status: common.SUCCESS, Transmissions: []string{common.ConfirmationMessage(identifier)}, Streams: []string{stream}, DisconnectInterest: -1, + IState: state, }, nil } @@ -134,6 +143,7 @@ func (c *Controller) Disconnect(sid string, env *common.SessionEnv, ids string, func NewStreamsController(conf *Config, l *slog.Logger) *Controller { key := conf.Secret allowPublic := conf.Public + whispers := conf.Whisper resolver := func(identifier string) (*SubscribeRequest, error) { var request SubscribeRequest @@ -146,6 +156,10 @@ func NewStreamsController(conf *Config, l *slog.Logger) *Controller { return nil, errors.New("public streams are not allowed") } + if whispers || (request.StreamName != "") { + request.whisper = true + } + return &request, nil } diff --git a/streams/controller_test.go b/streams/controller_test.go index 67f8c4c3..c0a60b55 100644 --- a/streams/controller_test.go +++ b/streams/controller_test.go @@ -53,6 +53,7 @@ func TestStreamsController(t *testing.T) { assert.Equal(t, []string{common.ConfirmationMessage(`{"channel":"$pubsub","stream_name":"chat:2024"}`)}, res.Transmissions) assert.Equal(t, []string{"chat:2024"}, res.Streams) assert.Equal(t, -1, res.DisconnectInterest) + assert.Equal(t, "chat:2024", res.IState[common.WHISPER_STREAM_STATE]) }) t.Run("Subscribe - no public allowed", func(t *testing.T) { @@ -85,6 +86,28 @@ func TestStreamsController(t *testing.T) { assert.Equal(t, []string{common.ConfirmationMessage(identifier)}, res.Transmissions) assert.Equal(t, []string{"chat:2021"}, res.Streams) assert.Equal(t, -1, res.DisconnectInterest) + assert.Nil(t, res.IState) + }) + + t.Run("Subscribe - signed - whisper", func(t *testing.T) { + conf := NewConfig() + conf.Secret = key + conf.Whisper = true + subject := NewStreamsController(&conf, slog.Default()) + + require.NotNil(t, subject) + + identifier := `{"channel":"$pubsub","signed_stream_name":"` + stream + `"}` + + res, err := subject.Subscribe("42", nil, "name=jack", identifier) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.SUCCESS, res.Status) + assert.Equal(t, []string{common.ConfirmationMessage(identifier)}, res.Transmissions) + assert.Equal(t, []string{"chat:2021"}, res.Streams) + assert.Equal(t, -1, res.DisconnectInterest) + assert.Equal(t, "chat:2021", res.IState[common.WHISPER_STREAM_STATE]) }) } From ea7924a285ef3f6fa03e28468650a2a9e7ea41dd Mon Sep 17 00:00:00 2001 From: Vladimir Dementyev Date: Fri, 22 Mar 2024 21:31:12 -0700 Subject: [PATCH 5/5] fix(style): += --- broadcast/http.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/broadcast/http.go b/broadcast/http.go index 5e68bdf3..733935b9 100644 --- a/broadcast/http.go +++ b/broadcast/http.go @@ -137,7 +137,7 @@ func (s *HTTPBroadcaster) Start(done chan (error)) error { } if s.enableCORS { - verifiedVia = verifiedVia + ", CORS enabled" + verifiedVia += ", CORS enabled" } s.log.Info(fmt.Sprintf("Accept broadcast requests at %s%s (%s)", s.server.Address(), s.path, verifiedVia))