Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: unify the ObjectRef and GroupingKey types #158

Merged
merged 4 commits into from
Aug 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 9 additions & 9 deletions pkg/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import (
"github.com/kubewharf/kelemetry/pkg/aggregator/tracer"
"github.com/kubewharf/kelemetry/pkg/manager"
"github.com/kubewharf/kelemetry/pkg/metrics"
"github.com/kubewharf/kelemetry/pkg/util"
utilobject "github.com/kubewharf/kelemetry/pkg/util/object"
"github.com/kubewharf/kelemetry/pkg/util/zconstants"
)

Expand Down Expand Up @@ -109,7 +109,7 @@ type Aggregator interface {
// it waits for the primary event to be created and takes it as the parent.
// If the primary event does not get created after options.subObjectPrimaryBackoff, this event is promoted as primary.
// If multiple primary events are sent, the slower one (by SpanCache-authoritative timing) is demoted.
Send(ctx context.Context, object util.ObjectRef, event *aggregatorevent.Event, subObjectId *SubObjectId) error
Send(ctx context.Context, object utilobject.Rich, event *aggregatorevent.Event, subObjectId *SubObjectId) error
}

type SubObjectId struct {
Expand Down Expand Up @@ -183,7 +183,7 @@ func (aggregator *aggregator) Close(ctx context.Context) error { return nil }

func (aggregator *aggregator) Send(
ctx context.Context,
object util.ObjectRef,
object utilobject.Rich,
event *aggregatorevent.Event,
subObjectId *SubObjectId,
) (err error) {
Expand Down Expand Up @@ -370,12 +370,12 @@ func (aggregator *aggregator) Send(

func (aggregator *aggregator) ensureObjectSpan(
ctx context.Context,
object util.ObjectRef,
object utilobject.Rich,
eventTime time.Time,
) (tracer.SpanContext, error) {
return aggregator.getOrCreateSpan(ctx, object, eventTime, func() (_ tracer.SpanContext, err error) {
// try to associate a parent object
var parent *util.ObjectRef
var parent *utilobject.Rich

for _, linker := range aggregator.Linkers.Impls {
parent = linker.Lookup(ctx, object)
Expand All @@ -395,7 +395,7 @@ func (aggregator *aggregator) ensureObjectSpan(

func (aggregator *aggregator) getOrCreateSpan(
ctx context.Context,
object util.ObjectRef,
object utilobject.Rich,
eventTime time.Time,
parentGetter func() (tracer.SpanContext, error),
) (tracer.SpanContext, error) {
Expand Down Expand Up @@ -530,7 +530,7 @@ func (aggregator *aggregator) getOrCreateSpan(

func (aggregator *aggregator) createSpan(
ctx context.Context,
object util.ObjectRef,
object utilobject.Rich,
nestLevel string,
eventTime time.Time,
parent tracer.SpanContext,
Expand Down Expand Up @@ -580,11 +580,11 @@ func (aggregator *aggregator) createSpan(
return spanContext, nil
}

func (aggregator *aggregator) expiringSpanCacheKey(object util.ObjectRef, timestamp time.Time) string {
func (aggregator *aggregator) expiringSpanCacheKey(object utilobject.Rich, timestamp time.Time) string {
expiringWindow := timestamp.Unix() / int64(aggregator.options.spanTtl.Seconds())
return aggregator.spanCacheKey(object, fmt.Sprintf("field=object,window=%d", expiringWindow))
}

func (aggregator *aggregator) spanCacheKey(object util.ObjectRef, subObjectId string) string {
func (aggregator *aggregator) spanCacheKey(object utilobject.Rich, subObjectId string) string {
return fmt.Sprintf("%s/%s", object.String(), subObjectId)
}
4 changes: 2 additions & 2 deletions pkg/aggregator/eventdecorator/decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@ import (
"context"

"github.com/kubewharf/kelemetry/pkg/aggregator/aggregatorevent"
"github.com/kubewharf/kelemetry/pkg/util"
utilobject "github.com/kubewharf/kelemetry/pkg/util/object"
)

type Decorator interface {
Decorate(ctx context.Context, object util.ObjectRef, event *aggregatorevent.Event)
Decorate(ctx context.Context, object utilobject.Rich, event *aggregatorevent.Event)
}
4 changes: 2 additions & 2 deletions pkg/aggregator/eventdecorator/eventtagger/eventtagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"github.com/kubewharf/kelemetry/pkg/aggregator/eventdecorator"
"github.com/kubewharf/kelemetry/pkg/aggregator/resourcetagger"
"github.com/kubewharf/kelemetry/pkg/manager"
"github.com/kubewharf/kelemetry/pkg/util"
utilobject "github.com/kubewharf/kelemetry/pkg/util/object"
)

func init() {
Expand Down Expand Up @@ -71,7 +71,7 @@ func (d *eventTagDecorator) Init() error {
func (d *eventTagDecorator) Start(ctx context.Context) error { return nil }
func (d *eventTagDecorator) Close(ctx context.Context) error { return nil }

func (d *eventTagDecorator) Decorate(ctx context.Context, object util.ObjectRef, event *aggregatorevent.Event) {
func (d *eventTagDecorator) Decorate(ctx context.Context, object utilobject.Rich, event *aggregatorevent.Event) {
if event == nil {
return
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/aggregator/linker/linker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ package linker
import (
"context"

"github.com/kubewharf/kelemetry/pkg/util"
utilobject "github.com/kubewharf/kelemetry/pkg/util/object"
)

type Linker interface {
Lookup(ctx context.Context, object util.ObjectRef) *util.ObjectRef
Lookup(ctx context.Context, object utilobject.Rich) *utilobject.Rich
}
4 changes: 2 additions & 2 deletions pkg/aggregator/objectspandecorator/decorator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,9 @@ package objectspandecorator
import (
"context"

"github.com/kubewharf/kelemetry/pkg/util"
utilobject "github.com/kubewharf/kelemetry/pkg/util/object"
)

type Decorator interface {
Decorate(ctx context.Context, object util.ObjectRef, traceSource string, tags map[string]string)
Decorate(ctx context.Context, object utilobject.Rich, traceSource string, tags map[string]string)
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ import (
"github.com/kubewharf/kelemetry/pkg/aggregator/objectspandecorator"
"github.com/kubewharf/kelemetry/pkg/aggregator/resourcetagger"
"github.com/kubewharf/kelemetry/pkg/manager"
"github.com/kubewharf/kelemetry/pkg/util"
utilobject "github.com/kubewharf/kelemetry/pkg/util/object"
)

func init() {
Expand Down Expand Up @@ -54,7 +54,7 @@ func (d *ObjectSpanTag) Init() error { return nil }
func (d *ObjectSpanTag) Start(ctx context.Context) error { return nil }
func (d *ObjectSpanTag) Close(ctx context.Context) error { return nil }

func (d *ObjectSpanTag) Decorate(ctx context.Context, object util.ObjectRef, traceSource string, tags map[string]string) {
func (d *ObjectSpanTag) Decorate(ctx context.Context, object utilobject.Rich, traceSource string, tags map[string]string) {
if tags == nil {
return
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/aggregator/resourcetagger/resource_tagger.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/kubewharf/kelemetry/pkg/k8s/objectcache"
"github.com/kubewharf/kelemetry/pkg/manager"
"github.com/kubewharf/kelemetry/pkg/metrics"
"github.com/kubewharf/kelemetry/pkg/util"
utilobject "github.com/kubewharf/kelemetry/pkg/util/object"
)

func init() {
Expand Down Expand Up @@ -133,7 +133,7 @@ func (d *ResourceTagger) registerResource(gr schema.GroupResource, tagPathMappin
}
}

func (d *ResourceTagger) DecorateTag(ctx context.Context, object util.ObjectRef, traceSource string, tags map[string]any) {
func (d *ResourceTagger) DecorateTag(ctx context.Context, object utilobject.Rich, traceSource string, tags map[string]any) {
if tags == nil {
return
}
Expand All @@ -153,7 +153,7 @@ func (d *ResourceTagger) DecorateTag(ctx context.Context, object util.ObjectRef,
logger.Debug("Fetching dynamic object for tag decorator")

var err error
raw, err = d.ObjectCache.Get(ctx, object)
raw, err = d.ObjectCache.Get(ctx, object.VersionedKey)
if err != nil {
tagMetric.Result = "FetchErr"
logger.WithError(err).Error("cannot fetch object value")
Expand Down
21 changes: 5 additions & 16 deletions pkg/annotationlinker/linker.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,13 @@ import (

"github.com/sirupsen/logrus"
"github.com/spf13/pflag"
"k8s.io/apimachinery/pkg/runtime/schema"

"github.com/kubewharf/kelemetry/pkg/aggregator/linker"
"github.com/kubewharf/kelemetry/pkg/k8s"
"github.com/kubewharf/kelemetry/pkg/k8s/discovery"
"github.com/kubewharf/kelemetry/pkg/k8s/objectcache"
"github.com/kubewharf/kelemetry/pkg/manager"
"github.com/kubewharf/kelemetry/pkg/util"
utilobject "github.com/kubewharf/kelemetry/pkg/util/object"
)

func init() {
Expand Down Expand Up @@ -59,7 +58,7 @@ func (ctrl *controller) Init() error { return nil }
func (ctrl *controller) Start(ctx context.Context) error { return nil }
func (ctrl *controller) Close(ctx context.Context) error { return nil }

func (ctrl *controller) Lookup(ctx context.Context, object util.ObjectRef) *util.ObjectRef {
func (ctrl *controller) Lookup(ctx context.Context, object utilobject.Rich) *utilobject.Rich {
raw := object.Raw

logger := ctrl.Logger.WithFields(object.AsFields("object"))
Expand All @@ -68,7 +67,7 @@ func (ctrl *controller) Lookup(ctx context.Context, object util.ObjectRef) *util
logger.Debug("Fetching dynamic object")

var err error
raw, err = ctrl.ObjectCache.Get(ctx, object)
raw, err = ctrl.ObjectCache.Get(ctx, object.VersionedKey)

if err != nil {
logger.WithError(err).Error("cannot fetch object value")
Expand All @@ -93,20 +92,10 @@ func (ctrl *controller) Lookup(ctx context.Context, object util.ObjectRef) *util
ref.Cluster = object.Cluster
}

objectRef := &util.ObjectRef{
Cluster: ref.Cluster,
GroupVersionResource: schema.GroupVersionResource{
Group: ref.GroupVersionResource.Group,
Version: ref.GroupVersionResource.Version,
Resource: ref.GroupVersionResource.Resource,
},
Namespace: ref.Namespace,
Name: ref.Name,
Uid: ref.Uid,
}
objectRef := ref.ToRich()
logger.WithField("parent", objectRef).Debug("Resolved parent")

return objectRef
return &objectRef
}

return nil
Expand Down
18 changes: 18 additions & 0 deletions pkg/annotationlinker/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package annotationlinker
import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/types"

utilobject "github.com/kubewharf/kelemetry/pkg/util/object"
)

const LinkAnnotation = "kelemetry.kubewharf.io/parent-link"
Expand All @@ -31,3 +33,19 @@ type ParentLink struct {

Uid types.UID `json:"uid"`
}

func (ln ParentLink) ToRich() utilobject.Rich {
return utilobject.Rich{
VersionedKey: utilobject.VersionedKey{
Key: utilobject.Key{
Cluster: ln.Cluster,
Group: ln.Group,
Resource: ln.Resource,
Namespace: ln.Namespace,
Name: ln.Name,
},
Version: ln.Version,
},
Uid: ln.Uid,
}
}
14 changes: 2 additions & 12 deletions pkg/audit/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ import (
"github.com/kubewharf/kelemetry/pkg/k8s/discovery"
"github.com/kubewharf/kelemetry/pkg/manager"
"github.com/kubewharf/kelemetry/pkg/metrics"
"github.com/kubewharf/kelemetry/pkg/util"
utilobject "github.com/kubewharf/kelemetry/pkg/util/object"
"github.com/kubewharf/kelemetry/pkg/util/shutdown"
"github.com/kubewharf/kelemetry/pkg/util/zconstants"
)
Expand Down Expand Up @@ -210,17 +210,7 @@ func (recv *receiver) handleItem(
}
}

objectRef := util.ObjectRef{
Cluster: message.Cluster,
GroupVersionResource: schema.GroupVersionResource{
Group: message.ObjectRef.APIGroup,
Version: message.ObjectRef.APIVersion,
Resource: message.ObjectRef.Resource,
},
Namespace: message.ObjectRef.Namespace,
Name: message.ObjectRef.Name,
Uid: message.ObjectRef.UID,
}
objectRef := utilobject.RichFromAudit(message.ObjectRef, message.Cluster)

if message.ResponseObject != nil {
objectRef.Raw = &unstructured.Unstructured{
Expand Down
42 changes: 21 additions & 21 deletions pkg/diff/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"github.com/kubewharf/kelemetry/pkg/k8s/objectcache"
"github.com/kubewharf/kelemetry/pkg/manager"
"github.com/kubewharf/kelemetry/pkg/metrics"
"github.com/kubewharf/kelemetry/pkg/util"
utilobject "github.com/kubewharf/kelemetry/pkg/util/object"
"github.com/kubewharf/kelemetry/pkg/util/shutdown"
)

Expand Down Expand Up @@ -114,15 +114,15 @@ func (api *api) handleGet(ctx *gin.Context) error {
cluster = clusterQuery
}

raw, err := api.ObjectCache.Get(ctx, util.ObjectRef{
Cluster: cluster,
GroupVersionResource: schema.GroupVersionResource{
Group: group,
Version: version,
Resource: resource,
raw, err := api.ObjectCache.Get(ctx, utilobject.VersionedKey{
Key: utilobject.Key{
Cluster: cluster,
Group: group,
Resource: resource,
Namespace: namespace,
Name: name,
},
Namespace: namespace,
Name: name,
Version: version,
})
if err != nil {
return err
Expand All @@ -131,13 +131,13 @@ func (api *api) handleGet(ctx *gin.Context) error {
return ctx.AbortWithError(404, fmt.Errorf("object does not exist"))
}

object := util.ObjectRefFromUnstructured(raw, cluster, schema.GroupVersionResource{
object := utilobject.RichFromUnstructured(raw, cluster, schema.GroupVersionResource{
Group: group,
Version: version,
Resource: resource,
})

patch, err := api.DiffCache.Fetch(ctx, object, rv, &rv)
patch, err := api.DiffCache.Fetch(ctx, object.Key, rv, &rv)
if err != nil || patch == nil {
return ctx.AbortWithError(404, fmt.Errorf("patch not found for rv: %w", err))
}
Expand Down Expand Up @@ -165,15 +165,15 @@ func (api *api) handleScan(ctx *gin.Context) error {
limit = parsedLimit
}

raw, err := api.ObjectCache.Get(ctx, util.ObjectRef{
Cluster: cluster,
GroupVersionResource: schema.GroupVersionResource{
Group: group,
Version: version,
Resource: resource,
raw, err := api.ObjectCache.Get(ctx, utilobject.VersionedKey{
Key: utilobject.Key{
Cluster: cluster,
Group: group,
Resource: resource,
Namespace: namespace,
Name: name,
},
Namespace: namespace,
Name: name,
Version: version,
})
if err != nil {
return err
Expand All @@ -182,12 +182,12 @@ func (api *api) handleScan(ctx *gin.Context) error {
return ctx.AbortWithError(404, fmt.Errorf("object does not exist"))
}

object := util.ObjectRefFromUnstructured(raw, cluster, schema.GroupVersionResource{
object := utilobject.RichFromUnstructured(raw, cluster, schema.GroupVersionResource{
Group: group,
Version: version,
Resource: resource,
})
list, err := api.DiffCache.List(ctx, object, limit)
list, err := api.DiffCache.List(ctx, object.Key, limit)
if err != nil {
return err
}
Expand Down
Loading
Loading