Skip to content

Commit

Permalink
Merge pull request #83 from vshn/fix/default-settings
Browse files Browse the repository at this point in the history
Fix reconcile loop for DBaaS with reported default settings
  • Loading branch information
glrf authored Feb 1, 2023
2 parents d53a5c7 + 0854bdd commit 7fc3c8e
Show file tree
Hide file tree
Showing 19 changed files with 817 additions and 47 deletions.
87 changes: 87 additions & 0 deletions internal/settings/schema.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
package settings

import (
"encoding/json"
"fmt"

"github.com/vshn/provider-exoscale/operator/mapper"
"k8s.io/apimachinery/pkg/runtime"
)

type Schemas interface {
SetDefaults(schema string, input runtime.RawExtension) (runtime.RawExtension, error)
}

// ParseSchemas takes an object containing a map of json schemas and parses it
func ParseSchemas(raw []byte) (Schemas, error) {
req := struct {
Settings schemas
}{}

err := json.Unmarshal(raw, &req)
if err != nil {
return nil, err
}
return req.Settings, nil
}

type schemas map[string]schema

type schema struct {
Default interface{}
Properties schemas
}

// SetDefaults takes a setting for a DBaaS and will set the defaults of the schema with name `name`
func (s schemas) SetDefaults(name string, input runtime.RawExtension) (runtime.RawExtension, error) {
sc, ok := s[name]
if !ok {
return runtime.RawExtension{}, fmt.Errorf("unknown schema: %q", name)
}

inMap, err := mapper.ToMap(input)
if err != nil {
return runtime.RawExtension{}, fmt.Errorf("failed to parse input: %w", err)
}

setDefaults(sc, inMap)

out, err := mapper.ToRawExtension(&inMap)
if err != nil {
return runtime.RawExtension{}, fmt.Errorf("failed to parse defaulted setting: %w", err)
}
return out, nil
}

func setDefaults(sc schema, input map[string]interface{}) bool {
hasSetDefaults := false

for key, val := range sc.Properties {
if len(val.Properties) > 0 {
submap := map[string]interface{}{}

if _, ok := input[key]; ok {
submap, ok = input[key].(map[string]interface{})
if !ok {
continue
}
}

if setDefaults(val, submap) {
input[key] = submap
hasSetDefaults = true
}
} else {
_, ok := input[key]
if ok {
continue
}

if val.Default != nil {
input[key] = val.Default
hasSetDefaults = true
}
}
}
return hasSetDefaults
}
111 changes: 111 additions & 0 deletions internal/settings/schema_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package settings

import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/vshn/provider-exoscale/operator/mapper"
"k8s.io/apimachinery/pkg/runtime"
)

var exampleSchemas = []byte(`{
"settings": {
"simple": {
"type": "object",
"properties": {
"foo": {
"default": true,
"type": "boolean"
},
"bar": {
"default": "buzz",
"type": "string"
},
"count": {
"default": 42,
"type": "integer"
},
"nodefault": {
"type": ["string", "null"]
}
}
},
"nested": {
"type": "object",
"properties": {
"foo": {
"default": true,
"type": "boolean"
},
"obj": {
"type": "object",
"properties": {
"bar": {
"default": "buzz",
"type": "string"
},
"obj": {
"type": "object",
"properties": {
"count": {
"default": 42,
"type": "integer"
}
}
}
}
}
}
}
}
}`)

func TestSetDefaultSimple(t *testing.T) {
schemas, err := ParseSchemas(exampleSchemas)
require.NoError(t, err, "failed to parse example schema")

input := runtime.RawExtension{
Raw: []byte(`{"bar": "blub"}`),
}
out, err := schemas.SetDefaults("simple", input)
require.NoError(t, err, "failed to set defaults")

outMap, err := mapper.ToMap(out)
require.NoError(t, err, "failed to set defaults")

assert.EqualValues(t, true, outMap["foo"])
assert.EqualValues(t, "blub", outMap["bar"])
assert.EqualValues(t, 42, outMap["count"])
_, ok := outMap["nodefault"]
assert.Falsef(t, ok, "should not set values withou defaults")
}

