From 328a20fe7a95e30dd9c1ec12f4451dd65f751c0d Mon Sep 17 00:00:00 2001 From: "Giau. Tran Minh" Date: Mon, 6 Jan 2025 13:35:53 +0700 Subject: [PATCH] controller: added status builder for `AtlasSchema` --- .../controller/atlasmigration_controller.go | 6 +- internal/controller/atlasschema_controller.go | 40 ++--- internal/controller/atlasschema_status.go | 152 ++++++++++++++++++ internal/controller/common.go | 2 +- internal/result/errors.go | 36 +++++ internal/result/result.go | 46 ++---- 6 files changed, 221 insertions(+), 61 deletions(-) create mode 100644 internal/controller/atlasschema_status.go create mode 100644 internal/result/errors.go diff --git a/internal/controller/atlasmigration_controller.go b/internal/controller/atlasmigration_controller.go index 3f9b5ac..0f3106c 100644 --- a/internal/controller/atlasmigration_controller.go +++ b/internal/controller/atlasmigration_controller.go @@ -134,7 +134,7 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque } res.SetNotReady(reason, err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) } // We need to update the ready condition immediately before doing // any heavy jobs if the hash is different from the last applied. @@ -156,7 +156,7 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL) if err != nil { res.SetNotReady("GettingDevDB", err.Error()) - return result(err) + return xresult(err) } } // Reconcile given resource @@ -164,7 +164,7 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque if err != nil { r.recordErrEvent(res, err) } - return result(err) + return xresult(err) } func (r *AtlasMigrationReconciler) readDirState(ctx context.Context, obj client.Object) (migrate.Dir, error) { diff --git a/internal/controller/atlasschema_controller.go b/internal/controller/atlasschema_controller.go index faa61f8..2469417 100644 --- a/internal/controller/atlasschema_controller.go +++ b/internal/controller/atlasschema_controller.go @@ -134,7 +134,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err != nil { res.SetNotReady("ReadSchema", err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) } if data.hasTargets() { res.SetNotReady("ReadSchema", "Multiple targets are not supported") @@ -144,7 +144,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err != nil { res.SetNotReady("CalculatingHash", err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) } // We need to update the ready condition immediately before doing // any heavy jobs if the hash is different from the last applied. @@ -163,7 +163,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL) if err != nil { res.SetNotReady("GettingDevDB", err.Error()) - return result(err) + return xresult(err) } } opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)} @@ -190,14 +190,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err != nil { res.SetNotReady("CreatingWorkingDir", err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) } defer wd.Close() cli, err := r.atlasClient(wd.Path(), data.Cloud) if err != nil { res.SetNotReady("CreatingAtlasClient", err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) } var whoami *atlasexec.WhoAmI switch whoami, err = cli.WhoAmI(ctx); { @@ -207,12 +207,12 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = errors.New("login is required to use custom atlas.hcl config") res.SetNotReady("WhoAmI", err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) } case err != nil: res.SetNotReady("WhoAmI", err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) default: log.Info("the resource is connected to Atlas Cloud", "org", whoami.Org) } @@ -221,7 +221,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err != nil { res.SetNotReady("LintPolicyError", err.Error()) r.recordErrEvent(res, err) - return result(err) + return xresult(err) } switch desiredURL := data.Desired.String(); { // The resource is connected to Atlas Cloud. @@ -231,7 +231,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) m.setLintReview(dbv1alpha1.LintReviewError, false) }) if err != nil { - return result(err) + return xresult(err) } params := &atlasexec.SchemaApplyParams{ Env: data.EnvName, @@ -268,7 +268,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) } state, err := cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{ Env: data.EnvName, @@ -285,7 +285,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) } desiredURL = state.URL } @@ -315,7 +315,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) default: log.Info("created a new schema plan", "plan", plan.File.URL, "desiredURL", desiredURL) res.Status.PlanURL = plan.File.URL @@ -342,7 +342,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) // There are multiple pending plans. This is an unexpected state. case len(plans) > 1: planURLs := make([]string, 0, len(plans)) @@ -355,7 +355,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) err = errors.New(msg) r.recordErrEvent(res, err) - return result(err) + return xresult(err) // There are no pending plans, but Atlas has been asked to review the changes ALWAYS. case len(plans) == 0 && data.Policy.Lint.Review == dbv1alpha1.LintReviewAlways: // Create a plan for the pending changes. @@ -390,7 +390,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) if err = editAtlasHCL(func(m *managedData) { m.enableDestructive(true) }); err != nil { - return result(err) + return xresult(err) } err = r.lint(ctx, wd, data, data.Vars) switch d := (*destructiveErr)(nil); { @@ -408,13 +408,13 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) } // Revert the destructive linting policy back to the original value. if err = editAtlasHCL(func(m *managedData) { m.Policy.Lint.Destructive.Error = false }); err != nil { - return result(err) + return xresult(err) } reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{ Env: data.EnvName, @@ -433,7 +433,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) } reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{ Env: data.EnvName, @@ -459,14 +459,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) err = transient(err) } r.recordErrEvent(res, err) - return result(err) + return xresult(err) } s := dbv1alpha1.AtlasSchemaStatus{ LastApplied: time.Now().Unix(), ObservedHash: hash, } if len(reports) != 1 { - return result(fmt.Errorf("unexpected number of schema reports: %d", len(reports))) + return xresult(fmt.Errorf("unexpected number of schema reports: %d", len(reports))) } log.Info("schema changes are applied", "applied", len(reports[0].Changes.Applied)) // Truncate the applied and pending changes to 1024 bytes. diff --git a/internal/controller/atlasschema_status.go b/internal/controller/atlasschema_status.go new file mode 100644 index 0000000..b2b9dd5 --- /dev/null +++ b/internal/controller/atlasschema_status.go @@ -0,0 +1,152 @@ +// Copyright 2025 The Atlas Operator Authors. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "encoding/json" + "fmt" + + "ariga.io/atlas-go-sdk/atlasexec" + "k8s.io/apimachinery/pkg/api/meta" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "sigs.k8s.io/controller-runtime/pkg/reconcile" + + "github.com/ariga/atlas-operator/api/v1alpha1" + "github.com/ariga/atlas-operator/internal/result" + "github.com/ariga/atlas-operator/internal/status" +) + +type optionBuilder struct { + opts []status.Option[*v1alpha1.AtlasSchema] +} + +// statusOptions returns an initialized optionBuilder +func statusOptions() *optionBuilder { + return &optionBuilder{} +} + +// GetOptions implements the OptionBuilder interface +func (o *optionBuilder) GetOptions() []status.Option[*v1alpha1.AtlasSchema] { + return o.opts +} + +func (o *optionBuilder) withCondition(condition metav1.Condition) *optionBuilder { + o.opts = append(o.opts, conditionOption{cond: condition}) + return o +} + +func (o *optionBuilder) withObservedHash(h string) *optionBuilder { + o.opts = append(o.opts, observedHashOption{hash: h}) + return o +} + +func (o *optionBuilder) withPlanFile(f *atlasexec.SchemaPlanFile) *optionBuilder { + o.opts = append(o.opts, planFileOption{file: f}) + return o +} + +func (o *optionBuilder) withReport(p *atlasexec.SchemaApply) *optionBuilder { + // Truncate the applied and pending changes to 1024 bytes. + p.Changes.Applied = truncateSQL(p.Changes.Applied, sqlLimitSize) + p.Changes.Pending = truncateSQL(p.Changes.Pending, sqlLimitSize) + o.opts = append(o.opts, reportOption{report: p}) + if plan := p.Plan; plan != nil { + return o.withPlanFile(plan.File) + } + return o +} + +type observedHashOption struct { + hash string +} + +func (o observedHashOption) ApplyOption(obj *v1alpha1.AtlasSchema) { + obj.Status.ObservedHash = o.hash +} + +func (o observedHashOption) GetResult() (reconcile.Result, error) { + return result.OK() +} + +type reportOption struct { + report *atlasexec.SchemaApply +} + +func (m reportOption) ApplyOption(obj *v1alpha1.AtlasSchema) { + var msg string + if j, err := json.Marshal(m.report); err != nil { + msg = fmt.Sprintf("Error marshalling apply response: %v", err) + } else { + msg = fmt.Sprintf("The schema has been applied successfully. Apply response: %s", j) + } + obj.Status.LastApplied = m.report.End.Unix() + meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{ + Type: "Ready", + Status: metav1.ConditionTrue, + Reason: "Applied", + Message: msg, + }) +} + +func (m reportOption) GetResult() (reconcile.Result, error) { + return result.OK() +} + +func (o *optionBuilder) withNotReady(reason string, err error) *optionBuilder { + err = IgnoreNonTransient(err) + o.opts = append(o.opts, conditionOption{ + cond: metav1.Condition{ + Type: "Ready", + Status: metav1.ConditionFalse, + Reason: reason, + Message: err.Error(), + }, + err: err, + }) + return o +} + +type planFileOption struct { + file *atlasexec.SchemaPlanFile +} + +func (m planFileOption) ApplyOption(obj *v1alpha1.AtlasSchema) { + obj.Status.PlanURL = m.file.URL + obj.Status.PlanLink = m.file.Link +} + +func (m planFileOption) GetResult() (reconcile.Result, error) { + return result.OK() +} + +type conditionOption struct { + cond metav1.Condition + err error +} + +func (m conditionOption) ApplyOption(obj *v1alpha1.AtlasSchema) { + meta.SetStatusCondition(&obj.Status.Conditions, m.cond) +} + +func (m conditionOption) GetResult() (reconcile.Result, error) { + return result.Transient(m.err) +} + +func IgnoreNonTransient(err error) error { + if err == nil || result.IsTransient(err) { + return nil + } + return err +} diff --git a/internal/controller/common.go b/internal/controller/common.go index 4c1ba89..edb9f17 100644 --- a/internal/controller/common.go +++ b/internal/controller/common.go @@ -185,7 +185,7 @@ func isTransient(err error) bool { // Permanent errors are not returned as errors because they cause // the controller to requeue indefinitely. Instead, they should be // reported as a status condition. -func result(err error) (r ctrl.Result, _ error) { +func xresult(err error) (r ctrl.Result, _ error) { if t := (*transientError)(nil); errors.As(err, &t) { r.RequeueAfter = t.after } diff --git a/internal/result/errors.go b/internal/result/errors.go new file mode 100644 index 0000000..c7c4000 --- /dev/null +++ b/internal/result/errors.go @@ -0,0 +1,36 @@ +package result + +import ( + "errors" +) + +// TransientError is an error that should be retried. +type TransientError struct { + Err error + After int +} + +// Error implements the error interface +func (t *TransientError) Error() string { + return t.Err.Error() +} + +// Unwrap implements the errors.Wrapper interface +func (t *TransientError) Unwrap() error { + return t.Err +} + +// TransientErrorAfter wraps an error to indicate that it should be retried after +// the given duration. +func TransientErrorAfter(err error, after int) error { + if err == nil { + return nil + } + return &TransientError{Err: err, After: after} +} + +// IsTransient checks if the error is transient +func IsTransient(err error) bool { + var t *TransientError + return errors.As(err, &t) +} diff --git a/internal/result/result.go b/internal/result/result.go index d88cf07..4136d73 100644 --- a/internal/result/result.go +++ b/internal/result/result.go @@ -21,18 +21,6 @@ import ( ctrl "sigs.k8s.io/controller-runtime" ) -// Transient checks if the error is transient and returns a result -// that indicates whether the request should be retried. -func Transient(err error) (r ctrl.Result, _ error) { - if t := (*transientError)(nil); errors.As(err, &t) { - return Retry(t.after) - } - // Permanent errors are not returned as errors because they cause - // the controller to requeue indefinitely. Instead, they should be - // reported as a status condition. - return OK() -} - // OK returns a successful result func OK() (ctrl.Result, error) { return ctrl.Result{}, nil @@ -51,30 +39,14 @@ func Retry(after int) (ctrl.Result, error) { }, nil } -// transientError is an error that should be retried. -type transientError struct { - err error - after int -} - -func (t *transientError) Error() string { return t.err.Error() } -func (t *transientError) Unwrap() error { return t.err } - -// TransientError wraps an error to indicate that it should be retried. -func TransientError(err error) error { - return TransientErrorAfter(err, 5) -} - -// TransientErrorAfter wraps an error to indicate that it should be retried after -// the given duration. -func TransientErrorAfter(err error, after int) error { - if err == nil { - return nil +// Transient checks if the error is transient and returns a result +// that indicates whether the request should be retried. +func Transient(err error) (ctrl.Result, error) { + if t := (*TransientError)(nil); errors.As(err, &t) { + return Retry(t.After) } - return &transientError{err: err, after: after} -} - -func isTransient(err error) bool { - var t *transientError - return errors.As(err, &t) + // Permanent errors are not returned as errors because they cause + // the controller to requeue indefinitely. Instead, they should be + // reported as a status condition. + return OK() }