Skip to content

Commit

Permalink
Enhance Kafka pubsub OIDC auth support to support oauthbearer.extensi…
Browse files Browse the repository at this point in the history
…ons (#3124)

Signed-off-by: ItalyPaleAle <[email protected]>
Co-authored-by: Abhinav Mishra <[email protected]>
  • Loading branch information
ItalyPaleAle and Abhinav Mishra authored Sep 12, 2023
1 parent 304ad6b commit 2cb3c1c
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 31 deletions.
6 changes: 6 additions & 0 deletions bindings/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ authenticationProfiles:
Although not required, this field is recommended.
example: '"openid,kafka-prod"'
default: '"openid"'
- name: oidcExtensions
description: |
String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token.
example: |
{"cluster":"kafka","poolid":"kafkapool"}
type: string
- title: "SASL Authentication"
description: |
Authenticate using SASL.
Expand Down
6 changes: 2 additions & 4 deletions internal/component/kafka/auth.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func updateTLSConfig(config *sarama.Config, metadata *KafkaMetadata) error {
}

func updateOidcAuthInfo(config *sarama.Config, metadata *KafkaMetadata) error {
tokenProvider := newOAuthTokenSource(metadata.OidcTokenEndpoint, metadata.OidcClientID, metadata.OidcClientSecret, metadata.internalOidcScopes)
tokenProvider := metadata.getOAuthTokenSource()

if metadata.TLSCaCert != "" {
err := tokenProvider.addCa(metadata.TLSCaCert)
Expand All @@ -82,11 +82,9 @@ func updateOidcAuthInfo(config *sarama.Config, metadata *KafkaMetadata) error {
}
}

tokenProvider.skipCaVerify = metadata.TLSSkipVerify

config.Net.SASL.Enable = true
config.Net.SASL.Mechanism = sarama.SASLTypeOAuth
config.Net.SASL.TokenProvider = &tokenProvider
config.Net.SASL.TokenProvider = tokenProvider

return nil
}
59 changes: 34 additions & 25 deletions internal/component/kafka/metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ limitations under the License.
package kafka

import (
"encoding/json"
"errors"
"fmt"
"strconv"
Expand Down Expand Up @@ -42,31 +43,33 @@ const (
)

type KafkaMetadata struct {
Brokers string `mapstructure:"brokers"`
internalBrokers []string `mapstructure:"-"`
ConsumerGroup string `mapstructure:"consumerGroup"`
ClientID string `mapstructure:"clientId"`
AuthType string `mapstructure:"authType"`
SaslUsername string `mapstructure:"saslUsername"`
SaslPassword string `mapstructure:"saslPassword"`
SaslMechanism string `mapstructure:"saslMechanism"`
InitialOffset string `mapstructure:"initialOffset"`
internalInitialOffset int64 `mapstructure:"-"`
MaxMessageBytes int `mapstructure:"maxMessageBytes"`
OidcTokenEndpoint string `mapstructure:"oidcTokenEndpoint"`
OidcClientID string `mapstructure:"oidcClientID"`
OidcClientSecret string `mapstructure:"oidcClientSecret"`
OidcScopes string `mapstructure:"oidcScopes"`
internalOidcScopes []string `mapstructure:"-"`
TLSDisable bool `mapstructure:"disableTls"`
TLSSkipVerify bool `mapstructure:"skipVerify"`
TLSCaCert string `mapstructure:"caCert"`
TLSClientCert string `mapstructure:"clientCert"`
TLSClientKey string `mapstructure:"clientKey"`
ConsumeRetryEnabled bool `mapstructure:"consumeRetryEnabled"`
ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"`
Version string `mapstructure:"version"`
internalVersion sarama.KafkaVersion `mapstructure:"-"`
Brokers string `mapstructure:"brokers"`
internalBrokers []string `mapstructure:"-"`
ConsumerGroup string `mapstructure:"consumerGroup"`
ClientID string `mapstructure:"clientId"`
AuthType string `mapstructure:"authType"`
SaslUsername string `mapstructure:"saslUsername"`
SaslPassword string `mapstructure:"saslPassword"`
SaslMechanism string `mapstructure:"saslMechanism"`
InitialOffset string `mapstructure:"initialOffset"`
internalInitialOffset int64 `mapstructure:"-"`
MaxMessageBytes int `mapstructure:"maxMessageBytes"`
OidcTokenEndpoint string `mapstructure:"oidcTokenEndpoint"`
OidcClientID string `mapstructure:"oidcClientID"`
OidcClientSecret string `mapstructure:"oidcClientSecret"`
OidcScopes string `mapstructure:"oidcScopes"`
OidcExtensions string `mapstructure:"oidcExtensions"`
internalOidcScopes []string `mapstructure:"-"`
TLSDisable bool `mapstructure:"disableTls"`
TLSSkipVerify bool `mapstructure:"skipVerify"`
TLSCaCert string `mapstructure:"caCert"`
TLSClientCert string `mapstructure:"clientCert"`
TLSClientKey string `mapstructure:"clientKey"`
ConsumeRetryEnabled bool `mapstructure:"consumeRetryEnabled"`
ConsumeRetryInterval time.Duration `mapstructure:"consumeRetryInterval"`
Version string `mapstructure:"version"`
internalVersion sarama.KafkaVersion `mapstructure:"-"`
internalOidcExtensions map[string]string `mapstructure:"-"`
}

// upgradeMetadata updates metadata properties based on deprecated usage.
Expand Down Expand Up @@ -180,6 +183,12 @@ func (k *Kafka) getKafkaMetadata(meta map[string]string) (*KafkaMetadata, error)
k.logger.Warn("Warning: no OIDC scopes specified, using default 'openid' scope only. This is a security risk for token reuse.")
m.internalOidcScopes = []string{"openid"}
}
if m.OidcExtensions != "" {
err = json.Unmarshal([]byte(m.OidcExtensions), &m.internalOidcExtensions)
if err != nil || len(m.internalOidcExtensions) < 1 {
return nil, errors.New("kafka error: improper OIDC Extensions format for authType 'oidc'")
}
}
k.logger.Debug("Configuring SASL token authentication via OIDC.")
case mtlsAuthType:
if m.TLSClientCert != "" {
Expand Down
11 changes: 9 additions & 2 deletions internal/component/kafka/sasl_oauthbearer.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,15 @@ type OAuthTokenSource struct {
skipCaVerify bool
}

func newOAuthTokenSource(oidcTokenEndpoint, oidcClientID, oidcClientSecret string, oidcScopes []string) OAuthTokenSource {
return OAuthTokenSource{TokenEndpoint: oauth2.Endpoint{TokenURL: oidcTokenEndpoint}, ClientID: oidcClientID, ClientSecret: oidcClientSecret, Scopes: oidcScopes}
func (m KafkaMetadata) getOAuthTokenSource() *OAuthTokenSource {
return &OAuthTokenSource{
TokenEndpoint: oauth2.Endpoint{TokenURL: m.OidcTokenEndpoint},
ClientID: m.OidcClientID,
ClientSecret: m.OidcClientSecret,
Scopes: m.internalOidcScopes,
Extensions: m.internalOidcExtensions,
skipCaVerify: m.TLSSkipVerify,
}
}

var tokenRequestTimeout, _ = time.ParseDuration("30s")
Expand Down
6 changes: 6 additions & 0 deletions pubsub/kafka/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ authenticationProfiles:
Although not required, this field is recommended.
example: '"openid,kafka-prod"'
default: '"openid"'
- name: oidcExtensions
description: |
String containing a JSON-encoded dictionary of OAuth2/OIDC extensions to request with the access token.
example: |
{"cluster":"kafka","poolid":"kafkapool"}
type: string
- title: "SASL Authentication"
description: |
Authenticate using SASL.
Expand Down

0 comments on commit 2cb3c1c

Please sign in to comment.