func TestSetDefaultNested(t *testing.T) {
schemas, err := ParseSchemas(exampleSchemas)
require.NoError(t, err, "failed to parse example schema")

input := runtime.RawExtension{
Raw: []byte(`{"bar": "blub"}`),
}
out, err := schemas.SetDefaults("nested", input)
require.NoError(t, err, "failed to set defaults")

outMap, err := mapper.ToMap(out)
require.NoError(t, err, "failed to set defaults")

assert.EqualValues(t, true, outMap["foo"])
assert.EqualValues(t, "blub", outMap["bar"])

sub1, ok := outMap["obj"]
require.Truef(t, ok, "should set sub object")
sub1Map, ok := sub1.(map[string]interface{})
require.Truef(t, ok, "should set sub object as map")
assert.EqualValues(t, "buzz", sub1Map["bar"])

sub2, ok := sub1Map["obj"]
require.Truef(t, ok, "should set sub-sub object")
sub2Map, ok := sub2.(map[string]interface{})
require.Truef(t, ok, "should set sub-sub object as map")
assert.EqualValues(t, 42, sub2Map["count"])
}
47 changes: 8 additions & 39 deletions operator/kafkacontroller/observe.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
exoscaleapi "github.com/exoscale/egoscale/v2/api"
"github.com/exoscale/egoscale/v2/oapi"
"github.com/google/go-cmp/cmp"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/utils/pointer"
controllerruntime "sigs.k8s.io/controller-runtime"

Expand Down Expand Up @@ -67,7 +66,13 @@ func (c connection) Observe(ctx context.Context, mg resource.Managed) (managed.E
return managed.ExternalObservation{}, fmt.Errorf("failed to get kafka connection details: %w", err)
}

upToDate, diff := diffParameters(external, instance.Spec.ForProvider)
currentParams, err := setSettingsDefaults(ctx, c.exo, &instance.Spec.ForProvider)
if err != nil {
log.Error(err, "unable to set kafka settings schema")
currentParams = &instance.Spec.ForProvider
}

upToDate, diff := diffParameters(external, *currentParams)

