Skip to content

Commit

Permalink
controller: added status builder for AtlasSchema
Browse files Browse the repository at this point in the history
  • Loading branch information
giautm committed Jan 6, 2025
1 parent 99a9286 commit 328a20f
Show file tree
Hide file tree
Showing 6 changed files with 221 additions and 61 deletions.
6 changes: 3 additions & 3 deletions internal/controller/atlasmigration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque
}
res.SetNotReady(reason, err.Error())
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
}
// We need to update the ready condition immediately before doing
// any heavy jobs if the hash is different from the last applied.
Expand All @@ -156,15 +156,15 @@ func (r *AtlasMigrationReconciler) Reconcile(ctx context.Context, req ctrl.Reque
data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL)
if err != nil {
res.SetNotReady("GettingDevDB", err.Error())
return result(err)
return xresult(err)
}
}
// Reconcile given resource
err = r.reconcile(ctx, data, res)
if err != nil {
r.recordErrEvent(res, err)
}
return result(err)
return xresult(err)
}

func (r *AtlasMigrationReconciler) readDirState(ctx context.Context, obj client.Object) (migrate.Dir, error) {
Expand Down
40 changes: 20 additions & 20 deletions internal/controller/atlasschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if err != nil {
res.SetNotReady("ReadSchema", err.Error())
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
}
if data.hasTargets() {
res.SetNotReady("ReadSchema", "Multiple targets are not supported")
Expand All @@ -144,7 +144,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if err != nil {
res.SetNotReady("CalculatingHash", err.Error())
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
}
// We need to update the ready condition immediately before doing
// any heavy jobs if the hash is different from the last applied.
Expand All @@ -163,7 +163,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
data.DevURL, err = r.devDB.devURL(ctx, res, *data.URL)
if err != nil {
res.SetNotReady("GettingDevDB", err.Error())
return result(err)
return xresult(err)
}
}
opts := []atlasexec.Option{atlasexec.WithAtlasHCL(data.render)}
Expand All @@ -190,14 +190,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if err != nil {
res.SetNotReady("CreatingWorkingDir", err.Error())
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
}
defer wd.Close()
cli, err := r.atlasClient(wd.Path(), data.Cloud)
if err != nil {
res.SetNotReady("CreatingAtlasClient", err.Error())
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
}
var whoami *atlasexec.WhoAmI
switch whoami, err = cli.WhoAmI(ctx); {
Expand All @@ -207,12 +207,12 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
err = errors.New("login is required to use custom atlas.hcl config")
res.SetNotReady("WhoAmI", err.Error())
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
}
case err != nil:
res.SetNotReady("WhoAmI", err.Error())
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
default:
log.Info("the resource is connected to Atlas Cloud", "org", whoami.Org)
}
Expand All @@ -221,7 +221,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if err != nil {
res.SetNotReady("LintPolicyError", err.Error())
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
}
switch desiredURL := data.Desired.String(); {
// The resource is connected to Atlas Cloud.
Expand All @@ -231,7 +231,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
m.setLintReview(dbv1alpha1.LintReviewError, false)
})
if err != nil {
return result(err)
return xresult(err)
}
params := &atlasexec.SchemaApplyParams{
Env: data.EnvName,
Expand Down Expand Up @@ -268,7 +268,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
}
state, err := cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{
Env: data.EnvName,
Expand All @@ -285,7 +285,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
}
desiredURL = state.URL
}
Expand Down Expand Up @@ -315,7 +315,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
default:
log.Info("created a new schema plan", "plan", plan.File.URL, "desiredURL", desiredURL)
res.Status.PlanURL = plan.File.URL
Expand All @@ -342,7 +342,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
// There are multiple pending plans. This is an unexpected state.
case len(plans) > 1:
planURLs := make([]string, 0, len(plans))
Expand All @@ -355,7 +355,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
err = errors.New(msg)
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
// There are no pending plans, but Atlas has been asked to review the changes ALWAYS.
case len(plans) == 0 && data.Policy.Lint.Review == dbv1alpha1.LintReviewAlways:
// Create a plan for the pending changes.
Expand Down Expand Up @@ -390,7 +390,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
if err = editAtlasHCL(func(m *managedData) {
m.enableDestructive(true)
}); err != nil {
return result(err)
return xresult(err)
}
err = r.lint(ctx, wd, data, data.Vars)
switch d := (*destructiveErr)(nil); {
Expand All @@ -408,13 +408,13 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
}
// Revert the destructive linting policy back to the original value.
if err = editAtlasHCL(func(m *managedData) {
m.Policy.Lint.Destructive.Error = false
}); err != nil {
return result(err)
return xresult(err)
}
reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
Expand All @@ -433,7 +433,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
}
reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
Expand All @@ -459,14 +459,14 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
return xresult(err)
}
s := dbv1alpha1.AtlasSchemaStatus{
LastApplied: time.Now().Unix(),
ObservedHash: hash,
}
if len(reports) != 1 {
return result(fmt.Errorf("unexpected number of schema reports: %d", len(reports)))
return xresult(fmt.Errorf("unexpected number of schema reports: %d", len(reports)))
}
log.Info("schema changes are applied", "applied", len(reports[0].Changes.Applied))
// Truncate the applied and pending changes to 1024 bytes.
Expand Down
152 changes: 152 additions & 0 deletions internal/controller/atlasschema_status.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2025 The Atlas Operator Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import (
"encoding/json"
"fmt"

"ariga.io/atlas-go-sdk/atlasexec"
"k8s.io/apimachinery/pkg/api/meta"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"sigs.k8s.io/controller-runtime/pkg/reconcile"

"github.com/ariga/atlas-operator/api/v1alpha1"
"github.com/ariga/atlas-operator/internal/result"
"github.com/ariga/atlas-operator/internal/status"
)

