diff --git a/CHANGELOG.md b/CHANGELOG.md index 9075d457..41b03907 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,14 +2,16 @@ ## master +- Add `?stream` and `?signed_stream` support for SSE connections. ([@palkan][]) + +## 1.5.0 (2024-04-01) + - Add _whispering_ support to pub/sub streams. ([@palkan][]) Whispering is an ability for client to publish events to the subscribed stream without involving any server-side logic. Whenever a client _whispers_ an event to the channel, it's broadcasted to all other connected clients. The feature is enabled for public streams automatically. For signed streams, you must opt-in via the `--streams_whisper` (`ANYCABLE_STREAMS_WHISPER=true`) option. -## 1.5.0-rc.1 - - HTTP broadcaster is enabled by default. ([@palkan][]) HTTP broadcaster is now enabled by default in addition to Redis (backward-compatibility). It will be the primary default broadcaster in v2. diff --git a/docs/sse.md b/docs/sse.md index 7533e9ac..8a80e4db 100644 --- a/docs/sse.md +++ b/docs/sse.md @@ -53,6 +53,20 @@ const source = new EventSource( **IMPORTANT**: You MUST specify either `channel` or `identifier` query parameters. If you don't, the connection will be rejected. +### Usage with signed/public streams + +When using with [signed streams](./signed_streams.md), you can provide the public or signed stream name via the `stream` or `signed_stream` parameter respectively: + +```js +const publicSource = new EventSource( + `http://localhost:8080/events?stream=${encodeURIComponent(myStreamName)}` +); + +const signedSource = new EventSource( + `http://localhost:8080/events?signed_stream=${encodeURIComponent(mySecretStreamName)}` +); +``` + ### Reliability EventSource is a reliable transport, which means that it will automatically reconnect if the connection is lost. diff --git a/sse/handler_test.go b/sse/handler_test.go index 6de4fc70..51a676d4 100644 --- a/sse/handler_test.go +++ b/sse/handler_test.go @@ -255,6 +255,102 @@ func TestSSEHandler(t *testing.T) { require.Equal(t, http.StatusOK, w.Code) }) + t.Run("GET request with stream", func(t *testing.T) { + defer assertNoSessions(t, appNode) + + controller. + On("Authenticate", "sid-public-stream", mock.Anything). + Return(&common.ConnectResult{ + Identifier: "se2024", + Status: common.SUCCESS, + Transmissions: []string{`{"type":"welcome"}`}, + }, nil) + + identifier := `{"channel":"$pubsub","stream_name":"chat_1"}` + + controller. + On("Subscribe", "sid-public-stream", mock.Anything, "se2024", identifier). + Return(&common.CommandResult{ + Status: common.SUCCESS, + Transmissions: []string{`{"type":"confirm","identifier":"chat_1"}`}, + Streams: []string{"chat_1"}, + }, nil) + + req, _ := http.NewRequest("GET", "/?stream=chat_1", nil) + req.Header.Set("X-Request-ID", "sid-public-stream") + + ctx_, release := context.WithTimeout(context.Background(), 2*time.Second) + defer release() + + ctx, cancel := context.WithCancel(ctx_) + defer cancel() + + req = req.WithContext(ctx) + + w := httptest.NewRecorder() + sw := newStreamingWriter(w) + + go handler.ServeHTTP(sw, req) + + msg, err := sw.ReadEvent(ctx) + require.NoError(t, err) + assert.Equal(t, "event: welcome\n"+`data: {"type":"welcome"}`, msg) + + msg, err = sw.ReadEvent(ctx) + require.NoError(t, err) + assert.Equal(t, "event: confirm\n"+`data: {"type":"confirm","identifier":"chat_1"}`, msg) + + require.Equal(t, http.StatusOK, w.Code) + }) + + t.Run("GET request with signed_stream", func(t *testing.T) { + defer assertNoSessions(t, appNode) + + controller. + On("Authenticate", "sid-signed-stream", mock.Anything). + Return(&common.ConnectResult{ + Identifier: "se2024", + Status: common.SUCCESS, + Transmissions: []string{`{"type":"welcome"}`}, + }, nil) + + identifier := `{"channel":"$pubsub","signed_stream_name":"secretto"}` + + controller. + On("Subscribe", "sid-signed-stream", mock.Anything, "se2024", identifier). + Return(&common.CommandResult{ + Status: common.SUCCESS, + Transmissions: []string{`{"type":"confirm","identifier":"secret_chat_1"}`}, + Streams: []string{"chat_1"}, + }, nil) + + req, _ := http.NewRequest("GET", "/?signed_stream=secretto", nil) + req.Header.Set("X-Request-ID", "sid-signed-stream") + + ctx_, release := context.WithTimeout(context.Background(), 2*time.Second) + defer release() + + ctx, cancel := context.WithCancel(ctx_) + defer cancel() + + req = req.WithContext(ctx) + + w := httptest.NewRecorder() + sw := newStreamingWriter(w) + + go handler.ServeHTTP(sw, req) + + msg, err := sw.ReadEvent(ctx) + require.NoError(t, err) + assert.Equal(t, "event: welcome\n"+`data: {"type":"welcome"}`, msg) + + msg, err = sw.ReadEvent(ctx) + require.NoError(t, err) + assert.Equal(t, "event: confirm\n"+`data: {"type":"confirm","identifier":"secret_chat_1"}`, msg) + + require.Equal(t, http.StatusOK, w.Code) + }) + t.Run("GET request with channel + rejected", func(t *testing.T) { defer assertNoSessions(t, appNode) diff --git a/sse/sse.go b/sse/sse.go index bad4c28a..7c4e1cd3 100644 --- a/sse/sse.go +++ b/sse/sse.go @@ -17,8 +17,12 @@ import ( ) const ( + signedStreamParam = "signed_stream" + publicStreamParam = "stream" + signedStreamChannel = "$pubsub" turboStreamsParam = "turbo_signed_stream_name" turboStreamsChannel = "Turbo::StreamsChannel" + historySinceParam = "history_since" ) func NewSSESession(n *node.Node, w http.ResponseWriter, r *http.Request, info *server.RequestInfo) (*node.Session, error) { @@ -76,6 +80,30 @@ func subscribeCommandFromGetRequest(r *http.Request) (*common.Message, error) { } } + // Check for public stream name + if identifier == "" { + stream := r.URL.Query().Get(publicStreamParam) + + if stream != "" { + identifier = string(utils.ToJSON(map[string]string{ + "channel": signedStreamChannel, + "stream_name": stream, + })) + } + } + + // Check for signed stream name + if identifier == "" { + stream := r.URL.Query().Get(signedStreamParam) + + if stream != "" { + identifier = string(utils.ToJSON(map[string]string{ + "channel": signedStreamChannel, + "signed_stream_name": stream, + })) + } + } + // Then, check for Turbo Streams name if identifier == "" { stream := r.URL.Query().Get(turboStreamsParam) diff --git a/utils/message_verifier.go b/utils/message_verifier.go index d62e2125..75ca1ac1 100644 --- a/utils/message_verifier.go +++ b/utils/message_verifier.go @@ -41,8 +41,8 @@ func (m *MessageVerifier) Generate(payload interface{}) (string, error) { } func (m *MessageVerifier) Verified(msg string) (interface{}, error) { - if !m.isValid(msg) { - return "", errors.New("Invalid message") + if err := m.Validate(msg); err != nil { + return "", errorx.Decorate(err, "failed to verify message") } parts := strings.Split(msg, "--") @@ -64,21 +64,25 @@ func (m *MessageVerifier) Verified(msg string) (interface{}, error) { } // https://github.com/rails/rails/blob/061bf3156fb90ac6b8ec255dfa39492cf22d7b13/activesupport/lib/active_support/message_verifier.rb#L122 -func (m *MessageVerifier) isValid(msg string) bool { +func (m *MessageVerifier) Validate(msg string) error { if msg == "" { - return false + return errors.New("message is empty") } parts := strings.Split(msg, "--") if len(parts) != 2 { - return false + return fmt.Errorf("message must contain 2 parts, got %d", len(parts)) } data := []byte(parts[0]) digest := []byte(parts[1]) - return m.VerifySignature(data, digest) + if m.VerifySignature(data, digest) { + return nil + } else { + return errors.New("invalid signature") + } } func (m *MessageVerifier) Sign(payload []byte) ([]byte, error) {