Skip to content

Commit

Permalink
feat: make oidc discovery url configurable (knative#8145)
Browse files Browse the repository at this point in the history
* feat: make oidc discovery url configurable

Signed-off-by: Calum Murray <[email protected]>

* Use Trustbundles and public CAs, when issuer URL is not kubernetes.default.svc

* refactor: pass configmap watcher to auth verifier constructor instead of feature flags

Signed-off-by: Cali0707 <[email protected]>

* feat: add rw lock for provider access

Signed-off-by: Cali0707 <[email protected]>

* Fix verifier rwmutex

---------

Signed-off-by: Calum Murray <[email protected]>
Signed-off-by: Cali0707 <[email protected]>
Co-authored-by: Christoph Stäbler <[email protected]>
  • Loading branch information
Cali0707 and creydr authored Oct 15, 2024
1 parent eff8465 commit 33a9027
Show file tree
Hide file tree
Showing 11 changed files with 198 additions and 57 deletions.
8 changes: 4 additions & 4 deletions cmd/broker/filter/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,8 @@ func main() {
// Watch the observability config map and dynamically update request logs.
configMapWatcher.Watch(logging.ConfigMapName(), logging.UpdateLevelFromConfigMap(sl, atomicLevel, component))

trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())

var featureStore *feature.Store
var handler *filter.Handler

Expand All @@ -134,7 +136,6 @@ func main() {
}
handler.EventTypeCreator = autoCreate
}

})
featureStore.WatchConfigs(configMapWatcher)

Expand All @@ -154,9 +155,8 @@ func main() {
oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
// We are running both the receiver (takes messages in from the Broker) and the dispatcher (send
// the messages to the triggers' subscribers) in this binary.
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister())
trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())
handler, err = filter.NewHandler(logger, authVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), subscriptioninformer.Get(ctx), reporter, trustBundleConfigMapInformer, ctxFunc)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, configMapWatcher)
handler, err = filter.NewHandler(logger, authVerifier, oidcTokenProvider, triggerinformer.Get(ctx), brokerinformer.Get(ctx), subscriptioninformer.Get(ctx), reporter, trustBundleConfigMapLister, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
7 changes: 4 additions & 3 deletions cmd/broker/ingress/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,8 @@ func main() {
logger.Fatal("Error setting up trace publishing", zap.Error(err))
}

trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())

var featureStore *feature.Store
var handler *ingress.Handler

