From 8a18333e1efc1292140279d8ff85d6e4e417a854 Mon Sep 17 00:00:00 2001 From: jt-dd Date: Wed, 13 Nov 2024 22:32:54 +0100 Subject: [PATCH] only-one-copy-in-graph-per-cluster --- pkg/ingestor/api/api.go | 5 ++- pkg/kubehound/core/core_grpc_api.go | 21 ++++++--- .../storage/graphdb/janusgraph_provider.go | 26 +++++++++++ .../storage/graphdb/mocks/graph_provider.go | 43 +++++++++++++++++++ pkg/kubehound/storage/graphdb/provider.go | 3 ++ 5 files changed, 92 insertions(+), 6 deletions(-) diff --git a/pkg/ingestor/api/api.go b/pkg/ingestor/api/api.go index aa8c248a0..59ee84539 100644 --- a/pkg/ingestor/api/api.go +++ b/pkg/ingestor/api/api.go @@ -56,10 +56,13 @@ func NewIngestorAPI(cfg *config.KubehoundConfig, puller puller.DataPuller, notif } } +func (g *IngestorAPI) Close(ctx context.Context) { + g.providers.Close(ctx) +} + // RehydrateLatest is just a GRPC wrapper around the Ingest method from the API package func (g *IngestorAPI) RehydrateLatest(ctx context.Context) ([]*grpc.IngestedCluster, error) { l := log.Logger(ctx) - l.Error("id123") // first level key are cluster names directories, errRet := g.puller.ListFiles(ctx, "", false) if errRet != nil { diff --git a/pkg/kubehound/core/core_grpc_api.go b/pkg/kubehound/core/core_grpc_api.go index 12bfaaec7..490bdd57d 100644 --- a/pkg/kubehound/core/core_grpc_api.go +++ b/pkg/kubehound/core/core_grpc_api.go @@ -10,12 +10,13 @@ import ( "github.com/DataDog/KubeHound/pkg/ingestor/notifier/noop" "github.com/DataDog/KubeHound/pkg/ingestor/puller/blob" "github.com/DataDog/KubeHound/pkg/kubehound/providers" + "github.com/DataDog/KubeHound/pkg/telemetry/events" "github.com/DataDog/KubeHound/pkg/telemetry/log" "github.com/DataDog/KubeHound/pkg/telemetry/span" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) -func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error { +func initCoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) (*api.IngestorAPI, error) { l := log.Logger(ctx) l.Info("Starting KubeHound Distributed Ingestor Service") span, ctx := span.SpanRunFromContext(ctx, span.IngestorLaunch) @@ -28,22 +29,32 @@ func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error { l.Info("Initializing providers (graph, cache, store)") p, err := providers.NewProvidersFactoryConfig(ctx, khCfg) if err != nil { - return fmt.Errorf("factory config creation: %w", err) + return nil, fmt.Errorf("factory config creation: %w", err) } - defer p.Close(ctx) l.Info("Creating Blob Storage provider") puller, err := blob.NewBlobStorage(khCfg, khCfg.Ingestor.Blob) if err != nil { - return err + return nil, err } l.Info("Creating Noop Notifier") noopNotifier := noop.NewNoopNotifier() l.Info("Creating Ingestor API") - ingestorApi := api.NewIngestorAPI(khCfg, puller, noopNotifier, p) + return api.NewIngestorAPI(khCfg, puller, noopNotifier, p), nil +} +func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error { + ingestorApi, err := initCoreGrpcApi(ctx, khCfg) + defer ingestorApi.Close(ctx) + if err != nil { + events.PushEventIngestorFailed(ctx) + return err + } + events.PushEventIngestorInit(ctx) + + l := log.Logger(ctx) l.Info("Starting Ingestor API") err = grpc.Listen(ctx, ingestorApi) if err != nil { diff --git a/pkg/kubehound/storage/graphdb/janusgraph_provider.go b/pkg/kubehound/storage/graphdb/janusgraph_provider.go index ad883dadd..bff5ad207 100644 --- a/pkg/kubehound/storage/graphdb/janusgraph_provider.go +++ b/pkg/kubehound/storage/graphdb/janusgraph_provider.go @@ -145,3 +145,29 @@ func (jgp *JanusGraphProvider) Close(ctx context.Context) error { return nil } + +// Raw returns a handle to the underlying provider to allow implementation specific operations e.g graph queries. +func (jgp *JanusGraphProvider) Clean(ctx context.Context, cluster string) error { + l := log.Trace(ctx) + l.Infof("Cleaning cluster", log.FieldClusterKey, cluster) + g := gremlin.Traversal_().WithRemote(jgp.drc) + tx := g.Tx() + defer tx.Close() + + gtx, err := tx.Begin() + if err != nil { + return err + } + + err = <-gtx.V().Has("cluster", cluster).Drop().Iterate() + if err != nil { + return err + } + + err = tx.Commit() + if err != nil { + return err + } + + return nil +} diff --git a/pkg/kubehound/storage/graphdb/mocks/graph_provider.go b/pkg/kubehound/storage/graphdb/mocks/graph_provider.go index 68d040b74..6dd9561bb 100644 --- a/pkg/kubehound/storage/graphdb/mocks/graph_provider.go +++ b/pkg/kubehound/storage/graphdb/mocks/graph_provider.go @@ -29,6 +29,49 @@ func (_m *Provider) EXPECT() *Provider_Expecter { return &Provider_Expecter{mock: &_m.Mock} } +// Clean provides a mock function with given fields: ctx, cluster +func (_m *Provider) Clean(ctx context.Context, cluster string) error { + ret := _m.Called(ctx, cluster) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, cluster) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Provider_Clean_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Clean' +type Provider_Clean_Call struct { + *mock.Call +} + +// Clean is a helper method to define mock.On call +// - ctx context.Context +// - cluster string +func (_e *Provider_Expecter) Clean(ctx interface{}, cluster interface{}) *Provider_Clean_Call { + return &Provider_Clean_Call{Call: _e.mock.On("Clean", ctx, cluster)} +} + +func (_c *Provider_Clean_Call) Run(run func(ctx context.Context, cluster string)) *Provider_Clean_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *Provider_Clean_Call) Return(_a0 error) *Provider_Clean_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Provider_Clean_Call) RunAndReturn(run func(context.Context, string) error) *Provider_Clean_Call { + _c.Call.Return(run) + return _c +} + // Close provides a mock function with given fields: ctx func (_m *Provider) Close(ctx context.Context) error { ret := _m.Called(ctx) diff --git a/pkg/kubehound/storage/graphdb/provider.go b/pkg/kubehound/storage/graphdb/provider.go index eb37eb22d..bd82a7e11 100644 --- a/pkg/kubehound/storage/graphdb/provider.go +++ b/pkg/kubehound/storage/graphdb/provider.go @@ -35,6 +35,9 @@ type Provider interface { // Raw returns a handle to the underlying provider to allow implementation specific operations e.g graph queries. Raw() any + // Droping all assets from the graph database from a cluster name + Clean(ctx context.Context, cluster string) error + // VertexWriter creates a new AsyncVertexWriter instance to enable asynchronous bulk inserts of vertices. VertexWriter(ctx context.Context, v vertex.Builder, c cache.CacheProvider, opts ...WriterOption) (AsyncVertexWriter, error)