Skip to content
This repository has been archived by the owner on Dec 28, 2024. It is now read-only.

Stream message meta + exclude_socket #194

Merged
merged 5 commits into from
Oct 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
440 changes: 0 additions & 440 deletions .circleci/config.yml

This file was deleted.

2 changes: 1 addition & 1 deletion .github/workflows/benchmark.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ on:
- "**/*.go"
- "go.sum"
workflow_dispatch:
pull_request:

jobs:
benchmark:
Expand All @@ -18,7 +19,6 @@ jobs:
runs-on: ubuntu-latest
env:
GO111MODULE: on
BUNDLE_GEMFILE: .circleci/Gemfile
DEBUG: true
services:
redis:
Expand Down
1 change: 0 additions & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,6 @@ jobs:
GORACE: "halt_on_error=1"
COVERAGE: "true"
GOCOVERDIR: "_icoverdir_"
BUNDLE_GEMFILE: .circleci/Gemfile
BUNDLE_PATH: ./vendor/bundle
# Specify REDIS_URL explicitly, so Makefile doesn't check the presence of Redis
REDIS_URL: redis://localhost:6379/
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## master

- Fix passing RPC context via headers when using HTTP RPC. ([@palkan][])

- Add `exclude_socket` option support to broadcasts. ([@palkan][])

- Add support for batched broadcasts. ([@palkan][])

It's now possible to publish an array of broadcasting messages (e.g., `[{"stream":"a","data":"..."},"stream":"b","data":"..."}]`). The messages will be delivered in the same order as they were published (within a stream).
Expand Down
12 changes: 7 additions & 5 deletions .circleci/Gemfile → Gemfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@ gem "nats-pure", "< 2.3.0"
gem "colorize"
gem "puma"

if File.directory?(File.join(__dir__, "../../anycable"))
gem "activesupport", "~> 7.0.0"

if File.directory?(File.join(__dir__, "../anycable"))
$stdout.puts "\n=== Using local gems for Anyt ===\n\n"
gem "debug"
gem "anycable", path: "../../anycable"
gem "anycable-rails", path: "../../anycable-rails"
gem "anyt", path: "../../anyt"
gem "wsdirector-cli", path: "../../wsdirector"
gem "anycable", path: "../anycable"
gem "anycable-rails", path: "../anycable-rails"
gem "anyt", path: "../anyt"
gem "wsdirector-cli", path: "../wsdirector"
else
gem "anycable", github: "anycable/anycable"
gem "anycable-rails", github: "anycable/anycable-rails"
Expand Down
24 changes: 12 additions & 12 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -119,52 +119,52 @@ test:
go test -count=1 -timeout=30s -race -tags mrb ./... $(TEST_FLAGS)

