Skip to content

Commit

Permalink
feat: smarter fly presets for clusters
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Nov 2, 2023
1 parent 0ec3c62 commit b5b97c2
Show file tree
Hide file tree
Showing 9 changed files with 339 additions and 48 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@

## master

- Infer default configuration for Fly apps based on the cluster formation. ([@palkan][])

Automatically enable broker (in-memory or embedded NATS-based) depending on the number of VMs in the cluster.

- Add NATS-based broker. ([@palkan][])

## 1.4.6 (2023-10-25)
Expand Down
41 changes: 32 additions & 9 deletions config/presets.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"strconv"
"strings"

"github.com/anycable/anycable-go/fly"
"github.com/apex/log"
)

Expand Down Expand Up @@ -66,6 +67,18 @@ func (c *Config) loadFlyPreset(defaults *Config) error {
return errors.New("FLY_APP_NAME env is missing")
}

// Obtain cluster info
cluster, err := fly.Cluster(appName)

if err != nil {
log.Debugf("Failed to retrieve Fly app info from DNS: %v", err)
}

// Whether we can use embedded NATS broker with this setup
singleNode := cluster != nil && cluster.NumVMs() == 1
multiNode := cluster != nil && cluster.NumVMs() > 1
useNATSBroker := cluster != nil && cluster.NumRegions() == 1 && cluster.NumVMs() >= 3

