Skip to content

Commit

Permalink
fix: add retries to nats broker, use a random store dir by default
Browse files Browse the repository at this point in the history
- Upgrade NATS deps
  • Loading branch information
palkan committed Nov 2, 2023
1 parent bf54537 commit 9660cc4
Show file tree
Hide file tree
Showing 5 changed files with 156 additions and 53 deletions.
49 changes: 48 additions & 1 deletion broker/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,16 +352,19 @@ func (n *NATS) add(stream string, data string) (uint64, error) {
}

func (n *NATS) addStreamConsumer(stream string) {
attempts := 5

err := n.ensureStreamExists(stream)

if err != nil {
n.log.Errorf("Failed to create JetStream stream %s: %s", stream, err)
return
}

createConsumer:
prefixedStream := streamPrefix + stream

n.jconsumers.fetch(stream, func() (jetstream.Consumer, error) { // nolint:errcheck
_, err = n.jconsumers.fetch(stream, func() (jetstream.Consumer, error) { // nolint:errcheck
cons, err := n.js.CreateConsumer(context.Background(), prefixedStream, jetstream.ConsumerConfig{
AckPolicy: jetstream.AckNonePolicy,
})
Expand Down Expand Up @@ -407,6 +410,17 @@ func (n *NATS) addStreamConsumer(stream string) {
n.streamSync.remove(stream)
n.js.DeleteConsumer(context.Background(), prefixedStream, name) // nolint:errcheck
})

if err != nil {
if context.DeadlineExceeded == err {
if attempts > 0 {
attempts--
n.log.Warnf("failed to create consumer for stream %s, retrying in 500ms...", stream)
time.Sleep(500 * time.Millisecond)
goto createConsumer
}
}
}
}

func (n *NATS) consumeMessage(stream string, msg jetstream.Msg) {
Expand All @@ -430,7 +444,9 @@ func (n *NATS) consumeMessage(stream string, msg jetstream.Msg) {

func (n *NATS) ensureStreamExists(stream string) error {
prefixedStream := streamPrefix + stream
attempts := 5

createStream:
_, err := n.jstreams.fetch(stream, func() (string, error) {
ctx := context.Background()

Expand All @@ -451,16 +467,29 @@ func (n *NATS) ensureStreamExists(stream string) error {
return stream, nil
}, func(stream string) {})

if err != nil {
if context.DeadlineExceeded == err {
if attempts > 0 {
attempts--
n.log.Warnf("failed to create stream %s, retrying in 500ms...", stream)
time.Sleep(500 * time.Millisecond)
goto createStream
}
}
}

return err
}

func (n *NATS) calculateEpoch() (string, error) {
attempts := 5
maybeNewEpoch, _ := nanoid.Nanoid(4)

ttl := time.Duration(10 * int64(math.Max(float64(n.conf.HistoryTTL), float64(n.conf.SessionsTTL))*float64(time.Second)))
// We must use a separate bucket due to a different TTL
bucketKey := epochBucket

fetchEpoch:
kv, err := n.fetchBucketWithTTL(bucketKey, ttl)

if err != nil {
Expand All @@ -478,6 +507,14 @@ func (n *NATS) calculateEpoch() (string, error) {

return maybeNewEpoch, nil
} else if err != nil {
if context.DeadlineExceeded == err {
if attempts > 0 {
attempts--
n.log.Warnf("failed to retrieve epoch, retrying in 1s...")
time.Sleep(1 * time.Second)
goto fetchEpoch
}
}
return "", errorx.Decorate(err, "Failed to retrieve JetStream KV epoch")
}

Expand All @@ -487,6 +524,7 @@ func (n *NATS) calculateEpoch() (string, error) {
func (n *NATS) fetchBucketWithTTL(key string, ttl time.Duration) (jetstream.KeyValue, error) {
var bucket jetstream.KeyValue
newBucket := false
attempts := 5

bucketSetup:
bucket, err := n.js.KeyValue(context.Background(), key)
Expand All @@ -506,6 +544,15 @@ bucketSetup:

newBucket = true
} else if err != nil {
if context.DeadlineExceeded == err {
if attempts > 0 {
attempts--
n.log.Warnf("failed to retrieve bucket %s, retrying in 500ms...", key)
time.Sleep(500 * time.Millisecond)
goto bucketSetup
}
}

return nil, errorx.Decorate(err, "Failed to retrieve JetStream KV bucket: %s", key)
}

Expand Down
4 changes: 4 additions & 0 deletions enats/enats.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"context"
"fmt"
"net/url"
"os"
"path/filepath"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -108,6 +110,8 @@ func (s *Service) Start() error {

if s.config.StoreDir != "" {
opts.StoreDir = s.config.StoreDir
} else {
opts.StoreDir = filepath.Join(os.TempDir(), "nats-data", s.serverName())
}

s.server, err = server.NewServer(opts)
Expand Down
91 changes: 77 additions & 14 deletions features/enats_broker.testfile
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,73 @@ store_path = File.expand_path("../tmp/nats-data", __dir__)
FileUtils.rm_rf(store_path) if File.directory?(store_path)

launch :anycable_1,
"./dist/anycable-go --broker=nats --pubsub=nats --broadcast_adapter=http --embed_nats --enats_addr=nats://localhost:4242 --enats_cluster=nats://localhost:4342 --enats_gateway=nats://localhost:4442 --enats_cluster_routes=nats://localhost:4342 --enats_store_dir=#{store_path}/one"
"./dist/anycable-go --port 8080 --broker=nats --pubsub=nats --broadcast_adapter=http --embed_nats --enats_addr=nats://localhost:4242 --enats_cluster=nats://localhost:4342 --enats_gateway=nats://localhost:4442 --enats_cluster_routes=nats://localhost:4342 --enats_store_dir=#{store_path}/one"

sleep 2

launch :anycable_2,
"./dist/anycable-go --port 8081 --broker=nats --pubsub=nats --broadcast_adapter=nats --embed_nats --enats_addr=nats://localhost:4243 --enats_cluster=nats://localhost:4343 --enats_cluster_routes=nats://localhost:4342 --enats_store_dir=#{store_path}/two"

wait_tcp 8080
wait_tcp 8081

# We need to wait a bit for the NATS servers to find each other
sleep 2

launch :anycable_3,
"./dist/anycable-go --port 8082 --broker=nats --pubsub=nats --broadcast_adapter=nats --embed_nats --enats_addr=nats://localhost:4244 --enats_cluster=nats://localhost:4344 --enats_cluster_routes=nats://localhost:4342 --enats_store_dir=#{store_path}/three"

wait_tcp 8080, timeout: 20
wait_tcp 8081, timeout: 20
wait_tcp 8082, timeout: 20

# We need to obtain epoch first

epoch_scenario = [
{
client: {
protocol: "action_cable",
actions: [
{
subscribe: {
channel: "BenchmarkChannel"
}
},
{
perform: {
channel: "BenchmarkChannel",
action: "broadcast",
data: {
message: "hello"
}
}
},
{
receive: {
channel: "BenchmarkChannel",
"data>": {
message: "hello",
action: "broadcast",
},
stream_id: "all",
print: true
}
}
]
}
}
]

run :wsdirector, "bundle exec wsdirector ws://localhost:8080/cable -i #{epoch_scenario.to_json}"

result = stdout(:wsdirector)

if result !~ /1 clients, 0 failures/
fail "Unexpected scenario result:\n#{result}"
end

epoch = result.match(/"epoch":"([^"]+)"/)[1]

if epoch.nil? || epoch.empty?
fail "Epoch is missing"
end

scenario = [
{
client: {
Expand All @@ -28,6 +84,7 @@ scenario = [
channel: "BenchmarkChannel"
}
},
"wait_all",
{
perform: {
channel: "BenchmarkChannel",
Expand All @@ -44,7 +101,8 @@ scenario = [
message: "hello",
action: "broadcast",
},
stream_id: "all"
stream_id: "all",
epoch: epoch
}
},
{
Expand All @@ -63,23 +121,26 @@ scenario = [
client: {
protocol: "action_cable",
name: "subscriber",
multiplier: ":scale",
connection_options: {
url: "http://localhost:8080/cable"
url: "http://localhost:8081/cable"
},
actions: [
{
subscribe: {
channel: "BenchmarkChannel"
}
},
"wait_all",
{
receive: {
channel: "BenchmarkChannel",
"data>": {
message: "hello",
action: "broadcast"
},
stream_id: "all"
stream_id: "all",
epoch: epoch
}
}
]
Expand All @@ -89,23 +150,26 @@ scenario = [
client: {
protocol: "action_cable",
name: "another_subscriber",
multiplier: ":scale",
connection_options: {
url: "http://localhost:8081/cable"
url: "http://localhost:8082/cable"
},
actions: [
{
subscribe: {
channel: "BenchmarkChannel"
}
},
"wait_all",
{
receive: {
channel: "BenchmarkChannel",
"data>": {
message: "hello",
action: "broadcast"
},
stream_id: "all"
stream_id: "all",
epoch: epoch
}
}
]
Expand All @@ -114,18 +178,17 @@ scenario = [
]

TEST_COMMAND = <<~CMD
bundle exec wsdirector ws://localhost:8080/cable -i #{scenario.to_json}
bundle exec wsdirector ws://localhost:8080/cable -i #{scenario.to_json} -s 4
CMD

# NATS super-cluster may take longer to fully connect
retrying(delay: 2) do
run :wsdirector, TEST_COMMAND

result = stdout(:wsdirector)

unless result.include?("Group publisher: 1 clients, 0 failures") &&
result.include?("Group subscriber: 1 clients, 0 failures") &&
result.include?("Group another_subscriber: 1 clients, 0 failures")
result.include?("Group subscriber: 4 clients, 0 failures") &&
result.include?("Group another_subscriber: 4 clients, 0 failures")
fail "Unexpected scenario result:\n#{result}"
end
end
18 changes: 9 additions & 9 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ require (
github.com/mattn/go-isatty v0.0.14
github.com/mitchellh/go-mruby v0.0.0-20200315023956-207cedc21542
github.com/namsral/flag v1.7.4-pre
github.com/nats-io/nats.go v1.30.0
github.com/nats-io/nats.go v1.31.0
github.com/pkg/errors v0.9.1 // indirect
github.com/posthog/posthog-go v0.0.0-20221221115252-24dfed35d71a
github.com/redis/rueidis v1.0.17
github.com/smira/go-statsd v1.3.2
github.com/stretchr/testify v1.8.4
github.com/urfave/cli/v2 v2.11.1
go.uber.org/automaxprocs v1.5.1
go.uber.org/automaxprocs v1.5.3
golang.org/x/exp v0.0.0-20230626212559-97b1e661b5df
golang.org/x/net v0.12.0
google.golang.org/grpc v1.53.0
Expand All @@ -36,26 +36,26 @@ require (
require github.com/sony/gobreaker v0.5.0

require (
github.com/klauspost/compress v1.17.0 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/minio/highwayhash v1.0.2 // indirect
github.com/nats-io/jwt/v2 v2.3.0 // indirect
github.com/nats-io/jwt/v2 v2.5.2 // indirect
github.com/xtgo/uuid v0.0.0-20140804021211-a0b114877d4c // indirect
golang.org/x/time v0.3.0 // indirect
)

require (
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/jhump/protoreflect v1.5.0 // indirect
github.com/nats-io/nats-server/v2 v2.9.14
github.com/nats-io/nkeys v0.4.5 // indirect
github.com/nats-io/nats-server/v2 v2.10.4
github.com/nats-io/nkeys v0.4.6 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/stretchr/objx v0.5.0 // indirect
github.com/xrash/smetrics v0.0.0-20201216005158-039620a65673 // indirect
golang.org/x/crypto v0.11.0 // indirect
golang.org/x/sys v0.10.0 // indirect
golang.org/x/text v0.11.0 // indirect
golang.org/x/crypto v0.14.0 // indirect
golang.org/x/sys v0.13.0 // indirect
golang.org/x/text v0.13.0 // indirect
google.golang.org/genproto v0.0.0-20230216225411-c8e22ba71e44 // indirect
google.golang.org/protobuf v1.28.1 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
Expand Down
Loading

0 comments on commit 9660cc4

Please sign in to comment.