Skip to content

Commit

Permalink
only-one-copy-in-graph-per-cluster (#286)
Browse files Browse the repository at this point in the history
* only-one-copy-in-graph-per-cluster

* fix split branches

* commenting events creation

* fix linter

* fix typo

* adding clean function in grpc ingestion

* adding trace for cleaning

* fix linter

* readding events
  • Loading branch information
jt-dd authored Nov 14, 2024
1 parent 2d1e729 commit 724c53f
Show file tree
Hide file tree
Showing 6 changed files with 106 additions and 6 deletions.
11 changes: 10 additions & 1 deletion pkg/ingestor/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,13 @@ func NewIngestorAPI(cfg *config.KubehoundConfig, puller puller.DataPuller, notif
}
}

func (g *IngestorAPI) Close(ctx context.Context) {
g.providers.Close(ctx)
}

// RehydrateLatest is just a GRPC wrapper around the Ingest method from the API package
func (g *IngestorAPI) RehydrateLatest(ctx context.Context) ([]*grpc.IngestedCluster, error) {
l := log.Logger(ctx)
l.Error("id123")
// first level key are cluster names
directories, errRet := g.puller.ListFiles(ctx, "", false)
if errRet != nil {
Expand Down Expand Up @@ -204,6 +207,12 @@ func (g *IngestorAPI) Ingest(ctx context.Context, path string) error {
}
}

// Keeping only the latest dump for each cluster in memory
err = g.providers.GraphProvider.Clean(runCtx, clusterName) //nolint: contextcheck
if err != nil {
return err
}

err = g.providers.IngestBuildData(runCtx, runCfg) //nolint: contextcheck
if err != nil {
return err
Expand Down
23 changes: 18 additions & 5 deletions pkg/kubehound/core/core_grpc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,13 @@ import (
"github.com/DataDog/KubeHound/pkg/ingestor/notifier/noop"
"github.com/DataDog/KubeHound/pkg/ingestor/puller/blob"
"github.com/DataDog/KubeHound/pkg/kubehound/providers"
"github.com/DataDog/KubeHound/pkg/telemetry/events"
"github.com/DataDog/KubeHound/pkg/telemetry/log"
"github.com/DataDog/KubeHound/pkg/telemetry/span"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error {
func initCoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) (*api.IngestorAPI, error) {
l := log.Logger(ctx)
l.Info("Starting KubeHound Distributed Ingestor Service")
span, ctx := span.SpanRunFromContext(ctx, span.IngestorLaunch)
Expand All @@ -28,22 +29,34 @@ func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error {
l.Info("Initializing providers (graph, cache, store)")
p, err := providers.NewProvidersFactoryConfig(ctx, khCfg)
if err != nil {
return fmt.Errorf("factory config creation: %w", err)
return nil, fmt.Errorf("factory config creation: %w", err)
}
defer p.Close(ctx)

l.Info("Creating Blob Storage provider")
puller, err := blob.NewBlobStorage(khCfg, khCfg.Ingestor.Blob)
if err != nil {
return err
return nil, err
}

l.Info("Creating Noop Notifier")
noopNotifier := noop.NewNoopNotifier()

l.Info("Creating Ingestor API")
ingestorApi := api.NewIngestorAPI(khCfg, puller, noopNotifier, p)

return api.NewIngestorAPI(khCfg, puller, noopNotifier, p), nil
}

func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error {
ingestorApi, err := initCoreGrpcApi(ctx, khCfg)
if err != nil {
_ = events.PushEvent(ctx, events.IngestorFailed, "")

return err
}
defer ingestorApi.Close(ctx)
_ = events.PushEvent(ctx, events.IngestorInit, "")

l := log.Logger(ctx)
l.Info("Starting Ingestor API")
err = grpc.Listen(ctx, ingestorApi)
if err != nil {
Expand Down
31 changes: 31 additions & 0 deletions pkg/kubehound/storage/graphdb/janusgraph_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"github.com/DataDog/KubeHound/pkg/kubehound/graph/vertex"
"github.com/DataDog/KubeHound/pkg/kubehound/storage/cache"
"github.com/DataDog/KubeHound/pkg/telemetry/log"
"github.com/DataDog/KubeHound/pkg/telemetry/span"
"github.com/DataDog/KubeHound/pkg/telemetry/tag"
gremlin "github.com/apache/tinkerpop/gremlin-go/v3/driver"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

const (
Expand Down Expand Up @@ -145,3 +147,32 @@ func (jgp *JanusGraphProvider) Close(ctx context.Context) error {

return nil
}

// Raw returns a handle to the underlying provider to allow implementation specific operations e.g graph queries.
func (jgp *JanusGraphProvider) Clean(ctx context.Context, cluster string) error {
var err error
span, ctx := span.SpanRunFromContext(ctx, span.IngestorClean)
defer func() { span.Finish(tracer.WithError(err)) }()
l := log.Trace(ctx)
l.Infof("Cleaning cluster", log.FieldClusterKey, cluster)
g := gremlin.Traversal_().WithRemote(jgp.drc)
tx := g.Tx()
defer tx.Close()

gtx, err := tx.Begin()
if err != nil {
return err
}

err = <-gtx.V().Has("cluster", cluster).Drop().Iterate()
if err != nil {
return err
}

err = tx.Commit()
if err != nil {
return err
}

return nil
}
43 changes: 43 additions & 0 deletions pkg/kubehound/storage/graphdb/mocks/graph_provider.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions pkg/kubehound/storage/graphdb/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,9 @@ type Provider interface {
// Raw returns a handle to the underlying provider to allow implementation specific operations e.g graph queries.
Raw() any

// Droping all assets from the graph database from a cluster name
Clean(ctx context.Context, cluster string) error

// VertexWriter creates a new AsyncVertexWriter instance to enable asynchronous bulk inserts of vertices.
VertexWriter(ctx context.Context, v vertex.Builder, c cache.CacheProvider, opts ...WriterOption) (AsyncVertexWriter, error)

Expand Down
1 change: 1 addition & 0 deletions pkg/telemetry/span/spans.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ const (
IngestorBlobPut = "kubehound.ingestor.blob.put"
IngestorBlobExtract = "kubehound.ingestor.blob.extract"
IngestorBlobClose = "kubehound.ingestor.blob.close"
IngestorClean = "kubehound.ingestor.clean"

DumperLaunch = "kubehound.dumper.launch"

Expand Down

0 comments on commit 724c53f

Please sign in to comment.