// Use the same port for HTTP broadcasts by default
if c.HTTPBroadcast.Port == defaults.HTTPBroadcast.Port {
c.HTTPBroadcast.Port = c.Port
Expand Down Expand Up @@ -96,21 +109,31 @@ func (c *Config) loadFlyPreset(defaults *Config) error {
if c.PubSubAdapter == defaults.PubSubAdapter {
if c.Redis.URL != defaults.Redis.URL {
c.PubSubAdapter = "redis"
} else {
}
}

if multiNode {
if c.PubSubAdapter == defaults.PubSubAdapter {
c.PubSubAdapter = "nats"
}

// NATS hasn't been configured, so we can embed it
if !c.EmbedNats || c.NATS.Servers == defaults.NATS.Servers {
c.EmbedNats = true
c.NATS.Servers = c.EmbeddedNats.ServiceAddr
}

// NATS hasn't been configured, so we can embed it
if !c.EmbedNats || c.NATS.Servers == defaults.NATS.Servers {
c.EmbedNats = true
c.NATS.Servers = c.EmbeddedNats.ServiceAddr
if c.BrokerAdapter == defaults.BrokerAdapter {
if useNATSBroker {
log.WithField("context", "config").Infof("Discovered %d VMs in the same ragion -> enabling NATS broker", cluster.NumVMs())
c.BrokerAdapter = "nats"
}
}
}

if c.BrokerAdapter == defaults.BrokerAdapter {
if c.EmbedNats {
c.BrokerAdapter = "nats"
}
if singleNode {
log.WithField("context", "config").Infof("Discovered a single node cluster -> enabling in-memory broker")
c.BrokerAdapter = "memory"
}

if rpcName, ok := os.LookupEnv("ANYCABLE_FLY_RPC_APP_NAME"); ok {
Expand Down
167 changes: 133 additions & 34 deletions config/presets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"os"
"testing"

"github.com/anycable/anycable-go/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand All @@ -18,7 +19,7 @@ func TestNoPresets(t *testing.T) {
assert.Equal(t, "localhost", config.Host)
}

func TestFlyPresets(t *testing.T) {
func TestFlyPresets_no_vms_discovered(t *testing.T) {
cleanupEnv := setEnv(
"FLY_APP_NAME", "any-test",
"FLY_REGION", "mag",
Expand All @@ -37,6 +38,104 @@ func TestFlyPresets(t *testing.T) {

require.NoError(t, err)

assert.Equal(t, "0.0.0.0", config.Host)
assert.Equal(t, 8989, config.Port)
assert.Equal(t, 8989, config.HTTPBroadcast.Port)
// No cluster info, no broker — sorry
assert.Equal(t, "", config.BrokerAdapter)
assert.Equal(t, "nats://0.0.0.0:4222", config.EmbeddedNats.ServiceAddr)
assert.Equal(t, "nats://0.0.0.0:5222", config.EmbeddedNats.ClusterAddr)
assert.Equal(t, "any-test-mag-cluster", config.EmbeddedNats.ClusterName)
assert.Equal(t, []string{"nats://mag.any-test.internal:5222"}, config.EmbeddedNats.Routes)
assert.Equal(t, "dns:///mag.anycable-web.internal:50051", config.RPC.Host)
}

func TestFlyPresets_when_single_vm_discovered(t *testing.T) {
teardownDNS := mocks.MockDNSServer("vms.any-test.internal.", []string{"1234 mag"})
defer teardownDNS()

cleanupEnv := setEnv(
"FLY_APP_NAME", "any-test",
"FLY_REGION", "mag",
"FLY_ALLOC_ID", "1234",
)
defer cleanupEnv()

config := NewConfig()

config.Port = 8989

require.Equal(t, []string{"fly"}, config.Presets())

err := config.LoadPresets()

require.NoError(t, err)

assert.Equal(t, "0.0.0.0", config.Host)
assert.Equal(t, 8989, config.Port)
assert.Equal(t, 8989, config.HTTPBroadcast.Port)
// In-memory broker is good enough for single node, no pub/sub needed
assert.Equal(t, false, config.EmbedNats)
assert.Equal(t, "", config.PubSubAdapter)
assert.Equal(t, "memory", config.BrokerAdapter)
assert.Equal(t, "nats://0.0.0.0:4222", config.EmbeddedNats.ServiceAddr)
}

func TestFlyPresets_when_two_vms_discovered(t *testing.T) {
teardownDNS := mocks.MockDNSServer("vms.any-test.internal.", []string{"1234 mag", "4567 mag"})
defer teardownDNS()

cleanupEnv := setEnv(
"FLY_APP_NAME", "any-test",
"FLY_REGION", "mag",
"FLY_ALLOC_ID", "1234",
)
defer cleanupEnv()

config := NewConfig()

config.Port = 8989

require.Equal(t, []string{"fly"}, config.Presets())

err := config.LoadPresets()

require.NoError(t, err)

assert.Equal(t, "0.0.0.0", config.Host)
assert.Equal(t, 8989, config.Port)
assert.Equal(t, 8989, config.HTTPBroadcast.Port)
assert.Equal(t, true, config.EmbedNats)
assert.Equal(t, "nats", config.PubSubAdapter)
// We do not enable broker by default, since it requires at least 3 nodes or exactly 1
assert.Equal(t, "", config.BrokerAdapter)
assert.Equal(t, "nats://0.0.0.0:4222", config.EmbeddedNats.ServiceAddr)
assert.Equal(t, "nats://0.0.0.0:5222", config.EmbeddedNats.ClusterAddr)
assert.Equal(t, "any-test-mag-cluster", config.EmbeddedNats.ClusterName)
assert.Equal(t, []string{"nats://mag.any-test.internal:5222"}, config.EmbeddedNats.Routes)
}

func TestFlyPresets_when_three_vms_discovered(t *testing.T) {
teardownDNS := mocks.MockDNSServer("vms.any-test.internal.", []string{"1234 mag", "4567 mag", "8901 mag"})
defer teardownDNS()

cleanupEnv := setEnv(
"FLY_APP_NAME", "any-test",
"FLY_REGION", "mag",
"FLY_ALLOC_ID", "1234",
)
defer cleanupEnv()

config := NewConfig()

config.Port = 8989

require.Equal(t, []string{"fly"}, config.Presets())

err := config.LoadPresets()

require.NoError(t, err)

assert.Equal(t, "0.0.0.0", config.Host)
assert.Equal(t, 8989, config.Port)
assert.Equal(t, 8989, config.HTTPBroadcast.Port)
Expand All @@ -47,7 +146,39 @@ func TestFlyPresets(t *testing.T) {
assert.Equal(t, "nats://0.0.0.0:5222", config.EmbeddedNats.ClusterAddr)
assert.Equal(t, "any-test-mag-cluster", config.EmbeddedNats.ClusterName)
assert.Equal(t, []string{"nats://mag.any-test.internal:5222"}, config.EmbeddedNats.Routes)
assert.Equal(t, "dns:///mag.anycable-web.internal:50051", config.RPC.Host)
}

func TestFlyPresets_when_three_vms_from_different_regions(t *testing.T) {
teardownDNS := mocks.MockDNSServer("vms.any-test.internal.", []string{"1234 mag", "4567 mag", "8901 sea"})
defer teardownDNS()

cleanupEnv := setEnv(
"FLY_APP_NAME", "any-test",
"FLY_REGION", "mag",
"FLY_ALLOC_ID", "1234",
)
defer cleanupEnv()

config := NewConfig()

config.Port = 8989

require.Equal(t, []string{"fly"}, config.Presets())

err := config.LoadPresets()

require.NoError(t, err)

assert.Equal(t, "0.0.0.0", config.Host)
assert.Equal(t, 8989, config.Port)
assert.Equal(t, 8989, config.HTTPBroadcast.Port)
assert.Equal(t, true, config.EmbedNats)
assert.Equal(t, "nats", config.PubSubAdapter)
// Currently, we do not enable broker for multi-region setup; we need to figure this out later
assert.Equal(t, "", config.BrokerAdapter)
assert.Equal(t, "nats://0.0.0.0:4222", config.EmbeddedNats.ServiceAddr)
assert.Equal(t, "nats://0.0.0.0:5222", config.EmbeddedNats.ClusterAddr)
assert.Equal(t, "any-test-mag-cluster", config.EmbeddedNats.ClusterName)
}

func TestFlyPresets_When_RedisConfigured(t *testing.T) {
Expand Down Expand Up @@ -147,38 +278,6 @@ func TestBrokerWhenENATSConfigured(t *testing.T) {
assert.Equal(t, "nats", config.PubSubAdapter)
}

func TestFlyWithBrokerPresets(t *testing.T) {
cleanupEnv := setEnv(
"FLY_APP_NAME", "any-test",
"FLY_REGION", "mag",
"FLY_ALLOC_ID", "1234",
)
defer cleanupEnv()

config := NewConfig()
config.UserPresets = []string{"fly", "broker"}
config.Port = 8989

require.Equal(t, []string{"fly", "broker"}, config.Presets())

err := config.LoadPresets()

require.NoError(t, err)

assert.Equal(t, "0.0.0.0", config.Host)
assert.Equal(t, 8989, config.Port)
assert.Equal(t, 8989, config.HTTPBroadcast.Port)
assert.Equal(t, true, config.EmbedNats)
assert.Equal(t, "nats", config.PubSubAdapter)
assert.Equal(t, "nats", config.BrokerAdapter)
assert.Equal(t, "http,nats", config.BroadcastAdapter)
assert.Equal(t, "nats://0.0.0.0:4222", config.EmbeddedNats.ServiceAddr)
assert.Equal(t, "nats://0.0.0.0:5222", config.EmbeddedNats.ClusterAddr)
assert.Equal(t, "any-test-mag-cluster", config.EmbeddedNats.ClusterName)
assert.Equal(t, []string{"nats://mag.any-test.internal:5222"}, config.EmbeddedNats.Routes)
assert.Equal(t, "localhost:50051", config.RPC.Host)
}

func TestOverrideSomePresetSettings(t *testing.T) {
cleanupEnv := setEnv(
"FLY_APP_NAME", "any-test",
Expand Down
2 changes: 1 addition & 1 deletion docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ The preset provide the following defaults:
- `enats_cluster_routes`: "nats://\<FLY_REGION\>.\<FLY_APP_NAME\>.internal:5222"
- `enats_gateway_advertise`: "\<FLY_REGION\>.\<FLY_APP_NAME\>.internal:7222" (**NOTE:** You must set `ANYCABLE_ENATS_GATEWAY` to `nats://0.0.0.0:7222` and configure at least one gateway address manually to enable gateways).

Also, [embedded NATS](./embedded_nats.md) is enabled automatically if no other pub/sub adapter neither Redis is configured. Similarly, pub/sub, broker and broadcast adapters using embedded NATS are configured automatically, too. Thus, by default, AnyCable-Go setups a NATS cluster automatically (within a single region), no configuration is required.
Also, [embedded NATS](./embedded_nats.md) is enabled automatically if no other pub/sub adapter neither Redis is configured. If the number of VMs is at least 3 and all within the same region, the NATS-based broker is enabled, so you can have [reliable streams](./reliable_streams.md) automatically.

If the `ANYCABLE_FLY_RPC_APP_NAME` env variable is provided, the following defaults are configured as well:

Expand Down
71 changes: 71 additions & 0 deletions fly/fly.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Utilities to interact with Fly.io platform
package fly

import (
"context"
"fmt"
"net"
"time"
)

type VMInfo struct {
ID string
Region string
}

// ClusterInfo contains information about the Fly.io cluster
// obtained from the DNS records, such as the number of machines and regions
type ClusterInfo struct {
regions []string
vms []*VMInfo
}

func (c *ClusterInfo) NumRegions() int {
return len(c.regions)
}

func (c *ClusterInfo) Regions() []string {
return c.regions
}

func (c *ClusterInfo) VMs() []string {
ids := make([]string, len(c.vms))
for i, vm := range c.vms {
ids[i] = vm.ID
}
return ids
}

func (c *ClusterInfo) NumVMs() int {
return len(c.vms)
}

func Cluster(appName string) (*ClusterInfo, error) {
addr := fmt.Sprintf("vms.%s.internal.", appName)

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()

textRecords, err := net.DefaultResolver.LookupTXT(ctx, addr)

if err != nil {
return nil, err
}

vms := make([]*VMInfo, len(textRecords))
regionsMap := make(map[string]struct{})
regions := make([]string, 0)

for i, txt := range textRecords {
vm := &VMInfo{}
fmt.Sscanf(txt, "%s %s", &vm.ID, &vm.Region)
vms[i] = vm

if _, ok := regionsMap[vm.Region]; !ok {
regionsMap[vm.Region] = struct{}{}
regions = append(regions, vm.Region)
}
}

return &ClusterInfo{regions: regions, vms: vms}, nil
}
30 changes: 30 additions & 0 deletions fly/fly_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package fly

import (
"testing"

"github.com/anycable/anycable-go/mocks"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

func TestCluster_multiple_nodes_and_regions(t *testing.T) {
teardownDNS := mocks.MockDNSServer("vms.my-fly-app.internal.", []string{"xyz ewr", "abc sea", "def ewr"})
defer teardownDNS()

cluster, err := Cluster("my-fly-app")

require.NoError(t, err)

assert.Equal(t, 2, cluster.NumRegions())
assert.Equal(t, 3, cluster.NumVMs())
assert.EqualValues(t, []string{"ewr", "sea"}, cluster.Regions())
assert.EqualValues(t, []string{"xyz", "abc", "def"}, cluster.VMs())
}

func TestCluster_when_dns_error(t *testing.T) {
cluster, err := Cluster("my-fly-app")

require.Error(t, err)
assert.Nil(t, cluster)
}
Loading

0 comments on commit b5b97c2

Please sign in to comment.