benchmarks: build
BUNDLE_GEMFILE=.circleci/Gemfile ruby features/runner.rb features/*.benchfile
ruby features/runner.rb features/*.benchfile

tmp/anycable-go-test:
go build $(TEST_BUILD_FLAGS) -tags mrb -race -o tmp/anycable-go-test cmd/anycable-go/main.go

test-conformance: tmp/anycable-go-test
BUNDLE_GEMFILE=.circleci/Gemfile bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable"
bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable"

test-conformance-ssl: tmp/anycable-go-test
ANYCABLE_RPC_TLS_CERT=etc/ssl/server.crt \
ANYCABLE_RPC_TLS_KEY=etc/ssl/server.key \
BUNDLE_GEMFILE=.circleci/Gemfile bundle exec anyt -c \
bundle exec anyt -c \
"tmp/anycable-go-test --headers=cookie,x-api-token --rpc_enable_tls --rpc_tls_verify=false --ssl_key=etc/ssl/server.key --ssl_cert=etc/ssl/server.crt --port=8443" \
--target-url="wss://localhost:8443/cable"

test-conformance-http: tmp/anycable-go-test
BUNDLE_GEMFILE=.circleci/Gemfile \
\
ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret \
ANYCABLE_HTTP_RPC_SECRET=rpc_secret ANYCABLE_HTTP_RPC_MOUNT_PATH=/_anycable \
bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token --rpc_impl=http --rpc_host=http://localhost:9292/_anycable" --target-url="ws://localhost:8080/cable"
bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token --rpc_impl=http --rpc_host=http://localhost:9292/_anycable" --target-url="ws://localhost:8080/cable" --require=etc/anyt/broadcast_tests/*.rb

test-conformance-nats: tmp/anycable-go-test
BUNDLE_GEMFILE=.circleci/Gemfile ANYCABLE_BROADCAST_ADAPTER=nats bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable"
ANYCABLE_BROADCAST_ADAPTER=nats bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/broadcast_tests/*.rb

test-conformance-nats-embedded: tmp/anycable-go-test
BUNDLE_GEMFILE=.circleci/Gemfile ANYCABLE_BROADCAST_ADAPTER=nats ANYCABLE_NATS_SERVERS=nats://127.0.0.1:4242 ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4242 bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable"
ANYCABLE_BROADCAST_ADAPTER=nats ANYCABLE_NATS_SERVERS=nats://127.0.0.1:4242 ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4242 bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/broadcast_tests/*.rb

test-conformance-broker-http: tmp/anycable-go-test
BUNDLE_GEMFILE=.circleci/Gemfile ANYCABLE_BROKER=memory ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/broker_tests/*.rb
ANYCABLE_BROKER=memory ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb

test-conformance-broker-redis: tmp/anycable-go-test
BUNDLE_GEMFILE=.circleci/Gemfile ANYCABLE_BROKER=memory ANYCABLE_BROADCAST_ADAPTER=redisx ANYCABLE_HTTP_BROADCAST_SECRET=any_secret ANYCABLE_PUBSUB=redis bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/broker_tests/*.rb
ANYCABLE_BROKER=memory ANYCABLE_BROADCAST_ADAPTER=redisx ANYCABLE_HTTP_BROADCAST_SECRET=any_secret ANYCABLE_PUBSUB=redis bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb

test-conformance-broker-nats: tmp/anycable-go-test
BUNDLE_GEMFILE=.circleci/Gemfile ANYCABLE_BROKER=memory ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4343 ANYCABLE_PUBSUB=nats ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/broker_tests/*.rb
ANYCABLE_BROKER=memory ANYCABLE_EMBED_NATS=true ANYCABLE_ENATS_ADDR=nats://127.0.0.1:4343 ANYCABLE_PUBSUB=nats ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable" --require=etc/anyt/**/*.rb

test-conformance-all: test-conformance test-conformance-ssl test-conformance-http

