diff --git a/broadcast/http.go b/broadcast/http.go index 5e68bdf3..733935b9 100644 --- a/broadcast/http.go +++ b/broadcast/http.go @@ -137,7 +137,7 @@ func (s *HTTPBroadcaster) Start(done chan (error)) error { } if s.enableCORS { - verifiedVia = verifiedVia + ", CORS enabled" + verifiedVia += ", CORS enabled" } s.log.Info(fmt.Sprintf("Accept broadcast requests at %s%s (%s)", s.server.Address(), s.path, verifiedVia)) diff --git a/broker/memory.go b/broker/memory.go index 51744d46..59f6b105 100644 --- a/broker/memory.go +++ b/broker/memory.go @@ -234,6 +234,11 @@ func (b *Memory) Shutdown(ctx context.Context) error { } func (b *Memory) HandleBroadcast(msg *common.StreamMessage) { + if msg.Meta != nil && msg.Meta.Transient { + b.broadcaster.Broadcast(msg) + return + } + offset := b.add(msg.Stream, msg.Data) msg.Epoch = b.GetEpoch() diff --git a/broker/nats.go b/broker/nats.go index db4548a9..ac28c8bf 100644 --- a/broker/nats.go +++ b/broker/nats.go @@ -311,6 +311,11 @@ func (n *NATS) writeEpoch(val string) { } func (n *NATS) HandleBroadcast(msg *common.StreamMessage) { + if msg.Meta != nil && msg.Meta.Transient { + n.broadcaster.Broadcast(msg) + return + } + err := n.Ready(jetstreamReadyTimeout) if err != nil { n.log.Debug("JetStream is not ready yet to publish messages, add to backlog") diff --git a/cli/options.go b/cli/options.go index b02c2301..f397eb0d 100644 --- a/cli/options.go +++ b/cli/options.go @@ -1096,6 +1096,12 @@ func signedStreamsCLIFlags(c *config.Config, turboRailsKey *string, cableReadyKe Destination: &c.Streams.Public, }, + &cli.BoolFlag{ + Name: "streams_whisper", + Usage: "Enable whispering for signed pub/sub streams", + Destination: &c.Streams.Whisper, + }, + &cli.BoolFlag{ Name: "turbo_streams", Usage: "Enable Turbo Streams support", diff --git a/common/common.go b/common/common.go index caf0d15b..a755195c 100644 --- a/common/common.go +++ b/common/common.go @@ -64,6 +64,8 @@ const ( HistoryConfirmedType = "confirm_history" HistoryRejectedType = "reject_history" + + WhisperType = "whisper" ) // Disconnect reasons @@ -75,6 +77,11 @@ const ( UNAUTHORIZED_REASON = "unauthorized" ) +// Reserver state fields +const ( + WHISPER_STREAM_STATE = "$w" +) + // SessionEnv represents the underlying HTTP connection data: // URL and request headers. // It also carries channel and connection state information used by the RPC app. @@ -128,6 +135,10 @@ func (st *SessionEnv) MergeChannelState(id string, other *map[string]string) { } } +func (st *SessionEnv) RemoveChannelState(id string) { + delete((*st.ChannelStates), id) +} + // Returns a value for the specified key of the specified channel func (st *SessionEnv) GetChannelStateField(id string, field string) string { cst, ok := (*st.ChannelStates)[id] @@ -289,6 +300,10 @@ func (m *Message) LogValue() slog.Value { // which can be used to modify delivery behavior type StreamMessageMetadata struct { ExcludeSocket string `json:"exclude_socket,omitempty"` + // BroadcastType defines the message type to be used for messages sent to clients + BroadcastType string `json:"broadcast_type,omitempty"` + // Transient defines whether this message should be stored in the history + Transient bool `json:"transient,omitempty"` } func (smm *StreamMessageMetadata) LogValue() slog.Value { diff --git a/docs/signed_streams.md b/docs/signed_streams.md index 41af76a6..73d79b62 100644 --- a/docs/signed_streams.md +++ b/docs/signed_streams.md @@ -142,6 +142,27 @@ $digest = hash_hmac('sha256', $encoded, $SECRET_KEY); $signed_stream_name = $encoded . '--' . $digest; ``` +## Whispering + +_Whispering_ is an ability to publish _transient_ broadcasts from clients, i.e., without touching your backend. This is useful when you want to share client-only information from one connection to others. Typical examples include typing indicators, cursor position sharing, etc. + +Whispering must be enabled explicitly for signed streams via the `--streams_whisper` (`ANYCABLE_STREAMS_WHISPER=true`) option. Public streams always allow whispering. + +Here is an example client code using AnyCable JS SDK: + +```js +let channel = cable.streamFrom("chat/22"); + +channel.on("message", (msg) => { + if (msg.event === "typing") { + console.log(`user ${msg.name} is typing`); + } +}) + +// publishing whispers +channel.whisper({event: "typing", name: user.name}) +``` + ## Hotwire and CableReady support AnyCable provides an ability to terminate Hotwire ([Turbo Streams](https://turbo.hotwired.dev/handbook/streams)) and [CableReady](https://cableready.stimulusreflex.com) (v5+) subscriptions at the WebSocker server using the same signed streams functionality under the hood (and, thus, without performing any RPC calls to authorize subscriptions). diff --git a/etc/anyt/broadcast_tests/whisper_test.rb b/etc/anyt/broadcast_tests/whisper_test.rb new file mode 100644 index 00000000..cb7603b9 --- /dev/null +++ b/etc/anyt/broadcast_tests/whisper_test.rb @@ -0,0 +1,51 @@ +# frozen_string_literal: true + +feature "Whisper" do + channel do + def subscribed + stream_from "a" + # whispers_to "a" ??? + __istate__["$w"] = "a" + end + end + + let(:client2) { build_client(ignore: %w[ping welcome]) } + let(:client3) { build_client(ignore: %w[ping welcome]) } + + before do + subscribe_request = {command: "subscribe", identifier: {channel: channel}.to_json} + + client.send(subscribe_request) + client2.send(subscribe_request) + client3.send(subscribe_request) + + ack = { + "identifier" => {channel: channel}.to_json, "type" => "confirm_subscription" + } + + assert_message ack, client.receive + assert_message ack, client2.receive + assert_message ack, client3.receive + end + + scenario %( + Only other clients receive the whisper message + ) do + perform_request = { + :command => "whisper", + :identifier => {channel: channel}.to_json, + "data" => {"event" => "typing", "user" => "Vova"} + } + + client.send(perform_request) + + msg = {"identifier" => {channel: channel}.to_json, "message" => {"event" => "typing", "user" => "Vova"}} + + assert_message msg, client2.receive + assert_message msg, client3.receive + assert_raises(Anyt::Client::TimeoutError) do + msg = client.receive(timeout: 0.5) + raise "Client 1 should not receive the message: #{msg}" + end + end +end diff --git a/hub/gate.go b/hub/gate.go index 673d0624..fedddd25 100644 --- a/hub/gate.go +++ b/hub/gate.go @@ -162,7 +162,13 @@ func (g *Gate) performBroadcast(streamMsg *common.StreamMessage) { } func buildMessage(msg *common.StreamMessage, identifier string) encoders.EncodedMessage { - return encoders.NewCachedEncodedMessage(msg.ToReplyFor(identifier)) + reply := msg.ToReplyFor(identifier) + + if msg.Meta != nil { + reply.Type = msg.Meta.BroadcastType + } + + return encoders.NewCachedEncodedMessage(reply) } func streamSessionsSnapshot[T comparable](src map[T]map[string]bool) map[T][]string { diff --git a/node/broker_integration_test.go b/node/broker_integration_test.go index a94fa2a9..95f70b1e 100644 --- a/node/broker_integration_test.go +++ b/node/broker_integration_test.go @@ -310,6 +310,8 @@ func sharedIntegrationHistory(t *testing.T, node *Node, controller *mocks.Contro ts := time.Now().Unix() node.HandleBroadcast([]byte(`{"stream": "messages_1","data":"Flavia: buona sera"}`)) + // Transient messages must not be stored in the history + node.HandleBroadcast([]byte(`{"stream": "messages_1","data":"Who's there?","meta":{"transient":true}}`)) node.HandleBroadcast([]byte(`{"stream": "messages_1","data":"Mario: ta-dam!"}`)) node.HandleBroadcast([]byte(`{"stream": "presence_1","data":"1 new notification"}`)) diff --git a/node/node.go b/node/node.go index e1753037..8a5f4cb0 100644 --- a/node/node.go +++ b/node/node.go @@ -173,6 +173,8 @@ func (n *Node) HandleCommand(s *Session, msg *common.Message) (err error) { _, err = n.Perform(s, msg) case "history": err = n.History(s, msg) + case "whisper": + err = n.Whisper(s, msg) default: err = fmt.Errorf("unknown command: %s", msg.Command) } @@ -492,6 +494,7 @@ func (n *Node) Unsubscribe(s *Session, msg *common.Message) (*common.CommandResu // Make sure to remove all streams subscriptions res.StopAllStreams = true + s.env.RemoveChannelState(msg.Identifier) s.subscriptions.RemoveChannel(msg.Identifier) s.Log.Debug("unsubscribed", "identifier", msg.Identifier) @@ -628,6 +631,39 @@ func (n *Node) retreiveHistory(history *common.HistoryRequest, streams []string) return backlog, nil } +// Whisper broadcasts the message to the specified whispering stream to +// all clients except the sender +func (n *Node) Whisper(s *Session, msg *common.Message) error { + // The session must have the whisper stream name defined in the state to be able to whisper + // If the stream is not defined, the whisper message is ignored + env := s.GetEnv() + if env == nil { + return errors.New("session environment is missing") + } + + stream := env.GetChannelStateField(msg.Identifier, common.WHISPER_STREAM_STATE) + + if stream == "" { + s.Log.Debug("whisper stream not found", "identifier", msg.Identifier) + return nil + } + + broadcast := &common.StreamMessage{ + Stream: stream, + Data: string(utils.ToJSON(msg.Data)), + Meta: &common.StreamMessageMetadata{ + ExcludeSocket: s.GetID(), + Transient: true, + }, + } + + n.broker.HandleBroadcast(broadcast) + + s.Log.Debug("whispered", "stream", stream) + + return nil +} + // Broadcast message to stream (locally) func (n *Node) Broadcast(msg *common.StreamMessage) { n.metrics.CounterIncrement(metricsBroadcastMsg) diff --git a/node/node_test.go b/node/node_test.go index 47d891a5..3e26ac84 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -258,6 +258,54 @@ func TestPerform(t *testing.T) { }) } +func TestWhisper(t *testing.T) { + node := NewMockNode() + session := NewMockSession("14", node) + session2 := NewMockSession("15", node) + + // Subscribe using different identifiers to make sure whisper is working + // per stream name, not per identifier + defer subscribeSessionToStream(session, node, "test_channel", "test_whisper")() + defer subscribeSessionToStream(session2, node, "test_channel_2", "test_whisper")() + + go node.hub.Run() + defer node.hub.Shutdown() + + t.Run("When whispering stream is configured for sending subscription", func(t *testing.T) { + session.env.MergeChannelState("test_channel", &map[string]string{common.WHISPER_STREAM_STATE: "test_whisper"}) + + err := node.Whisper(session, &common.Message{Identifier: "test_channel", Data: "tshh... it's a secret"}) + assert.Nil(t, err) + + expected := `{"identifier":"test_channel_2","message":"tshh... it's a secret"}` + + msg, err := session2.conn.Read() + assert.NoError(t, err) + assert.Equal(t, expected, string(msg)) + + // Sender do not receive the message + msg, err = session.conn.Read() + assert.Nil(t, msg) + assert.Error(t, err, "Session hasn't received any messages") + }) + + t.Run("When whispering stream is not configured", func(t *testing.T) { + session.env.RemoveChannelState("test_channel") + + err := node.Whisper(session, &common.Message{Identifier: "test_channel", Data: "tshh... it's a secret"}) + assert.Nil(t, err) + + msg, err := session2.conn.Read() + assert.Error(t, err) + assert.Nil(t, msg) + + // Sender do not receive the message + msg, err = session.conn.Read() + assert.Nil(t, msg) + assert.Error(t, err, "Session hasn't received any messages") + }) +} + func TestStreamSubscriptionRaceConditions(t *testing.T) { node := NewMockNode() session := NewMockSession("14", node) @@ -781,6 +829,19 @@ func toJSON(msg encoders.EncodedMessage) []byte { return b } +func subscribeSessionToStream(s *Session, n *Node, identifier string, stream string) func() { + n.hub.AddSession(s) + + s.subscriptions.AddChannel(identifier) + s.subscriptions.AddChannelStream(identifier, stream) + n.hub.SubscribeSession(s, stream, identifier) + n.broker.Subscribe(stream) + + return func() { + n.hub.RemoveSession(s) + } +} + func readMessages(conn Connection, count int) ([]string, error) { var messages []string diff --git a/streams/config.go b/streams/config.go index b0760b04..0dfe0849 100644 --- a/streams/config.go +++ b/streams/config.go @@ -9,6 +9,9 @@ type Config struct { // Public determines if public (unsigned) streams are allowed Public bool + // Whisper determines if whispering is enabled for pub/sub streams + Whisper bool + // PubSubChannel is the channel name used for direct pub/sub PubSubChannel string diff --git a/streams/controller.go b/streams/controller.go index 0c1213eb..72f09139 100644 --- a/streams/controller.go +++ b/streams/controller.go @@ -14,6 +14,8 @@ import ( type SubscribeRequest struct { StreamName string `json:"stream_name"` SignedStreamName string `json:"signed_stream_name"` + + whisper bool } func (r *SubscribeRequest) IsPresent() bool { @@ -106,11 +108,18 @@ func (c *Controller) Subscribe(sid string, env *common.SessionEnv, ids string, i c.log.With("identifier", identifier).Debug("verified", "stream", stream) } + var state map[string]string + + if request.whisper { + state = map[string]string{common.WHISPER_STREAM_STATE: stream} + } + return &common.CommandResult{ Status: common.SUCCESS, Transmissions: []string{common.ConfirmationMessage(identifier)}, Streams: []string{stream}, DisconnectInterest: -1, + IState: state, }, nil } @@ -134,6 +143,7 @@ func (c *Controller) Disconnect(sid string, env *common.SessionEnv, ids string, func NewStreamsController(conf *Config, l *slog.Logger) *Controller { key := conf.Secret allowPublic := conf.Public + whispers := conf.Whisper resolver := func(identifier string) (*SubscribeRequest, error) { var request SubscribeRequest @@ -146,6 +156,10 @@ func NewStreamsController(conf *Config, l *slog.Logger) *Controller { return nil, errors.New("public streams are not allowed") } + if whispers || (request.StreamName != "") { + request.whisper = true + } + return &request, nil } diff --git a/streams/controller_test.go b/streams/controller_test.go index 67f8c4c3..c0a60b55 100644 --- a/streams/controller_test.go +++ b/streams/controller_test.go @@ -53,6 +53,7 @@ func TestStreamsController(t *testing.T) { assert.Equal(t, []string{common.ConfirmationMessage(`{"channel":"$pubsub","stream_name":"chat:2024"}`)}, res.Transmissions) assert.Equal(t, []string{"chat:2024"}, res.Streams) assert.Equal(t, -1, res.DisconnectInterest) + assert.Equal(t, "chat:2024", res.IState[common.WHISPER_STREAM_STATE]) }) t.Run("Subscribe - no public allowed", func(t *testing.T) { @@ -85,6 +86,28 @@ func TestStreamsController(t *testing.T) { assert.Equal(t, []string{common.ConfirmationMessage(identifier)}, res.Transmissions) assert.Equal(t, []string{"chat:2021"}, res.Streams) assert.Equal(t, -1, res.DisconnectInterest) + assert.Nil(t, res.IState) + }) + + t.Run("Subscribe - signed - whisper", func(t *testing.T) { + conf := NewConfig() + conf.Secret = key + conf.Whisper = true + subject := NewStreamsController(&conf, slog.Default()) + + require.NotNil(t, subject) + + identifier := `{"channel":"$pubsub","signed_stream_name":"` + stream + `"}` + + res, err := subject.Subscribe("42", nil, "name=jack", identifier) + + require.NoError(t, err) + require.NotNil(t, res) + require.Equal(t, common.SUCCESS, res.Status) + assert.Equal(t, []string{common.ConfirmationMessage(identifier)}, res.Transmissions) + assert.Equal(t, []string{"chat:2021"}, res.Streams) + assert.Equal(t, -1, res.DisconnectInterest) + assert.Equal(t, "chat:2021", res.IState[common.WHISPER_STREAM_STATE]) }) }