Skip to content
This repository has been archived by the owner on Dec 28, 2024. It is now read-only.

Commit

Permalink
feat: add --disable_disconnect parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Jun 10, 2020
1 parent 8bfee91 commit 632458f
Show file tree
Hide file tree
Showing 7 changed files with 71 additions and 27 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## 1.0.0-dev

- Add `--disable_disconnect` option. ([@palkan][])

Allows you to avoid calling `Disconnect` RPC method completely if you don't need it.

- Add channel state support. ([@palkan][])

- Add stopped streams support. ([@palkan][])
Expand Down
1 change: 0 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ test-conformance-ssl: tmp/anycable-go-test
BUNDLE_GEMFILE=.circleci/Gemfile bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token --ssl_key=etc/ssl/server.key --ssl_cert=etc/ssl/server.crt --port=8443" --target-url="wss://localhost:8443/cable"

test-conformance-http: tmp/anycable-go-test
go build -tags mrb -o tmp/anycable-go-test cmd/anycable-go/main.go
BUNDLE_GEMFILE=.circleci/Gemfile ANYCABLE_BROADCAST_ADAPTER=http ANYCABLE_HTTP_BROADCAST_SECRET=any_secret bundle exec anyt -c "tmp/anycable-go-test --headers=cookie,x-api-token" --target-url="ws://localhost:8080/cable"

test-conformance-all: test-conformance test-conformance-ssl test-conformance-http
Expand Down
2 changes: 2 additions & 0 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ func init() {

fs.IntVar(&defaults.DisconnectQueue.Rate, "disconnect_rate", 100, "")
fs.IntVar(&defaults.DisconnectQueue.ShutdownTimeout, "disconnect_timeout", 5, "")
fs.BoolVar(&defaults.DisconnectorDisabled, "disable_disconnect", false, "")

fs.StringVar(&defaults.LogLevel, "log_level", "info", "")
fs.StringVar(&defaults.LogFormat, "log_format", "text", "")
Expand Down Expand Up @@ -149,6 +150,7 @@ OPTIONS
--disconnect_rate Max number of Disconnect calls per second, default: 100, env: ANYCABLE_DISCONNECT_RATE
--disconnect_timeout Graceful shutdown timeouts (in seconds), default: 5, env: ANYCABLE_DISCONNECT_TIMEOUT
--disable_disconnect Disable calling Disconnect callback, default: false, env: ANYCABLE_DISABLE_DISCONNECT
--log_level Set logging level (debug/info/warn/error/fatal), default: info, env: ANYCABLE_LOG_LEVEL
--log_format Set logging format (text, json), default: text, env: ANYCABLE_LOG_FORMAT
Expand Down
10 changes: 8 additions & 2 deletions cmd/anycable-go/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,15 @@ func main() {
appNode := node.NewNode(controller, metrics)
appNode.Start()

disconnector := node.NewDisconnectQueue(appNode, &config.DisconnectQueue)
go disconnector.Run() // nolint:errcheck
var disconnector node.Disconnector

if config.DisconnectorDisabled {
disconnector = node.NewNoopDisconnector()
} else {
disconnector = node.NewDisconnectQueue(appNode, &config.DisconnectQueue)
}

go disconnector.Run() // nolint:errcheck
appNode.SetDisconnector(disconnector)

subscriber, err := pubsub.NewSubscriber(appNode, config.BroadcastAdapter, &config.Redis, &config.HTTPPubSub)
Expand Down
33 changes: 17 additions & 16 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,23 @@ import (

// Config contains main application configuration
type Config struct {
RPC rpc.Config
Redis pubsub.RedisConfig
HTTPPubSub pubsub.HTTPConfig
Host string
Port int
BroadcastAdapter string
Path string
HealthPath string
Headers []string
SSL server.SSLConfig
WS node.WSConfig
MaxMessageSize int64
DisconnectQueue node.DisconnectQueueConfig
LogLevel string
LogFormat string
Metrics metrics.Config
RPC rpc.Config
Redis pubsub.RedisConfig
HTTPPubSub pubsub.HTTPConfig
Host string
Port int
BroadcastAdapter string
Path string
HealthPath string
Headers []string
SSL server.SSLConfig
WS node.WSConfig
MaxMessageSize int64
DisconnectorDisabled bool
DisconnectQueue node.DisconnectQueueConfig
LogLevel string
LogFormat string
Metrics metrics.Config
}

// New returns a new empty config
Expand Down
40 changes: 40 additions & 0 deletions node/disconnector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package node

import "github.com/apex/log"

// Disconnector is an interface for disconnect queue implementation
type Disconnector interface {
Run() error
Shutdown() error
Enqueue(*Session) error
Size() int
}

// NoopDisconnectQueue is non-operational disconnect queue implementation
type NoopDisconnectQueue struct{}

// Run does nothing
func (d *NoopDisconnectQueue) Run() error {
log.WithField("context", "disconnector").Info("Disconnect events are turned off")
return nil
}

// Shutdown does nothing
func (d *NoopDisconnectQueue) Shutdown() error {
return nil
}

// Size returns 0
func (d *NoopDisconnectQueue) Size() int {
return 0
}

// Enqueue does nothing
func (d *NoopDisconnectQueue) Enqueue(s *Session) error {
return nil
}

// NewNoopDisconnector returns new NoopDisconnectQueue
func NewNoopDisconnector() *NoopDisconnectQueue {
return &NoopDisconnectQueue{}
}
8 changes: 0 additions & 8 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,6 @@ func (d *disconnectMessage) toJSON() []byte {
return jsonStr
}

// Disconnector is an interface for disconnect queue implementation
type Disconnector interface {
Run() error
Shutdown() error
Enqueue(*Session) error
Size() int
}

// AppNode describes a basic node interface
type AppNode interface {
HandlePubSub(msg []byte)
Expand Down

0 comments on commit 632458f

Please sign in to comment.