Skip to content

Commit

Permalink
chore: telemetry improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
palkan committed Oct 16, 2024
1 parent 240ccae commit 3b5b2b7
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 43 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master

- Telemetry improvements. ([@palkan][])

## 1.5.4 (2024-10-08)

- Add `anycable.toml` support. ([@palkan][])
Expand Down
12 changes: 5 additions & 7 deletions cli/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ type Runner struct {
broadcastersFactory broadcastersFactory
websocketEndpoints map[string]websocketHandler

router *router.RouterController
metrics *metricspkg.Metrics
router *router.RouterController
metrics *metricspkg.Metrics

telemetryEnabled bool
telemetryConfig *telemetry.Config

errChan chan error
shutdownables []Shutdownable
Expand Down Expand Up @@ -247,11 +249,7 @@ func (r *Runner) runNode() (*node.Node, error) {
)

if r.telemetryEnabled {
telemetryConfig := telemetry.NewConfig()
if customTelemetryUrl := os.Getenv("ANYCABLE_TELEMETRY_URL"); customTelemetryUrl != "" {
telemetryConfig.Endpoint = customTelemetryUrl
}
tracker := telemetry.NewTracker(metrics, r.config, telemetryConfig)
tracker := telemetry.NewTracker(metrics, r.config, r.telemetryConfig)

r.log.With("context", "telemetry").Info(tracker.Announce())
go tracker.Collect()
Expand Down
21 changes: 19 additions & 2 deletions cli/runner_options.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/anycable/anycable-go/node"
"github.com/anycable/anycable-go/pubsub"
"github.com/anycable/anycable-go/rpc"
"github.com/anycable/anycable-go/telemetry"
"github.com/joomcode/errorx"
)

Expand Down Expand Up @@ -179,10 +180,26 @@ func WithDefaultBroker() Option {
})
}

// WithTelemetry enables AnyCable telemetry unless ANYCABLE_DISABLE_TELEMETRY is set
func WithTelemetry() Option {
// WithTelemetry enables AnyCable telemetry unless ANYCABLE_DISABLE_TELEMETRY is set.
// You can pass custom properties as pairs of key and value.
func WithTelemetry(props ...string) Option {
return func(r *Runner) error {
r.telemetryEnabled = os.Getenv("ANYCABLE_DISABLE_TELEMETRY") != "true"
r.telemetryConfig = telemetry.NewConfig()
if customTelemetryUrl := os.Getenv("ANYCABLE_TELEMETRY_URL"); customTelemetryUrl != "" {
r.telemetryConfig.Endpoint = customTelemetryUrl
}

if len(props) > 0 {
if len(props)%2 != 0 {
return errorx.IllegalArgument.New("telemetry properties should be passed as pairs of key and value")
}

for i := 0; i < len(props); i += 2 {
r.telemetryConfig.CustomProps[props[i]] = props[i+1]
}
}

return nil
}
}
Expand Down
14 changes: 8 additions & 6 deletions telemetry/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,19 @@ package telemetry
import "os"

type Config struct {
Token string
Endpoint string
Debug bool
Token string
Endpoint string
CustomProps map[string]string
Debug bool
}

var authToken = "secret" // make it overridable during build time

func NewConfig() *Config {
return &Config{
Token: authToken,
Endpoint: "https://telemetry.anycable.io",
Debug: os.Getenv("ANYCABLE_TELEMETRY_DEBUG") == "1",
Token: authToken,
Endpoint: "https://telemetry.anycable.io",
Debug: os.Getenv("ANYCABLE_TELEMETRY_DEBUG") == "1",
CustomProps: map[string]string{},
}
}
87 changes: 60 additions & 27 deletions telemetry/telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ const (
)

type Tracker struct {
id string
client *http.Client
id string
client *http.Client
fingerprint string

// Remote service configuration
url string
Expand All @@ -48,7 +49,8 @@ type Tracker struct {
logger *slog.Logger

// Observed metrics values
observations map[string]interface{}
observations map[string]interface{}
customizations map[string]string
}

func NewTracker(instrumenter *metrics.Metrics, c *config.Config, tc *Config) *Tracker {
Expand All @@ -64,15 +66,19 @@ func NewTracker(instrumenter *metrics.Metrics, c *config.Config, tc *Config) *Tr

logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: logLevel}))

fingerprint := clusterFingerprint(c)

return &Tracker{
client: client,
url: tc.Endpoint,
authToken: tc.Token,
logger: logger,
config: c,
instrumenter: instrumenter,
id: id,
observations: make(map[string]interface{}),
client: client,
url: tc.Endpoint,
authToken: tc.Token,
logger: logger,
config: c,
instrumenter: instrumenter,
id: id,
fingerprint: fingerprint,
observations: make(map[string]interface{}),
customizations: tc.CustomProps,
}
}

