diff --git a/CHANGELOG.md b/CHANGELOG.md index 06dab959..cb4646b8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,8 @@ ## master +- Telemetry improvements. ([@palkan][]) + ## 1.5.4 (2024-10-08) - Add `anycable.toml` support. ([@palkan][]) diff --git a/cli/cli.go b/cli/cli.go index 56c4198d..7c673b63 100644 --- a/cli/cli.go +++ b/cli/cli.go @@ -63,9 +63,11 @@ type Runner struct { broadcastersFactory broadcastersFactory websocketEndpoints map[string]websocketHandler - router *router.RouterController - metrics *metricspkg.Metrics + router *router.RouterController + metrics *metricspkg.Metrics + telemetryEnabled bool + telemetryConfig *telemetry.Config errChan chan error shutdownables []Shutdownable @@ -247,11 +249,7 @@ func (r *Runner) runNode() (*node.Node, error) { ) if r.telemetryEnabled { - telemetryConfig := telemetry.NewConfig() - if customTelemetryUrl := os.Getenv("ANYCABLE_TELEMETRY_URL"); customTelemetryUrl != "" { - telemetryConfig.Endpoint = customTelemetryUrl - } - tracker := telemetry.NewTracker(metrics, r.config, telemetryConfig) + tracker := telemetry.NewTracker(metrics, r.config, r.telemetryConfig) r.log.With("context", "telemetry").Info(tracker.Announce()) go tracker.Collect() diff --git a/cli/runner_options.go b/cli/runner_options.go index 3c4f1030..57d2a18c 100644 --- a/cli/runner_options.go +++ b/cli/runner_options.go @@ -11,6 +11,7 @@ import ( "github.com/anycable/anycable-go/node" "github.com/anycable/anycable-go/pubsub" "github.com/anycable/anycable-go/rpc" + "github.com/anycable/anycable-go/telemetry" "github.com/joomcode/errorx" ) @@ -179,10 +180,26 @@ func WithDefaultBroker() Option { }) } -// WithTelemetry enables AnyCable telemetry unless ANYCABLE_DISABLE_TELEMETRY is set -func WithTelemetry() Option { +// WithTelemetry enables AnyCable telemetry unless ANYCABLE_DISABLE_TELEMETRY is set. +// You can pass custom properties as pairs of key and value. +func WithTelemetry(props ...string) Option { return func(r *Runner) error { r.telemetryEnabled = os.Getenv("ANYCABLE_DISABLE_TELEMETRY") != "true" + r.telemetryConfig = telemetry.NewConfig() + if customTelemetryUrl := os.Getenv("ANYCABLE_TELEMETRY_URL"); customTelemetryUrl != "" { + r.telemetryConfig.Endpoint = customTelemetryUrl + } + + if len(props) > 0 { + if len(props)%2 != 0 { + return errorx.IllegalArgument.New("telemetry properties should be passed as pairs of key and value") + } + + for i := 0; i < len(props); i += 2 { + r.telemetryConfig.CustomProps[props[i]] = props[i+1] + } + } + return nil } } diff --git a/telemetry/config.go b/telemetry/config.go index 6f05143c..6b507056 100644 --- a/telemetry/config.go +++ b/telemetry/config.go @@ -3,17 +3,19 @@ package telemetry import "os" type Config struct { - Token string - Endpoint string - Debug bool + Token string + Endpoint string + CustomProps map[string]string + Debug bool } var authToken = "secret" // make it overridable during build time func NewConfig() *Config { return &Config{ - Token: authToken, - Endpoint: "https://telemetry.anycable.io", - Debug: os.Getenv("ANYCABLE_TELEMETRY_DEBUG") == "1", + Token: authToken, + Endpoint: "https://telemetry.anycable.io", + Debug: os.Getenv("ANYCABLE_TELEMETRY_DEBUG") == "1", + CustomProps: map[string]string{}, } } diff --git a/telemetry/telemetry.go b/telemetry/telemetry.go index 5a1d5fae..d7c037c4 100644 --- a/telemetry/telemetry.go +++ b/telemetry/telemetry.go @@ -31,8 +31,9 @@ const ( ) type Tracker struct { - id string - client *http.Client + id string + client *http.Client + fingerprint string // Remote service configuration url string @@ -48,7 +49,8 @@ type Tracker struct { logger *slog.Logger // Observed metrics values - observations map[string]interface{} + observations map[string]interface{} + customizations map[string]string } func NewTracker(instrumenter *metrics.Metrics, c *config.Config, tc *Config) *Tracker { @@ -64,15 +66,19 @@ func NewTracker(instrumenter *metrics.Metrics, c *config.Config, tc *Config) *Tr logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{Level: logLevel})) + fingerprint := clusterFingerprint(c) + return &Tracker{ - client: client, - url: tc.Endpoint, - authToken: tc.Token, - logger: logger, - config: c, - instrumenter: instrumenter, - id: id, - observations: make(map[string]interface{}), + client: client, + url: tc.Endpoint, + authToken: tc.Token, + logger: logger, + config: c, + instrumenter: instrumenter, + id: id, + fingerprint: fingerprint, + observations: make(map[string]interface{}), + customizations: tc.CustomProps, } } @@ -116,6 +122,7 @@ func (t *Tracker) Send(event string, props map[string]interface{}) { // Avoid storing IP address props["$ip"] = nil props["distinct_id"] = t.id + props["cluster-fingerprint"] = t.fingerprint props["event"] = event ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) @@ -212,13 +219,7 @@ func (t *Tracker) collectUsage() { } func (t *Tracker) bootProperties() map[string]interface{} { - props := posthog.NewProperties() - - props.Set("version", version.Version()) - props.Set("os", runtime.GOOS) - props.Set("cluster-fingerprint", t.clusterFingerprint()) - - return props + return t.appProperties() } func (t *Tracker) appProperties() map[string]interface{} { @@ -237,7 +238,6 @@ func (t *Tracker) appProperties() map[string]interface{} { } props.Set("deploy", guessPlatform()) - props.Set("cluster-fingerprint", t.clusterFingerprint()) // Features props.Set("has-secret", t.config.Secret != "") @@ -263,6 +263,11 @@ func (t *Tracker) appProperties() map[string]interface{} { props.Set("plus-name", name) } + // Custom properties + for k, v := range t.customizations { + props.Set(k, v) + } + return props } @@ -312,18 +317,32 @@ func guessPlatform() string { // Try to generate a unique cluster fingerprint to identify events // from different instances of the same cluster. -func (t *Tracker) clusterFingerprint() string { +func clusterFingerprint(c *config.Config) string { platformID := platformServiceID() if platformID != "" { - return generateDigest(platformID) + return generateDigest("P", platformID) + } + + platform := guessPlatform() + // Explicitly set env vars + env := anycableEnvVarsList() + // Command line arguments + opts := anycableCLIArgs() + // File configuration as a string + file := anycableFileConfig(c.ConfigFilePath) + + // Likely development environment + if env == "" && opts == "" && file == "" && platform == "" { + return "default" } return generateDigest( - // Explicitly set env vars - anycableEnvVarsList(), - // Command line arguments - anycableCLIArgs(), + "C", + platform, + env, + opts, + file, ) } @@ -351,7 +370,7 @@ func platformServiceID() string { return "" } -func generateDigest(parts ...string) string { +func generateDigest(prefix string, parts ...string) string { h := sha256.New() for _, part := range parts { @@ -360,7 +379,7 @@ func generateDigest(parts ...string) string { } } - return fmt.Sprintf("%x", h.Sum(nil)) + return fmt.Sprintf("%s%x", prefix, h.Sum(nil)) } // Return a sorted list of AnyCable environment variables. @@ -386,3 +405,17 @@ func anycableCLIArgs() string { return strings.Join(args, ",") } + +// Return the contents of AnyCable configuration file if any. +func anycableFileConfig(path string) string { + if path == "" { + return "" + } + + bytes, err := os.ReadFile(path) + if err != nil { + return "" + } + + return string(bytes) +} diff --git a/telemetry/telemetry_test.go b/telemetry/telemetry_test.go index 9fac9d9a..64053f68 100644 --- a/telemetry/telemetry_test.go +++ b/telemetry/telemetry_test.go @@ -56,7 +56,11 @@ func TestTracking(t *testing.T) { defer ts.Close() conf := config.NewConfig() - tracker := NewTracker(metrics, &conf, &Config{Endpoint: ts.URL}) + tconf := NewConfig() + tconf.Endpoint = ts.URL + tconf.CustomProps["distro"] = "testo" + + tracker := NewTracker(metrics, &conf, tconf) defer tracker.Shutdown(context.Background()) // nolint: errcheck tracker.Collect() @@ -66,6 +70,7 @@ func TestTracking(t *testing.T) { assert.Equal(t, "boot", event["event"]) assert.Equal(t, version.Version(), event["version"]) + assert.Equal(t, "testo", event["distro"]) time.Sleep(100 * time.Millisecond) @@ -82,6 +87,7 @@ func TestTracking(t *testing.T) { assert.Equal(t, "ecs-fargate", event["deploy"]) assert.Equal(t, 14, int(event["clients_max"].(float64))) assert.Equal(t, 100, int(event["mem_sys_max"].(float64))) + assert.Equal(t, "testo", event["distro"]) require.NoError(t, tracker.Shutdown(context.Background())) }