TESTFILE ?= features/*.testfile
test-features: build
BUNDLE_GEMFILE=.circleci/Gemfile ruby features/runner.rb $(TESTFILE)
ruby features/runner.rb $(TESTFILE)

test-ci: prepare prepare-mruby test test-conformance

prepare:
BUNDLE_GEMFILE=.circleci/Gemfile bundle install
bundle install

gen-ssl:
mkdir -p tmp/ssl
Expand Down
1 change: 0 additions & 1 deletion Readme.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
[![Latest Release](https://img.shields.io/github/release/anycable/anycable-go.svg?include_prereleases)](https://github.com/anycable/anycable-go/releases/latest?include_prereleases)
[![Build](https://github.com/anycable/anycable-go/workflows/Test/badge.svg)](https://github.com/anycable/anycable-go/actions)
[![CircleCI](https://img.shields.io/circleci/project/github/anycable/anycable-go.svg?label=CircleCI)](https://circleci.com/gh/anycable/anycable-go)
[![Docker](https://img.shields.io/docker/pulls/anycable/anycable-go.svg)](https://hub.docker.com/r/anycable/anycable-go/)
[![Documentation](https://img.shields.io/badge/docs-link-brightgreen.svg)](https://docs.anycable.io/anycable-go/getting_started)
# AnyCable-Go WebSocket Server
Expand Down
11 changes: 9 additions & 2 deletions common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,10 +224,17 @@ type Message struct {
History HistoryRequest `json:"history,omitempty"`
}

// StreamMessageMetadata describes additional information about a stream message
// which can be used to modify delivery behavior
type StreamMessageMetadata struct {
ExcludeSocket string `json:"exclude_socket,omitempty"`
}

// StreamMessage represents a pub/sub message to be sent to stream
type StreamMessage struct {
Stream string `json:"stream"`
Data string `json:"data"`
Stream string `json:"stream"`
Data string `json:"data"`
Meta *StreamMessageMetadata `json:"meta,omitempty"`

// Offset is the position of this message in the stream
Offset uint64
Expand Down
28 changes: 14 additions & 14 deletions docs/binary_formats.md
Original file line number Diff line number Diff line change
Expand Up @@ -224,11 +224,11 @@ export default createCable({protocol: 'actioncable-v1-ext-protobuf', encoder: ne

Here is the in/out traffic comparison:

Encoder | Sent | Rcvd
--------|------|-------
protobuf | 315.32MB | 327.1KB
msgpack | 339.58MB | 473.6KB
json | 502.45MB | 571.8KB
| Encoder | Sent | Rcvd |
|----------|------|-------|
| protobuf | 315.32MB | 327.1KB |
| msgpack | 339.58MB | 473.6KB |
| json | 502.45MB | 571.8KB |

The data above were captured while running a [websocket-bench][] benchmark with the following parameters:

Expand All @@ -240,15 +240,15 @@ websocket-bench broadcast ws://0.0.0.0:8080/cable —server-type=actioncable —

Here is the encode/decode speed comparison:

Encoder | Decode (ns/op) | Encode (ns/op)
--------|------|-------
protobuf (base) | 425 | 1153
msgpack (base) | 676 | 1512
json (base) | 1386 | 1266
||
protobuf (long) | 479 | 2370
msgpack (long) | 763 | 2506
json (long) | 2457 | 2319
| Encoder | Decode (ns/op) | Encode (ns/op) |
|--------|------|-------|
| protobuf (base) | 425 | 1153 |
| msgpack (base) | 676 | 1512 |
| json (base) | 1386 | 1266 |
||||
| protobuf (long) | 479 | 2370 |
| msgpack (long) | 763 | 2506 |
| json (long) | 2457 | 2319 |

Where base payload is:

Expand Down
54 changes: 54 additions & 0 deletions etc/anyt/broadcast_tests/options_test.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
# frozen_string_literal: true

feature "Broadcast with options" do
channel do
def subscribed
stream_from "a"
end

def speak(data)
data.delete("action")
ActionCable.server.broadcast("a", data, to_others: true)
end
end

let(:client2) { build_client(ignore: %w[ping welcome]) }
let(:client3) { build_client(ignore: %w[ping welcome]) }

before do
subscribe_request = {command: "subscribe", identifier: {channel: channel}.to_json}

client.send(subscribe_request)
client2.send(subscribe_request)
client3.send(subscribe_request)

ack = {
"identifier" => {channel: channel}.to_json, "type" => "confirm_subscription"
}

assert_message ack, client.receive
assert_message ack, client2.receive
assert_message ack, client3.receive
end

scenario %(
Only other clients receive the message when broadcasted to others
) do
perform_request = {
:command => "message",
:identifier => {channel: channel}.to_json,
"data" => {"action" => "speak", "content" => "The Other Side"}.to_json
}

client.send(perform_request)

msg = {"identifier" => {channel: channel}.to_json, "message" => {"content" => "The Other Side"}}

assert_message msg, client2.receive
assert_message msg, client3.receive
assert_raises(Anyt::Client::TimeoutError) do
msg = client.receive(timeout: 0.5)
raise "Client 1 should not receive the message: #{msg}"
end
end
end
2 changes: 1 addition & 1 deletion features/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

gem "childprocess", "~> 4.1"
gem "jwt"
gem "activesupport", "~> 7.0"
gem "activesupport", "~> 7.0.0"
end
rescue
raise if retried
Expand Down
4 changes: 4 additions & 0 deletions hub/gate.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,10 @@ func (g *Gate) performBroadcast(streamMsg *common.StreamMessage) {
g.mu.RUnlock()

for session, ids := range streamSessions {
if streamMsg.Meta != nil && streamMsg.Meta.ExcludeSocket == session.GetID() {
continue
}

for _, id := range ids {
if msg, ok := buf[id]; ok {
bdata = msg
Expand Down
34 changes: 34 additions & 0 deletions hub/hub_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/anycable/anycable-go/common"
"github.com/anycable/anycable-go/encoders"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type MockSession struct {
Expand Down Expand Up @@ -250,6 +251,39 @@ func TestBroadcastMessage(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, "{\"identifier\":\"test_channel\",\"message\":\"ciao\",\"stream_id\":\"test\",\"epoch\":\"xyz\",\"offset\":2022}", string(msg))
})

t.Run("Broadcast with exclude_socket", func(t *testing.T) {
session2 := NewMockSession("234")
hub.AddSession(session2)
hub.SubscribeSession(session2, "test", "test_channel")

hub.BroadcastMessage(&common.StreamMessage{Stream: "test", Data: "\"ciao\""})

msg, err := session.Read()
assert.Nil(t, err)
assert.Equal(t, "{\"identifier\":\"test_channel\",\"message\":\"ciao\"}", string(msg))

msg, err = session2.Read()
assert.Nil(t, err)
assert.Equal(t, "{\"identifier\":\"test_channel\",\"message\":\"ciao\"}", string(msg))

hub.BroadcastMessage(&common.StreamMessage{
Stream: "test",
Data: "\"hoi!\"",
Meta: &common.StreamMessageMetadata{
ExcludeSocket: "234",
},
})

msg, err = session.Read()
assert.Nil(t, err)
assert.Equal(t, "{\"identifier\":\"test_channel\",\"message\":\"hoi!\"}", string(msg))

msg, err = session2.Read()
assert.Nil(t, msg)
require.Error(t, err)
assert.Contains(t, err.Error(), "hasn't received any messages")
})
}

func TestBroadcastOrder(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion rpc/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func (s *HTTPService) performRequest(ctx context.Context, path string, payload [
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", s.conf.Secret))
}

if md, _, ok := metadata.FromOutgoingContextRaw(ctx); ok {
if md, ok := metadata.FromIncomingContext(ctx); ok {
// Set headers from metadata
for k, v := range md {
req.Header.Set(fmt.Sprintf("x-anycable-meta-%s", k), v[0])
Expand Down
6 changes: 3 additions & 3 deletions rpc/http_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestHTTPServiceRPC(t *testing.T) {
}

md := metadata.Pairs("album", "Kamni", "year", "2008")
ctx := metadata.NewOutgoingContext(context.Background(), md)
ctx := metadata.NewIncomingContext(context.Background(), md)
res, err := service.Connect(ctx, protocol.NewConnectMessage(
common.NewSessionEnv("ws://anycable.io/cable", &map[string]string{"cookie": "foo=bar"}),
))
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestHTTPServiceRPC(t *testing.T) {
}

md := metadata.Pairs("error", "test error")
ctx := metadata.NewOutgoingContext(context.Background(), md)
ctx := metadata.NewIncomingContext(context.Background(), md)
res, err := service.Disconnect(ctx, protocol.NewDisconnectMessage(
common.NewSessionEnv("ws://anycable.io/cable", &map[string]string{"cookie": "foo=bar"}),
"test-session",
Expand Down Expand Up @@ -147,7 +147,7 @@ func TestHTTPServiceRPC(t *testing.T) {
}

md := metadata.Pairs("track", "easy-way-out")
ctx := metadata.NewOutgoingContext(context.Background(), md)
ctx := metadata.NewIncomingContext(context.Background(), md)
res, err := service.Command(ctx, protocol.NewCommandMessage(
common.NewSessionEnv("ws://anycable.io/cable", &map[string]string{"cookie": "foo=bar"}),
"subscribe",
Expand Down
12 changes: 12 additions & 0 deletions rpc/rpc_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package rpc

import (
"context"
"errors"
"testing"

Expand All @@ -11,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"google.golang.org/grpc/metadata"
)

type MockState struct {
Expand Down Expand Up @@ -478,5 +480,15 @@ func TestCustomDialFun(t *testing.T) {
assert.Equal(t, "user=john", res.Identifier)
assert.Equal(t, map[string]string{"_s_": "test-session"}, res.CState)
assert.Empty(t, res.Broadcasts)

call := service.Calls[0]
requestCtx, ok := call.Arguments[0].(context.Context)

require.True(t, ok)

md, ok := metadata.FromIncomingContext(requestCtx)
require.True(t, ok)

assert.Equal(t, []string{"42"}, md.Get("sid"))
})
}
Loading