Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(redpanda): temporary file use #2884

Merged
merged 1 commit into from
Nov 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 15 additions & 41 deletions modules/redpanda/redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"fmt"
"math"
"net/http"
"os"
"path/filepath"
"strings"
"text/template"
Expand Down Expand Up @@ -61,12 +60,6 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize

// Run creates an instance of the Redpanda container type
func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustomizer) (*Container, error) {
tmpDir, err := os.MkdirTemp("", "redpanda")
if err != nil {
return nil, fmt.Errorf("create temporary directory: %w", err)
}
defer os.RemoveAll(tmpDir)

// 1. Create container request.
// Some (e.g. Image) may be overridden by providing an option argument to this function.
req := testcontainers.GenericContainerRequest{
Expand Down Expand Up @@ -114,66 +107,47 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
settings.EnableWasmTransform = false
}

// 3. Create temporary entrypoint file. We need a custom entrypoint that waits
// until the actual Redpanda node config is mounted. Once the redpanda config is
// mounted we will call the original entrypoint with the same parameters.
// We have to do this kind of two-step process, because we need to know the mapped
// port, so that we can use this in Redpanda's advertised listeners configuration for
// the Kafka API.
entrypointPath := filepath.Join(tmpDir, entrypointFile)
if err := os.WriteFile(entrypointPath, entrypoint, 0o700); err != nil {
return nil, fmt.Errorf("write entrypoint file: %w", err)
}

// 4. Register extra kafka listeners if provided, network aliases will be
// 3. Register extra kafka listeners if provided, network aliases will be
// set
if err := registerListeners(settings, req); err != nil {
return nil, fmt.Errorf("register listeners: %w", err)
}

// Bootstrap config file contains cluster configurations which will only be considered
// the very first time you start a cluster.
bootstrapConfigPath := filepath.Join(tmpDir, bootstrapConfigFile)
bootstrapConfig, err := renderBootstrapConfig(settings)
if err != nil {
return nil, err
}
if err := os.WriteFile(bootstrapConfigPath, bootstrapConfig, 0o600); err != nil {
return nil, fmt.Errorf("write bootstrap config: %w", err)
}

// We need a custom entrypoint that waits until the actual Redpanda node config is mounted.
// Once the redpanda config is mounted we will call the original entrypoint with the same parameters.
// We have to do this kind of two-step process, because we need to know the mapped
// port, so that we can use this in Redpanda's advertised listeners configuration for
// the Kafka API.
req.Files = append(req.Files,
testcontainers.ContainerFile{
HostFilePath: entrypointPath,
Reader: bytes.NewReader(entrypoint),
ContainerFilePath: entrypointFile,
FileMode: 700,
},
testcontainers.ContainerFile{
HostFilePath: bootstrapConfigPath,
Reader: bytes.NewReader(bootstrapConfig),
ContainerFilePath: filepath.Join(redpandaDir, bootstrapConfigFile),
FileMode: 600,
},
)

// 5. Create certificate and key for TLS connections.
// 4. Create certificate and key for TLS connections.
if settings.EnableTLS {
certPath := filepath.Join(tmpDir, certFile)
if err := os.WriteFile(certPath, settings.cert, 0o600); err != nil {
return nil, fmt.Errorf("write certificate file: %w", err)
}
keyPath := filepath.Join(tmpDir, keyFile)
if err := os.WriteFile(keyPath, settings.key, 0o600); err != nil {
return nil, fmt.Errorf("write key file: %w", err)
}

req.Files = append(req.Files,
testcontainers.ContainerFile{
HostFilePath: certPath,
Reader: bytes.NewReader(settings.cert),
ContainerFilePath: filepath.Join(redpandaDir, certFile),
FileMode: 600,
},
testcontainers.ContainerFile{
HostFilePath: keyPath,
Reader: bytes.NewReader(settings.key),
ContainerFilePath: filepath.Join(redpandaDir, keyFile),
FileMode: 600,
},
Expand All @@ -189,7 +163,7 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
return c, fmt.Errorf("generic container: %w", err)
}

// 6. Get mapped port for the Kafka API, so that we can render and then mount
// 5. Get mapped port for the Kafka API, so that we can render and then mount
// the Redpanda config with the advertised Kafka address.
hostIP, err := ctr.Host(ctx)
if err != nil {
Expand All @@ -201,7 +175,7 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
return c, fmt.Errorf("mapped kafka port: %w", err)
}

// 7. Render redpanda.yaml config and mount it.
// 6. Render redpanda.yaml config and mount it.
nodeConfig, err := renderNodeConfig(settings, hostIP, kafkaPort.Int())
if err != nil {
return c, err
Expand All @@ -212,7 +186,7 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
return c, fmt.Errorf("copy to container: %w", err)
}

// 8. Wait until Redpanda is ready to serve requests.
// 7. Wait until Redpanda is ready to serve requests.
waitHTTP := wait.ForHTTP(defaultAdminAPIPort).
WithStatusCodeMatcher(func(status int) bool {
// Redpanda's admin API returns 404 for requests to "/".
Expand Down Expand Up @@ -248,7 +222,7 @@ func Run(ctx context.Context, img string, opts ...testcontainers.ContainerCustom
c.urlScheme += "s"
}

// 9. Create Redpanda Service Accounts if configured to do so.
// 8. Create Redpanda Service Accounts if configured to do so.
if len(settings.ServiceAccounts) > 0 {
adminAPIPort, err := ctr.MappedPort(ctx, nat.Port(defaultAdminAPIPort))
if err != nil {
Expand Down