return managed.ExternalObservation{
ResourceExists: true,
Expand Down Expand Up @@ -183,7 +188,7 @@ func diffParameters(external *oapi.DbaasServiceKafka, expected exoscalev1.KafkaP
return false, err.Error()
}

actualKafkaRestSettings, err := getActualKafkaRestSettings(external.KafkaRestSettings, expected.KafkaRestSettings)
actualKafkaRestSettings, err := mapper.ToRawExtension(external.KafkaRestSettings)
if err != nil {
return false, err.Error()
}
Expand All @@ -209,39 +214,3 @@ func diffParameters(external *oapi.DbaasServiceKafka, expected exoscalev1.KafkaP
settingComparer := cmp.Comparer(mapper.CompareSettings)
return cmp.Equal(expected, actual, settingComparer), cmp.Diff(expected, actual, settingComparer)
}

// getActualKafkaRestSettings reads the Kafa REST settings and strips out all non relevant default settings
// Exoscale always returns all defaults, not just the fields we set, so we need to strip them so that we can compare the actual and expected setting.
func getActualKafkaRestSettings(actual *map[string]interface{}, expected runtime.RawExtension) (runtime.RawExtension, error) {
if actual == nil {
return runtime.RawExtension{}, nil
}
expectedMap, err := mapper.ToMap(expected)
if err != nil {
return runtime.RawExtension{}, fmt.Errorf("error parsing kafka REST settings: %w", err)
}
s := stripRestSettingsDefaults(*actual, expectedMap)
return mapper.ToRawExtension(&s)
}

// defaultRestSettings are the default settings for Kafka REST.
var defaultRestSettings = map[string]interface{}{
"consumer_enable_auto_commit": true,
"producer_acks": "1", // Yes, that's a "1" as a string. I don't know why, that's just how it is..
"consumer_request_max_bytes": float64(67108864), // When parsing json into map[string]interface{} we get floats.
"simpleconsumer_pool_size_max": float64(25),
"producer_linger_ms": float64(0),
"consumer_request_timeout_ms": float64(1000),
}

func stripRestSettingsDefaults(actual map[string]interface{}, expected map[string]interface{}) map[string]interface{} {
res := map[string]interface{}{}
for k, v := range actual {
d, isDefault := defaultRestSettings[k]
_, isExpected := expected[k]
if !isDefault || d != v || isExpected {
res[k] = v
}
}
return res
}
27 changes: 27 additions & 0 deletions operator/kafkacontroller/observe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func TestObserve_UpToDate_ConnectionDetails(t *testing.T) {
found.ConnectionInfo.AccessKey = pointer.String("KEY")

mockGetKafkaCall(exoMock, "foo", found, nil)
mockGetKafkaSettingsCall(exoMock, nil)
mockCACall(exoMock)

assert.NotPanics(t, func() {
Expand Down Expand Up @@ -108,6 +109,7 @@ func TestObserve_UpToDate_ConnectionDetails_with_REST(t *testing.T) {
found.KafkaRestEnabled = pointer.Bool(true)
found.ConnectionInfo.RestUri = pointer.String("https://admin:[email protected]:21701")
mockGetKafkaCall(exoMock, "foo", found, nil)
mockGetKafkaSettingsCall(exoMock, nil)
mockCACall(exoMock)

assert.NotPanics(t, func() {
Expand Down Expand Up @@ -153,6 +155,7 @@ func TestObserve_UpToDate_Status(t *testing.T) {
}

mockGetKafkaCall(exoMock, "foo", found, nil)
mockGetKafkaSettingsCall(exoMock, nil)
mockCACall(exoMock)

assert.NotPanics(t, func() {
Expand Down Expand Up @@ -183,6 +186,7 @@ func TestObserve_UpToDate_Condition_NotReady(t *testing.T) {
found.State = &state

mockGetKafkaCall(exoMock, "foo", found, nil)
mockGetKafkaSettingsCall(exoMock, nil)
mockCACall(exoMock)

assert.NotPanics(t, func() {
Expand Down Expand Up @@ -211,6 +215,7 @@ func TestObserve_UpToDate_Condition_Ready(t *testing.T) {
found.State = &state

mockGetKafkaCall(exoMock, "foo", found, nil)
mockGetKafkaSettingsCall(exoMock, nil)
mockCACall(exoMock)

assert.NotPanics(t, func() {
Expand Down Expand Up @@ -238,6 +243,7 @@ func TestObserve_UpToDate_WithVersion(t *testing.T) {
found.Version = pointer.String("3.2.1")

mockGetKafkaCall(exoMock, "foo", found, nil)
mockGetKafkaSettingsCall(exoMock, nil)
mockCACall(exoMock)

assert.NotPanics(t, func() {
Expand All @@ -262,6 +268,7 @@ func TestObserve_UpToDate_EmptyRestSettings(t *testing.T) {
found.KafkaRestEnabled = pointer.Bool(true)

mockGetKafkaCall(exoMock, "foo", found, nil)
mockGetKafkaSettingsCall(exoMock, nil)
mockCACall(exoMock)

assert.NotPanics(t, func() {
Expand Down Expand Up @@ -291,6 +298,7 @@ func TestObserve_UpToDate_RestSettings(t *testing.T) {
found.KafkaRestEnabled = pointer.Bool(true)

mockGetKafkaCall(exoMock, "foo", found, nil)
mockGetKafkaSettingsCall(exoMock, nil)
mockCACall(exoMock)

assert.NotPanics(t, func() {
Expand All @@ -313,6 +321,7 @@ func TestObserve_Outdated(t *testing.T) {
found.Maintenance.Dow = "tuesday"

mockGetKafkaCall(exoMock, "foo", found, nil)
mockGetKafkaSettingsCall(exoMock, nil)
mockCACall(exoMock)

assert.NotPanics(t, func() {
Expand Down Expand Up @@ -343,6 +352,7 @@ func TestObserve_Outdated_Settings(t *testing.T) {
}

mockGetKafkaCall(exoMock, "foo", found, nil)
mockGetKafkaSettingsCall(exoMock, nil)
mockCACall(exoMock)

assert.NotPanics(t, func() {
Expand Down Expand Up @@ -371,6 +381,7 @@ func TestObserve_Outdated_RestSettings(t *testing.T) {
found.KafkaRestEnabled = pointer.Bool(true)

mockGetKafkaCall(exoMock, "foo", found, nil)
mockGetKafkaSettingsCall(exoMock, nil)
mockCACall(exoMock)

assert.NotPanics(t, func() {
Expand Down Expand Up @@ -444,6 +455,15 @@ func sampleAPIKafka(name string) *oapi.DbaasServiceKafka {
return &res
}

var defaultRestSettings = map[string]interface{}{
"consumer_enable_auto_commit": true,
"producer_acks": "1", // Yes, that's a "1" as a string. I don't know why, that's just how it is..
"consumer_request_max_bytes": float64(67108864), // When parsing json into map[string]interface{} we get floats.
"simpleconsumer_pool_size_max": float64(25),
"producer_linger_ms": float64(0),
"consumer_request_timeout_ms": float64(1000),
}

func mockGetKafkaCall(m *operatortest.ClientWithResponsesInterface, name string, found *oapi.DbaasServiceKafka, err error) {
m.On("GetDbaasServiceKafkaWithResponse", mock.Anything, oapi.DbaasServiceName(name)).
Return(&oapi.GetDbaasServiceKafkaResponse{
Expand All @@ -453,6 +473,13 @@ func mockGetKafkaCall(m *operatortest.ClientWithResponsesInterface, name string,
Once()

}
func mockGetKafkaSettingsCall(m *operatortest.ClientWithResponsesInterface, err error) {
m.On("GetDbaasSettingsKafkaWithResponse", mock.Anything).
Return(&oapi.GetDbaasSettingsKafkaResponse{
Body: rawSettingsResponse,
}, err).
Once()
}
func mockCACall(m *operatortest.ClientWithResponsesInterface) {
m.On("GetDbaasCaCertificateWithResponse", mock.Anything).
Return(&oapi.GetDbaasCaCertificateResponse{
Expand Down
Loading

0 comments on commit 7fc3c8e

Please sign in to comment.