diff --git a/datadog/span-metrics.tf b/datadog/span-metrics.tf index e5aea673c..68e8d6125 100644 --- a/datadog/span-metrics.tf +++ b/datadog/span-metrics.tf @@ -31,7 +31,7 @@ resource "datadog_spans_metric" "kubehound_ingest_duration" { } filter { - query = "service:kubehound operation_name:kubehound.ingestData" + query = "service:kubehound-ingestor operation_name:kubehound.ingestData" } dynamic "group_by" { @@ -63,7 +63,7 @@ resource "datadog_spans_metric" "kubehound_graph_duration" { } filter { - query = "service:kubehound operation_name:kubehound.buildGraph" + query = "service:kubehound-ingestor operation_name:kubehound.buildGraph" } dynamic "group_by" { @@ -103,7 +103,7 @@ resource "datadog_spans_metric" "kubehound_collector_stream_duration" { } filter { - query = "service:kubehound operation_name:kubehound.collector.stream" + query = "service:kubehound-collector operation_name:kubehound.collector.stream" } dynamic "group_by" { @@ -144,7 +144,7 @@ resource "datadog_spans_metric" "kubehound_graph_builder_edge_duration" { } filter { - query = "service:kubehound operation_name:kubehound.graph.builder.edge" + query = "service:kubehound-ingestor operation_name:kubehound.graph.builder.edge" } dynamic "group_by" { @@ -154,4 +154,4 @@ resource "datadog_spans_metric" "kubehound_graph_builder_edge_duration" { path = group_by.value } } -} \ No newline at end of file +} diff --git a/pkg/collector/file.go b/pkg/collector/file.go index 09172c60e..089431f58 100644 --- a/pkg/collector/file.go +++ b/pkg/collector/file.go @@ -124,7 +124,7 @@ func (c *FileCollector) streamPodsNamespace(ctx context.Context, fp string, inge } for _, item := range list.Items { - _ = statsd.Incr(metric.CollectorCount, c.tags.pod, 1) + _ = statsd.Incr(ctx, metric.CollectorCount, c.tags.pod, 1) i := item err = ingestor.IngestPod(ctx, &i) if err != nil { @@ -176,7 +176,7 @@ func (c *FileCollector) streamRolesNamespace(ctx context.Context, fp string, ing } for _, item := range list.Items { - _ = statsd.Incr(metric.CollectorCount, c.tags.role, 1) + _ = statsd.Incr(ctx, metric.CollectorCount, c.tags.role, 1) i := item err = ingestor.IngestRole(ctx, &i) if err != nil { @@ -228,7 +228,7 @@ func (c *FileCollector) streamRoleBindingsNamespace(ctx context.Context, fp stri } for _, item := range list.Items { - _ = statsd.Incr(metric.CollectorCount, c.tags.rolebinding, 1) + _ = statsd.Incr(ctx, metric.CollectorCount, c.tags.rolebinding, 1) i := item err = ingestor.IngestRoleBinding(ctx, &i) if err != nil { @@ -280,7 +280,7 @@ func (c *FileCollector) streamEndpointsNamespace(ctx context.Context, fp string, } for _, item := range list.Items { - _ = statsd.Incr(metric.CollectorCount, c.tags.endpoint, 1) + _ = statsd.Incr(ctx, metric.CollectorCount, c.tags.endpoint, 1) i := item err = ingestor.IngestEndpoint(ctx, &i) if err != nil { @@ -339,7 +339,7 @@ func (c *FileCollector) StreamNodes(ctx context.Context, ingestor NodeIngestor) } for _, item := range list.Items { - _ = statsd.Incr(metric.CollectorCount, c.tags.node, 1) + _ = statsd.Incr(ctx, metric.CollectorCount, c.tags.node, 1) i := item err = ingestor.IngestNode(ctx, &i) if err != nil { @@ -366,7 +366,7 @@ func (c *FileCollector) StreamClusterRoles(ctx context.Context, ingestor Cluster } for _, item := range list.Items { - _ = statsd.Incr(metric.CollectorCount, c.tags.clusterrole, 1) + _ = statsd.Incr(ctx, metric.CollectorCount, c.tags.clusterrole, 1) i := item err = ingestor.IngestClusterRole(ctx, &i) if err != nil { @@ -393,7 +393,7 @@ func (c *FileCollector) StreamClusterRoleBindings(ctx context.Context, ingestor } for _, item := range list.Items { - _ = statsd.Incr(metric.CollectorCount, c.tags.clusterrolebinding, 1) + _ = statsd.Incr(ctx, metric.CollectorCount, c.tags.clusterrolebinding, 1) i := item err = ingestor.IngestClusterRoleBinding(ctx, &i) if err != nil { diff --git a/pkg/collector/k8s_api.go b/pkg/collector/k8s_api.go index f6cc31996..ac88dbc6c 100644 --- a/pkg/collector/k8s_api.go +++ b/pkg/collector/k8s_api.go @@ -162,7 +162,7 @@ func (c *k8sAPICollector) wait(ctx context.Context, resourceType string, tags [] } // entity := tag.Entity(resourceType) - err := statsd.Gauge(metric.CollectorWait, float64(c.waitTime[resourceType]), tags, 1) + err := statsd.Gauge(ctx, metric.CollectorWait, float64(c.waitTime[resourceType]), tags, 1) if err != nil { l.Error("could not send gauge", log.ErrorField(err)) } @@ -213,19 +213,19 @@ func (c *k8sAPICollector) computeMetrics(ctx context.Context) (Metrics, error) { } runDuration := time.Since(c.startTime) - err := statsd.Gauge(metric.CollectorRunWait, float64(runTotalWaitTime), c.tags.baseTags, 1) + err := statsd.Gauge(ctx, metric.CollectorRunWait, float64(runTotalWaitTime), c.tags.baseTags, 1) if err != nil { errMetric = errors.Join(errMetric, err) l.Error("could not send gauge", log.ErrorField(err)) } - err = statsd.Gauge(metric.CollectorRunDuration, float64(runDuration), c.tags.baseTags, 1) + err = statsd.Gauge(ctx, metric.CollectorRunDuration, float64(runDuration), c.tags.baseTags, 1) if err != nil { errMetric = errors.Join(errMetric, err) l.Error("could not send gauge", log.ErrorField(err)) } runThrottlingPercentage := 1 - (float64(runDuration-runTotalWaitTime) / float64(runDuration)) - err = statsd.Gauge(metric.CollectorRunThrottling, runThrottlingPercentage, c.tags.baseTags, 1) + err = statsd.Gauge(ctx, metric.CollectorRunThrottling, runThrottlingPercentage, c.tags.baseTags, 1) if err != nil { errMetric = errors.Join(errMetric, err) l.Error("could not send gauge", log.ErrorField(err)) @@ -288,7 +288,7 @@ func (c *k8sAPICollector) streamPodsNamespace(ctx context.Context, namespace str c.setPagerConfig(pager) return pager.EachListItem(ctx, opts, func(obj runtime.Object) error { - _ = statsd.Incr(metric.CollectorCount, c.tags.pod, 1) + _ = statsd.Incr(ctx, metric.CollectorCount, c.tags.pod, 1) c.wait(ctx, entity, c.tags.pod) item, ok := obj.(*corev1.Pod) if !ok { @@ -343,7 +343,7 @@ func (c *k8sAPICollector) streamRolesNamespace(ctx context.Context, namespace st c.setPagerConfig(pager) return pager.EachListItem(ctx, opts, func(obj runtime.Object) error { - _ = statsd.Incr(metric.CollectorCount, c.tags.role, 1) + _ = statsd.Incr(ctx, metric.CollectorCount, c.tags.role, 1) c.wait(ctx, entity, c.tags.role) item, ok := obj.(*rbacv1.Role) if !ok { @@ -398,7 +398,7 @@ func (c *k8sAPICollector) streamRoleBindingsNamespace(ctx context.Context, names c.setPagerConfig(pager) return pager.EachListItem(ctx, opts, func(obj runtime.Object) error { - _ = statsd.Incr(metric.CollectorCount, c.tags.rolebinding, 1) + _ = statsd.Incr(ctx, metric.CollectorCount, c.tags.rolebinding, 1) c.wait(ctx, entity, c.tags.rolebinding) item, ok := obj.(*rbacv1.RoleBinding) if !ok { @@ -453,7 +453,7 @@ func (c *k8sAPICollector) streamEndpointsNamespace(ctx context.Context, namespac c.setPagerConfig(pager) return pager.EachListItem(ctx, opts, func(obj runtime.Object) error { - _ = statsd.Incr(metric.CollectorCount, c.tags.endpoint, 1) + _ = statsd.Incr(ctx, metric.CollectorCount, c.tags.endpoint, 1) c.wait(ctx, entity, c.tags.endpoint) item, ok := obj.(*discoveryv1.EndpointSlice) if !ok { @@ -507,7 +507,7 @@ func (c *k8sAPICollector) StreamNodes(ctx context.Context, ingestor NodeIngestor c.setPagerConfig(pager) err = pager.EachListItem(ctx, opts, func(obj runtime.Object) error { - _ = statsd.Incr(metric.CollectorCount, c.tags.node, 1) + _ = statsd.Incr(ctx, metric.CollectorCount, c.tags.node, 1) c.wait(ctx, entity, c.tags.node) item, ok := obj.(*corev1.Node) if !ok { @@ -550,7 +550,7 @@ func (c *k8sAPICollector) StreamClusterRoles(ctx context.Context, ingestor Clust c.setPagerConfig(pager) err = pager.EachListItem(ctx, opts, func(obj runtime.Object) error { - _ = statsd.Incr(metric.CollectorCount, c.tags.clusterrole, 1) + _ = statsd.Incr(ctx, metric.CollectorCount, c.tags.clusterrole, 1) c.wait(ctx, entity, c.tags.clusterrole) item, ok := obj.(*rbacv1.ClusterRole) if !ok { @@ -593,7 +593,7 @@ func (c *k8sAPICollector) StreamClusterRoleBindings(ctx context.Context, ingesto c.setPagerConfig(pager) err = pager.EachListItem(ctx, opts, func(obj runtime.Object) error { - _ = statsd.Incr(metric.CollectorCount, c.tags.clusterrolebinding, 1) + _ = statsd.Incr(ctx, metric.CollectorCount, c.tags.clusterrolebinding, 1) c.wait(ctx, entity, c.tags.clusterrolebinding) item, ok := obj.(*rbacv1.ClusterRoleBinding) if !ok { diff --git a/pkg/ingestor/api/api.go b/pkg/ingestor/api/api.go index aa8c248a0..3304e98ef 100644 --- a/pkg/ingestor/api/api.go +++ b/pkg/ingestor/api/api.go @@ -11,21 +11,19 @@ import ( "github.com/DataDog/KubeHound/pkg/collector" "github.com/DataDog/KubeHound/pkg/config" "github.com/DataDog/KubeHound/pkg/dump" - "github.com/DataDog/KubeHound/pkg/ingestor" grpc "github.com/DataDog/KubeHound/pkg/ingestor/api/grpc/pb" "github.com/DataDog/KubeHound/pkg/ingestor/notifier" "github.com/DataDog/KubeHound/pkg/ingestor/puller" - "github.com/DataDog/KubeHound/pkg/kubehound/graph" "github.com/DataDog/KubeHound/pkg/kubehound/graph/adapter" "github.com/DataDog/KubeHound/pkg/kubehound/providers" "github.com/DataDog/KubeHound/pkg/kubehound/store/collections" "github.com/DataDog/KubeHound/pkg/telemetry/events" "github.com/DataDog/KubeHound/pkg/telemetry/log" "github.com/DataDog/KubeHound/pkg/telemetry/span" - "github.com/DataDog/KubeHound/pkg/telemetry/tag" gremlingo "github.com/apache/tinkerpop/gremlin-go/v3/driver" "go.mongodb.org/mongo-driver/bson" "google.golang.org/protobuf/types/known/timestamppb" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) @@ -153,27 +151,24 @@ func (g *IngestorAPI) Ingest(ctx context.Context, path string) error { runCtx := context.Background() runCtx = context.WithValue(runCtx, log.ContextFieldCluster, clusterName) runCtx = context.WithValue(runCtx, log.ContextFieldRunID, runID) - l = log.Logger(runCtx) //nolint: contextcheck - spanJob, runCtx := span.SpanRunFromContext(runCtx, span.IngestorStartJob) - defer func() { spanJob.Finish(tracer.WithError(err)) }() - - events.PushEvent( - fmt.Sprintf("Ingesting cluster %s with runID %s", clusterName, runID), - fmt.Sprintf("Ingesting cluster %s with runID %s", clusterName, runID), - []string{ - tag.IngestionRunID(runID), - }, - ) - + l = log.Logger(runCtx) //nolint: contextcheck alreadyIngested, err := g.isAlreadyIngestedInGraph(runCtx, clusterName, runID) //nolint: contextcheck if err != nil { return err } if alreadyIngested { + _ = events.PushEvent(runCtx, events.IngestSkip, "") //nolint: contextcheck + return fmt.Errorf("%w [%s:%s]", ErrAlreadyIngested, clusterName, runID) } + spanJob, runCtx := span.SpanRunFromContext(runCtx, span.IngestorStartJob) + spanJob.SetTag(ext.ManualKeep, true) + defer func() { spanJob.Finish(tracer.WithError(err)) }() + + _ = events.PushEvent(runCtx, events.IngestStarted, "") //nolint: contextcheck + // We need to flush the cache to prevent warnings/errors when overwriting elements in cache from the previous ingestion // This avoid conflicts from previous ingestion (there is no need to reuse the cache from a previous ingestion) l.Info("Preparing cache provider") @@ -209,15 +204,11 @@ func (g *IngestorAPI) Ingest(ctx context.Context, path string) error { } } - err = ingestor.IngestData(runCtx, runCfg, collect, g.providers.CacheProvider, g.providers.StoreProvider, g.providers.GraphProvider) //nolint: contextcheck - if err != nil { - return fmt.Errorf("raw data ingest: %w", err) - } - - err = graph.BuildGraph(runCtx, runCfg, g.providers.StoreProvider, g.providers.GraphProvider, g.providers.CacheProvider) //nolint: contextcheck + err = g.providers.IngestBuildData(runCtx, runCfg) //nolint: contextcheck if err != nil { return err } + err = g.notifier.Notify(runCtx, clusterName, runID) //nolint: contextcheck if err != nil { return fmt.Errorf("notifying: %w", err) diff --git a/pkg/ingestor/puller/blob/blob.go b/pkg/ingestor/puller/blob/blob.go index 0afae671f..808602a5d 100644 --- a/pkg/ingestor/puller/blob/blob.go +++ b/pkg/ingestor/puller/blob/blob.go @@ -23,6 +23,8 @@ import ( _ "gocloud.dev/blob/gcsblob" _ "gocloud.dev/blob/memblob" "gocloud.dev/blob/s3blob" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) @@ -51,6 +53,9 @@ func NewBlobStorage(cfg *config.KubehoundConfig, blobConfig *config.BlobConfig) } func (bs *BlobStore) openBucket(ctx context.Context) (*blob.Bucket, error) { + l := log.Logger(ctx) + l.Info("Opening bucket", log.String("bucket_name", bs.bucketName)) + urlStruct, err := url.Parse(bs.bucketName) if err != nil { return nil, err @@ -132,10 +137,15 @@ func (bs *BlobStore) ListFiles(ctx context.Context, prefix string, recursive boo // Pull pulls the data from the blob store (e.g: s3) and returns the path of the folder containing the archive func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName string, runID string) error { l := log.Logger(outer) - l.Info("Putting data on blob store bucket", log.String("bucket_name", bs.bucketName), log.String(log.FieldClusterKey, clusterName), log.String(log.FieldRunIDKey, runID)) - spanPut, ctx := span.SpanRunFromContext(outer, span.IngestorBlobPull) var err error - defer func() { spanPut.Finish(tracer.WithError(err)) }() + + // Triggering a span only when it is an actual run and not the rehydration process (download the kubehound dump to get the metadata) + if log.GetRunIDFromContext(outer) != "" { + var spanPut ddtrace.Span + spanPut, outer = span.SpanRunFromContext(outer, span.IngestorBlobPull) + defer func() { spanPut.Finish(tracer.WithError(err)) }() + } + l.Info("Putting data on blob store bucket", log.String("bucket_name", bs.bucketName), log.String(log.FieldClusterKey, clusterName), log.String(log.FieldRunIDKey, runID)) dumpResult, err := dump.NewDumpResult(clusterName, runID, true) if err != nil { @@ -143,7 +153,7 @@ func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName } key := dumpResult.GetFullPath() l.Info("Opening bucket", log.String("bucket_name", bs.bucketName)) - b, err := bs.openBucket(ctx) + b, err := bs.openBucket(outer) if err != nil { return err } @@ -157,7 +167,7 @@ func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName l.Info("Uploading archive from blob store", log.String("key", key)) w := bufio.NewReader(f) - err = b.Upload(ctx, key, w, &blob.WriterOptions{ + err = b.Upload(outer, key, w, &blob.WriterOptions{ ContentType: "application/gzip", }) if err != nil { @@ -175,13 +185,15 @@ func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName // Pull pulls the data from the blob store (e.g: s3) and returns the path of the folder containing the archive func (bs *BlobStore) Pull(outer context.Context, key string) (string, error) { l := log.Logger(outer) - l.Info("Pulling data from blob store bucket", log.String("bucket_name", bs.bucketName), log.String("key", key)) - spanPull, ctx := span.SpanRunFromContext(outer, span.IngestorBlobPull) var err error - defer func() { spanPull.Finish(tracer.WithError(err)) }() + if log.GetRunIDFromContext(outer) != "" { + var spanPull ddtrace.Span + spanPull, outer = span.SpanRunFromContext(outer, span.IngestorBlobPull) + defer func() { spanPull.Finish(tracer.WithError(err)) }() + } + l.Info("Pulling data from blob store bucket", log.String("bucket_name", bs.bucketName), log.String("key", key)) - l.Info("Opening bucket", log.String("bucket_name", bs.bucketName)) - b, err := bs.openBucket(ctx) + b, err := bs.openBucket(outer) if err != nil { return "", err } @@ -207,7 +219,7 @@ func (bs *BlobStore) Pull(outer context.Context, key string) (string, error) { l.Info("Downloading archive from blob store", log.String("key", key)) w := bufio.NewWriter(f) - err = b.Download(ctx, key, w, nil) + err = b.Download(outer, key, w, nil) if err != nil { return archivePath, err } @@ -221,9 +233,12 @@ func (bs *BlobStore) Pull(outer context.Context, key string) (string, error) { } func (bs *BlobStore) Extract(ctx context.Context, archivePath string) error { - spanExtract, _ := span.SpanRunFromContext(ctx, span.IngestorBlobExtract) var err error - defer func() { spanExtract.Finish(tracer.WithError(err)) }() + if log.GetRunIDFromContext(ctx) != "" { + var spanPull ddtrace.Span + spanPull, ctx = span.SpanRunFromContext(ctx, span.IngestorBlobExtract) + defer func() { spanPull.Finish(tracer.WithError(err)) }() + } basePath := filepath.Dir(archivePath) err = puller.CheckSanePath(archivePath, basePath) @@ -243,9 +258,12 @@ func (bs *BlobStore) Extract(ctx context.Context, archivePath string) error { // Once downloaded and processed, we should cleanup the disk so we can reduce the disk usage // required for large infrastructure func (bs *BlobStore) Close(ctx context.Context, archivePath string) error { - spanClose, _ := span.SpanRunFromContext(ctx, span.IngestorBlobClose) var err error - defer func() { spanClose.Finish(tracer.WithError(err)) }() + if log.GetRunIDFromContext(ctx) != "" { + var spanClose ddtrace.Span + spanClose, _ = span.SpanRunFromContext(ctx, span.IngestorBlobClose) + defer func() { spanClose.Finish(tracer.WithError(err)) }() + } path := filepath.Dir(archivePath) err = puller.CheckSanePath(archivePath, bs.cfg.Ingestor.TempDir) diff --git a/pkg/kubehound/core/core_dump.go b/pkg/kubehound/core/core_dump.go index 4c638b221..c6e1e588b 100644 --- a/pkg/kubehound/core/core_dump.go +++ b/pkg/kubehound/core/core_dump.go @@ -13,7 +13,6 @@ import ( "github.com/DataDog/KubeHound/pkg/telemetry/events" "github.com/DataDog/KubeHound/pkg/telemetry/log" "github.com/DataDog/KubeHound/pkg/telemetry/span" - "github.com/DataDog/KubeHound/pkg/telemetry/tag" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) @@ -43,13 +42,7 @@ func DumpCore(ctx context.Context, khCfg *config.KubehoundConfig, upload bool) ( khCfg.Collector.Type = config.CollectorTypeK8sAPI - events.PushEvent( - fmt.Sprintf("Starting KubeHound dump for %s", clusterName), - fmt.Sprintf("Starting KubeHound dump for %s", clusterName), - []string{ - tag.ActionType(events.DumperRun), - }, - ) + _ = events.PushEvent(ctx, events.DumpStarted, "") filePath, err := runLocalDump(ctx, khCfg) if err != nil { @@ -76,13 +69,8 @@ func DumpCore(ctx context.Context, khCfg *config.KubehoundConfig, upload bool) ( } } - events.PushEvent( - fmt.Sprintf("Finish KubeHound dump for %s", clusterName), - fmt.Sprintf("KubeHound dump run has been completed in %s", time.Since(start)), - []string{ - tag.ActionType(events.DumperRun), - }, - ) + text := fmt.Sprintf("KubeHound dump run has been completed in %s", time.Since(start)) + _ = events.PushEvent(ctx, events.DumpFinished, text) l.Info("KubeHound dump run has been completed", log.Duration("duration", time.Since(start))) return filePath, nil diff --git a/pkg/kubehound/graph/builder.go b/pkg/kubehound/graph/builder.go index 801316782..2cb89827c 100644 --- a/pkg/kubehound/graph/builder.go +++ b/pkg/kubehound/graph/builder.go @@ -54,7 +54,7 @@ func (b *Builder) HealthCheck(ctx context.Context) error { // buildEdge inserts a class of edges into the graph database. func (b *Builder) buildEdge(ctx context.Context, label string, e edge.Builder, oic *converter.ObjectIDConverter) error { - span, ctx := tracer.StartSpanFromContext(ctx, span.BuildEdge, tracer.Measured(), tracer.ResourceName(e.Label())) + span, ctx := span.StartSpanFromContext(ctx, span.BuildEdge, tracer.Measured(), tracer.ResourceName(e.Label())) span.SetTag(tag.LabelTag, e.Label()) var err error defer func() { span.Finish(tracer.WithError(err)) }() diff --git a/pkg/kubehound/providers/providers.go b/pkg/kubehound/providers/providers.go index 5d476d4ae..6b63b5da3 100644 --- a/pkg/kubehound/providers/providers.go +++ b/pkg/kubehound/providers/providers.go @@ -3,6 +3,7 @@ package providers import ( "context" "fmt" + "time" "github.com/DataDog/KubeHound/pkg/collector" "github.com/DataDog/KubeHound/pkg/config" @@ -11,7 +12,11 @@ import ( "github.com/DataDog/KubeHound/pkg/kubehound/storage/cache" "github.com/DataDog/KubeHound/pkg/kubehound/storage/graphdb" "github.com/DataDog/KubeHound/pkg/kubehound/storage/storedb" + "github.com/DataDog/KubeHound/pkg/telemetry/events" "github.com/DataDog/KubeHound/pkg/telemetry/log" + "github.com/DataDog/KubeHound/pkg/telemetry/metric" + "github.com/DataDog/KubeHound/pkg/telemetry/statsd" + "github.com/DataDog/KubeHound/pkg/telemetry/tag" ) type ProvidersFactoryConfig struct { @@ -81,6 +86,7 @@ func (p *ProvidersFactoryConfig) IngestBuildData(ctx context.Context, khCfg *con l := log.Logger(ctx) // Create the collector instance l.Info("Loading Kubernetes data collector client") + start := time.Now() collect, err := collector.ClientFactory(ctx, khCfg) if err != nil { return fmt.Errorf("collector client creation: %w", err) @@ -94,11 +100,23 @@ func (p *ProvidersFactoryConfig) IngestBuildData(ctx context.Context, khCfg *con if err != nil { return fmt.Errorf("raw data ingest: %w", err) } + // Metric for IngestData + _ = statsd.Gauge(ctx, metric.IngestionIngestDuration, float64(time.Since(start)), tag.GetDefaultTags(ctx), 1) + startBuild := time.Now() err = graph.BuildGraph(ctx, khCfg, p.StoreProvider, p.GraphProvider, p.CacheProvider) if err != nil { return err } + // Metric for BuildGraph + _ = statsd.Gauge(ctx, metric.IngestionBuildDuration, float64(time.Since(startBuild)), tag.GetDefaultTags(ctx), 1) + + // Metric for IngestBuildData + _ = statsd.Gauge(ctx, metric.IngestionRunDuration, float64(time.Since(start)), tag.GetDefaultTags(ctx), 1) + + text := fmt.Sprintf("KubeHound ingestion has been completed in %s", time.Since(start)) + _ = events.PushEvent(ctx, events.IngestFinished, text) + return nil } diff --git a/pkg/kubehound/storage/cache/memcache_provider.go b/pkg/kubehound/storage/cache/memcache_provider.go index 929a3b36d..40e0d43f6 100644 --- a/pkg/kubehound/storage/cache/memcache_provider.go +++ b/pkg/kubehound/storage/cache/memcache_provider.go @@ -61,10 +61,10 @@ func (m *MemCacheProvider) Get(ctx context.Context, key cachekey.CacheKey) *Cach data, ok := m.data[computeKey(key)] tagCacheKey := tag.GetBaseTagsWith(tag.CacheKey(key.Shard())) if !ok { - _ = statsd.Incr(metric.CacheMiss, tagCacheKey, 1) + _ = statsd.Incr(ctx, metric.CacheMiss, tagCacheKey, 1) log.Trace(ctx).Debugf("entry not found in cache: %s", computeKey(key)) } else { - _ = statsd.Incr(metric.CacheHit, tagCacheKey, 1) + _ = statsd.Incr(ctx, metric.CacheHit, tagCacheKey, 1) } return &CacheResult{ diff --git a/pkg/kubehound/storage/cache/memcache_writer.go b/pkg/kubehound/storage/cache/memcache_writer.go index 031aa5bee..ffbb3f4ff 100644 --- a/pkg/kubehound/storage/cache/memcache_writer.go +++ b/pkg/kubehound/storage/cache/memcache_writer.go @@ -22,7 +22,7 @@ func (m *MemCacheAsyncWriter) Queue(ctx context.Context, key cachekey.CacheKey, m.mu.Lock() defer m.mu.Unlock() tagCacheKey := tag.GetBaseTagsWith(tag.CacheKey(key.Shard())) - _ = statsd.Incr(metric.CacheWrite, tagCacheKey, 1) + _ = statsd.Incr(ctx, metric.CacheWrite, tagCacheKey, 1) keyId := computeKey(key) entry, ok := m.data[keyId] if ok { @@ -33,7 +33,7 @@ func (m *MemCacheAsyncWriter) Queue(ctx context.Context, key cachekey.CacheKey, if !m.opts.ExpectOverwrite { // if overwrite is expected (e.g fast tracking of existence regardless of value), suppress metrics and logs - _ = statsd.Incr(metric.CacheDuplicate, tagCacheKey, 1) + _ = statsd.Incr(ctx, metric.CacheDuplicate, tagCacheKey, 1) log.Trace(ctx).Warnf("overwriting cache entry key=%s old=%#v new=%#v", keyId, entry, value) } } diff --git a/pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go b/pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go index 93bf15088..4818a5e43 100644 --- a/pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go +++ b/pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go @@ -77,13 +77,13 @@ func (jgv *JanusGraphEdgeWriter) startBackgroundWriter(ctx context.Context) { return } - _ = statsd.Count(metric.BackgroundWriterCall, 1, jgv.tags, 1) + _ = statsd.Count(ctx, metric.BackgroundWriterCall, 1, jgv.tags, 1) err := jgv.batchWrite(ctx, data) if err != nil { log.Trace(ctx).Errorf("write data in background batch writer: %v", err) } - _ = statsd.Decr(metric.QueueSize, jgv.tags, 1) + _ = statsd.Decr(ctx, metric.QueueSize, jgv.tags, 1) case <-ctx.Done(): log.Trace(ctx).Info("Closed background janusgraph worker on context cancel") @@ -103,7 +103,7 @@ func (jgv *JanusGraphEdgeWriter) batchWrite(ctx context.Context, data []any) err defer jgv.writingInFlight.Done() datalen := len(data) - _ = statsd.Count(metric.EdgeWrite, int64(datalen), jgv.tags, 1) + _ = statsd.Count(ctx, metric.EdgeWrite, int64(datalen), jgv.tags, 1) log.Trace(ctx).Debugf("Batch write JanusGraphEdgeWriter with %d elements", datalen) atomic.AddInt32(&jgv.wcounter, int32(datalen)) @@ -139,7 +139,7 @@ func (jgv *JanusGraphEdgeWriter) Flush(ctx context.Context) error { } if len(jgv.inserts) != 0 { - _ = statsd.Incr(metric.FlushWriterCall, jgv.tags, 1) + _ = statsd.Incr(ctx, metric.FlushWriterCall, jgv.tags, 1) jgv.writingInFlight.Add(1) err = jgv.batchWrite(ctx, jgv.inserts) @@ -175,7 +175,7 @@ func (jgv *JanusGraphEdgeWriter) Queue(ctx context.Context, v any) error { jgv.writingInFlight.Add(1) jgv.consumerChan <- copied - _ = statsd.Incr(metric.QueueSize, jgv.tags, 1) + _ = statsd.Incr(ctx, metric.QueueSize, jgv.tags, 1) // cleanup the ops array after we have copied it to the channel jgv.inserts = nil diff --git a/pkg/kubehound/storage/graphdb/janusgraph_vertex_writer.go b/pkg/kubehound/storage/graphdb/janusgraph_vertex_writer.go index 55a92ca05..ec7c8c9ad 100644 --- a/pkg/kubehound/storage/graphdb/janusgraph_vertex_writer.go +++ b/pkg/kubehound/storage/graphdb/janusgraph_vertex_writer.go @@ -81,13 +81,13 @@ func (jgv *JanusGraphVertexWriter) startBackgroundWriter(ctx context.Context) { return } - _ = statsd.Count(metric.BackgroundWriterCall, 1, jgv.tags, 1) + _ = statsd.Count(ctx, metric.BackgroundWriterCall, 1, jgv.tags, 1) err := jgv.batchWrite(ctx, data) if err != nil { log.Trace(ctx).Errorf("Write data in background batch writer: %v", err) } - _ = statsd.Decr(metric.QueueSize, jgv.tags, 1) + _ = statsd.Decr(ctx, metric.QueueSize, jgv.tags, 1) case <-ctx.Done(): log.Trace(ctx).Info("Closed background janusgraph worker on context cancel") @@ -130,7 +130,7 @@ func (jgv *JanusGraphVertexWriter) batchWrite(ctx context.Context, data []any) e defer jgv.writingInFlight.Done() datalen := len(data) - _ = statsd.Count(metric.VertexWrite, int64(datalen), jgv.tags, 1) + _ = statsd.Count(ctx, metric.VertexWrite, int64(datalen), jgv.tags, 1) log.Trace(ctx).Debugf("Batch write JanusGraphVertexWriter with %d elements", datalen) atomic.AddInt32(&jgv.wcounter, int32(datalen)) @@ -174,7 +174,7 @@ func (jgv *JanusGraphVertexWriter) Flush(ctx context.Context) error { } if len(jgv.inserts) != 0 { - _ = statsd.Incr(metric.FlushWriterCall, jgv.tags, 1) + _ = statsd.Incr(ctx, metric.FlushWriterCall, jgv.tags, 1) jgv.writingInFlight.Add(1) err = jgv.batchWrite(ctx, jgv.inserts) @@ -215,7 +215,7 @@ func (jgv *JanusGraphVertexWriter) Queue(ctx context.Context, v any) error { jgv.writingInFlight.Add(1) jgv.consumerChan <- copied - _ = statsd.Incr(metric.QueueSize, jgv.tags, 1) + _ = statsd.Incr(ctx, metric.QueueSize, jgv.tags, 1) // cleanup the ops array after we have copied it to the channel jgv.inserts = nil diff --git a/pkg/kubehound/storage/storedb/mongo_writer.go b/pkg/kubehound/storage/storedb/mongo_writer.go index ad606852b..dbf0860ce 100644 --- a/pkg/kubehound/storage/storedb/mongo_writer.go +++ b/pkg/kubehound/storage/storedb/mongo_writer.go @@ -67,13 +67,13 @@ func (maw *MongoAsyncWriter) startBackgroundWriter(ctx context.Context) { return } - _ = statsd.Count(metric.BackgroundWriterCall, 1, maw.tags, 1) + _ = statsd.Count(ctx, metric.BackgroundWriterCall, 1, maw.tags, 1) err := maw.batchWrite(ctx, data) if err != nil { log.Trace(ctx).Errorf("write data in background batch writer: %v", err) } - _ = statsd.Decr(metric.QueueSize, maw.tags, 1) + _ = statsd.Decr(ctx, metric.QueueSize, maw.tags, 1) case <-ctx.Done(): log.Trace(ctx).Debug("Closed background mongodb worker") @@ -91,7 +91,7 @@ func (maw *MongoAsyncWriter) batchWrite(ctx context.Context, ops []mongo.WriteMo defer func() { span.Finish(tracer.WithError(err)) }() defer maw.writingInFlight.Done() - _ = statsd.Count(metric.ObjectWrite, int64(len(ops)), maw.tags, 1) + _ = statsd.Count(ctx, metric.ObjectWrite, int64(len(ops)), maw.tags, 1) bulkWriteOpts := options.BulkWrite().SetOrdered(false) _, err = maw.dbWriter.BulkWrite(ctx, ops, bulkWriteOpts) @@ -114,7 +114,7 @@ func (maw *MongoAsyncWriter) Queue(ctx context.Context, model any) error { maw.writingInFlight.Add(1) maw.consumerChan <- copied - _ = statsd.Incr(metric.QueueSize, maw.tags, 1) + _ = statsd.Incr(ctx, metric.QueueSize, maw.tags, 1) // cleanup the ops array after we have copied it to the channel maw.ops = nil diff --git a/pkg/telemetry/events/events.go b/pkg/telemetry/events/events.go index f4e2b363b..5e008e3fa 100644 --- a/pkg/telemetry/events/events.go +++ b/pkg/telemetry/events/events.go @@ -1,19 +1,95 @@ package events import ( + "context" + "fmt" + + "github.com/DataDog/KubeHound/pkg/telemetry/log" kstatsd "github.com/DataDog/KubeHound/pkg/telemetry/statsd" + "github.com/DataDog/KubeHound/pkg/telemetry/tag" "github.com/DataDog/datadog-go/v5/statsd" ) const ( - DumperRun = "kubehound.dumper.run" - DumperStop = "kubehound.dumper.stop" + IngestSkip = iota + IngestStarted + IngestFinished + IngestorInit + IngestorFailed + DumpStarted + DumpFinished +) + +const ( + EventActionFail = "fail" + EventActionInit = "init" + EventActionStart = "start" + EventActionSkip = "skip" + EventActionFinish = "finish" ) -func PushEvent(title string, text string, tags []string) { - _ = kstatsd.Event(&statsd.Event{ - Title: title, - Text: text, - Tags: tags, +type EventAction int + +type EventActionDetails struct { + Title string + Text string + Level statsd.EventAlertType + Action string +} + +// Could also be a format stirng template in this case, if needed? +var map2msg = map[EventAction]EventActionDetails{ + IngestorFailed: {Title: "Ingestor/grpc endpoint init failed", Level: statsd.Error, Action: EventActionFail}, + IngestorInit: {Title: "Ingestor/grpc endpoint initiated", Level: statsd.Info, Action: EventActionInit}, + IngestStarted: {Title: "Ingestion started", Level: statsd.Info, Action: EventActionStart}, + IngestSkip: {Title: "Ingestion skipped", Level: statsd.Info, Action: EventActionSkip}, + IngestFinished: {Title: "Ingestion finished", Level: statsd.Info, Action: EventActionFinish}, + + DumpStarted: {Title: "Dump started", Level: statsd.Info, Action: EventActionStart}, + DumpFinished: {Title: "Dump finished", Level: statsd.Info, Action: EventActionFinish}, +} + +func (ea EventAction) Tags(ctx context.Context) []string { + tags := tag.GetDefaultTags(ctx) + tags = append(tags, fmt.Sprintf("%s:%s", tag.ActionTypeTag, map2msg[ea].Action)) + + return tags +} + +func (ea EventAction) Level() statsd.EventAlertType { + return map2msg[ea].Level +} + +func (ea EventAction) Title(ctx context.Context) string { + title, _ := getTitleTextMsg(ctx, map2msg[ea].Title) + + return title +} + +func (ea EventAction) DefaultMessage(ctx context.Context) string { + _, msg := getTitleTextMsg(ctx, map2msg[ea].Title) + + return msg +} + +func getTitleTextMsg(ctx context.Context, actionMsg string) (string, string) { + cluster := log.GetClusterFromContext(ctx) + runId := log.GetRunIDFromContext(ctx) + title := fmt.Sprintf("%s for %s", actionMsg, cluster) + text := fmt.Sprintf("%s for %s with run_id %s", actionMsg, cluster, runId) + + return title, text +} + +func PushEvent(ctx context.Context, action EventAction, text string) error { + if text == "" { + text = action.DefaultMessage(ctx) + } + + return kstatsd.Event(&statsd.Event{ + Title: action.Title(ctx), + Text: text, + Tags: action.Tags(ctx), + AlertType: action.Level(), }) } diff --git a/pkg/telemetry/log/fields.go b/pkg/telemetry/log/fields.go index 01915c3f7..07fec0f4d 100644 --- a/pkg/telemetry/log/fields.go +++ b/pkg/telemetry/log/fields.go @@ -47,6 +47,18 @@ func convertField(value any) string { return val } +func GetRunIDFromContext(ctx context.Context) string { + return convertField(ctx.Value(ContextFieldRunID)) +} + +func GetClusterFromContext(ctx context.Context) string { + return convertField(ctx.Value(ContextFieldCluster)) +} + +func GetComponentFromContext(ctx context.Context) string { + return convertField(ctx.Value(ContextFieldComponent)) +} + func SpanSetDefaultField(ctx context.Context, span ddtrace.Span) { runID := convertField(ctx.Value(ContextFieldRunID)) if runID != "" { diff --git a/pkg/telemetry/metric/metrics.go b/pkg/telemetry/metric/metrics.go index e2040bcb6..afb927704 100644 --- a/pkg/telemetry/metric/metrics.go +++ b/pkg/telemetry/metric/metrics.go @@ -37,3 +37,10 @@ var ( CacheWrite = "kubehound.cache.write" CacheDuplicate = "kubehound.cache.duplicate" ) + +// Ingestion metrics +const ( + IngestionRunDuration = "kubehound.ingestion.run.duration" + IngestionBuildDuration = "kubehound.ingestion.build.duration" + IngestionIngestDuration = "kubehound.ingestion.ingest.duration" +) diff --git a/pkg/telemetry/span/spans.go b/pkg/telemetry/span/spans.go index 3d1537e8d..bd4ec114d 100644 --- a/pkg/telemetry/span/spans.go +++ b/pkg/telemetry/span/spans.go @@ -64,15 +64,6 @@ const ( BuildEdge = "kubehound.graph.builder.edge" ) -// to avoid the following lint error -// should not use built-in type string as key for value; define your own type to avoid collisions (SA1029) -type contextKey int - -const ( - ContextLogFieldClusterName contextKey = iota - ContextLogFieldRunID -) - func convertTag(value any) string { val, err := value.(string) if !err { @@ -82,6 +73,13 @@ func convertTag(value any) string { return val } +func StartSpanFromContext(runCtx context.Context, operationName string, opts ...tracer.StartSpanOption) (tracer.Span, context.Context) { + spanJob, runCtx := tracer.StartSpanFromContext(runCtx, operationName, opts...) + spanIngestRunSetDefaultTag(runCtx, spanJob) + + return spanJob, runCtx +} + func SpanRunFromContext(runCtx context.Context, spanName string) (ddtrace.Span, context.Context) { spanJob, runCtx := tracer.StartSpanFromContext(runCtx, spanName, tracer.ResourceName(convertTag(runCtx.Value(log.ContextFieldCluster))), tracer.Measured()) spanIngestRunSetDefaultTag(runCtx, spanJob) diff --git a/pkg/telemetry/statsd/statsd.go b/pkg/telemetry/statsd/statsd.go index 1f6b02e3a..0114dfcf2 100644 --- a/pkg/telemetry/statsd/statsd.go +++ b/pkg/telemetry/statsd/statsd.go @@ -50,46 +50,51 @@ func Setup(ctx context.Context, cfg *config.KubehoundConfig) error { } // Count tracks how many times something happened per second. -func Count(name string, value int64, tags []string, rate float64) error { +func Count(ctx context.Context, name string, value int64, tags []string, rate float64) error { if statsdClient == nil { return nil } + tags = append(tags, tag.GetDefaultTags(ctx)...) return statsdClient.Count(name, value, tags, rate) } // Gauge measures the value of a metric at a particular time. -func Gauge(name string, value float64, tags []string, rate float64) error { +func Gauge(ctx context.Context, name string, value float64, tags []string, rate float64) error { if statsdClient == nil { return nil } + tags = append(tags, tag.GetDefaultTags(ctx)...) return statsdClient.Gauge(name, value, tags, rate) } // Incr is just Count of 1 -func Incr(name string, tags []string, rate float64) error { +func Incr(ctx context.Context, name string, tags []string, rate float64) error { if statsdClient == nil { return nil } + tags = append(tags, tag.GetDefaultTags(ctx)...) return statsdClient.Incr(name, tags, rate) } // Decr is just Count of -1 -func Decr(name string, tags []string, rate float64) error { +func Decr(ctx context.Context, name string, tags []string, rate float64) error { if statsdClient == nil { return nil } + tags = append(tags, tag.GetDefaultTags(ctx)...) return statsdClient.Decr(name, tags, rate) } // Histogram tracks the statistical distribution of a set of values. -func Histogram(name string, value float64, tags []string, rate float64) error { +func Histogram(ctx context.Context, name string, value float64, tags []string, rate float64) error { if statsdClient == nil { return nil } + tags = append(tags, tag.GetDefaultTags(ctx)...) return statsdClient.Histogram(name, value, tags, rate) } @@ -113,28 +118,31 @@ func SimpleEvent(title string, text string) error { } // Set counts the number of unique elements in a group. -func Set(name string, value string, tags []string, rate float64) error { +func Set(ctx context.Context, name string, value string, tags []string, rate float64) error { if statsdClient == nil { return nil } + tags = append(tags, tag.GetDefaultTags(ctx)...) return statsdClient.Set(name, value, tags, rate) } // Timing sends timing information, it is an alias for TimeInMilliseconds -func Timing(name string, value time.Duration, tags []string, rate float64) error { +func Timing(ctx context.Context, name string, value time.Duration, tags []string, rate float64) error { if statsdClient == nil { return nil } + tags = append(tags, tag.GetDefaultTags(ctx)...) return statsdClient.Timing(name, value, tags, rate) } // TimingDist sends dt in milliseconds as a distribution (p50-p99) -func TimingDist(name string, dt time.Duration, tags []string, rate float64) error { +func TimingDist(ctx context.Context, name string, dt time.Duration, tags []string, rate float64) error { if statsdClient == nil { return nil } + tags = append(tags, tag.GetDefaultTags(ctx)...) const secToMillis = 1000 @@ -142,19 +150,21 @@ func TimingDist(name string, dt time.Duration, tags []string, rate float64) erro } // TimeInMilliseconds sends timing information in milliseconds. -func TimeInMilliseconds(name string, value float64, tags []string, rate float64) error { +func TimeInMilliseconds(ctx context.Context, name string, value float64, tags []string, rate float64) error { if statsdClient == nil { return nil } + tags = append(tags, tag.GetDefaultTags(ctx)...) return statsdClient.TimeInMilliseconds(name, value, tags, rate) } // Distribution tracks accurate global percentiles of a set of values. -func Distribution(name string, value float64, tags []string, rate float64) error { +func Distribution(ctx context.Context, name string, value float64, tags []string, rate float64) error { if statsdClient == nil { return nil } + tags = append(tags, tag.GetDefaultTags(ctx)...) return statsdClient.Distribution(name, value, tags, rate) } diff --git a/pkg/telemetry/tag/tags.go b/pkg/telemetry/tag/tags.go index f9340a60c..3e3b6693d 100644 --- a/pkg/telemetry/tag/tags.go +++ b/pkg/telemetry/tag/tags.go @@ -1,20 +1,20 @@ package tag -import "sync" +import ( + "context" + "sync" + + "github.com/DataDog/KubeHound/pkg/telemetry/log" +) const ( ActionTypeTag = "action" CollectorTag = "collector" - CollectorClusterTag = "cluster" DumperS3BucketTag = "s3_bucket" DumperS3keyTag = "s3_key" - DumperFilePathTag = "file_path" DumperWorkerNumberTag = "worker_number" DumperWriterTypeTag = "writer_type" - EntityTag = "entity" WaitTag = "wait" - RunIdTag = "run_id" - IngestionRunIdTag = "ingestion_run_id" LabelTag = "label" CollectionTag = "collection" BuilderTag = "builder" @@ -23,6 +23,14 @@ const ( EdgeTypeTag = "edge_type" ) +var ( + RunIdTag = log.FieldRunIDKey + CollectorClusterTag = log.FieldClusterKey + DumperFilePathTag = log.FieldPathKey + EntityTag = log.FieldEntityKey + CompenentTag = log.FieldComponentKey +) + const ( StorageJanusGraph = "janusgraph" StorageMongoDB = "mongodb" @@ -83,10 +91,6 @@ func RunID(uuid string) string { return MakeTag(RunIdTag, uuid) } -func IngestionRunID(uuid string) string { - return MakeTag(IngestionRunIdTag, uuid) -} - func Collector(collector string) string { return MakeTag(CollectorTag, collector) } @@ -134,3 +138,27 @@ func S3Bucket(bucket string) string { func S3Key(key string) string { return MakeTag(DumperS3keyTag, key) } + +func ComponentName(component string) string { + return MakeTag(CompenentTag, component) +} + +func GetDefaultTags(ctx context.Context) []string { + defaultTags := []string{} + runID := log.GetRunIDFromContext(ctx) + if runID != "" { + defaultTags = append(defaultTags, RunID(runID)) + } + + cluster := log.GetClusterFromContext(ctx) + if cluster != "" { + defaultTags = append(defaultTags, ClusterName(cluster)) + } + + component := log.GetComponentFromContext(ctx) + if component != "" { + defaultTags = append(defaultTags, ComponentName(component)) + } + + return defaultTags +}