Skip to content

Commit

Permalink
feat: ?stream and ?signed_stream support for SSE
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Apr 16, 2024
1 parent ec828d4 commit f61395f
Show file tree
Hide file tree
Showing 5 changed files with 152 additions and 8 deletions.
6 changes: 4 additions & 2 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,16 @@

## master

- Add `?stream` and `?signed_stream` support for SSE connections. ([@palkan][])

## 1.5.0 (2024-04-01)

- Add _whispering_ support to pub/sub streams. ([@palkan][])

Whispering is an ability for client to publish events to the subscribed stream without involving any server-side logic. Whenever a client _whispers_ an event to the channel, it's broadcasted to all other connected clients.

The feature is enabled for public streams automatically. For signed streams, you must opt-in via the `--streams_whisper` (`ANYCABLE_STREAMS_WHISPER=true`) option.

## 1.5.0-rc.1

- HTTP broadcaster is enabled by default. ([@palkan][])

HTTP broadcaster is now enabled by default in addition to Redis (backward-compatibility). It will be the primary default broadcaster in v2.
Expand Down
14 changes: 14 additions & 0 deletions docs/sse.md
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,20 @@ const source = new EventSource(

**IMPORTANT**: You MUST specify either `channel` or `identifier` query parameters. If you don't, the connection will be rejected.

### Usage with signed/public streams

When using with [signed streams](./signed_streams.md), you can provide the public or signed stream name via the `stream` or `signed_stream` parameter respectively:

```js
const publicSource = new EventSource(
`http://localhost:8080/events?stream=${encodeURIComponent(myStreamName)}`
);

