Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

internal/controller: handle the case of Atlas returning multiple results #249

Merged
merged 1 commit into from
Dec 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions internal/controller/atlasmigration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,7 +358,7 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio
log.Info("applying pending migrations", "count", len(status.Pending))
// There are pending migrations
// Execute Atlas CLI migrate command
report, err := c.MigrateApply(ctx, &atlasexec.MigrateApplyParams{
reports, err := c.MigrateApplySlice(ctx, &atlasexec.MigrateApplyParams{
Env: data.EnvName,
Context: &atlasexec.DeployRunContext{
TriggerType: atlasexec.TriggerTypeKubernetes,
Expand All @@ -372,12 +372,15 @@ func (r *AtlasMigrationReconciler) reconcile(ctx context.Context, data *migratio
}
return err
}
if len(reports) != 1 {
return fmt.Errorf("unexpected number of reports: %d", len(reports))
}
res.SetReady(dbv1alpha1.AtlasMigrationStatus{
ObservedHash: data.ObservedHash,
LastApplied: report.End.Unix(),
LastAppliedVersion: report.Target,
LastApplied: reports[0].End.Unix(),
LastAppliedVersion: reports[0].Target,
})
r.recordApplied(res, report.Target)
r.recordApplied(res, reports[0].Target)
}
if data.Dir != nil {
// Compress the migration directory then store it in the secret
Expand Down
27 changes: 15 additions & 12 deletions internal/controller/atlasschema_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
default:
log.Info("the resource is connected to Atlas Cloud", "org", whoami.Org)
}
var report *atlasexec.SchemaApply
var reports []*atlasexec.SchemaApply
switch desiredURL := data.Desired.String(); {
// The resource is connected to Atlas Cloud.
case whoami != nil:
Expand All @@ -221,7 +221,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
repo := data.repoURL()
if repo == nil {
// No repository is set, apply the changes directly.
report, err = cli.SchemaApply(ctx, params)
reports, err = cli.SchemaApplySlice(ctx, params)
break
}
createPlan := func() (ctrl.Result, error) {
Expand Down Expand Up @@ -352,7 +352,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
// Try to apply the schema changes with lint policies,
// if the changes are rejected by the review policy, create a plan
// for the pending changes.
report, err = cli.SchemaApply(ctx, params)
reports, err = cli.SchemaApplySlice(ctx, params)
// TODO: Better error handling for rejected changes.
if err != nil && strings.HasPrefix(err.Error(), "Rejected by review policy") {
log.Info("schema changes are rejected by the review policy, creating a new schema plan")
Expand Down Expand Up @@ -391,7 +391,7 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
}); err != nil {
return result(err)
}
report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
To: desiredURL,
TxMode: string(data.TxMode),
Expand All @@ -409,15 +409,15 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.recordErrEvent(res, err)
return result(err)
}
report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
To: desiredURL,
TxMode: string(data.TxMode),
AutoApprove: true,
})
// No linting policy is set.
default:
report, err = cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
reports, err = cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
To: desiredURL,
TxMode: string(data.TxMode),
Expand All @@ -433,20 +433,23 @@ func (r *AtlasSchemaReconciler) Reconcile(ctx context.Context, req ctrl.Request)
r.recordErrEvent(res, err)
return result(err)
}
log.Info("schema changes are applied", "applied", len(report.Changes.Applied))
// Truncate the applied and pending changes to 1024 bytes.
report.Changes.Applied = truncateSQL(report.Changes.Applied, sqlLimitSize)
report.Changes.Pending = truncateSQL(report.Changes.Pending, sqlLimitSize)
s := dbv1alpha1.AtlasSchemaStatus{
LastApplied: time.Now().Unix(),
ObservedHash: hash,
}
if len(reports) != 1 {
return result(fmt.Errorf("unexpected number of schema reports: %d", len(reports)))
}
log.Info("schema changes are applied", "applied", len(reports[0].Changes.Applied))
// Truncate the applied and pending changes to 1024 bytes.
reports[0].Changes.Applied = truncateSQL(reports[0].Changes.Applied, sqlLimitSize)
reports[0].Changes.Pending = truncateSQL(reports[0].Changes.Pending, sqlLimitSize)
// Set the plan URL if it exists.
if p := report.Plan; p != nil {
if p := reports[0].Plan; p != nil {
s.PlanLink = p.File.Link
s.PlanURL = p.File.URL
}
res.SetReady(s, report)
res.SetReady(s, reports[0])
r.recorder.Event(res, corev1.EventTypeNormal, "Applied", "Applied schema")
return ctrl.Result{}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion internal/controller/atlasschema_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -672,7 +672,7 @@ func (t *test) initDB(statement string) {
require.NoError(t, err)
cli, err := atlasexec.NewClient(wd.Path(), "atlas")
require.NoError(t, err)
_, err = cli.SchemaApply(context.Background(), &atlasexec.SchemaApplyParams{
_, err = cli.SchemaApplySlice(context.Background(), &atlasexec.SchemaApplyParams{
URL: t.dburl,
DevURL: "sqlite://file2/?mode=memory",
To: "file://./schema.sql",
Expand Down
9 changes: 4 additions & 5 deletions internal/controller/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,17 +43,16 @@ type (
}
// AtlasExec is the interface for the atlas exec client.
AtlasExec interface {
// MigrateApply runs the `migrate apply` command and returns the successful runs.
MigrateApply(context.Context, *atlasexec.MigrateApplyParams) (*atlasexec.MigrateApply, error)
// MigrateApplySlice runs the `migrate apply` command and returns the successful runs.
MigrateApplySlice(context.Context, *atlasexec.MigrateApplyParams) ([]*atlasexec.MigrateApply, error)
// MigrateDown runs the `migrate down` command.
MigrateDown(context.Context, *atlasexec.MigrateDownParams) (*atlasexec.MigrateDown, error)
// MigrateLint runs the `migrate lint` command.
MigrateLint(context.Context, *atlasexec.MigrateLintParams) (*atlasexec.SummaryReport, error)
// MigrateStatus runs the `migrate status` command.
MigrateStatus(context.Context, *atlasexec.MigrateStatusParams) (*atlasexec.MigrateStatus, error)

// SchemaApply runs the `schema apply` command.
SchemaApply(context.Context, *atlasexec.SchemaApplyParams) (*atlasexec.SchemaApply, error)
// SchemaApplySlice runs the `schema apply` command and returns the successful runs.
SchemaApplySlice(context.Context, *atlasexec.SchemaApplyParams) ([]*atlasexec.SchemaApply, error)
// SchemaInspect runs the `schema inspect` command.
SchemaInspect(ctx context.Context, params *atlasexec.SchemaInspectParams) (string, error)
// SchemaPush runs the `schema push` command.
Expand Down
7 changes: 5 additions & 2 deletions internal/controller/lint.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (r *AtlasSchemaReconciler) lint(ctx context.Context, wd *atlasexec.WorkingD
if err != nil {
return err
}
plan, err := cli.SchemaApply(ctx, &atlasexec.SchemaApplyParams{
plans, err := cli.SchemaApplySlice(ctx, &atlasexec.SchemaApplyParams{
Env: data.EnvName,
Vars: vars,
To: data.Desired.String(),
Expand All @@ -63,9 +63,12 @@ func (r *AtlasSchemaReconciler) lint(ctx context.Context, wd *atlasexec.WorkingD
"unable to remove temporary directory", "dir", dir)
}
}()
if len(plans) != 1 {
return fmt.Errorf("unexpected number of schema plans: %d", len(plans))
}
dir, err := memDir(map[string]string{
"1.sql": current,
"2.sql": strings.Join(plan.Changes.Pending, ";\n"),
"2.sql": strings.Join(plans[0].Changes.Pending, ";\n"),
})
if err != nil {
return err
Expand Down
11 changes: 8 additions & 3 deletions internal/controller/testhelper.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,9 @@ func (m *mockAtlasExec) WhoAmI(context.Context) (*atlasexec.WhoAmI, error) {
return m.whoami.res, m.whoami.err
}

// SchemaApply implements AtlasExec.
func (m *mockAtlasExec) SchemaApply(ctx context.Context, params *atlasexec.SchemaApplyParams) (*atlasexec.SchemaApply, error) {
return m.schemaApply.res, m.schemaApply.err
// SchemaAppleSlice implements AtlasExec.
func (m *mockAtlasExec) SchemaApplySlice(ctx context.Context, params *atlasexec.SchemaApplyParams) ([]*atlasexec.SchemaApply, error) {
return []*atlasexec.SchemaApply{m.schemaApply.res}, m.schemaApply.err
}

// SchemaInspect implements AtlasExec.
Expand All @@ -97,6 +97,11 @@ func (m *mockAtlasExec) MigrateApply(context.Context, *atlasexec.MigrateApplyPar
return m.apply.res, m.apply.err
}

// MigrateApplySlice implements AtlasExec.
func (m *mockAtlasExec) MigrateApplySlice(context.Context, *atlasexec.MigrateApplyParams) ([]*atlasexec.MigrateApply, error) {
return []*atlasexec.MigrateApply{m.apply.res}, m.apply.err
}

// MigrateDown implements AtlasExec.
func (m *mockAtlasExec) MigrateDown(context.Context, *atlasexec.MigrateDownParams) (*atlasexec.MigrateDown, error) {
return m.down.res, m.down.err
Expand Down
Loading