Expand Down Expand Up @@ -116,6 +122,7 @@ func (t *Tracker) Send(event string, props map[string]interface{}) {
// Avoid storing IP address
props["$ip"] = nil
props["distinct_id"] = t.id
props["cluster-fingerprint"] = t.fingerprint
props["event"] = event

ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
Expand Down Expand Up @@ -212,13 +219,7 @@ func (t *Tracker) collectUsage() {
}

func (t *Tracker) bootProperties() map[string]interface{} {
props := posthog.NewProperties()

props.Set("version", version.Version())
props.Set("os", runtime.GOOS)
props.Set("cluster-fingerprint", t.clusterFingerprint())

return props
return t.appProperties()
}

func (t *Tracker) appProperties() map[string]interface{} {
Expand All @@ -237,7 +238,6 @@ func (t *Tracker) appProperties() map[string]interface{} {
}

props.Set("deploy", guessPlatform())
props.Set("cluster-fingerprint", t.clusterFingerprint())

// Features
props.Set("has-secret", t.config.Secret != "")
Expand All @@ -263,6 +263,11 @@ func (t *Tracker) appProperties() map[string]interface{} {
props.Set("plus-name", name)
}

// Custom properties
for k, v := range t.customizations {
props.Set(k, v)
}

return props
}

Expand Down Expand Up @@ -312,18 +317,32 @@ func guessPlatform() string {

// Try to generate a unique cluster fingerprint to identify events
// from different instances of the same cluster.
func (t *Tracker) clusterFingerprint() string {
func clusterFingerprint(c *config.Config) string {
platformID := platformServiceID()

if platformID != "" {
return generateDigest(platformID)
return generateDigest("P", platformID)
}

platform := guessPlatform()
// Explicitly set env vars
env := anycableEnvVarsList()
// Command line arguments
opts := anycableCLIArgs()
// File configuration as a string
file := anycableFileConfig(c.ConfigFilePath)

// Likely development environment
if env == "" && opts == "" && file == "" && platform == "" {
return "default"
}

return generateDigest(
// Explicitly set env vars
anycableEnvVarsList(),
// Command line arguments
anycableCLIArgs(),
"C",
platform,
env,
opts,
file,
)
}

Expand Down Expand Up @@ -351,7 +370,7 @@ func platformServiceID() string {
return ""
}

func generateDigest(parts ...string) string {
func generateDigest(prefix string, parts ...string) string {
h := sha256.New()

for _, part := range parts {
Expand All @@ -360,7 +379,7 @@ func generateDigest(parts ...string) string {
}
}

return fmt.Sprintf("%x", h.Sum(nil))
return fmt.Sprintf("%s%x", prefix, h.Sum(nil))
}

// Return a sorted list of AnyCable environment variables.
Expand All @@ -386,3 +405,17 @@ func anycableCLIArgs() string {

return strings.Join(args, ",")
}

// Return the contents of AnyCable configuration file if any.
func anycableFileConfig(path string) string {
if path == "" {
return ""
}

bytes, err := os.ReadFile(path)
if err != nil {
return ""
}

return string(bytes)
}
8 changes: 7 additions & 1 deletion telemetry/telemetry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,11 @@ func TestTracking(t *testing.T) {
defer ts.Close()

conf := config.NewConfig()
tracker := NewTracker(metrics, &conf, &Config{Endpoint: ts.URL})
tconf := NewConfig()
tconf.Endpoint = ts.URL
tconf.CustomProps["distro"] = "testo"

tracker := NewTracker(metrics, &conf, tconf)
defer tracker.Shutdown(context.Background()) // nolint: errcheck

tracker.Collect()
Expand All @@ -66,6 +70,7 @@ func TestTracking(t *testing.T) {

assert.Equal(t, "boot", event["event"])
assert.Equal(t, version.Version(), event["version"])
assert.Equal(t, "testo", event["distro"])

time.Sleep(100 * time.Millisecond)

Expand All @@ -82,6 +87,7 @@ func TestTracking(t *testing.T) {
assert.Equal(t, "ecs-fargate", event["deploy"])
assert.Equal(t, 14, int(event["clients_max"].(float64)))
assert.Equal(t, 100, int(event["mem_sys_max"].(float64)))
assert.Equal(t, "testo", event["distro"])

require.NoError(t, tracker.Shutdown(context.Background()))
}

0 comments on commit 3b5b2b7

Please sign in to comment.