Skip to content

Commit

Permalink
redpanda: allow using SASL and TLS together (#2140)
Browse files Browse the repository at this point in the history
The Admin HTTP client used in the user setup would before fail when trying to
talk to the admin API via HTTP. With TLS enabled, it'll have to use https.

Signed-off-by: Stephan Renatus <[email protected]>
Co-authored-by: Manuel de la Peña <[email protected]>
  • Loading branch information
srenatus and mdelapenya authored Feb 2, 2024
1 parent 9a9da1a commit 082b32b
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 8 deletions.
13 changes: 11 additions & 2 deletions modules/redpanda/admin_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,19 @@ import (

type AdminAPIClient struct {
BaseURL string
client *http.Client
}

func NewAdminAPIClient(baseURL string) *AdminAPIClient {
return &AdminAPIClient{BaseURL: baseURL}
return &AdminAPIClient{
BaseURL: baseURL,
client: http.DefaultClient,
}
}

func (cl *AdminAPIClient) WithHTTPClient(c *http.Client) *AdminAPIClient {
cl.client = c
return cl
}

type createUserRequest struct {
Expand Down Expand Up @@ -46,7 +55,7 @@ func (cl *AdminAPIClient) CreateUser(ctx context.Context, username, password str
}
req.Header.Set("Content-Type", "application/json")

resp, err := http.DefaultClient.Do(req)
resp, err := cl.client.Do(req)
if err != nil {
return fmt.Errorf("request failed: %w", err)
}
Expand Down
34 changes: 28 additions & 6 deletions modules/redpanda/redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,12 @@ package redpanda
import (
"bytes"
"context"
"crypto/tls"
"crypto/x509"
_ "embed"
"fmt"
"math"
"net/http"
"os"
"path/filepath"
"text/template"
Expand Down Expand Up @@ -193,15 +196,39 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
return nil, fmt.Errorf("failed to wait for Redpanda readiness: %w", err)
}

scheme := "http"
if settings.EnableTLS {
scheme += "s"
}

// 9. Create Redpanda Service Accounts if configured to do so.
if len(settings.ServiceAccounts) > 0 {
adminAPIPort, err := container.MappedPort(ctx, nat.Port(defaultAdminAPIPort))
if err != nil {
return nil, fmt.Errorf("failed to get mapped Admin API port: %w", err)
}

adminAPIUrl := fmt.Sprintf("http://%v:%d", hostIP, adminAPIPort.Int())
adminAPIUrl := fmt.Sprintf("%s://%v:%d", scheme, hostIP, adminAPIPort.Int())
adminCl := NewAdminAPIClient(adminAPIUrl)
if settings.EnableTLS {
cert, err := tls.X509KeyPair(settings.cert, settings.key)
if err != nil {
return nil, fmt.Errorf("failed to create admin client with cert: %w", err)
}
caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(settings.cert)
adminCl = adminCl.WithHTTPClient(&http.Client{
Timeout: 5 * time.Second,
Transport: &http.Transport{
ForceAttemptHTTP2: true,
TLSHandshakeTimeout: 10 * time.Second,
TLSClientConfig: &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
},
},
})
}

for username, password := range settings.ServiceAccounts {
if err := adminCl.CreateUser(ctx, username, password); err != nil {
Expand All @@ -210,11 +237,6 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
}
}

scheme := "http"
if settings.EnableTLS {
scheme += "s"
}

return &Container{Container: container, urlScheme: scheme}, nil
}

Expand Down
44 changes: 44 additions & 0 deletions modules/redpanda/redpanda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,50 @@ func TestRedpandaWithTLS(t *testing.T) {
require.Error(t, results.FirstErr(), kerr.UnknownTopicOrPartition)
}

func TestRedpandaWithTLSAndSASL(t *testing.T) {
cert, err := tls.X509KeyPair(localhostCert, localhostKey)
require.NoError(t, err, "failed to load key pair")

ctx := context.Background()

container, err := RunContainer(ctx,
WithTLS(localhostCert, localhostKey),
WithEnableSASL(),
WithEnableKafkaAuthorization(),
WithNewServiceAccount("superuser-1", "test"),
WithSuperusers("superuser-1"),
)
require.NoError(t, err)

t.Cleanup(func() {
if err := container.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate container: %s", err)
}
})

caCertPool := x509.NewCertPool()
caCertPool.AppendCertsFromPEM(localhostCert)

tlsConfig := &tls.Config{
Certificates: []tls.Certificate{cert},
RootCAs: caCertPool,
}

broker, err := container.KafkaSeedBroker(ctx)
require.NoError(t, err)

kafkaCl, err := kgo.NewClient(
kgo.SeedBrokers(broker),
kgo.DialTLSConfig(tlsConfig),
kgo.SASL(scram.Auth{
User: "superuser-1",
Pass: "test",
}.AsSha256Mechanism()),
)
require.NoError(t, err)
defer kafkaCl.Close()
}

func TestRedpandaListener_Simple(t *testing.T) {
ctx := context.Background()

Expand Down

0 comments on commit 082b32b

Please sign in to comment.