diff --git a/pkg/aggregator/aggregator.go b/pkg/aggregator/aggregator.go index d19acbb0..4c7c976c 100644 --- a/pkg/aggregator/aggregator.go +++ b/pkg/aggregator/aggregator.go @@ -33,7 +33,7 @@ import ( "github.com/kubewharf/kelemetry/pkg/aggregator/tracer" "github.com/kubewharf/kelemetry/pkg/manager" "github.com/kubewharf/kelemetry/pkg/metrics" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" "github.com/kubewharf/kelemetry/pkg/util/zconstants" ) @@ -109,7 +109,7 @@ type Aggregator interface { // it waits for the primary event to be created and takes it as the parent. // If the primary event does not get created after options.subObjectPrimaryBackoff, this event is promoted as primary. // If multiple primary events are sent, the slower one (by SpanCache-authoritative timing) is demoted. - Send(ctx context.Context, object util.ObjectRef, event *aggregatorevent.Event, subObjectId *SubObjectId) error + Send(ctx context.Context, object utilobject.Rich, event *aggregatorevent.Event, subObjectId *SubObjectId) error } type SubObjectId struct { @@ -183,7 +183,7 @@ func (aggregator *aggregator) Close(ctx context.Context) error { return nil } func (aggregator *aggregator) Send( ctx context.Context, - object util.ObjectRef, + object utilobject.Rich, event *aggregatorevent.Event, subObjectId *SubObjectId, ) (err error) { @@ -370,12 +370,12 @@ func (aggregator *aggregator) Send( func (aggregator *aggregator) ensureObjectSpan( ctx context.Context, - object util.ObjectRef, + object utilobject.Rich, eventTime time.Time, ) (tracer.SpanContext, error) { return aggregator.getOrCreateSpan(ctx, object, eventTime, func() (_ tracer.SpanContext, err error) { // try to associate a parent object - var parent *util.ObjectRef + var parent *utilobject.Rich for _, linker := range aggregator.Linkers.Impls { parent = linker.Lookup(ctx, object) @@ -395,7 +395,7 @@ func (aggregator *aggregator) ensureObjectSpan( func (aggregator *aggregator) getOrCreateSpan( ctx context.Context, - object util.ObjectRef, + object utilobject.Rich, eventTime time.Time, parentGetter func() (tracer.SpanContext, error), ) (tracer.SpanContext, error) { @@ -530,7 +530,7 @@ func (aggregator *aggregator) getOrCreateSpan( func (aggregator *aggregator) createSpan( ctx context.Context, - object util.ObjectRef, + object utilobject.Rich, nestLevel string, eventTime time.Time, parent tracer.SpanContext, @@ -580,11 +580,11 @@ func (aggregator *aggregator) createSpan( return spanContext, nil } -func (aggregator *aggregator) expiringSpanCacheKey(object util.ObjectRef, timestamp time.Time) string { +func (aggregator *aggregator) expiringSpanCacheKey(object utilobject.Rich, timestamp time.Time) string { expiringWindow := timestamp.Unix() / int64(aggregator.options.spanTtl.Seconds()) return aggregator.spanCacheKey(object, fmt.Sprintf("field=object,window=%d", expiringWindow)) } -func (aggregator *aggregator) spanCacheKey(object util.ObjectRef, subObjectId string) string { +func (aggregator *aggregator) spanCacheKey(object utilobject.Rich, subObjectId string) string { return fmt.Sprintf("%s/%s", object.String(), subObjectId) } diff --git a/pkg/aggregator/eventdecorator/decorator.go b/pkg/aggregator/eventdecorator/decorator.go index b5d627d9..fadbe09d 100644 --- a/pkg/aggregator/eventdecorator/decorator.go +++ b/pkg/aggregator/eventdecorator/decorator.go @@ -18,9 +18,9 @@ import ( "context" "github.com/kubewharf/kelemetry/pkg/aggregator/aggregatorevent" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) type Decorator interface { - Decorate(ctx context.Context, object util.ObjectRef, event *aggregatorevent.Event) + Decorate(ctx context.Context, object utilobject.Rich, event *aggregatorevent.Event) } diff --git a/pkg/aggregator/eventdecorator/eventtagger/eventtagger.go b/pkg/aggregator/eventdecorator/eventtagger/eventtagger.go index fd89376a..f6a97ad5 100644 --- a/pkg/aggregator/eventdecorator/eventtagger/eventtagger.go +++ b/pkg/aggregator/eventdecorator/eventtagger/eventtagger.go @@ -24,7 +24,7 @@ import ( "github.com/kubewharf/kelemetry/pkg/aggregator/eventdecorator" "github.com/kubewharf/kelemetry/pkg/aggregator/resourcetagger" "github.com/kubewharf/kelemetry/pkg/manager" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) func init() { @@ -71,7 +71,7 @@ func (d *eventTagDecorator) Init() error { func (d *eventTagDecorator) Start(ctx context.Context) error { return nil } func (d *eventTagDecorator) Close(ctx context.Context) error { return nil } -func (d *eventTagDecorator) Decorate(ctx context.Context, object util.ObjectRef, event *aggregatorevent.Event) { +func (d *eventTagDecorator) Decorate(ctx context.Context, object utilobject.Rich, event *aggregatorevent.Event) { if event == nil { return } diff --git a/pkg/aggregator/linker/linker.go b/pkg/aggregator/linker/linker.go index 4cf2458f..f648be25 100644 --- a/pkg/aggregator/linker/linker.go +++ b/pkg/aggregator/linker/linker.go @@ -17,9 +17,9 @@ package linker import ( "context" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) type Linker interface { - Lookup(ctx context.Context, object util.ObjectRef) *util.ObjectRef + Lookup(ctx context.Context, object utilobject.Rich) *utilobject.Rich } diff --git a/pkg/aggregator/objectspandecorator/decorator.go b/pkg/aggregator/objectspandecorator/decorator.go index 3443a96d..3c6662e9 100644 --- a/pkg/aggregator/objectspandecorator/decorator.go +++ b/pkg/aggregator/objectspandecorator/decorator.go @@ -17,9 +17,9 @@ package objectspandecorator import ( "context" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) type Decorator interface { - Decorate(ctx context.Context, object util.ObjectRef, traceSource string, tags map[string]string) + Decorate(ctx context.Context, object utilobject.Rich, traceSource string, tags map[string]string) } diff --git a/pkg/aggregator/objectspandecorator/resourcetagger/objecttagger.go b/pkg/aggregator/objectspandecorator/resourcetagger/objecttagger.go index 0cf723d7..ee4a00b7 100644 --- a/pkg/aggregator/objectspandecorator/resourcetagger/objecttagger.go +++ b/pkg/aggregator/objectspandecorator/resourcetagger/objecttagger.go @@ -23,7 +23,7 @@ import ( "github.com/kubewharf/kelemetry/pkg/aggregator/objectspandecorator" "github.com/kubewharf/kelemetry/pkg/aggregator/resourcetagger" "github.com/kubewharf/kelemetry/pkg/manager" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) func init() { @@ -54,7 +54,7 @@ func (d *ObjectSpanTag) Init() error { return nil } func (d *ObjectSpanTag) Start(ctx context.Context) error { return nil } func (d *ObjectSpanTag) Close(ctx context.Context) error { return nil } -func (d *ObjectSpanTag) Decorate(ctx context.Context, object util.ObjectRef, traceSource string, tags map[string]string) { +func (d *ObjectSpanTag) Decorate(ctx context.Context, object utilobject.Rich, traceSource string, tags map[string]string) { if tags == nil { return } diff --git a/pkg/aggregator/resourcetagger/resource_tagger.go b/pkg/aggregator/resourcetagger/resource_tagger.go index b83e4722..0646494e 100644 --- a/pkg/aggregator/resourcetagger/resource_tagger.go +++ b/pkg/aggregator/resourcetagger/resource_tagger.go @@ -31,7 +31,7 @@ import ( "github.com/kubewharf/kelemetry/pkg/k8s/objectcache" "github.com/kubewharf/kelemetry/pkg/manager" "github.com/kubewharf/kelemetry/pkg/metrics" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) func init() { @@ -133,7 +133,7 @@ func (d *ResourceTagger) registerResource(gr schema.GroupResource, tagPathMappin } } -func (d *ResourceTagger) DecorateTag(ctx context.Context, object util.ObjectRef, traceSource string, tags map[string]any) { +func (d *ResourceTagger) DecorateTag(ctx context.Context, object utilobject.Rich, traceSource string, tags map[string]any) { if tags == nil { return } @@ -153,7 +153,7 @@ func (d *ResourceTagger) DecorateTag(ctx context.Context, object util.ObjectRef, logger.Debug("Fetching dynamic object for tag decorator") var err error - raw, err = d.ObjectCache.Get(ctx, object) + raw, err = d.ObjectCache.Get(ctx, object.VersionedKey) if err != nil { tagMetric.Result = "FetchErr" logger.WithError(err).Error("cannot fetch object value") diff --git a/pkg/annotationlinker/linker.go b/pkg/annotationlinker/linker.go index c2b5a362..46841d36 100644 --- a/pkg/annotationlinker/linker.go +++ b/pkg/annotationlinker/linker.go @@ -20,14 +20,13 @@ import ( "github.com/sirupsen/logrus" "github.com/spf13/pflag" - "k8s.io/apimachinery/pkg/runtime/schema" "github.com/kubewharf/kelemetry/pkg/aggregator/linker" "github.com/kubewharf/kelemetry/pkg/k8s" "github.com/kubewharf/kelemetry/pkg/k8s/discovery" "github.com/kubewharf/kelemetry/pkg/k8s/objectcache" "github.com/kubewharf/kelemetry/pkg/manager" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) func init() { @@ -59,7 +58,7 @@ func (ctrl *controller) Init() error { return nil } func (ctrl *controller) Start(ctx context.Context) error { return nil } func (ctrl *controller) Close(ctx context.Context) error { return nil } -func (ctrl *controller) Lookup(ctx context.Context, object util.ObjectRef) *util.ObjectRef { +func (ctrl *controller) Lookup(ctx context.Context, object utilobject.Rich) *utilobject.Rich { raw := object.Raw logger := ctrl.Logger.WithFields(object.AsFields("object")) @@ -68,7 +67,7 @@ func (ctrl *controller) Lookup(ctx context.Context, object util.ObjectRef) *util logger.Debug("Fetching dynamic object") var err error - raw, err = ctrl.ObjectCache.Get(ctx, object) + raw, err = ctrl.ObjectCache.Get(ctx, object.VersionedKey) if err != nil { logger.WithError(err).Error("cannot fetch object value") @@ -93,20 +92,10 @@ func (ctrl *controller) Lookup(ctx context.Context, object util.ObjectRef) *util ref.Cluster = object.Cluster } - objectRef := &util.ObjectRef{ - Cluster: ref.Cluster, - GroupVersionResource: schema.GroupVersionResource{ - Group: ref.GroupVersionResource.Group, - Version: ref.GroupVersionResource.Version, - Resource: ref.GroupVersionResource.Resource, - }, - Namespace: ref.Namespace, - Name: ref.Name, - Uid: ref.Uid, - } + objectRef := ref.ToRich() logger.WithField("parent", objectRef).Debug("Resolved parent") - return objectRef + return &objectRef } return nil diff --git a/pkg/annotationlinker/schema.go b/pkg/annotationlinker/schema.go index c9517d67..ce13026d 100644 --- a/pkg/annotationlinker/schema.go +++ b/pkg/annotationlinker/schema.go @@ -17,6 +17,8 @@ package annotationlinker import ( metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/types" + + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) const LinkAnnotation = "kelemetry.kubewharf.io/parent-link" @@ -31,3 +33,19 @@ type ParentLink struct { Uid types.UID `json:"uid"` } + +func (ln ParentLink) ToRich() utilobject.Rich { + return utilobject.Rich{ + VersionedKey: utilobject.VersionedKey{ + Key: utilobject.Key{ + Cluster: ln.Cluster, + Group: ln.Group, + Resource: ln.Resource, + Namespace: ln.Namespace, + Name: ln.Name, + }, + Version: ln.Version, + }, + Uid: ln.Uid, + } +} diff --git a/pkg/audit/consumer/consumer.go b/pkg/audit/consumer/consumer.go index 94ec24d3..a937cb70 100644 --- a/pkg/audit/consumer/consumer.go +++ b/pkg/audit/consumer/consumer.go @@ -38,7 +38,7 @@ import ( "github.com/kubewharf/kelemetry/pkg/k8s/discovery" "github.com/kubewharf/kelemetry/pkg/manager" "github.com/kubewharf/kelemetry/pkg/metrics" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" "github.com/kubewharf/kelemetry/pkg/util/shutdown" "github.com/kubewharf/kelemetry/pkg/util/zconstants" ) @@ -210,17 +210,7 @@ func (recv *receiver) handleItem( } } - objectRef := util.ObjectRef{ - Cluster: message.Cluster, - GroupVersionResource: schema.GroupVersionResource{ - Group: message.ObjectRef.APIGroup, - Version: message.ObjectRef.APIVersion, - Resource: message.ObjectRef.Resource, - }, - Namespace: message.ObjectRef.Namespace, - Name: message.ObjectRef.Name, - Uid: message.ObjectRef.UID, - } + objectRef := utilobject.RichFromAudit(message.ObjectRef, message.Cluster) if message.ResponseObject != nil { objectRef.Raw = &unstructured.Unstructured{ diff --git a/pkg/diff/api/api.go b/pkg/diff/api/api.go index 111ad774..9423facf 100644 --- a/pkg/diff/api/api.go +++ b/pkg/diff/api/api.go @@ -31,7 +31,7 @@ import ( "github.com/kubewharf/kelemetry/pkg/k8s/objectcache" "github.com/kubewharf/kelemetry/pkg/manager" "github.com/kubewharf/kelemetry/pkg/metrics" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" "github.com/kubewharf/kelemetry/pkg/util/shutdown" ) @@ -114,15 +114,15 @@ func (api *api) handleGet(ctx *gin.Context) error { cluster = clusterQuery } - raw, err := api.ObjectCache.Get(ctx, util.ObjectRef{ - Cluster: cluster, - GroupVersionResource: schema.GroupVersionResource{ - Group: group, - Version: version, - Resource: resource, + raw, err := api.ObjectCache.Get(ctx, utilobject.VersionedKey{ + Key: utilobject.Key{ + Cluster: cluster, + Group: group, + Resource: resource, + Namespace: namespace, + Name: name, }, - Namespace: namespace, - Name: name, + Version: version, }) if err != nil { return err @@ -131,13 +131,13 @@ func (api *api) handleGet(ctx *gin.Context) error { return ctx.AbortWithError(404, fmt.Errorf("object does not exist")) } - object := util.ObjectRefFromUnstructured(raw, cluster, schema.GroupVersionResource{ + object := utilobject.RichFromUnstructured(raw, cluster, schema.GroupVersionResource{ Group: group, Version: version, Resource: resource, }) - patch, err := api.DiffCache.Fetch(ctx, object, rv, &rv) + patch, err := api.DiffCache.Fetch(ctx, object.Key, rv, &rv) if err != nil || patch == nil { return ctx.AbortWithError(404, fmt.Errorf("patch not found for rv: %w", err)) } @@ -165,15 +165,15 @@ func (api *api) handleScan(ctx *gin.Context) error { limit = parsedLimit } - raw, err := api.ObjectCache.Get(ctx, util.ObjectRef{ - Cluster: cluster, - GroupVersionResource: schema.GroupVersionResource{ - Group: group, - Version: version, - Resource: resource, + raw, err := api.ObjectCache.Get(ctx, utilobject.VersionedKey{ + Key: utilobject.Key{ + Cluster: cluster, + Group: group, + Resource: resource, + Namespace: namespace, + Name: name, }, - Namespace: namespace, - Name: name, + Version: version, }) if err != nil { return err @@ -182,12 +182,12 @@ func (api *api) handleScan(ctx *gin.Context) error { return ctx.AbortWithError(404, fmt.Errorf("object does not exist")) } - object := util.ObjectRefFromUnstructured(raw, cluster, schema.GroupVersionResource{ + object := utilobject.RichFromUnstructured(raw, cluster, schema.GroupVersionResource{ Group: group, Version: version, Resource: resource, }) - list, err := api.DiffCache.List(ctx, object, limit) + list, err := api.DiffCache.List(ctx, object.Key, limit) if err != nil { return err } diff --git a/pkg/diff/cache/etcd/etcd.go b/pkg/diff/cache/etcd/etcd.go index cd83272c..2c6d229d 100644 --- a/pkg/diff/cache/etcd/etcd.go +++ b/pkg/diff/cache/etcd/etcd.go @@ -29,7 +29,7 @@ import ( k8sconfig "github.com/kubewharf/kelemetry/pkg/k8s/config" "github.com/kubewharf/kelemetry/pkg/manager" "github.com/kubewharf/kelemetry/pkg/metrics" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" "github.com/kubewharf/kelemetry/pkg/util/shutdown" ) @@ -110,7 +110,7 @@ func (cache *Etcd) GetCommonOptions() *diffcache.CommonOptions { return cache.GetAdditionalOptions().(*diffcache.CommonOptions) } -func (cache *Etcd) Store(ctx context.Context, object util.ObjectRef, patch *diffcache.Patch) { +func (cache *Etcd) Store(ctx context.Context, object utilobject.Key, patch *diffcache.Patch) { patchJson, err := json.Marshal(patch) if err != nil { cache.Logger.WithError(err).Error("cannot marshal patch") @@ -133,7 +133,7 @@ func (cache *Etcd) Store(ctx context.Context, object util.ObjectRef, patch *diff func (cache *Etcd) Fetch( ctx context.Context, - object util.ObjectRef, + object utilobject.Key, oldResourceVersion string, newResourceVersion *string, ) (*diffcache.Patch, error) { @@ -164,7 +164,7 @@ func (cache *Etcd) Fetch( return patch, nil } -func (cache *Etcd) StoreSnapshot(ctx context.Context, object util.ObjectRef, snapshotName string, snapshot *diffcache.Snapshot) { +func (cache *Etcd) StoreSnapshot(ctx context.Context, object utilobject.Key, snapshotName string, snapshot *diffcache.Snapshot) { snapshotJson, err := json.Marshal(snapshot) if err != nil { cache.Logger.WithError(err).Error("cannot marshal snapshot") @@ -185,7 +185,7 @@ func (cache *Etcd) StoreSnapshot(ctx context.Context, object util.ObjectRef, sna } } -func (cache *Etcd) FetchSnapshot(ctx context.Context, object util.ObjectRef, snapshotName string) (*diffcache.Snapshot, error) { +func (cache *Etcd) FetchSnapshot(ctx context.Context, object utilobject.Key, snapshotName string) (*diffcache.Snapshot, error) { key := cache.snapshotKey(object, snapshotName) resp, err := cache.client.KV.Get(ctx, key) if err != nil { @@ -207,7 +207,7 @@ func (cache *Etcd) FetchSnapshot(ctx context.Context, object util.ObjectRef, sna return snapshot, nil } -func (cache *Etcd) List(ctx context.Context, object util.ObjectRef, limit int) ([]string, error) { +func (cache *Etcd) List(ctx context.Context, object utilobject.Key, limit int) ([]string, error) { resp, err := cache.client.KV.Get( ctx, cache.cacheKeyPrefix(object), @@ -232,11 +232,11 @@ func (cache *Etcd) List(ctx context.Context, object util.ObjectRef, limit int) ( return keys, nil } -func (cache *Etcd) cacheKeyPrefix(object util.ObjectRef) string { +func (cache *Etcd) cacheKeyPrefix(object utilobject.Key) string { return fmt.Sprintf("%s%s/", cache.options.prefix, object.String()) } -func (cache *Etcd) cacheKey(object util.ObjectRef, keyRv string) string { +func (cache *Etcd) cacheKey(object utilobject.Key, keyRv string) string { whichRv := "newRv" if cluster := cache.ClusterConfigs.Provide(object.Cluster); cluster != nil && cluster.UseOldResourceVersion { whichRv = "oldRv" @@ -245,6 +245,6 @@ func (cache *Etcd) cacheKey(object util.ObjectRef, keyRv string) string { return cache.cacheKeyPrefix(object) + fmt.Sprintf("%s/%s", whichRv, keyRv) } -func (cache *Etcd) snapshotKey(object util.ObjectRef, snapshotName string) string { +func (cache *Etcd) snapshotKey(object utilobject.Key, snapshotName string) string { return cache.cacheKeyPrefix(object) + snapshotName } diff --git a/pkg/diff/cache/interface.go b/pkg/diff/cache/interface.go index 663f0bc2..be7d0464 100644 --- a/pkg/diff/cache/interface.go +++ b/pkg/diff/cache/interface.go @@ -27,7 +27,7 @@ import ( k8sconfig "github.com/kubewharf/kelemetry/pkg/k8s/config" "github.com/kubewharf/kelemetry/pkg/manager" "github.com/kubewharf/kelemetry/pkg/metrics" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) func init() { @@ -68,13 +68,13 @@ func (options *CommonOptions) Setup(fs *pflag.FlagSet) { type Cache interface { GetCommonOptions() *CommonOptions - Store(ctx context.Context, object util.ObjectRef, patch *Patch) - Fetch(ctx context.Context, object util.ObjectRef, oldResourceVersion string, newResourceVersion *string) (*Patch, error) + Store(ctx context.Context, object utilobject.Key, patch *Patch) + Fetch(ctx context.Context, object utilobject.Key, oldResourceVersion string, newResourceVersion *string) (*Patch, error) - StoreSnapshot(ctx context.Context, object util.ObjectRef, snapshotName string, snapshot *Snapshot) - FetchSnapshot(ctx context.Context, object util.ObjectRef, snapshotName string) (*Snapshot, error) + StoreSnapshot(ctx context.Context, object utilobject.Key, snapshotName string, snapshot *Snapshot) + FetchSnapshot(ctx context.Context, object utilobject.Key, snapshotName string) (*Snapshot, error) - List(ctx context.Context, object util.ObjectRef, limit int) ([]string, error) + List(ctx context.Context, object utilobject.Key, limit int) ([]string, error) } type mux struct { @@ -160,12 +160,12 @@ func (mux *mux) GetCommonOptions() *CommonOptions { return mux.options } -func (mux *mux) Store(ctx context.Context, object util.ObjectRef, patch *Patch) { +func (mux *mux) Store(ctx context.Context, object utilobject.Key, patch *Patch) { defer mux.StoreDiffMetric.DeferCount(mux.Clock.Now(), &storeDiffMetric{Redacted: patch.Redacted}) mux.Impl().(Cache).Store(ctx, object, patch) } -func (mux *mux) Fetch(ctx context.Context, object util.ObjectRef, oldResourceVersion string, newResourceVersion *string) (*Patch, error) { +func (mux *mux) Fetch(ctx context.Context, object utilobject.Key, oldResourceVersion string, newResourceVersion *string) (*Patch, error) { metric := &fetchDiffMetric{} defer mux.FetchDiffMetric.DeferCount(mux.Clock.Now(), metric) @@ -179,12 +179,12 @@ func (mux *mux) Fetch(ctx context.Context, object util.ObjectRef, oldResourceVer return patch, nil } -func (mux *mux) StoreSnapshot(ctx context.Context, object util.ObjectRef, snapshotName string, snapshot *Snapshot) { +func (mux *mux) StoreSnapshot(ctx context.Context, object utilobject.Key, snapshotName string, snapshot *Snapshot) { defer mux.StoreSnapshotMetric.DeferCount(mux.Clock.Now(), &storeSnapshotMetric{Redacted: snapshot.Redacted}) mux.Impl().(Cache).StoreSnapshot(ctx, object, snapshotName, snapshot) } -func (mux *mux) FetchSnapshot(ctx context.Context, object util.ObjectRef, snapshotName string) (*Snapshot, error) { +func (mux *mux) FetchSnapshot(ctx context.Context, object utilobject.Key, snapshotName string) (*Snapshot, error) { metric := &fetchSnapshotMetric{} defer mux.FetchSnapshotMetric.DeferCount(mux.Clock.Now(), metric) @@ -198,7 +198,7 @@ func (mux *mux) FetchSnapshot(ctx context.Context, object util.ObjectRef, snapsh return snapshot, nil } -func (mux *mux) List(ctx context.Context, object util.ObjectRef, limit int) ([]string, error) { +func (mux *mux) List(ctx context.Context, object utilobject.Key, limit int) ([]string, error) { defer mux.ListMetric.DeferCount(mux.Clock.Now(), &listMetric{}) return mux.Impl().(Cache).List(ctx, object, limit) } diff --git a/pkg/diff/cache/local/local.go b/pkg/diff/cache/local/local.go index af298914..bd30518b 100644 --- a/pkg/diff/cache/local/local.go +++ b/pkg/diff/cache/local/local.go @@ -26,8 +26,8 @@ import ( diffcache "github.com/kubewharf/kelemetry/pkg/diff/cache" k8sconfig "github.com/kubewharf/kelemetry/pkg/k8s/config" "github.com/kubewharf/kelemetry/pkg/manager" - "github.com/kubewharf/kelemetry/pkg/util" "github.com/kubewharf/kelemetry/pkg/util/cache" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" "github.com/kubewharf/kelemetry/pkg/util/shutdown" ) @@ -111,7 +111,7 @@ func (cache *localCache) GetCommonOptions() *diffcache.CommonOptions { return cache.GetAdditionalOptions().(*diffcache.CommonOptions) } -func (cache *localCache) Store(ctx context.Context, object util.ObjectRef, patch *diffcache.Patch) { +func (cache *localCache) Store(ctx context.Context, object utilobject.Key, patch *diffcache.Patch) { cache.dataLock.Lock() defer cache.dataLock.Unlock() @@ -128,7 +128,7 @@ func (cache *localCache) Store(ctx context.Context, object util.ObjectRef, patch func (cache *localCache) Fetch( ctx context.Context, - object util.ObjectRef, + object utilobject.Key, oldResourceVersion string, newResourceVersion *string, ) (*diffcache.Patch, error) { @@ -160,13 +160,13 @@ func (cache *localCache) Fetch( return nil, nil } -func (cache *localCache) StoreSnapshot(ctx context.Context, object util.ObjectRef, snapshotName string, value *diffcache.Snapshot) { +func (cache *localCache) StoreSnapshot(ctx context.Context, object utilobject.Key, snapshotName string, value *diffcache.Snapshot) { cache.snapshotCache.Add(fmt.Sprintf("%v/%s", object, snapshotName), value) } func (cache *localCache) FetchSnapshot( ctx context.Context, - object util.ObjectRef, + object utilobject.Key, snapshotName string, ) (*diffcache.Snapshot, error) { if value, ok := cache.snapshotCache.Get(fmt.Sprintf("%v/%s", object, snapshotName)); ok { @@ -176,7 +176,7 @@ func (cache *localCache) FetchSnapshot( return nil, nil } -func (cache *localCache) List(ctx context.Context, object util.ObjectRef, limit int) ([]string, error) { +func (cache *localCache) List(ctx context.Context, object utilobject.Key, limit int) ([]string, error) { cache.dataLock.RLock() defer cache.dataLock.RUnlock() diff --git a/pkg/diff/cache/memory_wrapper.go b/pkg/diff/cache/memory_wrapper.go index b809bda8..cc872d24 100644 --- a/pkg/diff/cache/memory_wrapper.go +++ b/pkg/diff/cache/memory_wrapper.go @@ -22,8 +22,8 @@ import ( k8sconfig "github.com/kubewharf/kelemetry/pkg/k8s/config" "github.com/kubewharf/kelemetry/pkg/metrics" - "github.com/kubewharf/kelemetry/pkg/util" "github.com/kubewharf/kelemetry/pkg/util/cache" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) type CacheWrapper struct { @@ -77,7 +77,7 @@ func (wrapper *CacheWrapper) GetCommonOptions() *CommonOptions { return wrapper.options } -func (wrapper *CacheWrapper) Store(ctx context.Context, object util.ObjectRef, patch *Patch) { +func (wrapper *CacheWrapper) Store(ctx context.Context, object utilobject.Key, patch *Patch) { wrapper.delegate.Store(ctx, object, patch) wrapper.patchCache.Add(cacheWrapperKey(object, patch.NewResourceVersion), patch) @@ -85,7 +85,7 @@ func (wrapper *CacheWrapper) Store(ctx context.Context, object util.ObjectRef, p func (wrapper *CacheWrapper) Fetch( ctx context.Context, - object util.ObjectRef, + object utilobject.Key, oldResourceVersion string, newResourceVersion *string, ) (*Patch, error) { @@ -113,7 +113,7 @@ func (wrapper *CacheWrapper) Fetch( func (wrapper *CacheWrapper) StoreSnapshot( ctx context.Context, - object util.ObjectRef, + object utilobject.Key, snapshotName string, snapshot *Snapshot, ) { @@ -123,7 +123,7 @@ func (wrapper *CacheWrapper) StoreSnapshot( func (wrapper *CacheWrapper) FetchSnapshot( ctx context.Context, - object util.ObjectRef, + object utilobject.Key, snapshotName string, ) (*Snapshot, error) { penetrateMetric := &penetrateMetric{Type: fmt.Sprintf("snapshot/%s", snapshotName)} @@ -140,10 +140,10 @@ func (wrapper *CacheWrapper) FetchSnapshot( } // List always penetrates the cache because we cannot get notified of new keys -func (wrapper *CacheWrapper) List(ctx context.Context, object util.ObjectRef, limit int) ([]string, error) { +func (wrapper *CacheWrapper) List(ctx context.Context, object utilobject.Key, limit int) ([]string, error) { return wrapper.delegate.List(ctx, object, limit) } -func cacheWrapperKey(object util.ObjectRef, subkey string) string { +func cacheWrapperKey(object utilobject.Key, subkey string) string { return fmt.Sprintf("%s/%s", object.String(), subkey) } diff --git a/pkg/diff/controller/controller.go b/pkg/diff/controller/controller.go index 6cf66e1b..8dc8d78b 100644 --- a/pkg/diff/controller/controller.go +++ b/pkg/diff/controller/controller.go @@ -43,9 +43,9 @@ import ( "github.com/kubewharf/kelemetry/pkg/k8s/multileader" "github.com/kubewharf/kelemetry/pkg/manager" "github.com/kubewharf/kelemetry/pkg/metrics" - "github.com/kubewharf/kelemetry/pkg/util" "github.com/kubewharf/kelemetry/pkg/util/channel" informerutil "github.com/kubewharf/kelemetry/pkg/util/informer" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" "github.com/kubewharf/kelemetry/pkg/util/shutdown" ) @@ -585,7 +585,7 @@ func (monitor *monitor) onUpdate( patch.DiffList = diffcmp.Compare(oldObj.Object, newObj.Object) } - objectRef := util.ObjectRefFromUnstructured(newObj, monitor.ctrl.Clients.TargetCluster().ClusterName(), monitor.gvr).Clone() + objectRef := utilobject.RichFromUnstructured(newObj, monitor.ctrl.Clients.TargetCluster().ClusterName(), monitor.gvr).Clone() return func(ctx context.Context) { ctx, cancelFunc := context.WithTimeout(ctx, monitor.ctrl.options.storeTimeout) @@ -593,7 +593,7 @@ func (monitor *monitor) onUpdate( monitor.ctrl.Cache.Store( ctx, - objectRef, + objectRef.Key, patch, ) } @@ -617,7 +617,7 @@ func (monitor *monitor) onNeedSnapshot( Error("cannot re-marshal unstructured object") } - objectRef := util.ObjectRefFromUnstructured(obj, monitor.ctrl.Clients.TargetCluster().ClusterName(), monitor.gvr).Clone() + objectRef := utilobject.RichFromUnstructured(obj, monitor.ctrl.Clients.TargetCluster().ClusterName(), monitor.gvr).Clone() snapshotRv := obj.GetResourceVersion() return func(ctx context.Context) { @@ -626,7 +626,7 @@ func (monitor *monitor) onNeedSnapshot( monitor.ctrl.Cache.StoreSnapshot( ctx, - objectRef, + objectRef.Key, snapshotName, &diffcache.Snapshot{ ResourceVersion: snapshotRv, diff --git a/pkg/diff/decorator/decorator.go b/pkg/diff/decorator/decorator.go index e0dd3ab2..7537438f 100644 --- a/pkg/diff/decorator/decorator.go +++ b/pkg/diff/decorator/decorator.go @@ -33,7 +33,7 @@ import ( diffcache "github.com/kubewharf/kelemetry/pkg/diff/cache" "github.com/kubewharf/kelemetry/pkg/manager" "github.com/kubewharf/kelemetry/pkg/metrics" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" "github.com/kubewharf/kelemetry/pkg/util/zconstants" ) @@ -211,7 +211,7 @@ func (decorator *decorator) tryDecorate( } // NOTE: UID may be empty, but we don't use it anyway - object := util.ObjectRefFromAudit(message.ObjectRef, message.Cluster, message.ObjectRef.UID) + object := utilobject.RichFromAudit(message.ObjectRef, message.Cluster) var tryOnce func(context.Context) (bool, error) @@ -306,14 +306,14 @@ func decoratesResource(message *audit.Message) bool { func (decorator *decorator) tryUpdateOnce( ctx context.Context, - object util.ObjectRef, + object utilobject.Rich, oldRv string, newRv *string, event *aggregatorevent.Event, message *audit.Message, ) (bool, error) { var err error - patch, err := decorator.Cache.Fetch(ctx, object, oldRv, newRv) + patch, err := decorator.Cache.Fetch(ctx, object.Key, oldRv, newRv) if err != nil || patch == nil { return false, err } @@ -347,11 +347,11 @@ func (decorator *decorator) tryUpdateOnce( func (decorator *decorator) tryCreateDeleteOnce( ctx context.Context, - object util.ObjectRef, + object utilobject.Rich, snapshotName string, event *aggregatorevent.Event, ) (bool, error) { - snapshot, err := decorator.Cache.FetchSnapshot(ctx, object, snapshotName) + snapshot, err := decorator.Cache.FetchSnapshot(ctx, object.Key, snapshotName) if err != nil || snapshot == nil { return false, err } diff --git a/pkg/event/controller.go b/pkg/event/controller.go index cc286db6..e77fd5b6 100644 --- a/pkg/event/controller.go +++ b/pkg/event/controller.go @@ -40,8 +40,8 @@ import ( "github.com/kubewharf/kelemetry/pkg/k8s/multileader" "github.com/kubewharf/kelemetry/pkg/manager" "github.com/kubewharf/kelemetry/pkg/metrics" - "github.com/kubewharf/kelemetry/pkg/util" informerutil "github.com/kubewharf/kelemetry/pkg/util/informer" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" "github.com/kubewharf/kelemetry/pkg/util/shutdown" "github.com/kubewharf/kelemetry/pkg/util/zconstants" ) @@ -312,12 +312,18 @@ func (ctrl *controller) handleEvent(ctx context.Context, event *corev1.Event) { Resource: gvr.Resource, }).Summary(float64(ctrl.Clock.Since(eventTime).Nanoseconds())) - if err := ctrl.Aggregator.Send(ctx, util.ObjectRef{ - Cluster: clusterName, - GroupVersionResource: gvr, - Namespace: event.InvolvedObject.Namespace, - Name: event.InvolvedObject.Name, - Uid: event.InvolvedObject.UID, + if err := ctrl.Aggregator.Send(ctx, utilobject.Rich{ + VersionedKey: utilobject.VersionedKey{ + Key: utilobject.Key{ + Cluster: clusterName, + Group: gvr.Group, + Resource: gvr.Resource, + Namespace: event.InvolvedObject.Namespace, + Name: event.InvolvedObject.Name, + }, + Version: gvr.Version, + }, + Uid: event.InvolvedObject.UID, }, aggregatorEvent, nil); err != nil { logger.WithError(err).Error("Cannot send trace") metric.Error = metrics.LabelError(err, "SendTrace") diff --git a/pkg/frontend/backend/jaeger-storage/backend.go b/pkg/frontend/backend/jaeger-storage/backend.go index f454c6e2..8cea1ee2 100644 --- a/pkg/frontend/backend/jaeger-storage/backend.go +++ b/pkg/frontend/backend/jaeger-storage/backend.go @@ -37,6 +37,7 @@ import ( tftree "github.com/kubewharf/kelemetry/pkg/frontend/tf/tree" "github.com/kubewharf/kelemetry/pkg/manager" utiljaeger "github.com/kubewharf/kelemetry/pkg/util/jaeger" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" "github.com/kubewharf/kelemetry/pkg/util/zconstants" ) @@ -194,11 +195,11 @@ func (backend *Backend) List( // exclusive mode, each object under trace should have a list entry type objectInTrace struct { traceId model.TraceID - key tftree.GroupingKey + key utilobject.Key } seenObjects := sets.New[objectInTrace]() deduplicator = func(span *model.Span) bool { - key, hasKey := tftree.GroupingKeyFromSpan(span) + key, hasKey := utilobject.FromSpan(span) if !hasKey { return false // not a root } diff --git a/pkg/frontend/extension/httptrace/httptrace.go b/pkg/frontend/extension/httptrace/httptrace.go index 022374ae..4fd68f7a 100644 --- a/pkg/frontend/extension/httptrace/httptrace.go +++ b/pkg/frontend/extension/httptrace/httptrace.go @@ -30,8 +30,8 @@ import ( "github.com/kubewharf/kelemetry/pkg/frontend/extension" "github.com/kubewharf/kelemetry/pkg/manager" - "github.com/kubewharf/kelemetry/pkg/util" filterutil "github.com/kubewharf/kelemetry/pkg/util/filter" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) func init() { @@ -128,7 +128,7 @@ func (provider *Provider) MaxConcurrency() int { return provider.maxConc func (provider *Provider) FetchForObject( ctx context.Context, - object util.ObjectRef, + object utilobject.Rich, mainTags model.KeyValues, start, end time.Time, ) (*extension.FetchResult, error) { @@ -151,7 +151,7 @@ func (provider *Provider) FetchForObject( func (provider *Provider) FetchForVersion( ctx context.Context, - object util.ObjectRef, + object utilobject.Rich, resourceVersion string, mainTags model.KeyValues, start, end time.Time, @@ -202,14 +202,14 @@ func (provider *Provider) LoadCache(ctx context.Context, jsonBuf []byte) ([]*mod return spans, nil } -func objectTemplateArgs(object util.ObjectRef) map[string]any { +func objectTemplateArgs(object utilobject.Rich) map[string]any { return map[string]any{ "cluster": object.Cluster, "group": object.Group, "version": object.Version, "resource": object.Resource, - "groupVersion": object.GroupVersionResource.GroupVersion().String(), - "groupResource": object.GroupVersionResource.GroupResource().String(), + "groupVersion": object.GroupVersion().String(), + "groupResource": object.GroupResource().String(), "namespace": object.Namespace, "name": object.Name, } @@ -218,7 +218,7 @@ func objectTemplateArgs(object util.ObjectRef) map[string]any { // fetch returns traces search from first successful traceBackend func (provider *Provider) fetch( ctx context.Context, - _ util.ObjectRef, + _ utilobject.Rich, mainTags model.KeyValues, start, end time.Time, templateArgs map[string]any, diff --git a/pkg/frontend/extension/jaeger-storage/storage.go b/pkg/frontend/extension/jaeger-storage/storage.go index 823d8aff..3fecb2a3 100644 --- a/pkg/frontend/extension/jaeger-storage/storage.go +++ b/pkg/frontend/extension/jaeger-storage/storage.go @@ -34,8 +34,8 @@ import ( "github.com/kubewharf/kelemetry/pkg/frontend/extension" "github.com/kubewharf/kelemetry/pkg/manager" - "github.com/kubewharf/kelemetry/pkg/util" utiljaeger "github.com/kubewharf/kelemetry/pkg/util/jaeger" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) func init() { @@ -195,7 +195,7 @@ func (provider *Provider) MaxConcurrency() int { return provider.maxConc func (provider *Provider) FetchForObject( ctx context.Context, - object util.ObjectRef, + object utilobject.Rich, mainTags model.KeyValues, start, end time.Time, ) (*extension.FetchResult, error) { @@ -211,7 +211,7 @@ func (provider *Provider) FetchForObject( func (provider *Provider) FetchForVersion( ctx context.Context, - object util.ObjectRef, + object utilobject.Rich, resourceVersion string, mainTags model.KeyValues, start, end time.Time, @@ -229,7 +229,7 @@ func (provider *Provider) FetchForVersion( ) } -func objectTemplateArgs(object util.ObjectRef) map[string]any { +func objectTemplateArgs(object utilobject.Rich) map[string]any { var objectApiPath string if object.Group == "" { objectApiPath = fmt.Sprintf("/api/%s", object.Version) @@ -248,8 +248,8 @@ func objectTemplateArgs(object util.ObjectRef) map[string]any { "group": object.Group, "version": object.Version, "resource": object.Resource, - "groupVersion": object.GroupVersionResource.GroupVersion().String(), - "groupResource": object.GroupVersionResource.GroupResource().String(), + "groupVersion": object.GroupVersion().String(), + "groupResource": object.GroupResource().String(), "namespace": object.Namespace, "name": object.Name, "objectApiPath": objectApiPath, diff --git a/pkg/frontend/extension/provider.go b/pkg/frontend/extension/provider.go index 2108478a..865a6ec4 100644 --- a/pkg/frontend/extension/provider.go +++ b/pkg/frontend/extension/provider.go @@ -21,7 +21,7 @@ import ( "github.com/jaegertracing/jaeger/model" "github.com/kubewharf/kelemetry/pkg/manager" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) // Produces new extension providers based on the configuration. @@ -58,7 +58,7 @@ type Provider interface { // `tags` contains the tags in the object pseudospan in the main trace. FetchForObject( ctx context.Context, - object util.ObjectRef, + object utilobject.Rich, tags model.KeyValues, start, end time.Time, ) (*FetchResult, error) @@ -73,7 +73,7 @@ type Provider interface { // `tags` contains the tags in the object pseudospan in the main trace. FetchForVersion( ctx context.Context, - object util.ObjectRef, + object utilobject.Rich, resourceVersion string, tags model.KeyValues, start, end time.Time, diff --git a/pkg/frontend/reader/merge.go b/pkg/frontend/reader/merge.go index 3a95e551..d35d5c38 100644 --- a/pkg/frontend/reader/merge.go +++ b/pkg/frontend/reader/merge.go @@ -19,21 +19,21 @@ import ( "k8s.io/apimachinery/pkg/util/sets" jaegerbackend "github.com/kubewharf/kelemetry/pkg/frontend/backend" - tftree "github.com/kubewharf/kelemetry/pkg/frontend/tf/tree" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) type mergeMap struct { ptrSet sets.Set[*mergeEntry] - fromKeys map[tftree.GroupingKey]*mergeEntry + fromKeys map[utilobject.Key]*mergeEntry } type mergeEntry struct { - keys sets.Set[tftree.GroupingKey] + keys sets.Set[utilobject.Key] identifiers []any spans []*model.Span } -func singletonMerged(keys sets.Set[tftree.GroupingKey], thumbnail *jaegerbackend.TraceThumbnail) *mergeEntry { +func singletonMerged(keys sets.Set[utilobject.Key], thumbnail *jaegerbackend.TraceThumbnail) *mergeEntry { return &mergeEntry{ keys: keys, identifiers: []any{thumbnail.Identifier}, @@ -51,7 +51,7 @@ func (entry *mergeEntry) join(other *mergeEntry) { } // add a thumbnail with a preferred root key. -func (m *mergeMap) add(keys sets.Set[tftree.GroupingKey], thumbnail *jaegerbackend.TraceThumbnail) { +func (m *mergeMap) add(keys sets.Set[utilobject.Key], thumbnail *jaegerbackend.TraceThumbnail) { entry := singletonMerged(keys.Clone(), thumbnail) m.ptrSet.Insert(entry) @@ -77,11 +77,11 @@ func (m *mergeMap) add(keys sets.Set[tftree.GroupingKey], thumbnail *jaegerbacke func mergeSegments(thumbnails []*jaegerbackend.TraceThumbnail) []*mergeEntry { m := mergeMap{ ptrSet: sets.New[*mergeEntry](), - fromKeys: map[tftree.GroupingKey]*mergeEntry{}, + fromKeys: map[utilobject.Key]*mergeEntry{}, } for _, thumbnail := range thumbnails { - keys := tftree.GroupingKeysFromSpans(thumbnail.Spans) + keys := utilobject.FromSpans(thumbnail.Spans) m.add(keys, thumbnail) } diff --git a/pkg/frontend/reader/reader.go b/pkg/frontend/reader/reader.go index 6484016a..08b86ebf 100644 --- a/pkg/frontend/reader/reader.go +++ b/pkg/frontend/reader/reader.go @@ -31,9 +31,9 @@ import ( "github.com/kubewharf/kelemetry/pkg/frontend/clusterlist" transform "github.com/kubewharf/kelemetry/pkg/frontend/tf" tfconfig "github.com/kubewharf/kelemetry/pkg/frontend/tf/config" - tftree "github.com/kubewharf/kelemetry/pkg/frontend/tf/tree" "github.com/kubewharf/kelemetry/pkg/frontend/tracecache" "github.com/kubewharf/kelemetry/pkg/manager" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" "github.com/kubewharf/kelemetry/pkg/util/zconstants" ) @@ -135,8 +135,8 @@ func (reader *spanReader) FindTraces(ctx context.Context, query *spanstore.Trace return nil, err } - var rootKey *tftree.GroupingKey - if rootKeyValue, ok := tftree.GroupingKeyFromMap(query.Tags); ok { + var rootKey *utilobject.Key + if rootKeyValue, ok := utilobject.FromMap(query.Tags); ok { rootKey = &rootKeyValue } diff --git a/pkg/frontend/tf/extension.go b/pkg/frontend/tf/extension.go index 48cffd4b..f3a0e252 100644 --- a/pkg/frontend/tf/extension.go +++ b/pkg/frontend/tf/extension.go @@ -26,7 +26,7 @@ import ( "github.com/kubewharf/kelemetry/pkg/frontend/extension" "github.com/kubewharf/kelemetry/pkg/frontend/tracecache" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" "github.com/kubewharf/kelemetry/pkg/util/semaphore" "github.com/kubewharf/kelemetry/pkg/util/zconstants" ) @@ -170,8 +170,8 @@ func (x *FetchExtensionsAndStoreCache) ProcessExtensions( return newSpans, nil } -func objectRefFromTags(tags model.KeyValues) (util.ObjectRef, bool) { - var object util.ObjectRef +func objectRefFromTags(tags model.KeyValues) (utilobject.Rich, bool) { + var object utilobject.Rich assign := func(name string, field *string) bool { value, ok := tags.FindByKey(name) diff --git a/pkg/frontend/tf/transform.go b/pkg/frontend/tf/transform.go index 2af85caa..8472a0f8 100644 --- a/pkg/frontend/tf/transform.go +++ b/pkg/frontend/tf/transform.go @@ -29,6 +29,7 @@ import ( tfconfig "github.com/kubewharf/kelemetry/pkg/frontend/tf/config" tftree "github.com/kubewharf/kelemetry/pkg/frontend/tf/tree" "github.com/kubewharf/kelemetry/pkg/manager" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) func init() { @@ -57,7 +58,7 @@ func (transformer *Transformer) Close(ctx context.Context) error { return nil } func (transformer *Transformer) Transform( ctx context.Context, trace *model.Trace, - rootObject *tftree.GroupingKey, + rootObject *utilobject.Key, configId tfconfig.Id, extensionProcessor ExtensionProcessor, start, end time.Time, @@ -80,7 +81,7 @@ func (transformer *Transformer) Transform( hasRootSpan := false for _, span := range tree.GetSpans() { - if key, hasKey := tftree.GroupingKeyFromSpan(span); hasKey && key == *rootObject { + if key, hasKey := utilobject.FromSpan(span); hasKey && key == *rootObject { rootSpan = span.SpanID hasRootSpan = true } @@ -120,10 +121,10 @@ func (transformer *Transformer) Transform( // merge spans of the same object from multiple traces func (transformer *Transformer) groupDuplicates(tree *tftree.SpanTree) { - commonSpans := map[tftree.GroupingKey][]model.SpanID{} + commonSpans := map[utilobject.Key][]model.SpanID{} for _, span := range tree.GetSpans() { - if key, hasKey := tftree.GroupingKeyFromSpan(span); hasKey { + if key, hasKey := utilobject.FromSpan(span); hasKey { commonSpans[key] = append(commonSpans[key], span.SpanID) } } diff --git a/pkg/frontend/tf/tree/grouping.go b/pkg/frontend/tf/tree/grouping.go deleted file mode 100644 index 8536ed1d..00000000 --- a/pkg/frontend/tf/tree/grouping.go +++ /dev/null @@ -1,89 +0,0 @@ -// Copyright 2023 The Kelemetry Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package tftree - -import ( - "github.com/jaegertracing/jaeger/model" - "k8s.io/apimachinery/pkg/util/sets" - - "github.com/kubewharf/kelemetry/pkg/util/zconstants" -) - -type GroupingKey struct { - Cluster string `json:"cluster"` - Group string `json:"group"` - Resource string `json:"resource"` - Namespace string `json:"namespace"` - Name string `json:"name"` - Field string `json:"field"` -} - -func GroupingKeyFromMap(tags map[string]string) (key GroupingKey, ok bool) { - for mapKey, field := range map[string]*string{ - "cluster": &key.Cluster, - "group": &key.Group, - "resource": &key.Resource, - "namespace": &key.Namespace, - "name": &key.Name, - } { - *field, ok = tags[mapKey] - if !ok { - return key, false - } - } - - if field, hasField := tags["field"]; hasField { - key.Field = field - } else { - key.Field = "object" - } - - return key, true -} - -func GroupingKeyFromSpan(span *model.Span) (GroupingKey, bool) { - tags := model.KeyValues(span.Tags) - traceSource, hasTraceSource := tags.FindByKey(zconstants.TraceSource) - if !hasTraceSource || traceSource.VStr != zconstants.TraceSourceObject { - return GroupingKey{}, false - } - - cluster, _ := tags.FindByKey("cluster") - group, _ := tags.FindByKey("group") - resource, _ := tags.FindByKey("resource") - namespace, _ := tags.FindByKey("namespace") - name, _ := tags.FindByKey("name") - field, _ := tags.FindByKey(zconstants.NestLevel) - key := GroupingKey{ - Cluster: cluster.VStr, - Group: group.VStr, - Resource: resource.VStr, - Namespace: namespace.VStr, - Name: name.VStr, - Field: field.VStr, - } - return key, true -} - -func GroupingKeysFromSpans(spans []*model.Span) sets.Set[GroupingKey] { - keys := sets.New[GroupingKey]() - - for _, span := range spans { - if key, ok := GroupingKeyFromSpan(span); ok { - keys.Insert(key) - } - } - return keys -} diff --git a/pkg/frontend/tracecache/interface.go b/pkg/frontend/tracecache/interface.go index 0b65b947..6e061403 100644 --- a/pkg/frontend/tracecache/interface.go +++ b/pkg/frontend/tracecache/interface.go @@ -21,8 +21,8 @@ import ( "github.com/jaegertracing/jaeger/model" - tftree "github.com/kubewharf/kelemetry/pkg/frontend/tf/tree" "github.com/kubewharf/kelemetry/pkg/manager" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) func init() { @@ -42,10 +42,10 @@ type Entry struct { } type EntryValue struct { - Identifiers []json.RawMessage `json:"identifiers"` - StartTime time.Time `json:"startTime"` - EndTime time.Time `json:"endTime"` - RootObject *tftree.GroupingKey `json:"rootObject"` + Identifiers []json.RawMessage `json:"identifiers"` + StartTime time.Time `json:"startTime"` + EndTime time.Time `json:"endTime"` + RootObject *utilobject.Key `json:"rootObject"` Extensions []ExtensionCache `json:"extensions"` } diff --git a/pkg/k8s/objectcache/objectcache.go b/pkg/k8s/objectcache/objectcache.go index 451f851c..1036242a 100644 --- a/pkg/k8s/objectcache/objectcache.go +++ b/pkg/k8s/objectcache/objectcache.go @@ -32,7 +32,7 @@ import ( "github.com/kubewharf/kelemetry/pkg/k8s" "github.com/kubewharf/kelemetry/pkg/manager" "github.com/kubewharf/kelemetry/pkg/metrics" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) func init() { @@ -95,11 +95,11 @@ func (oc *ObjectCache) Start(ctx context.Context) error { return nil } func (oc *ObjectCache) Close(ctx context.Context) error { return nil } -func (oc *ObjectCache) Get(ctx context.Context, object util.ObjectRef) (*unstructured.Unstructured, error) { +func (oc *ObjectCache) Get(ctx context.Context, object utilobject.VersionedKey) (*unstructured.Unstructured, error) { metric := &CacheRequestMetric{Cluster: object.Cluster, Error: "Unknown"} defer oc.CacheRequestMetric.DeferCount(oc.Clock.Now(), metric) - key := objectKey(object) + key := objectKey(object.Key) for { fetchCtx, cancelFunc := context.WithTimeout(ctx, oc.options.fetchTimeout) @@ -173,7 +173,7 @@ func decodeCached(cached []byte, metric *CacheRequestMetric) (*unstructured.Unst func (oc *ObjectCache) penetrate( ctx context.Context, - object util.ObjectRef, + object utilobject.VersionedKey, ) (_raw *unstructured.Unstructured, _err error, _metricCode string) { // first, try fetching from server clusterClient, err := oc.Clients.Cluster(object.Cluster) @@ -181,7 +181,7 @@ func (oc *ObjectCache) penetrate( return nil, fmt.Errorf("cannot initialize clients for cluster %q: %w", object.Cluster, err), "UnknownCluster" } - nsClient := clusterClient.DynamicClient().Resource(object.GroupVersionResource) + nsClient := clusterClient.DynamicClient().Resource(object.GroupVersionResource()) var client dynamic.ResourceInterface = nsClient if object.Namespace != "" { client = nsClient.Namespace(object.Namespace) @@ -199,7 +199,7 @@ func (oc *ObjectCache) penetrate( } // not found from apiserver, try deletion snapshot instead - snapshot, err := oc.DiffCache.FetchSnapshot(ctx, object, diffcache.SnapshotNameDeletion) + snapshot, err := oc.DiffCache.FetchSnapshot(ctx, object.Key, diffcache.SnapshotNameDeletion) if err != nil { return nil, metrics.LabelError(fmt.Errorf("cannot fallback to snapshot: %w", err), "SnapshotFetch"), "SnapshotFetch" } @@ -218,6 +218,6 @@ func (oc *ObjectCache) penetrate( return nil, nil, "NotFoundAnywhere" } -func objectKey(object util.ObjectRef) []byte { +func objectKey(object utilobject.Key) []byte { return []byte(fmt.Sprintf("%s/%s/%s/%s", object.Group, object.Resource, object.Namespace, object.Name)) } diff --git a/pkg/k8s/objectcache/objectcache_test.go b/pkg/k8s/objectcache/objectcache_test.go index 01fa7a76..5c2c9088 100644 --- a/pkg/k8s/objectcache/objectcache_test.go +++ b/pkg/k8s/objectcache/objectcache_test.go @@ -29,7 +29,7 @@ import ( "github.com/kubewharf/kelemetry/pkg/k8s" "github.com/kubewharf/kelemetry/pkg/k8s/objectcache" "github.com/kubewharf/kelemetry/pkg/metrics" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) func TestGet(t *testing.T) { @@ -70,11 +70,15 @@ func TestGet(t *testing.T) { assert.NoError(cache.Init()) for i := 0; i < 2; i++ { - uns, err := cache.Get(context.Background(), util.ObjectRef{ - Cluster: "test-cluster", - GroupVersionResource: corev1.SchemeGroupVersion.WithResource("configmaps"), - Namespace: "default", - Name: "test-cm", + uns, err := cache.Get(context.Background(), utilobject.VersionedKey{ + Key: utilobject.Key{ + Cluster: "test-cluster", + Namespace: "default", + Name: "test-cm", + Group: corev1.GroupName, + Resource: "configmaps", + }, + Version: corev1.SchemeGroupVersion.Version, }) assert.NoError(err) diff --git a/pkg/ownerlinker/linker.go b/pkg/ownerlinker/linker.go index 03d1ceea..eaf0bc7b 100644 --- a/pkg/ownerlinker/linker.go +++ b/pkg/ownerlinker/linker.go @@ -26,7 +26,7 @@ import ( "github.com/kubewharf/kelemetry/pkg/k8s/discovery" "github.com/kubewharf/kelemetry/pkg/k8s/objectcache" "github.com/kubewharf/kelemetry/pkg/manager" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) func init() { @@ -58,7 +58,7 @@ func (ctrl *Controller) Init() error { return nil } func (ctrl *Controller) Start(ctx context.Context) error { return nil } func (ctrl *Controller) Close(ctx context.Context) error { return nil } -func (ctrl *Controller) Lookup(ctx context.Context, object util.ObjectRef) *util.ObjectRef { +func (ctrl *Controller) Lookup(ctx context.Context, object utilobject.Rich) *utilobject.Rich { raw := object.Raw logger := ctrl.Logger.WithFields(object.AsFields("object")) @@ -67,7 +67,7 @@ func (ctrl *Controller) Lookup(ctx context.Context, object util.ObjectRef) *util logger.Debug("Fetching dynamic object") var err error - raw, err = ctrl.ObjectCache.Get(ctx, object) + raw, err = ctrl.ObjectCache.Get(ctx, object.VersionedKey) if err != nil { logger.WithError(err).Error("cannot fetch object value") @@ -101,12 +101,18 @@ func (ctrl *Controller) Lookup(ctx context.Context, object util.ObjectRef) *util continue } - ret := &util.ObjectRef{ - Cluster: object.Cluster, // inherited from the same cluster - GroupVersionResource: gvr, - Namespace: object.Namespace, - Name: owner.Name, - Uid: owner.UID, + ret := &utilobject.Rich{ + VersionedKey: utilobject.VersionedKey{ + Key: utilobject.Key{ + Cluster: object.Cluster, + Group: gvr.Group, + Resource: gvr.Group, + Namespace: object.Namespace, + Name: owner.Name, + }, + Version: gvr.Version, + }, + Uid: owner.UID, } logger.WithField("owner", ret).Debug("Resolved owner") diff --git a/pkg/util/filter/object_filter.go b/pkg/util/filter/object_filter.go index 1668e162..576c6711 100644 --- a/pkg/util/filter/object_filter.go +++ b/pkg/util/filter/object_filter.go @@ -17,7 +17,7 @@ package filter import ( "github.com/dlclark/regexp2" - "github.com/kubewharf/kelemetry/pkg/util" + utilobject "github.com/kubewharf/kelemetry/pkg/util/object" ) type ObjectFilters struct { @@ -44,7 +44,7 @@ func (regex *Regex) MatchString(s string) bool { return match } -func (f *ObjectFilters) Check(object util.ObjectRef) bool { +func (f *ObjectFilters) Check(object utilobject.Rich) bool { if f.Cluster.Pattern != nil && !f.Cluster.MatchString(object.Cluster) { return false } diff --git a/pkg/util/object/key.go b/pkg/util/object/key.go new file mode 100644 index 00000000..1208c076 --- /dev/null +++ b/pkg/util/object/key.go @@ -0,0 +1,147 @@ +// Copyright 2023 The Kelemetry Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utilobject + +import ( + "fmt" + "strings" + + "github.com/jaegertracing/jaeger/model" + "github.com/sirupsen/logrus" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/util/sets" + + "github.com/kubewharf/kelemetry/pkg/util/zconstants" +) + +type Key struct { + Cluster string `json:"cluster"` + Group string `json:"group"` + Resource string `json:"resource"` + Namespace string `json:"namespace"` + Name string `json:"name"` +} + +func (key Key) Clone() Key { + return Key{ + Cluster: strings.Clone(key.Cluster), + Group: strings.Clone(key.Group), + Resource: strings.Clone(key.Resource), + Namespace: strings.Clone(key.Namespace), + Name: strings.Clone(key.Name), + } +} + +func (key Key) GroupResource() schema.GroupResource { + return schema.GroupResource{Group: key.Group, Resource: key.Resource} +} + +func (key Key) String() string { + return fmt.Sprintf("%s/%s/%s/%s/%s", key.Cluster, key.Group, key.Resource, key.Namespace, key.Name) +} + +func (key Key) AsFields(prefix string) logrus.Fields { + return logrus.Fields{ + prefix + "Cluster": key.Cluster, + prefix + "Group": key.Group, + prefix + "Resource": key.Resource, + prefix + "Namespace": key.Namespace, + prefix + "Name": key.Name, + } +} + +func FromMap(tags map[string]string) (key Key, ok bool) { + for mapKey, field := range map[string]*string{ + "cluster": &key.Cluster, + "group": &key.Group, + "resource": &key.Resource, + "namespace": &key.Namespace, + "name": &key.Name, + } { + *field, ok = tags[mapKey] + if !ok { + return key, false + } + } + + return key, true +} + +func FromSpan(span *model.Span) (Key, bool) { + tags := model.KeyValues(span.Tags) + traceSource, hasTraceSource := tags.FindByKey(zconstants.TraceSource) + if !hasTraceSource || traceSource.VStr != zconstants.TraceSourceObject { + return Key{}, false + } + + cluster, _ := tags.FindByKey("cluster") + group, _ := tags.FindByKey("group") + resource, _ := tags.FindByKey("resource") + namespace, _ := tags.FindByKey("namespace") + name, _ := tags.FindByKey("name") + key := Key{ + Cluster: cluster.VStr, + Group: group.VStr, + Resource: resource.VStr, + Namespace: namespace.VStr, + Name: name.VStr, + } + return key, true +} + +func FromSpans(spans []*model.Span) sets.Set[Key] { + keys := sets.New[Key]() + + for _, span := range spans { + if key, ok := FromSpan(span); ok { + keys.Insert(key) + } + } + return keys +} + +type VersionedKey struct { + Key + Version string `json:"version"` +} + +func (key VersionedKey) Clone() VersionedKey { + return VersionedKey{ + Key: key.Key.Clone(), + Version: strings.Clone(key.Version), + } +} + +func (key VersionedKey) GroupVersionResource() schema.GroupVersionResource { + return key.GroupResource().WithVersion(key.Version) +} + +func (key VersionedKey) GroupVersion() schema.GroupVersion { + return schema.GroupVersion{Group: key.Group, Version: key.Version} +} + +func FromObject(object metav1.Object, cluster string, gvr schema.GroupVersionResource) VersionedKey { + return VersionedKey{ + Key: Key{ + Cluster: cluster, + Group: gvr.Group, + Resource: gvr.Resource, + Namespace: object.GetNamespace(), + Name: object.GetName(), + }, + Version: gvr.Version, + } +} diff --git a/pkg/util/object/rich.go b/pkg/util/object/rich.go new file mode 100644 index 00000000..fd30b84f --- /dev/null +++ b/pkg/util/object/rich.go @@ -0,0 +1,82 @@ +// Copyright 2023 The Kelemetry Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package utilobject + +import ( + "strings" + + "github.com/sirupsen/logrus" + "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" + "k8s.io/apimachinery/pkg/runtime/schema" + "k8s.io/apimachinery/pkg/types" + auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" +) + +type Rich struct { + VersionedKey + + Uid types.UID + + Raw *unstructured.Unstructured `json:"-"` +} + +// Returns a new Rich reference with all strings cloned again +// to ensure this object does not reference more data than it needs. +// +// Does not copy the Raw field. +func (ref Rich) Clone() Rich { + return Rich{ + VersionedKey: ref.VersionedKey.Clone(), + Uid: types.UID(strings.Clone(string(ref.Uid))), + } +} + +func (ref Rich) String() string { + return ref.Key.String() +} + +func (ref Rich) AsFields(prefix string) logrus.Fields { + fields := ref.Key.AsFields(prefix) + fields[prefix+"Uid"] = ref.Uid + return fields +} + +func RichFromUnstructured( + uns *unstructured.Unstructured, + cluster string, + gvr schema.GroupVersionResource, +) Rich { + return Rich{ + VersionedKey: FromObject(uns, cluster, gvr), + Uid: uns.GetUID(), + Raw: uns, + } +} + +func RichFromAudit(object *auditv1.ObjectReference, cluster string) Rich { + return Rich{ + VersionedKey: VersionedKey{ + Key: Key{ + Cluster: cluster, + Group: object.APIGroup, + Resource: object.Resource, + Namespace: object.Namespace, + Name: object.Name, + }, + Version: object.APIVersion, + }, + Uid: object.UID, + } +} diff --git a/pkg/util/object_ref.go b/pkg/util/object_ref.go deleted file mode 100644 index 6bd9dbcc..00000000 --- a/pkg/util/object_ref.go +++ /dev/null @@ -1,101 +0,0 @@ -// Copyright 2023 The Kelemetry Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package util - -import ( - "fmt" - "strings" - - "github.com/sirupsen/logrus" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" - "k8s.io/apimachinery/pkg/types" - auditv1 "k8s.io/apiserver/pkg/apis/audit/v1" -) - -type ObjectRef struct { - Cluster string - - schema.GroupVersionResource - - Namespace string - Name string - - Uid types.UID - - Raw *unstructured.Unstructured `json:"-"` -} - -// Returns a new ObjectRef with all strings cloned again -// to ensure this object does not reference more data than it needs. -// -// Does not copy the Raw field. -func (ref ObjectRef) Clone() ObjectRef { - return ObjectRef{ - Cluster: strings.Clone(ref.Cluster), - GroupVersionResource: schema.GroupVersionResource{ - Group: strings.Clone(ref.Group), - Version: strings.Clone(ref.Version), - Resource: strings.Clone(ref.Resource), - }, - Namespace: strings.Clone(ref.Namespace), - Name: strings.Clone(ref.Name), - Uid: types.UID(strings.Clone(string(ref.Uid))), - } -} - -func (ref ObjectRef) String() string { - return fmt.Sprintf("%s/%s/%s/%s/%s", ref.Cluster, ref.Group, ref.Resource, ref.Namespace, ref.Name) -} - -func (ref ObjectRef) AsFields(prefix string) logrus.Fields { - return logrus.Fields{ - prefix + "Cluster": ref.Cluster, - prefix + "Group": ref.Group, - prefix + "Resource": ref.Resource, - prefix + "Namespace": ref.Namespace, - prefix + "Name": ref.Name, - prefix + "Uid": ref.Uid, - } -} - -func ObjectRefFromUnstructured( - uns *unstructured.Unstructured, - cluster string, - gvr schema.GroupVersionResource, -) ObjectRef { - return ObjectRef{ - Cluster: cluster, - GroupVersionResource: gvr, - Namespace: uns.GetNamespace(), - Name: uns.GetName(), - Uid: uns.GetUID(), - Raw: uns, - } -} - -func ObjectRefFromAudit(object *auditv1.ObjectReference, cluster string, uid types.UID) ObjectRef { - return ObjectRef{ - Cluster: cluster, - GroupVersionResource: schema.GroupVersionResource{ - Group: object.APIGroup, - Version: object.APIVersion, - Resource: object.Resource, - }, - Namespace: object.Namespace, - Name: object.Name, - Uid: uid, - } -}