Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix observability #285

Merged
merged 9 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions datadog/span-metrics.tf
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ resource "datadog_spans_metric" "kubehound_ingest_duration" {
}

filter {
query = "service:kubehound operation_name:kubehound.ingestData"
query = "service:kubehound-ingestor operation_name:kubehound.ingestData"
}

dynamic "group_by" {
Expand Down Expand Up @@ -63,7 +63,7 @@ resource "datadog_spans_metric" "kubehound_graph_duration" {
}

filter {
query = "service:kubehound operation_name:kubehound.buildGraph"
query = "service:kubehound-ingestor operation_name:kubehound.buildGraph"
}

dynamic "group_by" {
Expand Down Expand Up @@ -103,7 +103,7 @@ resource "datadog_spans_metric" "kubehound_collector_stream_duration" {
}

filter {
query = "service:kubehound operation_name:kubehound.collector.stream"
query = "service:kubehound-collector operation_name:kubehound.collector.stream"
}

dynamic "group_by" {
Expand Down Expand Up @@ -144,7 +144,7 @@ resource "datadog_spans_metric" "kubehound_graph_builder_edge_duration" {
}

filter {
query = "service:kubehound operation_name:kubehound.graph.builder.edge"
query = "service:kubehound-ingestor operation_name:kubehound.graph.builder.edge"
}

dynamic "group_by" {
Expand All @@ -154,4 +154,4 @@ resource "datadog_spans_metric" "kubehound_graph_builder_edge_duration" {
path = group_by.value
}
}
}
}
14 changes: 7 additions & 7 deletions pkg/collector/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ func (c *FileCollector) streamPodsNamespace(ctx context.Context, fp string, inge
}

for _, item := range list.Items {
_ = statsd.Incr(metric.CollectorCount, c.tags.pod, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.pod, 1)
i := item
err = ingestor.IngestPod(ctx, &i)
if err != nil {
Expand Down Expand Up @@ -176,7 +176,7 @@ func (c *FileCollector) streamRolesNamespace(ctx context.Context, fp string, ing
}

for _, item := range list.Items {
_ = statsd.Incr(metric.CollectorCount, c.tags.role, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.role, 1)
i := item
err = ingestor.IngestRole(ctx, &i)
if err != nil {
Expand Down Expand Up @@ -228,7 +228,7 @@ func (c *FileCollector) streamRoleBindingsNamespace(ctx context.Context, fp stri
}

for _, item := range list.Items {
_ = statsd.Incr(metric.CollectorCount, c.tags.rolebinding, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.rolebinding, 1)
i := item
err = ingestor.IngestRoleBinding(ctx, &i)
if err != nil {
Expand Down Expand Up @@ -280,7 +280,7 @@ func (c *FileCollector) streamEndpointsNamespace(ctx context.Context, fp string,
}

for _, item := range list.Items {
_ = statsd.Incr(metric.CollectorCount, c.tags.endpoint, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.endpoint, 1)
i := item
err = ingestor.IngestEndpoint(ctx, &i)
if err != nil {
Expand Down Expand Up @@ -339,7 +339,7 @@ func (c *FileCollector) StreamNodes(ctx context.Context, ingestor NodeIngestor)
}

for _, item := range list.Items {
_ = statsd.Incr(metric.CollectorCount, c.tags.node, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.node, 1)
i := item
err = ingestor.IngestNode(ctx, &i)
if err != nil {
Expand All @@ -366,7 +366,7 @@ func (c *FileCollector) StreamClusterRoles(ctx context.Context, ingestor Cluster
}

for _, item := range list.Items {
_ = statsd.Incr(metric.CollectorCount, c.tags.clusterrole, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.clusterrole, 1)
i := item
err = ingestor.IngestClusterRole(ctx, &i)
if err != nil {
Expand All @@ -393,7 +393,7 @@ func (c *FileCollector) StreamClusterRoleBindings(ctx context.Context, ingestor
}

for _, item := range list.Items {
_ = statsd.Incr(metric.CollectorCount, c.tags.clusterrolebinding, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.clusterrolebinding, 1)
i := item
err = ingestor.IngestClusterRoleBinding(ctx, &i)
if err != nil {
Expand Down
22 changes: 11 additions & 11 deletions pkg/collector/k8s_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func (c *k8sAPICollector) wait(ctx context.Context, resourceType string, tags []
}

// entity := tag.Entity(resourceType)
err := statsd.Gauge(metric.CollectorWait, float64(c.waitTime[resourceType]), tags, 1)
err := statsd.Gauge(ctx, metric.CollectorWait, float64(c.waitTime[resourceType]), tags, 1)
if err != nil {
l.Error("could not send gauge", log.ErrorField(err))
}
Expand Down Expand Up @@ -213,19 +213,19 @@ func (c *k8sAPICollector) computeMetrics(ctx context.Context) (Metrics, error) {
}

runDuration := time.Since(c.startTime)
err := statsd.Gauge(metric.CollectorRunWait, float64(runTotalWaitTime), c.tags.baseTags, 1)
err := statsd.Gauge(ctx, metric.CollectorRunWait, float64(runTotalWaitTime), c.tags.baseTags, 1)
if err != nil {
errMetric = errors.Join(errMetric, err)
l.Error("could not send gauge", log.ErrorField(err))
}
err = statsd.Gauge(metric.CollectorRunDuration, float64(runDuration), c.tags.baseTags, 1)
err = statsd.Gauge(ctx, metric.CollectorRunDuration, float64(runDuration), c.tags.baseTags, 1)
if err != nil {
errMetric = errors.Join(errMetric, err)
l.Error("could not send gauge", log.ErrorField(err))
}

runThrottlingPercentage := 1 - (float64(runDuration-runTotalWaitTime) / float64(runDuration))
err = statsd.Gauge(metric.CollectorRunThrottling, runThrottlingPercentage, c.tags.baseTags, 1)
err = statsd.Gauge(ctx, metric.CollectorRunThrottling, runThrottlingPercentage, c.tags.baseTags, 1)
if err != nil {
errMetric = errors.Join(errMetric, err)
l.Error("could not send gauge", log.ErrorField(err))
Expand Down Expand Up @@ -288,7 +288,7 @@ func (c *k8sAPICollector) streamPodsNamespace(ctx context.Context, namespace str
c.setPagerConfig(pager)

return pager.EachListItem(ctx, opts, func(obj runtime.Object) error {
_ = statsd.Incr(metric.CollectorCount, c.tags.pod, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.pod, 1)
c.wait(ctx, entity, c.tags.pod)
item, ok := obj.(*corev1.Pod)
if !ok {
Expand Down Expand Up @@ -343,7 +343,7 @@ func (c *k8sAPICollector) streamRolesNamespace(ctx context.Context, namespace st
c.setPagerConfig(pager)

return pager.EachListItem(ctx, opts, func(obj runtime.Object) error {
_ = statsd.Incr(metric.CollectorCount, c.tags.role, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.role, 1)
c.wait(ctx, entity, c.tags.role)
item, ok := obj.(*rbacv1.Role)
if !ok {
Expand Down Expand Up @@ -398,7 +398,7 @@ func (c *k8sAPICollector) streamRoleBindingsNamespace(ctx context.Context, names
c.setPagerConfig(pager)

return pager.EachListItem(ctx, opts, func(obj runtime.Object) error {
_ = statsd.Incr(metric.CollectorCount, c.tags.rolebinding, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.rolebinding, 1)
c.wait(ctx, entity, c.tags.rolebinding)
item, ok := obj.(*rbacv1.RoleBinding)
if !ok {
Expand Down Expand Up @@ -453,7 +453,7 @@ func (c *k8sAPICollector) streamEndpointsNamespace(ctx context.Context, namespac
c.setPagerConfig(pager)

return pager.EachListItem(ctx, opts, func(obj runtime.Object) error {
_ = statsd.Incr(metric.CollectorCount, c.tags.endpoint, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.endpoint, 1)
c.wait(ctx, entity, c.tags.endpoint)
item, ok := obj.(*discoveryv1.EndpointSlice)
if !ok {
Expand Down Expand Up @@ -507,7 +507,7 @@ func (c *k8sAPICollector) StreamNodes(ctx context.Context, ingestor NodeIngestor
c.setPagerConfig(pager)

err = pager.EachListItem(ctx, opts, func(obj runtime.Object) error {
_ = statsd.Incr(metric.CollectorCount, c.tags.node, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.node, 1)
c.wait(ctx, entity, c.tags.node)
item, ok := obj.(*corev1.Node)
if !ok {
Expand Down Expand Up @@ -550,7 +550,7 @@ func (c *k8sAPICollector) StreamClusterRoles(ctx context.Context, ingestor Clust
c.setPagerConfig(pager)

err = pager.EachListItem(ctx, opts, func(obj runtime.Object) error {
_ = statsd.Incr(metric.CollectorCount, c.tags.clusterrole, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.clusterrole, 1)
c.wait(ctx, entity, c.tags.clusterrole)
item, ok := obj.(*rbacv1.ClusterRole)
if !ok {
Expand Down Expand Up @@ -593,7 +593,7 @@ func (c *k8sAPICollector) StreamClusterRoleBindings(ctx context.Context, ingesto
c.setPagerConfig(pager)

err = pager.EachListItem(ctx, opts, func(obj runtime.Object) error {
_ = statsd.Incr(metric.CollectorCount, c.tags.clusterrolebinding, 1)
_ = statsd.Incr(ctx, metric.CollectorCount, c.tags.clusterrolebinding, 1)
c.wait(ctx, entity, c.tags.clusterrolebinding)
item, ok := obj.(*rbacv1.ClusterRoleBinding)
if !ok {
Expand Down
33 changes: 12 additions & 21 deletions pkg/ingestor/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,19 @@ import (
"github.com/DataDog/KubeHound/pkg/collector"
"github.com/DataDog/KubeHound/pkg/config"
"github.com/DataDog/KubeHound/pkg/dump"
"github.com/DataDog/KubeHound/pkg/ingestor"
grpc "github.com/DataDog/KubeHound/pkg/ingestor/api/grpc/pb"
"github.com/DataDog/KubeHound/pkg/ingestor/notifier"
"github.com/DataDog/KubeHound/pkg/ingestor/puller"
"github.com/DataDog/KubeHound/pkg/kubehound/graph"
"github.com/DataDog/KubeHound/pkg/kubehound/graph/adapter"
"github.com/DataDog/KubeHound/pkg/kubehound/providers"
"github.com/DataDog/KubeHound/pkg/kubehound/store/collections"
"github.com/DataDog/KubeHound/pkg/telemetry/events"
"github.com/DataDog/KubeHound/pkg/telemetry/log"
"github.com/DataDog/KubeHound/pkg/telemetry/span"
"github.com/DataDog/KubeHound/pkg/telemetry/tag"
gremlingo "github.com/apache/tinkerpop/gremlin-go/v3/driver"
"go.mongodb.org/mongo-driver/bson"
"google.golang.org/protobuf/types/known/timestamppb"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/ext"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

Expand Down Expand Up @@ -153,27 +151,24 @@ func (g *IngestorAPI) Ingest(ctx context.Context, path string) error {
runCtx := context.Background()
runCtx = context.WithValue(runCtx, log.ContextFieldCluster, clusterName)
runCtx = context.WithValue(runCtx, log.ContextFieldRunID, runID)
l = log.Logger(runCtx) //nolint: contextcheck
spanJob, runCtx := span.SpanRunFromContext(runCtx, span.IngestorStartJob)
defer func() { spanJob.Finish(tracer.WithError(err)) }()

events.PushEvent(
fmt.Sprintf("Ingesting cluster %s with runID %s", clusterName, runID),
fmt.Sprintf("Ingesting cluster %s with runID %s", clusterName, runID),
[]string{
tag.IngestionRunID(runID),
},
)

l = log.Logger(runCtx) //nolint: contextcheck
alreadyIngested, err := g.isAlreadyIngestedInGraph(runCtx, clusterName, runID) //nolint: contextcheck
if err != nil {
return err
}

if alreadyIngested {
events.PushEventIngestSkip(runCtx) //nolint: contextcheck

return fmt.Errorf("%w [%s:%s]", ErrAlreadyIngested, clusterName, runID)
}

spanJob, runCtx := span.SpanRunFromContext(runCtx, span.IngestorStartJob)
spanJob.SetTag(ext.ManualKeep, true)
defer func() { spanJob.Finish(tracer.WithError(err)) }()

events.PushEventIngestStarted(runCtx) //nolint: contextcheck

// We need to flush the cache to prevent warnings/errors when overwriting elements in cache from the previous ingestion
// This avoid conflicts from previous ingestion (there is no need to reuse the cache from a previous ingestion)
l.Info("Preparing cache provider")
Expand Down Expand Up @@ -209,15 +204,11 @@ func (g *IngestorAPI) Ingest(ctx context.Context, path string) error {
}
}

err = ingestor.IngestData(runCtx, runCfg, collect, g.providers.CacheProvider, g.providers.StoreProvider, g.providers.GraphProvider) //nolint: contextcheck
if err != nil {
return fmt.Errorf("raw data ingest: %w", err)
}

err = graph.BuildGraph(runCtx, runCfg, g.providers.StoreProvider, g.providers.GraphProvider, g.providers.CacheProvider) //nolint: contextcheck
err = g.providers.IngestBuildData(runCtx, runCfg) //nolint: contextcheck
if err != nil {
return err
}

err = g.notifier.Notify(runCtx, clusterName, runID) //nolint: contextcheck
if err != nil {
return fmt.Errorf("notifying: %w", err)
Expand Down
46 changes: 31 additions & 15 deletions pkg/ingestor/puller/blob/blob.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
_ "gocloud.dev/blob/gcsblob"
_ "gocloud.dev/blob/memblob"
"gocloud.dev/blob/s3blob"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace"

"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

Expand Down Expand Up @@ -51,6 +53,9 @@ func NewBlobStorage(cfg *config.KubehoundConfig, blobConfig *config.BlobConfig)
}

func (bs *BlobStore) openBucket(ctx context.Context) (*blob.Bucket, error) {
l := log.Logger(ctx)
l.Info("Opening bucket", log.String("bucket_name", bs.bucketName))

urlStruct, err := url.Parse(bs.bucketName)
if err != nil {
return nil, err
Expand Down Expand Up @@ -132,18 +137,21 @@ func (bs *BlobStore) ListFiles(ctx context.Context, prefix string, recursive boo
// Pull pulls the data from the blob store (e.g: s3) and returns the path of the folder containing the archive
func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName string, runID string) error {
l := log.Logger(outer)
l.Info("Putting data on blob store bucket", log.String("bucket_name", bs.bucketName), log.String(log.FieldClusterKey, clusterName), log.String(log.FieldRunIDKey, runID))
spanPut, ctx := span.SpanRunFromContext(outer, span.IngestorBlobPull)
var err error
defer func() { spanPut.Finish(tracer.WithError(err)) }()
if log.GetRunIDFromContext(outer) != "" {
var spanPut ddtrace.Span
spanPut, outer = span.SpanRunFromContext(outer, span.IngestorBlobPull)
defer func() { spanPut.Finish(tracer.WithError(err)) }()
}
Comment on lines +143 to +147
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

2 comments (I may miss understanding something):

  • GetRunIDFromContext feels weird to be in the log package?
  • Can't we "automatically" do that in the custom span package directly ? Since you alread set default tags in there ? This would also simplify adding other custom field?

It would simplify that call in every places to only the 2 lines:

spanPut, outer = span.SpanRunFromContext(outer, span.IngestorBlobPull)
defer func() { spanPut.Finish(tracer.WithError(err)) }()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a special usecase for the blob. Here it is not to span too many spans (adding a comment).

Where would you prefer GetRunIDFromContext() to be ?

l.Info("Putting data on blob store bucket", log.String("bucket_name", bs.bucketName), log.String(log.FieldClusterKey, clusterName), log.String(log.FieldRunIDKey, runID))

dumpResult, err := dump.NewDumpResult(clusterName, runID, true)
if err != nil {
return err
}
key := dumpResult.GetFullPath()
l.Info("Opening bucket", log.String("bucket_name", bs.bucketName))
b, err := bs.openBucket(ctx)
b, err := bs.openBucket(outer)
if err != nil {
return err
}
Expand All @@ -157,7 +165,7 @@ func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName

l.Info("Uploading archive from blob store", log.String("key", key))
w := bufio.NewReader(f)
err = b.Upload(ctx, key, w, &blob.WriterOptions{
err = b.Upload(outer, key, w, &blob.WriterOptions{
ContentType: "application/gzip",
})
if err != nil {
Expand All @@ -175,13 +183,15 @@ func (bs *BlobStore) Put(outer context.Context, archivePath string, clusterName
// Pull pulls the data from the blob store (e.g: s3) and returns the path of the folder containing the archive
func (bs *BlobStore) Pull(outer context.Context, key string) (string, error) {
l := log.Logger(outer)
l.Info("Pulling data from blob store bucket", log.String("bucket_name", bs.bucketName), log.String("key", key))
spanPull, ctx := span.SpanRunFromContext(outer, span.IngestorBlobPull)
var err error
defer func() { spanPull.Finish(tracer.WithError(err)) }()
if log.GetRunIDFromContext(outer) != "" {
var spanPull ddtrace.Span
spanPull, outer = span.SpanRunFromContext(outer, span.IngestorBlobPull)
defer func() { spanPull.Finish(tracer.WithError(err)) }()
}
l.Info("Pulling data from blob store bucket", log.String("bucket_name", bs.bucketName), log.String("key", key))

l.Info("Opening bucket", log.String("bucket_name", bs.bucketName))
b, err := bs.openBucket(ctx)
b, err := bs.openBucket(outer)
if err != nil {
return "", err
}
Expand All @@ -207,7 +217,7 @@ func (bs *BlobStore) Pull(outer context.Context, key string) (string, error) {

l.Info("Downloading archive from blob store", log.String("key", key))
w := bufio.NewWriter(f)
err = b.Download(ctx, key, w, nil)
err = b.Download(outer, key, w, nil)
if err != nil {
return archivePath, err
}
Expand All @@ -221,9 +231,12 @@ func (bs *BlobStore) Pull(outer context.Context, key string) (string, error) {
}

func (bs *BlobStore) Extract(ctx context.Context, archivePath string) error {
spanExtract, _ := span.SpanRunFromContext(ctx, span.IngestorBlobExtract)
var err error
defer func() { spanExtract.Finish(tracer.WithError(err)) }()
if log.GetRunIDFromContext(ctx) != "" {
var spanPull ddtrace.Span
spanPull, ctx = span.SpanRunFromContext(ctx, span.IngestorBlobExtract)
defer func() { spanPull.Finish(tracer.WithError(err)) }()
}

basePath := filepath.Dir(archivePath)
err = puller.CheckSanePath(archivePath, basePath)
Expand All @@ -243,9 +256,12 @@ func (bs *BlobStore) Extract(ctx context.Context, archivePath string) error {
// Once downloaded and processed, we should cleanup the disk so we can reduce the disk usage
// required for large infrastructure
func (bs *BlobStore) Close(ctx context.Context, archivePath string) error {
spanClose, _ := span.SpanRunFromContext(ctx, span.IngestorBlobClose)
var err error
defer func() { spanClose.Finish(tracer.WithError(err)) }()
if log.GetRunIDFromContext(ctx) != "" {
var spanClose ddtrace.Span
spanClose, _ = span.SpanRunFromContext(ctx, span.IngestorBlobClose)
defer func() { spanClose.Finish(tracer.WithError(err)) }()
}

path := filepath.Dir(archivePath)
err = puller.CheckSanePath(archivePath, bs.cfg.Ingestor.TempDir)
Expand Down
Loading
Loading