From 082b32b78d70626765165374638fceaa8179281f Mon Sep 17 00:00:00 2001 From: Stephan Renatus Date: Fri, 2 Feb 2024 16:33:19 +0100 Subject: [PATCH] redpanda: allow using SASL and TLS together (#2140) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The Admin HTTP client used in the user setup would before fail when trying to talk to the admin API via HTTP. With TLS enabled, it'll have to use https. Signed-off-by: Stephan Renatus Co-authored-by: Manuel de la Peña --- modules/redpanda/admin_api.go | 13 +++++++-- modules/redpanda/redpanda.go | 34 +++++++++++++++++++----- modules/redpanda/redpanda_test.go | 44 +++++++++++++++++++++++++++++++ 3 files changed, 83 insertions(+), 8 deletions(-) diff --git a/modules/redpanda/admin_api.go b/modules/redpanda/admin_api.go index dbdcf0435c..4f99d5cd08 100644 --- a/modules/redpanda/admin_api.go +++ b/modules/redpanda/admin_api.go @@ -12,10 +12,19 @@ import ( type AdminAPIClient struct { BaseURL string + client *http.Client } func NewAdminAPIClient(baseURL string) *AdminAPIClient { - return &AdminAPIClient{BaseURL: baseURL} + return &AdminAPIClient{ + BaseURL: baseURL, + client: http.DefaultClient, + } +} + +func (cl *AdminAPIClient) WithHTTPClient(c *http.Client) *AdminAPIClient { + cl.client = c + return cl } type createUserRequest struct { @@ -46,7 +55,7 @@ func (cl *AdminAPIClient) CreateUser(ctx context.Context, username, password str } req.Header.Set("Content-Type", "application/json") - resp, err := http.DefaultClient.Do(req) + resp, err := cl.client.Do(req) if err != nil { return fmt.Errorf("request failed: %w", err) } diff --git a/modules/redpanda/redpanda.go b/modules/redpanda/redpanda.go index 3a30a24e75..9160524226 100644 --- a/modules/redpanda/redpanda.go +++ b/modules/redpanda/redpanda.go @@ -3,9 +3,12 @@ package redpanda import ( "bytes" "context" + "crypto/tls" + "crypto/x509" _ "embed" "fmt" "math" + "net/http" "os" "path/filepath" "text/template" @@ -193,6 +196,11 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to wait for Redpanda readiness: %w", err) } + scheme := "http" + if settings.EnableTLS { + scheme += "s" + } + // 9. Create Redpanda Service Accounts if configured to do so. if len(settings.ServiceAccounts) > 0 { adminAPIPort, err := container.MappedPort(ctx, nat.Port(defaultAdminAPIPort)) @@ -200,8 +208,27 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize return nil, fmt.Errorf("failed to get mapped Admin API port: %w", err) } - adminAPIUrl := fmt.Sprintf("http://%v:%d", hostIP, adminAPIPort.Int()) + adminAPIUrl := fmt.Sprintf("%s://%v:%d", scheme, hostIP, adminAPIPort.Int()) adminCl := NewAdminAPIClient(adminAPIUrl) + if settings.EnableTLS { + cert, err := tls.X509KeyPair(settings.cert, settings.key) + if err != nil { + return nil, fmt.Errorf("failed to create admin client with cert: %w", err) + } + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(settings.cert) + adminCl = adminCl.WithHTTPClient(&http.Client{ + Timeout: 5 * time.Second, + Transport: &http.Transport{ + ForceAttemptHTTP2: true, + TLSHandshakeTimeout: 10 * time.Second, + TLSClientConfig: &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + }, + }, + }) + } for username, password := range settings.ServiceAccounts { if err := adminCl.CreateUser(ctx, username, password); err != nil { @@ -210,11 +237,6 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize } } - scheme := "http" - if settings.EnableTLS { - scheme += "s" - } - return &Container{Container: container, urlScheme: scheme}, nil } diff --git a/modules/redpanda/redpanda_test.go b/modules/redpanda/redpanda_test.go index 2a6ed8dde6..668038196d 100644 --- a/modules/redpanda/redpanda_test.go +++ b/modules/redpanda/redpanda_test.go @@ -300,6 +300,50 @@ func TestRedpandaWithTLS(t *testing.T) { require.Error(t, results.FirstErr(), kerr.UnknownTopicOrPartition) } +func TestRedpandaWithTLSAndSASL(t *testing.T) { + cert, err := tls.X509KeyPair(localhostCert, localhostKey) + require.NoError(t, err, "failed to load key pair") + + ctx := context.Background() + + container, err := RunContainer(ctx, + WithTLS(localhostCert, localhostKey), + WithEnableSASL(), + WithEnableKafkaAuthorization(), + WithNewServiceAccount("superuser-1", "test"), + WithSuperusers("superuser-1"), + ) + require.NoError(t, err) + + t.Cleanup(func() { + if err := container.Terminate(ctx); err != nil { + t.Fatalf("failed to terminate container: %s", err) + } + }) + + caCertPool := x509.NewCertPool() + caCertPool.AppendCertsFromPEM(localhostCert) + + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{cert}, + RootCAs: caCertPool, + } + + broker, err := container.KafkaSeedBroker(ctx) + require.NoError(t, err) + + kafkaCl, err := kgo.NewClient( + kgo.SeedBrokers(broker), + kgo.DialTLSConfig(tlsConfig), + kgo.SASL(scram.Auth{ + User: "superuser-1", + Pass: "test", + }.AsSha256Mechanism()), + ) + require.NoError(t, err) + defer kafkaCl.Close() +} + func TestRedpandaListener_Simple(t *testing.T) { ctx := context.Background()