type optionBuilder struct {
opts []status.Option[*v1alpha1.AtlasSchema]
}

// statusOptions returns an initialized optionBuilder
func statusOptions() *optionBuilder {
return &optionBuilder{}
}

// GetOptions implements the OptionBuilder interface
func (o *optionBuilder) GetOptions() []status.Option[*v1alpha1.AtlasSchema] {
return o.opts
}

func (o *optionBuilder) withCondition(condition metav1.Condition) *optionBuilder {
o.opts = append(o.opts, conditionOption{cond: condition})
return o
}

func (o *optionBuilder) withObservedHash(h string) *optionBuilder {
o.opts = append(o.opts, observedHashOption{hash: h})
return o
}

func (o *optionBuilder) withPlanFile(f *atlasexec.SchemaPlanFile) *optionBuilder {
o.opts = append(o.opts, planFileOption{file: f})
return o
}

func (o *optionBuilder) withReport(p *atlasexec.SchemaApply) *optionBuilder {
// Truncate the applied and pending changes to 1024 bytes.
p.Changes.Applied = truncateSQL(p.Changes.Applied, sqlLimitSize)
p.Changes.Pending = truncateSQL(p.Changes.Pending, sqlLimitSize)
o.opts = append(o.opts, reportOption{report: p})
if plan := p.Plan; plan != nil {
return o.withPlanFile(plan.File)
}
return o
}

type observedHashOption struct {
hash string
}

func (o observedHashOption) ApplyOption(obj *v1alpha1.AtlasSchema) {
obj.Status.ObservedHash = o.hash
}

func (o observedHashOption) GetResult() (reconcile.Result, error) {
return result.OK()
}

type reportOption struct {
report *atlasexec.SchemaApply
}

func (m reportOption) ApplyOption(obj *v1alpha1.AtlasSchema) {
var msg string
if j, err := json.Marshal(m.report); err != nil {
msg = fmt.Sprintf("Error marshalling apply response: %v", err)
} else {
msg = fmt.Sprintf("The schema has been applied successfully. Apply response: %s", j)
}
obj.Status.LastApplied = m.report.End.Unix()
meta.SetStatusCondition(&obj.Status.Conditions, metav1.Condition{
Type: "Ready",
Status: metav1.ConditionTrue,
Reason: "Applied",
Message: msg,
})
}

func (m reportOption) GetResult() (reconcile.Result, error) {
return result.OK()
}

func (o *optionBuilder) withNotReady(reason string, err error) *optionBuilder {
err = IgnoreNonTransient(err)
o.opts = append(o.opts, conditionOption{
cond: metav1.Condition{
Type: "Ready",
Status: metav1.ConditionFalse,
Reason: reason,
Message: err.Error(),
},
err: err,
})
return o
}

type planFileOption struct {
file *atlasexec.SchemaPlanFile
}

func (m planFileOption) ApplyOption(obj *v1alpha1.AtlasSchema) {
obj.Status.PlanURL = m.file.URL
obj.Status.PlanLink = m.file.Link
}

func (m planFileOption) GetResult() (reconcile.Result, error) {
return result.OK()
}

type conditionOption struct {
cond metav1.Condition
err error
}

func (m conditionOption) ApplyOption(obj *v1alpha1.AtlasSchema) {
meta.SetStatusCondition(&obj.Status.Conditions, m.cond)
}

func (m conditionOption) GetResult() (reconcile.Result, error) {
return result.Transient(m.err)
}

func IgnoreNonTransient(err error) error {
if err == nil || result.IsTransient(err) {
return nil
}
return err
}
2 changes: 1 addition & 1 deletion internal/controller/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func isTransient(err error) bool {
// Permanent errors are not returned as errors because they cause
// the controller to requeue indefinitely. Instead, they should be
// reported as a status condition.
func result(err error) (r ctrl.Result, _ error) {
func xresult(err error) (r ctrl.Result, _ error) {
if t := (*transientError)(nil); errors.As(err, &t) {
r.RequeueAfter = t.after
}
Expand Down
36 changes: 36 additions & 0 deletions internal/result/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package result

import (
"errors"
)

// TransientError is an error that should be retried.
type TransientError struct {
Err error
After int
}

// Error implements the error interface
func (t *TransientError) Error() string {
return t.Err.Error()
}

// Unwrap implements the errors.Wrapper interface
func (t *TransientError) Unwrap() error {
return t.Err
}

// TransientErrorAfter wraps an error to indicate that it should be retried after
// the given duration.
func TransientErrorAfter(err error, after int) error {
if err == nil {
return nil
}
return &TransientError{Err: err, After: after}
}

// IsTransient checks if the error is transient
func IsTransient(err error) bool {
var t *TransientError
return errors.As(err, &t)
}
Loading

0 comments on commit 328a20f

Please sign in to comment.