Skip to content

Commit

Permalink
feat: pub/sub whispering + docs
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Mar 25, 2024
1 parent 54889b1 commit f1c23f8
Show file tree
Hide file tree
Showing 8 changed files with 69 additions and 3 deletions.
6 changes: 6 additions & 0 deletions cli/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -1096,6 +1096,12 @@ func signedStreamsCLIFlags(c *config.Config, turboRailsKey *string, cableReadyKe
Destination: &c.Streams.Public,
},

&cli.BoolFlag{
Name: "streams_whisper",
Usage: "Enable whispering for signed pub/sub streams",
Destination: &c.Streams.Whisper,
},

&cli.BoolFlag{
Name: "turbo_streams",
Usage: "Enable Turbo Streams support",
Expand Down
21 changes: 21 additions & 0 deletions docs/signed_streams.md
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,27 @@ $digest = hash_hmac('sha256', $encoded, $SECRET_KEY);
$signed_stream_name = $encoded . '--' . $digest;
```

## Whispering

_Whispering_ is an ability to publish _transient_ broadcasts from clients, i.e., without touching your backend. This is useful when you want to share client-only information from one connection to others. Typical examples include typing indicators, cursor position sharing, etc.

Whispering must be enabled explicitly for signed streams via the `--streams_whisper` (`ANYCABLE_STREAMS_WHISPER=true`) option. Public streams always allow whispering.

Here is an example client code using AnyCable JS SDK:

```js
let channel = cable.streamFrom("chat/22");

channel.on("message", (msg) => {
if (msg.event === "typing") {
console.log(`user ${msg.name} is typing`);
}
})

// publishing whispers
channel.whisper({event: "typing", name: user.name})
```

## Hotwire and CableReady support

AnyCable provides an ability to terminate Hotwire ([Turbo Streams](https://turbo.hotwired.dev/handbook/streams)) and [CableReady](https://cableready.stimulusreflex.com) (v5+) subscriptions at the WebSocker server using the same signed streams functionality under the hood (and, thus, without performing any RPC calls to authorize subscriptions).
Expand Down
2 changes: 1 addition & 1 deletion etc/anyt/broadcast_tests/whisper_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ def subscribed

client.send(perform_request)

msg = {"type" => "whisper", "identifier" => {channel: channel}.to_json, "message" => {"event" => "typing", "user" => "Vova"}}
msg = {"identifier" => {channel: channel}.to_json, "message" => {"event" => "typing", "user" => "Vova"}}

assert_message msg, client2.receive
assert_message msg, client3.receive
Expand Down
1 change: 0 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,6 @@ func (n *Node) Whisper(s *Session, msg *common.Message) error {
Data: string(utils.ToJSON(msg.Data)),
Meta: &common.StreamMessageMetadata{
ExcludeSocket: s.GetID(),
BroadcastType: common.WhisperType,
Transient: true,
},
}
Expand Down
2 changes: 1 addition & 1 deletion node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func TestWhisper(t *testing.T) {
err := node.Whisper(session, &common.Message{Identifier: "test_channel", Data: "tshh... it's a secret"})
assert.Nil(t, err)

expected := `{"type":"whisper","identifier":"test_channel_2","message":"tshh... it's a secret"}`
expected := `{"identifier":"test_channel_2","message":"tshh... it's a secret"}`

msg, err := session2.conn.Read()
assert.NoError(t, err)
Expand Down
3 changes: 3 additions & 0 deletions streams/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ type Config struct {
// Public determines if public (unsigned) streams are allowed
Public bool

// Whisper determines if whispering is enabled for pub/sub streams
Whisper bool

// PubSubChannel is the channel name used for direct pub/sub
PubSubChannel string

Expand Down
14 changes: 14 additions & 0 deletions streams/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ import (
type SubscribeRequest struct {
StreamName string `json:"stream_name"`
SignedStreamName string `json:"signed_stream_name"`

whisper bool
}

func (r *SubscribeRequest) IsPresent() bool {
Expand Down Expand Up @@ -106,11 +108,18 @@ func (c *Controller) Subscribe(sid string, env *common.SessionEnv, ids string, i
c.log.With("identifier", identifier).Debug("verified", "stream", stream)
}

var state map[string]string

if request.whisper {
state = map[string]string{common.WHISPER_STREAM_STATE: stream}
}

return &common.CommandResult{
Status: common.SUCCESS,
Transmissions: []string{common.ConfirmationMessage(identifier)},
Streams: []string{stream},
DisconnectInterest: -1,
IState: state,
}, nil
}

Expand All @@ -134,6 +143,7 @@ func (c *Controller) Disconnect(sid string, env *common.SessionEnv, ids string,
func NewStreamsController(conf *Config, l *slog.Logger) *Controller {
key := conf.Secret
allowPublic := conf.Public
whispers := conf.Whisper

resolver := func(identifier string) (*SubscribeRequest, error) {
var request SubscribeRequest
Expand All @@ -146,6 +156,10 @@ func NewStreamsController(conf *Config, l *slog.Logger) *Controller {
return nil, errors.New("public streams are not allowed")
}

if whispers || (request.StreamName != "") {
request.whisper = true
}

return &request, nil
}

Expand Down
23 changes: 23 additions & 0 deletions streams/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ func TestStreamsController(t *testing.T) {
assert.Equal(t, []string{common.ConfirmationMessage(`{"channel":"$pubsub","stream_name":"chat:2024"}`)}, res.Transmissions)
assert.Equal(t, []string{"chat:2024"}, res.Streams)
assert.Equal(t, -1, res.DisconnectInterest)
assert.Equal(t, "chat:2024", res.IState[common.WHISPER_STREAM_STATE])
})

t.Run("Subscribe - no public allowed", func(t *testing.T) {
Expand Down Expand Up @@ -85,6 +86,28 @@ func TestStreamsController(t *testing.T) {
assert.Equal(t, []string{common.ConfirmationMessage(identifier)}, res.Transmissions)
assert.Equal(t, []string{"chat:2021"}, res.Streams)
assert.Equal(t, -1, res.DisconnectInterest)
assert.Nil(t, res.IState)
})

t.Run("Subscribe - signed - whisper", func(t *testing.T) {
conf := NewConfig()
conf.Secret = key
conf.Whisper = true
subject := NewStreamsController(&conf, slog.Default())

require.NotNil(t, subject)

identifier := `{"channel":"$pubsub","signed_stream_name":"` + stream + `"}`

res, err := subject.Subscribe("42", nil, "name=jack", identifier)

require.NoError(t, err)
require.NotNil(t, res)
require.Equal(t, common.SUCCESS, res.Status)
assert.Equal(t, []string{common.ConfirmationMessage(identifier)}, res.Transmissions)
assert.Equal(t, []string{"chat:2021"}, res.Streams)
assert.Equal(t, -1, res.DisconnectInterest)
assert.Equal(t, "chat:2021", res.IState[common.WHISPER_STREAM_STATE])
})
}

Expand Down

0 comments on commit f1c23f8

Please sign in to comment.