Expand All @@ -168,9 +170,8 @@ func main() {
reporter := ingress.NewStatsReporter(env.ContainerName, kmeta.ChildName(env.PodName, uuid.New().String()))

oidcTokenProvider := auth.NewOIDCTokenProvider(ctx)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister())
trustBundleConfigMapInformer := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())
handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, authVerifier, oidcTokenProvider, trustBundleConfigMapInformer, ctxFunc)
authVerifier := auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, configMapWatcher)
handler, err = ingress.NewHandler(logger, reporter, broker.TTLDefaulter(logger, int32(env.MaxTTL)), brokerInformer, authVerifier, oidcTokenProvider, trustBundleConfigMapLister, ctxFunc)
if err != nil {
logger.Fatal("Error creating Handler", zap.Error(err))
}
Expand Down
18 changes: 13 additions & 5 deletions cmd/jobsink/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ import (
"net/http"
"strings"

configmapinformer "knative.dev/pkg/client/injection/kube/informers/core/v1/configmap/filtered"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"

"github.com/cloudevents/sdk-go/v2/binding"
cehttp "github.com/cloudevents/sdk-go/v2/protocol/http"
"go.uber.org/zap"
Expand Down Expand Up @@ -70,9 +73,13 @@ func main() {

cfg := injection.ParseAndGetRESTConfigOrDie()
ctx = injection.WithConfig(ctx, cfg)
ctx = filteredFactory.WithSelectors(ctx,
eventingtls.TrustBundleLabelSelector,
)

ctx, informers := injection.Default.SetupInformers(ctx, cfg)
ctx = injection.WithConfig(ctx, cfg)

loggingConfig, err := cmdbroker.GetLoggingConfig(ctx, system.Namespace(), logging.ConfigMapName())
if err != nil {
log.Fatal("Error loading/parsing logging configuration:", err)
Expand Down Expand Up @@ -104,21 +111,22 @@ func main() {

logger.Info("Starting the JobSink Ingress")

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
logger.Info("Updated", zap.String("name", name), zap.Any("value", value))
})
trustBundleConfigMapLister := configmapinformer.Get(ctx, eventingtls.TrustBundleLabelSelector).Lister().ConfigMaps(system.Namespace())
var h *Handler

featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"))
featureStore.WatchConfigs(configMapWatcher)

// Decorate contexts with the current state of the feature config.
ctxFunc := func(ctx context.Context) context.Context {
return logging.WithLogger(featureStore.ToContext(ctx), sl)
}

h := &Handler{
h = &Handler{
k8s: kubeclient.Get(ctx),
lister: jobsink.Get(ctx).Lister(),
withContext: ctxFunc,
authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister()),
authVerifier: auth.NewVerifier(ctx, eventpolicyinformer.Get(ctx).Lister(), trustBundleConfigMapLister, configMapWatcher),
}

tlsConfig, err := getServerTLSConfig(ctx)
Expand Down
19 changes: 18 additions & 1 deletion pkg/apis/feature/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ const (
// This configuration is applied when there is no EventPolicy with a "to" referencing a given
// resource.
AuthorizationAllowSameNamespace Flag = "Allow-Same-Namespace"

// DefaultOIDCDiscoveryURL is the default OIDC Discovery URL used in most Kubernetes clusters.
DefaultOIDCDiscoveryBaseURL Flag = "https://kubernetes.default.svc"
)

// Flags is a map containing all the enabled/disabled flags for the experimental features.
Expand All @@ -81,6 +84,7 @@ func newDefaults() Flags {
EvenTypeAutoCreate: Disabled,
NewAPIServerFilters: Disabled,
AuthorizationDefaultMode: AuthorizationAllowSameNamespace,
OIDCDiscoveryBaseURL: DefaultOIDCDiscoveryBaseURL,
}
}

Expand Down Expand Up @@ -134,6 +138,19 @@ func (e Flags) IsAuthorizationDefaultModeSameNamespace() bool {
return e != nil && e[AuthorizationDefaultMode] == AuthorizationAllowSameNamespace
}

func (e Flags) OIDCDiscoveryBaseURL() string {
if e == nil {
return string(DefaultOIDCDiscoveryBaseURL)
}

discoveryUrl, ok := e[OIDCDiscoveryBaseURL]
if !ok {
return string(DefaultOIDCDiscoveryBaseURL)
}

return string(discoveryUrl)
}

func (e Flags) String() string {
return fmt.Sprintf("%+v", map[string]Flag(e))
}
Expand Down Expand Up @@ -183,7 +200,7 @@ func NewFlagsConfigFromMap(data map[string]string) (Flags, error) {
flags[sanitizedKey] = AuthorizationDenyAll
} else if sanitizedKey == AuthorizationDefaultMode && strings.EqualFold(v, string(AuthorizationAllowSameNamespace)) {
flags[sanitizedKey] = AuthorizationAllowSameNamespace
} else if strings.Contains(k, NodeSelectorLabel) {
} else if strings.Contains(k, NodeSelectorLabel) || sanitizedKey == OIDCDiscoveryBaseURL {
flags[sanitizedKey] = Flag(v)
} else {
flags[k] = Flag(v)
Expand Down
2 changes: 2 additions & 0 deletions pkg/apis/feature/features_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ func TestGetFlags(t *testing.T) {
nodeSelector := flags.NodeSelector()
expectedNodeSelector := map[string]string{"testkey": "testvalue", "testkey1": "testvalue1", "testkey2": "testvalue2"}
require.Equal(t, expectedNodeSelector, nodeSelector)

require.Equal(t, flags.OIDCDiscoveryBaseURL(), "https://oidc.eks.eu-west-1.amazonaws.com/id/1")
}

func TestShouldNotOverrideDefaults(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/feature/flag_names.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,4 +28,5 @@ const (
CrossNamespaceEventLinks = "cross-namespace-event-links"
NewAPIServerFilters = "new-apiserversource-filters"
AuthorizationDefaultMode = "default-authorization-mode"
OIDCDiscoveryBaseURL = "oidc-discovery-base-url"
)
1 change: 1 addition & 0 deletions pkg/apis/feature/testdata/config-features.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,4 @@ data:
apiserversources-nodeselector-testkey: testvalue
apiserversources-nodeselector-testkey1: testvalue1
apiserversources-nodeselector-testkey2: testvalue2
oidc-discovery-base-url: "https://oidc.eks.eu-west-1.amazonaws.com/id/1"
103 changes: 77 additions & 26 deletions pkg/auth/verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,19 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"net/http"
"strings"
"sync"
"time"

"go.opencensus.io/plugin/ochttp"
corev1listers "k8s.io/client-go/listers/core/v1"
"knative.dev/eventing/pkg/eventingtls"
"knative.dev/pkg/configmap"
"knative.dev/pkg/network"
"knative.dev/pkg/tracing/propagation/tracecontextb3"

duckv1 "knative.dev/eventing/pkg/apis/duck/v1"
"knative.dev/eventing/pkg/client/listers/eventing/v1alpha1"

Expand All @@ -41,15 +50,13 @@ import (
"knative.dev/pkg/logging"
)

const (
kubernetesOIDCDiscoveryBaseURL = "https://kubernetes.default.svc"
)

type Verifier struct {
logger *zap.SugaredLogger
restConfig *rest.Config
provider *oidc.Provider
eventPolicyLister v1alpha1.EventPolicyLister
logger *zap.SugaredLogger
restConfig *rest.Config
eventPolicyLister v1alpha1.EventPolicyLister
trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister
m sync.RWMutex
provider *oidc.Provider
}

type IDToken struct {
Expand All @@ -61,14 +68,24 @@ type IDToken struct {
AccessTokenHash string
}

func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister) *Verifier {
func NewVerifier(ctx context.Context, eventPolicyLister listerseventingv1alpha1.EventPolicyLister, trustBundleConfigMapLister corev1listers.ConfigMapNamespaceLister, cmw configmap.Watcher) *Verifier {
tokenHandler := &Verifier{
logger: logging.FromContext(ctx).With("component", "oidc-token-handler"),
restConfig: injection.GetConfig(ctx),
eventPolicyLister: eventPolicyLister,
logger: logging.FromContext(ctx).With("component", "oidc-token-handler"),
restConfig: injection.GetConfig(ctx),
eventPolicyLister: eventPolicyLister,
trustBundleConfigMapLister: trustBundleConfigMapLister,
}

if err := tokenHandler.initOIDCProvider(ctx); err != nil {
featureStore := feature.NewStore(logging.FromContext(ctx).Named("feature-config-store"), func(name string, value interface{}) {
if features, ok := value.(feature.Flags); ok {
if err := tokenHandler.initOIDCProvider(ctx, features); err != nil {
tokenHandler.logger.Error(fmt.Sprintf("could not initialize provider after config update. You can ignore this message, when the %s feature is disabled", feature.OIDCAuthentication), zap.Error(err))
}
}
})
featureStore.WatchConfigs(cmw)

if err := tokenHandler.initOIDCProvider(ctx, featureStore.Load()); err != nil {
tokenHandler.logger.Error(fmt.Sprintf("could not initialize provider. You can ignore this message, when the %s feature is disabled", feature.OIDCAuthentication), zap.Error(err))
}

Expand Down Expand Up @@ -196,6 +213,9 @@ func (v *Verifier) verifyAuthZ(ctx context.Context, features feature.Flags, idTo

// verifyJWT verifies the given JWT for the expected audience and returns the parsed ID token.
func (v *Verifier) verifyJWT(ctx context.Context, jwt, audience string) (*IDToken, error) {
v.m.RLock()
defer v.m.RUnlock()

if v.provider == nil {
return nil, fmt.Errorf("provider is nil. Is the OIDC provider config correct?")
}
Expand All @@ -219,29 +239,35 @@ func (v *Verifier) verifyJWT(ctx context.Context, jwt, audience string) (*IDToke
}, nil
}

func (v *Verifier) initOIDCProvider(ctx context.Context) error {
discovery, err := v.getKubernetesOIDCDiscovery()
func (v *Verifier) initOIDCProvider(ctx context.Context, features feature.Flags) error {
httpClient, err := v.getHTTPClient(features)
if err != nil {
return fmt.Errorf("could not get HTTP client: %w", err)
}

discovery, err := v.getKubernetesOIDCDiscovery(features, httpClient)
if err != nil {
return fmt.Errorf("could not load Kubernetes OIDC discovery information: %w", err)
}

if discovery.Issuer != kubernetesOIDCDiscoveryBaseURL {
if discovery.Issuer != features.OIDCDiscoveryBaseURL() {
// in case we have another issuer as the api server:
ctx = oidc.InsecureIssuerURLContext(ctx, discovery.Issuer)
}

httpClient, err := v.getHTTPClientForKubeAPIServer()
if err != nil {
return fmt.Errorf("could not get HTTP client with TLS certs of API server: %w", err)
}
ctx = oidc.ClientContext(ctx, httpClient)

// get OIDC provider
v.provider, err = oidc.NewProvider(ctx, kubernetesOIDCDiscoveryBaseURL)
provider, err := oidc.NewProvider(ctx, features.OIDCDiscoveryBaseURL())
if err != nil {
return fmt.Errorf("could not get OIDC provider: %w", err)
}

// provider is valid, update it
v.m.Lock()
defer v.m.Unlock()
v.provider = provider

v.logger.Debug("updated OIDC provider config", zap.Any("discovery-config", discovery))

return nil
Expand All @@ -256,13 +282,38 @@ func (v *Verifier) getHTTPClientForKubeAPIServer() (*http.Client, error) {
return client, nil
}

func (v *Verifier) getKubernetesOIDCDiscovery() (*openIDMetadata, error) {
client, err := v.getHTTPClientForKubeAPIServer()
if err != nil {
return nil, fmt.Errorf("could not get HTTP client for API server: %w", err)
func (v *Verifier) getHTTPClient(features feature.Flags) (*http.Client, error) {
if features.OIDCDiscoveryBaseURL() == "https://kubernetes.default.svc" {
return v.getHTTPClientForKubeAPIServer()
}

var base = http.DefaultTransport.(*http.Transport).Clone()

clientConfig := eventingtls.ClientConfig{
TrustBundleConfigMapLister: v.trustBundleConfigMapLister,
}

resp, err := client.Get(kubernetesOIDCDiscoveryBaseURL + "/.well-known/openid-configuration")
base.DialTLSContext = func(ctx context.Context, net, addr string) (net.Conn, error) {
tlsConfig, err := eventingtls.GetTLSClientConfig(clientConfig)
if err != nil {
return nil, fmt.Errorf("could not get tls client config: %w", err)
}
return network.DialTLSWithBackOff(ctx, net, addr, tlsConfig)
}

client := &http.Client{
// Add output tracing.
Transport: &ochttp.Transport{
Base: base,
Propagation: tracecontextb3.TraceContextEgress,
},
}

return client, nil
}

func (v *Verifier) getKubernetesOIDCDiscovery(features feature.Flags, client *http.Client) (*openIDMetadata, error) {
resp, err := client.Get(features.OIDCDiscoveryBaseURL() + "/.well-known/openid-configuration")
if err != nil {
return nil, fmt.Errorf("could not get response: %w", err)
}
Expand Down
Loading

0 comments on commit 33a9027

Please sign in to comment.