diff --git a/internal/controller/atlasschema_controller.go b/internal/controller/atlasschema_controller.go index ac4bf27..f5e0065 100644 --- a/internal/controller/atlasschema_controller.go +++ b/internal/controller/atlasschema_controller.go @@ -22,6 +22,7 @@ import ( "fmt" "io" "net/url" + "path" "path/filepath" "strconv" "strings" @@ -211,64 +212,13 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) To: desiredURL, TxMode: string(data.TxMode), } - if repo := data.repoURL(); repo != "" { - // List the schema plans to check if there are any plans. - switch plans, err := cli.SchemaPlanList(ctx, &atlasexec.SchemaPlanListParams{ - Env: data.EnvName, - Vars: vars, - Repo: repo, - From: []string{"env://url"}, - To: []string{desiredURL}, - }); { - case err != nil: - reason, msg := "ListingPlans", err.Error() - res.SetNotReady(reason, msg) - r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) - if !isSQLErr(err) { - err = transient(err) - } - r.recordErrEvent(res, err) - return result(err) - // There are multiple pending plans. - // This is an unexpected state. - case len(plans) > 1: - planURLs := make([]string, 0, len(plans)) - for _, p := range plans { - planURLs = append(planURLs, p.URL) - } - log.Info("multiple schema plans found", "plans", planURLs) - reason, msg := "ListingPlans", fmt.Sprintf("multiple schema plans found: %s", strings.Join(planURLs, ", ")) - res.SetNotReady(reason, msg) - r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) - err = errors.New(msg) - r.recordErrEvent(res, err) - return result(err) - // Deploy the changes using the approved plan. - case len(plans) == 1 && plans[0].Status == "APPROVED": - log.Info("found an approved schema plan, applying", "plan", plans[0].URL) - params.PlanURL = plans[0].URL - // The plan is pending approval, show the plan to the user. - case len(plans) == 1 && plans[0].Status == "PENDING": - log.Info("found a pending schema plan, waiting for approval", "plan", plans[0].URL) - res.Status.PlanURL = plans[0].URL - res.Status.PlanLink = plans[0].Link - reason, msg := "ApprovalPending", "Schema plan is waiting for approval" - res.SetNotReady(reason, msg) - r.recorder.Event(res, corev1.EventTypeNormal, reason, msg) - return ctrl.Result{RequeueAfter: time.Second * 5}, nil - } + repo := data.repoURL() + if repo == nil { + // No repository is set, apply the changes directly. + report, err = cli.SchemaApply(ctx, params) + break } - // Try to apply the schema changes with lint policies, - // if the changes are rejected by the review policy, create a plan - // for the pending changes. - report, err = cli.SchemaApply(ctx, params) - // TODO: Better error handling for rejected changes. - if err != nil && strings.HasPrefix(err.Error(), "Rejected by review policy") { - log.Info("schema changes are rejected by the review policy, creating a new schema plan") - // If the desired state is a file, we need to push the schema to the Atlas Cloud. - // This is to ensure that the schema is in sync with the Atlas Cloud. - // And the schema is available for the Atlas CLI (on local machine) - // to modify or approve the changes. + createPlan := func() (ctrl.Result, error) { if data.Desired.Scheme == dbv1alpha1.SchemaTypeFile { log.Info("schema is a file, pushing the schema to Atlas Cloud") // Using hash of desired state as the tag for the schema. @@ -292,7 +242,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) state, err := cli.SchemaPush(ctx, &atlasexec.SchemaPushParams{ Env: data.EnvName, Vars: vars, - Name: data.Cloud.Repo, + Name: path.Join(repo.Host, repo.Path), Tag: fmt.Sprintf("operator-plan-%.8s", strings.ToLower(tag)), URL: []string{desiredURL}, }) @@ -313,7 +263,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) plan, err := cli.SchemaPlan(ctx, &atlasexec.SchemaPlanParams{ Env: data.EnvName, Vars: vars, - Repo: data.repoURL(), + Repo: repo.String(), From: []string{"env://url"}, To: []string{desiredURL}, Pending: true, @@ -336,6 +286,67 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request) r.recorder.Event(res, corev1.EventTypeNormal, reason, msg) return ctrl.Result{RequeueAfter: time.Second * 5}, nil } + // List the schema plans to check if there are any plans. + switch plans, err := cli.SchemaPlanList(ctx, &atlasexec.SchemaPlanListParams{ + Env: data.EnvName, + Vars: vars, + Repo: repo.String(), + From: []string{"env://url"}, + To: []string{desiredURL}, + }); { + case err != nil: + reason, msg := "ListingPlans", err.Error() + res.SetNotReady(reason, msg) + r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) + if !isSQLErr(err) { + err = transient(err) + } + r.recordErrEvent(res, err) + return result(err) + // There are multiple pending plans. + // This is an unexpected state. + case len(plans) > 1: + planURLs := make([]string, 0, len(plans)) + for _, p := range plans { + planURLs = append(planURLs, p.URL) + } + log.Info("multiple schema plans found", "plans", planURLs) + reason, msg := "ListingPlans", fmt.Sprintf("multiple schema plans found: %s", strings.Join(planURLs, ", ")) + res.SetNotReady(reason, msg) + r.recorder.Event(res, corev1.EventTypeWarning, reason, msg) + err = errors.New(msg) + r.recordErrEvent(res, err) + return result(err) + // Deploy the changes using the approved plan. + case len(plans) == 1 && plans[0].Status == "APPROVED": + log.Info("found an approved schema plan, applying", "plan", plans[0].URL) + params.PlanURL = plans[0].URL + // The plan is pending approval, show the plan to the user. + case len(plans) == 1 && plans[0].Status == "PENDING": + log.Info("found a pending schema plan, waiting for approval", "plan", plans[0].URL) + res.Status.PlanURL = plans[0].URL + res.Status.PlanLink = plans[0].Link + reason, msg := "ApprovalPending", "Schema plan is waiting for approval" + res.SetNotReady(reason, msg) + r.recorder.Event(res, corev1.EventTypeNormal, reason, msg) + return ctrl.Result{RequeueAfter: time.Second * 5}, nil + case len(plans) == 0 && vars["lint_review"] == "ALWAYS": + // No pending plans, but atlas is asked to ALWAYS review the changes. + return createPlan() + } + // Try to apply the schema changes with lint policies, + // if the changes are rejected by the review policy, create a plan + // for the pending changes. + report, err = cli.SchemaApply(ctx, params) + // TODO: Better error handling for rejected changes. + if err != nil && strings.HasPrefix(err.Error(), "Rejected by review policy") { + log.Info("schema changes are rejected by the review policy, creating a new schema plan") + // If the desired state is a file, we need to push the schema to the Atlas Cloud. + // This is to ensure that the schema is in sync with the Atlas Cloud. + // And the schema is available for the Atlas CLI (on local machine) + // to modify or approve the changes. + return createPlan() + } // Verify the first run doesn't contain destructive changes. case res.Status.LastApplied == 0: err = r.lint(ctx, wd, data, atlasexec.Vars2{ @@ -543,21 +554,21 @@ func (d *managedData) hash() (string, error) { return hex.EncodeToString(h.Sum(nil)), nil } -func (d *managedData) repoURL() string { +func (d *managedData) repoURL() *url.URL { switch { // The user has provided the repository name. case d.Cloud != nil && d.Cloud.Repo != "": return (&url.URL{ Scheme: v1alpha1.SchemaTypeAtlas, Host: d.Cloud.Repo, - }).String() + }) // Fallback to desired URL if it's Cloud URL. case d.Desired.Scheme == dbv1alpha1.SchemaTypeAtlas: c := *d.Desired c.RawQuery = "" - return c.String() + return &c default: - return "" + return nil } }