Skip to content

Commit

Permalink
atlas/schema: support ALWAYS for review.policy
Browse files Browse the repository at this point in the history
  • Loading branch information
giautm committed Nov 11, 2024
1 parent 9069a76 commit 64ce036
Showing 1 changed file with 74 additions and 63 deletions.
137 changes: 74 additions & 63 deletions internal/controller/atlasschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"fmt"
"io"
"net/url"
"path"
"path/filepath"
"strconv"
"strings"
Expand Down Expand Up @@ -211,64 +212,13 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
To: desiredURL,
TxMode: string(data.TxMode),
}
if repo := data.repoURL(); repo != "" {
// List the schema plans to check if there are any plans.
switch plans, err := cli.SchemaPlanList(ctx, &atlasexec.SchemaPlanListParams{
Env: data.EnvName,
Vars: vars,
Repo: repo,
From: []string{"env://url"},
To: []string{desiredURL},
}); {
case err != nil:
reason, msg := "ListingPlans", err.Error()
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
// There are multiple pending plans.
// This is an unexpected state.
case len(plans) > 1:
planURLs := make([]string, 0, len(plans))
for _, p := range plans {
planURLs = append(planURLs, p.URL)
}
log.Info("multiple schema plans found", "plans", planURLs)
reason, msg := "ListingPlans", fmt.Sprintf("multiple schema plans found: %s", strings.Join(planURLs, ", "))
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
err = errors.New(msg)
r.recordErrEvent(res, err)
return result(err)
// Deploy the changes using the approved plan.
case len(plans) == 1 && plans[0].Status == "APPROVED":
log.Info("found an approved schema plan, applying", "plan", plans[0].URL)
params.PlanURL = plans[0].URL
// The plan is pending approval, show the plan to the user.
case len(plans) == 1 && plans[0].Status == "PENDING":
log.Info("found a pending schema plan, waiting for approval", "plan", plans[0].URL)
res.Status.PlanURL = plans[0].URL
res.Status.PlanLink = plans[0].Link
reason, msg := "ApprovalPending", "Schema plan is waiting for approval"
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeNormal, reason, msg)
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
repo := data.repoURL()
if repo == nil {
// No repository is set, apply the changes directly.
report, err = cli.SchemaApply(ctx, params)
break
}
// Try to apply the schema changes with lint policies,
// if the changes are rejected by the review policy, create a plan
// for the pending changes.
report, err = cli.SchemaApply(ctx, params)
// TODO: Better error handling for rejected changes.
if err != nil && strings.HasPrefix(err.Error(), "Rejected by review policy") {
log.Info("schema changes are rejected by the review policy, creating a new schema plan")
// If the desired state is a file, we need to push the schema to the Atlas Cloud.
// This is to ensure that the schema is in sync with the Atlas Cloud.
// And the schema is available for the Atlas CLI (on local machine)
// to modify or approve the changes.
createPlan := func() (ctrl.Result, error) {
if data.Desired.Scheme == dbv1alpha1.SchemaTypeFile {
log.Info("schema is a file, pushing the schema to Atlas Cloud")
// Using hash of desired state as the tag for the schema.
Expand All @@ -292,7 +242,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
state, err := cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{
Env: data.EnvName,
Vars: vars,
Name: data.Cloud.Repo,
Name: path.Join(repo.Host, repo.Path),
Tag: fmt.Sprintf("operator-plan-%.8s", strings.ToLower(tag)),
URL: []string{desiredURL},
})
Expand All @@ -313,7 +263,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
plan, err := cli.SchemaPlan(ctx, &atlasexec.SchemaPlanParams{
Env: data.EnvName,
Vars: vars,
Repo: data.repoURL(),
Repo: repo.String(),
From: []string{"env://url"},
To: []string{desiredURL},
Pending: true,
Expand All @@ -336,6 +286,67 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.recorder.Event(res, corev1.EventTypeNormal, reason, msg)
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
}
// List the schema plans to check if there are any plans.
switch plans, err := cli.SchemaPlanList(ctx, &atlasexec.SchemaPlanListParams{
Env: data.EnvName,
Vars: vars,
Repo: repo.String(),
From: []string{"env://url"},
To: []string{desiredURL},
}); {
case err != nil:
reason, msg := "ListingPlans", err.Error()
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
if !isSQLErr(err) {
err = transient(err)
}
r.recordErrEvent(res, err)
return result(err)
// There are multiple pending plans.
// This is an unexpected state.
case len(plans) > 1:
planURLs := make([]string, 0, len(plans))
for _, p := range plans {
planURLs = append(planURLs, p.URL)
}
log.Info("multiple schema plans found", "plans", planURLs)
reason, msg := "ListingPlans", fmt.Sprintf("multiple schema plans found: %s", strings.Join(planURLs, ", "))
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeWarning, reason, msg)
err = errors.New(msg)
r.recordErrEvent(res, err)
return result(err)
// Deploy the changes using the approved plan.
case len(plans) == 1 && plans[0].Status == "APPROVED":
log.Info("found an approved schema plan, applying", "plan", plans[0].URL)
params.PlanURL = plans[0].URL
// The plan is pending approval, show the plan to the user.
case len(plans) == 1 && plans[0].Status == "PENDING":
log.Info("found a pending schema plan, waiting for approval", "plan", plans[0].URL)
res.Status.PlanURL = plans[0].URL
res.Status.PlanLink = plans[0].Link
reason, msg := "ApprovalPending", "Schema plan is waiting for approval"
res.SetNotReady(reason, msg)
r.recorder.Event(res, corev1.EventTypeNormal, reason, msg)
return ctrl.Result{RequeueAfter: time.Second * 5}, nil
case len(plans) == 0 && vars["lint_review"] == "ALWAYS":
// No pending plans, but atlas is asked to ALWAYS review the changes.
return createPlan()
}
// Try to apply the schema changes with lint policies,
// if the changes are rejected by the review policy, create a plan
// for the pending changes.
report, err = cli.SchemaApply(ctx, params)
// TODO: Better error handling for rejected changes.
if err != nil && strings.HasPrefix(err.Error(), "Rejected by review policy") {
log.Info("schema changes are rejected by the review policy, creating a new schema plan")
// If the desired state is a file, we need to push the schema to the Atlas Cloud.
// This is to ensure that the schema is in sync with the Atlas Cloud.
// And the schema is available for the Atlas CLI (on local machine)
// to modify or approve the changes.
return createPlan()
}
// Verify the first run doesn't contain destructive changes.
case res.Status.LastApplied == 0:
err = r.lint(ctx, wd, data, atlasexec.Vars2{
Expand Down Expand Up @@ -543,21 +554,21 @@ func (d *managedData) hash() (string, error) {
return hex.EncodeToString(h.Sum(nil)), nil
}

func (d *managedData) repoURL() string {
func (d *managedData) repoURL() *url.URL {
switch {
// The user has provided the repository name.
case d.Cloud != nil && d.Cloud.Repo != "":
return (&url.URL{
Scheme: v1alpha1.SchemaTypeAtlas,
Host: d.Cloud.Repo,
}).String()
})
// Fallback to desired URL if it's Cloud URL.
case d.Desired.Scheme == dbv1alpha1.SchemaTypeAtlas:
c := *d.Desired
c.RawQuery = ""
return c.String()
return &c
default:
return ""
return nil
}
}

Expand Down

0 comments on commit 64ce036

Please sign in to comment.