const signedSource = new EventSource(
`http://localhost:8080/events?signed_stream=${encodeURIComponent(mySecretStreamName)}`
);
```

### Reliability

EventSource is a reliable transport, which means that it will automatically reconnect if the connection is lost.
Expand Down
96 changes: 96 additions & 0 deletions sse/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,102 @@ func TestSSEHandler(t *testing.T) {
require.Equal(t, http.StatusOK, w.Code)
})

t.Run("GET request with stream", func(t *testing.T) {
defer assertNoSessions(t, appNode)

controller.
On("Authenticate", "sid-public-stream", mock.Anything).
Return(&common.ConnectResult{
Identifier: "se2024",
Status: common.SUCCESS,
Transmissions: []string{`{"type":"welcome"}`},
}, nil)

identifier := `{"channel":"$pubsub","stream_name":"chat_1"}`

controller.
On("Subscribe", "sid-public-stream", mock.Anything, "se2024", identifier).
Return(&common.CommandResult{
Status: common.SUCCESS,
Transmissions: []string{`{"type":"confirm","identifier":"chat_1"}`},
Streams: []string{"chat_1"},
}, nil)

req, _ := http.NewRequest("GET", "/?stream=chat_1", nil)
req.Header.Set("X-Request-ID", "sid-public-stream")

ctx_, release := context.WithTimeout(context.Background(), 2*time.Second)
defer release()

ctx, cancel := context.WithCancel(ctx_)
defer cancel()

req = req.WithContext(ctx)

w := httptest.NewRecorder()
sw := newStreamingWriter(w)

go handler.ServeHTTP(sw, req)

msg, err := sw.ReadEvent(ctx)
require.NoError(t, err)
assert.Equal(t, "event: welcome\n"+`data: {"type":"welcome"}`, msg)

msg, err = sw.ReadEvent(ctx)
require.NoError(t, err)
assert.Equal(t, "event: confirm\n"+`data: {"type":"confirm","identifier":"chat_1"}`, msg)

require.Equal(t, http.StatusOK, w.Code)
})

t.Run("GET request with signed_stream", func(t *testing.T) {
defer assertNoSessions(t, appNode)

controller.
On("Authenticate", "sid-signed-stream", mock.Anything).
Return(&common.ConnectResult{
Identifier: "se2024",
Status: common.SUCCESS,
Transmissions: []string{`{"type":"welcome"}`},
}, nil)

identifier := `{"channel":"$pubsub","signed_stream_name":"secretto"}`

controller.
On("Subscribe", "sid-signed-stream", mock.Anything, "se2024", identifier).
Return(&common.CommandResult{
Status: common.SUCCESS,
Transmissions: []string{`{"type":"confirm","identifier":"secret_chat_1"}`},
Streams: []string{"chat_1"},
}, nil)

req, _ := http.NewRequest("GET", "/?signed_stream=secretto", nil)
req.Header.Set("X-Request-ID", "sid-signed-stream")

ctx_, release := context.WithTimeout(context.Background(), 2*time.Second)
defer release()

ctx, cancel := context.WithCancel(ctx_)
defer cancel()

req = req.WithContext(ctx)

w := httptest.NewRecorder()
sw := newStreamingWriter(w)

go handler.ServeHTTP(sw, req)

msg, err := sw.ReadEvent(ctx)
require.NoError(t, err)
assert.Equal(t, "event: welcome\n"+`data: {"type":"welcome"}`, msg)

msg, err = sw.ReadEvent(ctx)
require.NoError(t, err)
assert.Equal(t, "event: confirm\n"+`data: {"type":"confirm","identifier":"secret_chat_1"}`, msg)

require.Equal(t, http.StatusOK, w.Code)
})

t.Run("GET request with channel + rejected", func(t *testing.T) {
defer assertNoSessions(t, appNode)

Expand Down
28 changes: 28 additions & 0 deletions sse/sse.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@ import (
)

const (
signedStreamParam = "signed_stream"
publicStreamParam = "stream"
signedStreamChannel = "$pubsub"
turboStreamsParam = "turbo_signed_stream_name"
turboStreamsChannel = "Turbo::StreamsChannel"
historySinceParam = "history_since"
)

func NewSSESession(n *node.Node, w http.ResponseWriter, r *http.Request, info *server.RequestInfo) (*node.Session, error) {
Expand Down Expand Up @@ -76,6 +80,30 @@ func subscribeCommandFromGetRequest(r *http.Request) (*common.Message, error) {
}
}

// Check for public stream name
if identifier == "" {
stream := r.URL.Query().Get(publicStreamParam)

if stream != "" {
identifier = string(utils.ToJSON(map[string]string{
"channel": signedStreamChannel,
"stream_name": stream,
}))
}
}

// Check for signed stream name
if identifier == "" {
stream := r.URL.Query().Get(signedStreamParam)

if stream != "" {
identifier = string(utils.ToJSON(map[string]string{
"channel": signedStreamChannel,
"signed_stream_name": stream,
}))
}
}

// Then, check for Turbo Streams name
if identifier == "" {
stream := r.URL.Query().Get(turboStreamsParam)
Expand Down
16 changes: 10 additions & 6 deletions utils/message_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ func (m *MessageVerifier) Generate(payload interface{}) (string, error) {
}

func (m *MessageVerifier) Verified(msg string) (interface{}, error) {
if !m.isValid(msg) {
return "", errors.New("Invalid message")
if err := m.Validate(msg); err != nil {
return "", errorx.Decorate(err, "failed to verify message")
}

parts := strings.Split(msg, "--")
Expand All @@ -64,21 +64,25 @@ func (m *MessageVerifier) Verified(msg string) (interface{}, error) {
}

// https://github.com/rails/rails/blob/061bf3156fb90ac6b8ec255dfa39492cf22d7b13/activesupport/lib/active_support/message_verifier.rb#L122
func (m *MessageVerifier) isValid(msg string) bool {
func (m *MessageVerifier) Validate(msg string) error {
if msg == "" {
return false
return errors.New("message is empty")
}

parts := strings.Split(msg, "--")

if len(parts) != 2 {
return false
return fmt.Errorf("message must contain 2 parts, got %d", len(parts))
}

data := []byte(parts[0])
digest := []byte(parts[1])

return m.VerifySignature(data, digest)
if m.VerifySignature(data, digest) {
return nil
} else {
return errors.New("invalid signature")
}
}

func (m *MessageVerifier) Sign(payload []byte) ([]byte, error) {
Expand Down

0 comments on commit f61395f

Please sign in to comment.