diff --git a/pkg/ingestor/api/api.go b/pkg/ingestor/api/api.go index 3304e98ef..7aeef285a 100644 --- a/pkg/ingestor/api/api.go +++ b/pkg/ingestor/api/api.go @@ -54,10 +54,13 @@ func NewIngestorAPI(cfg *config.KubehoundConfig, puller puller.DataPuller, notif } } +func (g *IngestorAPI) Close(ctx context.Context) { + g.providers.Close(ctx) +} + // RehydrateLatest is just a GRPC wrapper around the Ingest method from the API package func (g *IngestorAPI) RehydrateLatest(ctx context.Context) ([]*grpc.IngestedCluster, error) { l := log.Logger(ctx) - l.Error("id123") // first level key are cluster names directories, errRet := g.puller.ListFiles(ctx, "", false) if errRet != nil { @@ -204,6 +207,12 @@ func (g *IngestorAPI) Ingest(ctx context.Context, path string) error { } } + // Keeping only the latest dump for each cluster in memory + err = g.providers.GraphProvider.Clean(runCtx, clusterName) //nolint: contextcheck + if err != nil { + return err + } + err = g.providers.IngestBuildData(runCtx, runCfg) //nolint: contextcheck if err != nil { return err diff --git a/pkg/kubehound/core/core_grpc_api.go b/pkg/kubehound/core/core_grpc_api.go index 12bfaaec7..ad9f1e9c2 100644 --- a/pkg/kubehound/core/core_grpc_api.go +++ b/pkg/kubehound/core/core_grpc_api.go @@ -10,12 +10,13 @@ import ( "github.com/DataDog/KubeHound/pkg/ingestor/notifier/noop" "github.com/DataDog/KubeHound/pkg/ingestor/puller/blob" "github.com/DataDog/KubeHound/pkg/kubehound/providers" + "github.com/DataDog/KubeHound/pkg/telemetry/events" "github.com/DataDog/KubeHound/pkg/telemetry/log" "github.com/DataDog/KubeHound/pkg/telemetry/span" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) -func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error { +func initCoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) (*api.IngestorAPI, error) { l := log.Logger(ctx) l.Info("Starting KubeHound Distributed Ingestor Service") span, ctx := span.SpanRunFromContext(ctx, span.IngestorLaunch) @@ -28,22 +29,34 @@ func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error { l.Info("Initializing providers (graph, cache, store)") p, err := providers.NewProvidersFactoryConfig(ctx, khCfg) if err != nil { - return fmt.Errorf("factory config creation: %w", err) + return nil, fmt.Errorf("factory config creation: %w", err) } - defer p.Close(ctx) l.Info("Creating Blob Storage provider") puller, err := blob.NewBlobStorage(khCfg, khCfg.Ingestor.Blob) if err != nil { - return err + return nil, err } l.Info("Creating Noop Notifier") noopNotifier := noop.NewNoopNotifier() l.Info("Creating Ingestor API") - ingestorApi := api.NewIngestorAPI(khCfg, puller, noopNotifier, p) + return api.NewIngestorAPI(khCfg, puller, noopNotifier, p), nil +} + +func CoreGrpcApi(ctx context.Context, khCfg *config.KubehoundConfig) error { + ingestorApi, err := initCoreGrpcApi(ctx, khCfg) + if err != nil { + _ = events.PushEvent(ctx, events.IngestorFailed, "") + + return err + } + defer ingestorApi.Close(ctx) + _ = events.PushEvent(ctx, events.IngestorInit, "") + + l := log.Logger(ctx) l.Info("Starting Ingestor API") err = grpc.Listen(ctx, ingestorApi) if err != nil { diff --git a/pkg/kubehound/storage/graphdb/janusgraph_provider.go b/pkg/kubehound/storage/graphdb/janusgraph_provider.go index ad883dadd..8a4d04dd9 100644 --- a/pkg/kubehound/storage/graphdb/janusgraph_provider.go +++ b/pkg/kubehound/storage/graphdb/janusgraph_provider.go @@ -10,8 +10,10 @@ import ( "github.com/DataDog/KubeHound/pkg/kubehound/graph/vertex" "github.com/DataDog/KubeHound/pkg/kubehound/storage/cache" "github.com/DataDog/KubeHound/pkg/telemetry/log" + "github.com/DataDog/KubeHound/pkg/telemetry/span" "github.com/DataDog/KubeHound/pkg/telemetry/tag" gremlin "github.com/apache/tinkerpop/gremlin-go/v3/driver" + "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) const ( @@ -145,3 +147,32 @@ func (jgp *JanusGraphProvider) Close(ctx context.Context) error { return nil } + +// Raw returns a handle to the underlying provider to allow implementation specific operations e.g graph queries. +func (jgp *JanusGraphProvider) Clean(ctx context.Context, cluster string) error { + var err error + span, ctx := span.SpanRunFromContext(ctx, span.IngestorClean) + defer func() { span.Finish(tracer.WithError(err)) }() + l := log.Trace(ctx) + l.Infof("Cleaning cluster", log.FieldClusterKey, cluster) + g := gremlin.Traversal_().WithRemote(jgp.drc) + tx := g.Tx() + defer tx.Close() + + gtx, err := tx.Begin() + if err != nil { + return err + } + + err = <-gtx.V().Has("cluster", cluster).Drop().Iterate() + if err != nil { + return err + } + + err = tx.Commit() + if err != nil { + return err + } + + return nil +} diff --git a/pkg/kubehound/storage/graphdb/mocks/graph_provider.go b/pkg/kubehound/storage/graphdb/mocks/graph_provider.go index 68d040b74..6dd9561bb 100644 --- a/pkg/kubehound/storage/graphdb/mocks/graph_provider.go +++ b/pkg/kubehound/storage/graphdb/mocks/graph_provider.go @@ -29,6 +29,49 @@ func (_m *Provider) EXPECT() *Provider_Expecter { return &Provider_Expecter{mock: &_m.Mock} } +// Clean provides a mock function with given fields: ctx, cluster +func (_m *Provider) Clean(ctx context.Context, cluster string) error { + ret := _m.Called(ctx, cluster) + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, string) error); ok { + r0 = rf(ctx, cluster) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Provider_Clean_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Clean' +type Provider_Clean_Call struct { + *mock.Call +} + +// Clean is a helper method to define mock.On call +// - ctx context.Context +// - cluster string +func (_e *Provider_Expecter) Clean(ctx interface{}, cluster interface{}) *Provider_Clean_Call { + return &Provider_Clean_Call{Call: _e.mock.On("Clean", ctx, cluster)} +} + +func (_c *Provider_Clean_Call) Run(run func(ctx context.Context, cluster string)) *Provider_Clean_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *Provider_Clean_Call) Return(_a0 error) *Provider_Clean_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *Provider_Clean_Call) RunAndReturn(run func(context.Context, string) error) *Provider_Clean_Call { + _c.Call.Return(run) + return _c +} + // Close provides a mock function with given fields: ctx func (_m *Provider) Close(ctx context.Context) error { ret := _m.Called(ctx) diff --git a/pkg/kubehound/storage/graphdb/provider.go b/pkg/kubehound/storage/graphdb/provider.go index eb37eb22d..bd82a7e11 100644 --- a/pkg/kubehound/storage/graphdb/provider.go +++ b/pkg/kubehound/storage/graphdb/provider.go @@ -35,6 +35,9 @@ type Provider interface { // Raw returns a handle to the underlying provider to allow implementation specific operations e.g graph queries. Raw() any + // Droping all assets from the graph database from a cluster name + Clean(ctx context.Context, cluster string) error + // VertexWriter creates a new AsyncVertexWriter instance to enable asynchronous bulk inserts of vertices. VertexWriter(ctx context.Context, v vertex.Builder, c cache.CacheProvider, opts ...WriterOption) (AsyncVertexWriter, error) diff --git a/pkg/telemetry/span/spans.go b/pkg/telemetry/span/spans.go index bd4ec114d..3d1c732a5 100644 --- a/pkg/telemetry/span/spans.go +++ b/pkg/telemetry/span/spans.go @@ -39,6 +39,7 @@ const ( IngestorBlobPut = "kubehound.ingestor.blob.put" IngestorBlobExtract = "kubehound.ingestor.blob.extract" IngestorBlobClose = "kubehound.ingestor.blob.close" + IngestorClean = "kubehound.ingestor.clean" DumperLaunch = "kubehound.dumper.launch"