Skip to content

Commit

Permalink
add protobuf object handling to validating round tripper
Browse files Browse the repository at this point in the history
Signed-off-by: Per Goncalves da Silva <[email protected]>
  • Loading branch information
Per Goncalves da Silva committed Jan 13, 2025
1 parent 74a356d commit cc4930e
Show file tree
Hide file tree
Showing 4 changed files with 80 additions and 21 deletions.
2 changes: 1 addition & 1 deletion cmd/olm/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func main() {
config := mgr.GetConfig()

// create a config that validates we're creating objects with labels
validatingConfig := validatingroundtripper.Wrap(config)
validatingConfig := validatingroundtripper.Wrap(config, mgr.GetScheme())

versionedConfigClient, err := configclientset.NewForConfig(config)
if err != nil {
Expand Down
6 changes: 5 additions & 1 deletion pkg/controller/operators/catalog/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"time"

apiregistrationv1 "k8s.io/kube-aggregator/pkg/apis/apiregistration/v1"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/labeller"
"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/operators/validatingroundtripper"
errorwrap "github.com/pkg/errors"
Expand Down Expand Up @@ -149,7 +151,9 @@ func NewOperator(ctx context.Context, kubeconfigPath string, clock utilclock.Clo
}

// create a config that validates we're creating objects with labels
validatingConfig := validatingroundtripper.Wrap(config)
_ = apiextensionsv1.AddToScheme(scheme) // required by opClient
_ = apiregistrationv1.AddToScheme(scheme) // required by opClient
validatingConfig := validatingroundtripper.Wrap(config, scheme)

// Create a new client for dynamic types (CRs)
dynamicClient, err := dynamic.NewForConfig(validatingConfig)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,13 @@ package validatingroundtripper

import (
"fmt"
"io"
"net/http"
"os"

"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/serializer"

"github.com/operator-framework/operator-lifecycle-manager/pkg/controller/install"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/util/yaml"
Expand All @@ -13,23 +17,71 @@ import (

type validatingRoundTripper struct {
delegate http.RoundTripper
codecs serializer.CodecFactory
}

func (rt *validatingRoundTripper) decodeYAMLOrJSON(body io.Reader) (*unstructured.Unstructured, error) {
dec := yaml.NewYAMLOrJSONDecoder(body, 10)
unstructuredObject := &unstructured.Unstructured{}
if err := dec.Decode(unstructuredObject); err != nil {
return nil, fmt.Errorf("error decoding yaml/json object to an unstructured object: %w", err)
}
return unstructuredObject, nil
}

func (rt *validatingRoundTripper) decodeProtobuf(body io.Reader) (*unstructured.Unstructured, error) {
data, err := io.ReadAll(body)
if err != nil {
panic(fmt.Errorf("failed to read request body: %w", err))
}

decoder := rt.codecs.UniversalDeserializer()
obj, _, err := decoder.Decode(data, nil, nil)
if err != nil {
return nil, fmt.Errorf("failed to decode protobuf data: %w", err)
}

unstructuredObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(obj)
if err != nil {
return nil, fmt.Errorf("failed to convert object to unstructured: %w", err)
}

return &unstructured.Unstructured{Object: unstructuredObj}, nil
}

func (rt *validatingRoundTripper) decodeRequestBody(req *http.Request) *unstructured.Unstructured {
b, err := req.GetBody()
if err != nil {
panic(fmt.Errorf("failed to get request body: %w", err))
}
defer b.Close()

var unstructuredObject *unstructured.Unstructured
switch req.Header.Get("Content-Type") {
case "application/vnd.kubernetes.protobuf":
unstructuredObject, err = rt.decodeProtobuf(b)
default:
unstructuredObject, err = rt.decodeYAMLOrJSON(b)
}

if err != nil {
panic(fmt.Errorf("failed to decode request body: %w", err))
}

return unstructuredObject
}

func (rt *validatingRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
if req.Method == "POST" {
b, err := req.GetBody()
if err != nil {
panic(err)
}
dec := yaml.NewYAMLOrJSONDecoder(b, 10)
unstructuredObject := &unstructured.Unstructured{}
if err := dec.Decode(unstructuredObject); err != nil {
panic(fmt.Errorf("error decoding object to an unstructured object: %w", err))
}
unstructuredObject := rt.decodeRequestBody(req)
gvk := unstructuredObject.GroupVersionKind()
if gvk.Kind != "Event" {
if labels := unstructuredObject.GetLabels(); labels[install.OLMManagedLabelKey] != install.OLMManagedLabelValue {
panic(fmt.Errorf("%s.%s/%v %s/%s does not have labels[%s]=%s", gvk.Kind, gvk.Group, gvk.Version, unstructuredObject.GetNamespace(), unstructuredObject.GetName(), install.OLMManagedLabelKey, install.OLMManagedLabelValue))
labels := unstructuredObject.GetLabels()
if labels[install.OLMManagedLabelKey] != install.OLMManagedLabelValue {
panic(fmt.Errorf("%s.%s/%v %s/%s does not have labels[%s]=%s",
gvk.Kind, gvk.Group, gvk.Version,
unstructuredObject.GetNamespace(), unstructuredObject.GetName(),
install.OLMManagedLabelKey, install.OLMManagedLabelValue))
}
}
}
Expand All @@ -40,14 +92,17 @@ var _ http.RoundTripper = (*validatingRoundTripper)(nil)

// Wrap is meant to be used in developer environments and CI to make it easy to find places
// where we accidentally create Kubernetes objects without our management label.
func Wrap(cfg *rest.Config) *rest.Config {
func Wrap(cfg *rest.Config, scheme *runtime.Scheme) *rest.Config {
if _, set := os.LookupEnv("CI"); !set {
return cfg
}

cfgCopy := *cfg
cfgCopy.Wrap(func(rt http.RoundTripper) http.RoundTripper {
return &validatingRoundTripper{delegate: rt}
return &validatingRoundTripper{
delegate: rt,
codecs: serializer.NewCodecFactory(scheme),
}
})
return &cfgCopy
}
12 changes: 6 additions & 6 deletions pkg/lib/operatorclient/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,25 +172,25 @@ func NewClientFromConfig(kubeconfig string, logger *logrus.Logger) ClientInterfa
}

func NewClientFromRestConfig(config *rest.Config) (client ClientInterface, err error) {
kubernetes, err := kubernetes.NewForConfig(config)
k8s, err := kubernetes.NewForConfig(config)
if err != nil {
return
}

apiextensions, err := apiextensions.NewForConfig(config)
apiext, err := apiextensions.NewForConfig(config)
if err != nil {
return
}

apiregistration, err := apiregistration.NewForConfig(config)
apireg, err := apiregistration.NewForConfig(config)
if err != nil {
return
}

client = &Client{
kubernetes,
apiextensions,
apiregistration,
k8s,
apiext,
apireg,
}

return
Expand Down

0 comments on commit cc4930e

Please sign in to comment.