Skip to content

Commit

Permalink
Add Support for WASM Transforms to Redpanda Module (#2170)
Browse files Browse the repository at this point in the history
* feat: add redpanda wasm transforms

* fix: switch to standard comparison

* fix: reduce version

* chore: run make lint

---------

Co-authored-by: Manuel de la Peña <[email protected]>
  • Loading branch information
gene-redpanda and mdelapenya authored Feb 4, 2024
1 parent bc67dc3 commit 5e6736f
Show file tree
Hide file tree
Showing 7 changed files with 216 additions and 3 deletions.
1 change: 1 addition & 0 deletions modules/redpanda/examples_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ func ExampleRunContainer() {
redpandaContainer, err := redpanda.RunContainer(ctx,
redpanda.WithEnableSASL(),
redpanda.WithEnableKafkaAuthorization(),
redpanda.WithEnableWasmTransform(),
redpanda.WithNewServiceAccount("superuser-1", "test"),
redpanda.WithNewServiceAccount("superuser-2", "test"),
redpanda.WithNewServiceAccount("no-superuser", "test"),
Expand Down
4 changes: 2 additions & 2 deletions modules/redpanda/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ require (
github.com/testcontainers/testcontainers-go v0.27.0
github.com/twmb/franz-go v1.15.4
github.com/twmb/franz-go/pkg/kadm v1.10.0
golang.org/x/mod v0.14.0
)

require (
Expand Down Expand Up @@ -58,9 +59,8 @@ require (
go.opentelemetry.io/otel/trace v1.19.0 // indirect
golang.org/x/crypto v0.17.0 // indirect
golang.org/x/exp v0.0.0-20230510235704-dd950f8aeaea // indirect
golang.org/x/mod v0.11.0 // indirect
golang.org/x/sys v0.16.0 // indirect
golang.org/x/tools v0.10.0 // indirect
golang.org/x/tools v0.13.0 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230711160842-782d3b101e98 // indirect
google.golang.org/grpc v1.58.3 // indirect
google.golang.org/protobuf v1.31.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions modules/redpanda/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk=
github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0=
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161/go.mod h1:xomTg63KZ2rFqZQzSB4Vz2SUXa1BpHTVz9L5PTmPC4E=
github.com/Masterminds/semver/v3 v3.2.1 h1:RN9w6+7QoMeJVGyfmbcgs28Br8cvmnucEXnY0rYXWg0=
github.com/Masterminds/semver/v3 v3.2.1/go.mod h1:qvl/7zhW3nngYb5+80sSMF+FG2BjYrf8m9wsX0PNOMQ=
github.com/Microsoft/go-winio v0.6.1 h1:9/kr64B9VUZrLm5YYwbGtUJnMgqWVOdUAXu6Migciow=
github.com/Microsoft/go-winio v0.6.1/go.mod h1:LRdKpFKfdobln8UmuiYcKPot9D2v6svN5+sAH+4kjUM=
github.com/Microsoft/hcsshim v0.11.4 h1:68vKo2VN8DE9AdN4tnkWnmdhqdbpUFM8OF3Airm7fz8=
Expand Down Expand Up @@ -137,6 +139,8 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.11.0 h1:bUO06HqtnRcc/7l71XBe4WcqTZ+3AH1J59zWDDwLKgU=
golang.org/x/mod v0.11.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs=
golang.org/x/mod v0.14.0 h1:dGoOF9QVLYng8IHTm7BAyWqCqSheQ5pYWGhzW00YJr0=
golang.org/x/mod v0.14.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
Expand Down Expand Up @@ -168,6 +172,8 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg=
golang.org/x/tools v0.10.0/go.mod h1:UJwyiVBsOA2uwvK/e5OY3GTpDUJriEd+/YlqAwLPmyM=
golang.org/x/tools v0.13.0 h1:Iey4qkscZuv0VvIt8E0neZjtPVQFSc870HQ448QgEmQ=
golang.org/x/tools v0.13.0/go.mod h1:HvlwmtVNQAhOuCjW7xxvovg8wbNq7LwfXh/k7wXUl58=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
4 changes: 4 additions & 0 deletions modules/redpanda/mounts/bootstrap.yaml.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ superusers:
kafka_enable_authorization: true
{{- end }}

{{- if .EnableWasmTransform }}
data_transforms_enabled: true
{{- end }}

{{- if .AutoCreateTopics }}
auto_create_topics_enabled: true
{{- end }}
11 changes: 11 additions & 0 deletions modules/redpanda/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type options struct {
// or "http_basic" for HTTP basic authentication.
SchemaRegistryAuthenticationMethod string

// EnableWasmTransform is a flag to enable wasm transform.
EnableWasmTransform bool

// ServiceAccounts is a map of username (key) to password (value) of users
// that shall be created, so that you can use these to authenticate against
// Redpanda (either for the Kafka API or Schema Registry HTTP access).
Expand Down Expand Up @@ -97,6 +100,14 @@ func WithEnableKafkaAuthorization() Option {
}
}

// WithEnableWasmTransform enables wasm transform.
// Should not be used with RP versions before 23.3
func WithEnableWasmTransform() Option {
return func(o *options) {
o.EnableWasmTransform = true
}
}

// WithEnableSchemaRegistryHTTPBasicAuth enables HTTP basic authentication for
// Schema Registry.
func WithEnableSchemaRegistryHTTPBasicAuth() Option {
Expand Down
31 changes: 30 additions & 1 deletion modules/redpanda/redpanda.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,12 @@ import (
"net/http"
"os"
"path/filepath"
"strings"
"text/template"
"time"

"github.com/docker/go-connections/nat"
"golang.org/x/mod/semver"

"github.com/testcontainers/testcontainers-go"
"github.com/testcontainers/testcontainers-go/wait"
Expand Down Expand Up @@ -62,7 +64,7 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
// Some (e.g. Image) may be overridden by providing an option argument to this function.
req := testcontainers.GenericContainerRequest{
ContainerRequest: testcontainers.ContainerRequest{
Image: "docker.redpanda.com/redpandadata/redpanda:v23.1.7",
Image: "docker.redpanda.com/redpandadata/redpanda:v23.3.3",
User: "root:root",
// Files: Will be added later after we've rendered our YAML templates.
ExposedPorts: []string{
Expand Down Expand Up @@ -92,6 +94,11 @@ func RunContainer(ctx context.Context, opts ...testcontainers.ContainerCustomize
opt.Customize(&req)
}

// 2.1. If the image is not at least v23.3, disable wasm transform
if !isAtLeastVersion(req.ContainerRequest.Image, "23.3") {
settings.EnableWasmTransform = false
}

// 3. Create temporary entrypoint file. We need a custom entrypoint that waits
// until the actual Redpanda node config is mounted. Once the redpanda config is
// mounted we will call the original entrypoint with the same parameters.
Expand Down Expand Up @@ -267,6 +274,7 @@ func renderBootstrapConfig(settings options) ([]byte, error) {
Superusers: settings.Superusers,
KafkaAPIEnableAuthorization: settings.KafkaEnableAuthorization,
AutoCreateTopics: settings.AutoCreateTopics,
EnableWasmTransform: settings.EnableWasmTransform,
}

tpl, err := template.New("bootstrap.yaml").Parse(bootstrapConfigTpl)
Expand Down Expand Up @@ -340,6 +348,7 @@ type redpandaBootstrapConfigTplParams struct {
Superusers []string
KafkaAPIEnableAuthorization bool
AutoCreateTopics bool
EnableWasmTransform bool
}

type redpandaConfigTplParams struct {
Expand All @@ -366,3 +375,23 @@ type listener struct {
Port int
AuthenticationMethod string
}

// isAtLeastVersion returns true if the base image (without tag) is in a version or above
func isAtLeastVersion(image, major string) bool {
parts := strings.Split(image, ":")
version := parts[len(parts)-1]

if version == "latest" {
return true
}

if !strings.HasPrefix(version, "v") {
version = fmt.Sprintf("v%s", version)
}

if semver.IsValid(version) {
return semver.Compare(version, fmt.Sprintf("v%s", major)) >= 0 // version >= v8.x
}

return false
}
162 changes: 162 additions & 0 deletions modules/redpanda/redpanda_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,119 @@ func TestRedpandaWithAuthentication(t *testing.T) {
container, err := RunContainer(ctx,
WithEnableSASL(),
WithEnableKafkaAuthorization(),
WithEnableWasmTransform(),
WithNewServiceAccount("superuser-1", "test"),
WithNewServiceAccount("superuser-2", "test"),
WithNewServiceAccount("no-superuser", "test"),
WithSuperusers("superuser-1", "superuser-2"),
WithEnableSchemaRegistryHTTPBasicAuth(),
)
require.NoError(t, err)
// }

// Clean up the container after the test is complete
t.Cleanup(func() {
if err := container.Terminate(ctx); err != nil {
t.Fatalf("failed to terminate container: %s", err)
}
})

// kafkaSeedBroker {
seedBroker, err := container.KafkaSeedBroker(ctx)
// }
require.NoError(t, err)

// Test successful authentication & authorization with all created superusers
serviceAccounts := map[string]string{
"superuser-1": "test",
"superuser-2": "test",
}

for user, password := range serviceAccounts {
kafkaCl, err := kgo.NewClient(
kgo.SeedBrokers(seedBroker),
kgo.SASL(scram.Auth{
User: user,
Pass: password,
}.AsSha256Mechanism()),
)
require.NoError(t, err)

kafkaAdmCl := kadm.NewClient(kafkaCl)
_, err = kafkaAdmCl.CreateTopic(ctx, 1, 1, nil, fmt.Sprintf("test-%v", user))
require.NoError(t, err)
kafkaCl.Close()
}

// Test successful authentication, but failed authorization with a non-superuser account
{
kafkaCl, err := kgo.NewClient(
kgo.SeedBrokers(seedBroker),
kgo.SASL(scram.Auth{
User: "no-superuser",
Pass: "test",
}.AsSha256Mechanism()),
)
require.NoError(t, err)

kafkaAdmCl := kadm.NewClient(kafkaCl)
_, err = kafkaAdmCl.CreateTopic(ctx, 1, 1, nil, "test-2")
require.Error(t, err)
require.ErrorContains(t, err, "TOPIC_AUTHORIZATION_FAILED")
kafkaCl.Close()
}

// Test failed authentication
{
kafkaCl, err := kgo.NewClient(
kgo.SeedBrokers(seedBroker),
kgo.SASL(scram.Auth{
User: "wrong",
Pass: "wrong",
}.AsSha256Mechanism()),
)
require.NoError(t, err)

kafkaAdmCl := kadm.NewClient(kafkaCl)
_, err = kafkaAdmCl.Metadata(ctx)
require.Error(t, err)
require.ErrorContains(t, err, "SASL_AUTHENTICATION_FAILED")
}

// Test Schema Registry API
httpCl := &http.Client{Timeout: 5 * time.Second}
// schemaRegistryAddress {
schemaRegistryURL, err := container.SchemaRegistryAddress(ctx)
// }
require.NoError(t, err)

// Failed authentication
req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("%s/subjects", schemaRegistryURL), nil)
require.NoError(t, err)
resp, err := httpCl.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusUnauthorized, resp.StatusCode)
resp.Body.Close()

// Successful authentication
for user, password := range serviceAccounts {
req.SetBasicAuth(user, password)
resp, err = httpCl.Do(req)
require.NoError(t, err)
assert.Equal(t, http.StatusOK, resp.StatusCode)
resp.Body.Close()
}
}

func TestRedpandaWithOldVersionAndWasm(t *testing.T) {
ctx := context.Background()
// redpandaCreateContainer {
// this would fail to start if we weren't ignoring wasm transforms for older versions
container, err := RunContainer(ctx,
testcontainers.WithImage("redpandadata/redpanda:v23.2.18"),
WithEnableSASL(),
WithEnableKafkaAuthorization(),
WithEnableWasmTransform(),
WithNewServiceAccount("superuser-1", "test"),
WithNewServiceAccount("superuser-2", "test"),
WithNewServiceAccount("no-superuser", "test"),
Expand Down Expand Up @@ -512,3 +625,52 @@ D4ZNvyXf/6E27Ibu6v2p/vs=
-----END TESTING KEY-----`))

func testingKey(s string) string { return strings.ReplaceAll(s, "TESTING KEY", "PRIVATE KEY") }

func Test_isAtLeastVersion(t *testing.T) {
type args struct {
image string
major string
}
tests := []struct {
name string
args args
want bool
}{
{
name: "v21.5.6",
args: args{
image: "redpandadata/redpanda:v21.5.6",
major: "23.3",
},
want: false,
},
{
name: "v23.3.3",
args: args{
image: "redpandadata/redpanda:v23.3.3",
major: "23.3",
},
want: true,
},
{
name: "v23.3.3-rc1",
args: args{
image: "redpandadata/redpanda:v23.3.3-rc1",
major: "23.3",
},
want: true,
},
{
name: "v21.3.3-rc1",
args: args{
image: "redpandadata/redpanda:v21.3.3-rc1",
major: "23.3",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
assert.Equalf(t, tt.want, isAtLeastVersion(tt.args.image, tt.args.major), "isAtLeastVersion(%v, %v)", tt.args.image, tt.args.major)
})
}
}

0 comments on commit 5e6736f

Please sign in to comment.