Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
PTEUDO-1991 drop database prior to restoring it
Browse files Browse the repository at this point in the history
    When a database is partially or fully restored, we fail
    to migrate to it. To support migration to existing dbs,
    we will drop the database. This requires dropping all
    current connections to the database in the process.

    Add a migration test to a previously migrated database.
Drew Wells committed Nov 10, 2024
1 parent 600e772 commit b589c76
Showing 13 changed files with 1,217 additions and 177 deletions.
169 changes: 161 additions & 8 deletions internal/controller/databasecontroller_migrate_test.go
Original file line number Diff line number Diff line change
@@ -41,7 +41,7 @@ import (
"github.com/infobloxopen/db-controller/pkg/pgctl"
)

var _ = Describe("claim migrate", func() {
var _ = Describe("Migrate", func() {

// Define utility constants for object names and testing timeouts/durations and intervals.

@@ -93,12 +93,11 @@ var _ = Describe("claim migrate", func() {
// Namespace: "default",
// }

claim := &persistancev1.DatabaseClaim{}

kctx := context.Background()

BeforeEach(func() {

claim := &persistancev1.DatabaseClaim{}
By("ensuring the resource does not exist")
Expect(k8sClient.Get(kctx, typeNamespacedName, claim)).To(HaveOccurred())

@@ -119,7 +118,7 @@ var _ = Describe("claim migrate", func() {
},
Spec: persistancev1.DatabaseClaimSpec{
Class: ptr.To(""),
DatabaseName: "postgres",
DatabaseName: "sample_app",
SecretName: claimSecretName,
EnableSuperUser: ptr.To(false),
EnableReplicationRole: ptr.To(false),
@@ -128,6 +127,7 @@ var _ = Describe("claim migrate", func() {
SourceDataFrom: &persistancev1.SourceDataFrom{
Type: "database",
Database: &persistancev1.Database{
DSN: testDSN,
SecretRef: &persistancev1.SecretRef{
Name: sourceSecretName,
Namespace: "default",
@@ -190,10 +190,12 @@ var _ = Describe("claim migrate", func() {

_, err := controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).NotTo(HaveOccurred())
var claim persistancev1.DatabaseClaim
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &claim)).NotTo(HaveOccurred())
Expect(claim.Status.Error).To(Equal(""))
By("Ensuring the active db connection info is set")
Eventually(func() *persistancev1.DatabaseClaimConnectionInfo {
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, claim)).NotTo(HaveOccurred())
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &claim)).NotTo(HaveOccurred())
return claim.Status.ActiveDB.ConnectionInfo
}).ShouldNot(BeNil())

@@ -223,16 +225,17 @@ var _ = Describe("claim migrate", func() {
Expect(dsn.Redacted()).To(Equal(redacted))
})

It("Migrate", func() {
It("Populate a new database", func() {

Expect(controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})).To(Equal(success))
var dbc persistancev1.DatabaseClaim
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
Expect(claim.Status.Error).To(Equal(""))
Expect(dbc.Status.Error).To(Equal(""))
By("Ensuring the active db connection info is set")
Expect(controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})).To(Equal(success))
Eventually(func() *persistancev1.DatabaseClaimConnectionInfo {
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
return claim.Status.ActiveDB.ConnectionInfo
return dbc.Status.ActiveDB.ConnectionInfo
}).ShouldNot(BeNil())

hostParams, err := hostparams.New(controllerReconciler.Config.Viper, &dbc)
@@ -316,5 +319,155 @@ var _ = Describe("claim migrate", func() {

})

It("Recreate existing database", func() {

Expect(controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})).To(Equal(success))
var dbc persistancev1.DatabaseClaim
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
Expect(dbc.Status.Error).To(Equal(""))
By("Ensuring the active db connection info is set")
Eventually(func() *persistancev1.DatabaseClaimConnectionInfo {
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
return dbc.Status.ActiveDB.ConnectionInfo
}).ShouldNot(BeNil())

oldDB, err := url.Parse(testDSN)
Expect(err).NotTo(HaveOccurred())
aConn := dbc.Status.ActiveDB.ConnectionInfo
Expect(aConn.Port).To(Equal(oldDB.Port()))

logger.Info("what", "status", dbc.Status.ActiveDB.ConnectionInfo.Uri())

hostParams, err := hostparams.New(controllerReconciler.Config.Viper, &dbc)
Expect(err).ToNot(HaveOccurred())

fakeCPSecretName := fmt.Sprintf("%s-%s-%s", env, resourceName, hostParams.Hash())

By(fmt.Sprintf("Mocking a RDS pod to look like crossplane set it up: %s", fakeCPSecretName))
fakeCli, fakeDSN, fakeCancel := dockerdb.MockRDS(GinkgoT(), ctxLogger, k8sClient, fakeCPSecretName, "migrate", dbc.Spec.DatabaseName)
_ = fakeCancel
DeferCleanup(fakeCancel)

fakeCPSecret := corev1.Secret{
ObjectMeta: metav1.ObjectMeta{
Name: fakeCPSecretName,
Namespace: "default",
},
}
nname := types.NamespacedName{
Name: fakeCPSecretName,
Namespace: "default",
}
Eventually(k8sClient.Get(ctxLogger, nname, &fakeCPSecret)).Should(Succeed())
logger.Info("debugsecret", "rdssecret", fakeCPSecret)

By("Disabling UseExistingSource")
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
dbc.Spec.UseExistingSource = ptr.To(false)
Expect(k8sClient.Update(ctxLogger, &dbc)).NotTo(HaveOccurred())

res, err := controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).To(BeNil())
Expect(dbc.Status.Error).To(Equal(""))
Expect(res.Requeue).To(BeTrue())

By("Check source DSN looks roughly correct")
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
activeDB := dbc.Status.ActiveDB
Expect(activeDB.ConnectionInfo).NotTo(BeNil())
compareDSN := strings.Replace(testDSN, "//postgres:postgres", fmt.Sprintf("//%s_a:", migratedowner), 1)
Expect(activeDB.ConnectionInfo.Uri()).To(Equal(compareDSN))

By("Check target DSN looks roughly correct")
newDB := dbc.Status.NewDB
compareDSN = strings.Replace(fakeDSN, "//migrate:postgres", fmt.Sprintf("//%s_a:", migratedowner), 1)
Expect(newDB.ConnectionInfo).NotTo(BeNil())
Expect(newDB.ConnectionInfo.Uri()).To(Equal(compareDSN))
Expect(dbc.Status.MigrationState).To(Equal(pgctl.S_MigrationInProgress.String()))
logger.Info("what newDSN", "status", dbc.Status.ActiveDB.ConnectionInfo.Uri())
logger.Info("what newDSN", "status", dbc.Status)

var tempCreds corev1.Secret
// temp-migrate-dbclaim-creds
Expect(k8sClient.Get(ctxLogger, types.NamespacedName{Name: "temp-" + claimSecretName, Namespace: "default"}, &tempCreds)).NotTo(HaveOccurred())
for k, v := range tempCreds.Data {
logger.Info("tempcreds", k, string(v))
}

By("CR reconciles but must be requeued to perform migration, reconcile manually for test")
res, err = controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).To(BeNil())
Expect(res.Requeue).To(BeFalse())
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
Expect(dbc.Status.Error).To(Equal(""))

By("Waiting to disable source, reconcile manually again")
res, err = controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).To(BeNil())
Expect(res.RequeueAfter).To(Equal(time.Duration(60 * time.Second)))
By("Verify migration is complete on this reconcile")
res, err = controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).To(BeNil())
Expect(res.Requeue).To(BeFalse())
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
Expect(dbc.Status.Error).To(Equal(""))
Expect(dbc.Status.MigrationState).To(Equal(pgctl.S_Completed.String()))
activeDB = dbc.Status.ActiveDB
Expect(activeDB.ConnectionInfo).NotTo(BeNil())
Expect(activeDB.ConnectionInfo.Uri()).To(Equal(compareDSN))

By(fmt.Sprintf("Verify owner of migrated DB is %s", migratedowner))
row := fakeCli.QueryRowContext(ctxLogger, "select tableowner from pg_tables where tablename = 'users' AND schemaname = 'public';")
var owner string
Expect(row.Scan(&owner)).NotTo(HaveOccurred())
Expect(owner).To(Equal(migratedowner))

// Re-Enable and Disable `UseExistingSource` to trigger a second migration
By("Enable UseExistingSource to switch back to original database")
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
dbc.Spec.UseExistingSource = ptr.To(true)
Expect(k8sClient.Update(ctxLogger, &dbc)).NotTo(HaveOccurred())
res, err = controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).To(BeNil())
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
Expect(dbc.Status.Error).To(Equal(""))
Expect(res.Requeue).To(BeFalse())
Expect(dbc.Status.MigrationState).To(BeEmpty())

By("Checking active db connection info is reverted to original dsn")
Expect(activeDB.ConnectionInfo).NotTo(BeNil())
aConn = dbc.Status.ActiveDB.ConnectionInfo
Expect(aConn.Port).To(Equal(oldDB.Port()))
logger.Info("what oldDSN", "status", dbc.Status.ActiveDB.ConnectionInfo.Uri())

By("Disabling UseExistingSource Again")
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
dbc.Spec.UseExistingSource = ptr.To(false)
Expect(k8sClient.Update(ctxLogger, &dbc)).NotTo(HaveOccurred())
res, err = controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).To(BeNil())
Expect(dbc.Status.Error).To(Equal(""))

By("Checking that migration completes")
Eventually(func() string {
res, err = controllerReconciler.Reconcile(ctxLogger, reconcile.Request{NamespacedName: typeNamespacedName})
Expect(err).To(BeNil())

Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
Expect(dbc.Status.Error).To(Equal(""))
return dbc.Status.MigrationState
}).Should(Equal(pgctl.S_Completed.String()))

// Keep requeuing until migration is complete

By("Checking active db moves to new db")
fakeURI, err := url.Parse(fakeDSN)
Expect(err).NotTo(HaveOccurred())
Expect(k8sClient.Get(ctxLogger, typeNamespacedName, &dbc)).NotTo(HaveOccurred())
aConn = dbc.Status.ActiveDB.ConnectionInfo
Expect(aConn.Port).To(Equal(fakeURI.Port()))
logger.Info("what newDSN", "status", dbc.Status)
})

})
})
5 changes: 3 additions & 2 deletions internal/controller/suite_test.go
Original file line number Diff line number Diff line change
@@ -75,7 +75,7 @@ func NewGinkgoLogger() logr.Logger {
// Create a new Zap logger, routing output to GinkgoWriter
return zap.New(zap.WriteTo(GinkgoWriter), zap.UseFlagOptions(&zap.Options{
Encoder: zapcore.NewConsoleEncoder(uberzap.NewDevelopmentEncoderConfig()),
Development: true,
Development: false,
TimeEncoder: zapcore.RFC3339TimeEncoder,
}))
}
@@ -130,7 +130,8 @@ var _ = BeforeSuite(func() {
Expect(k8sClient).NotTo(BeNil())

now := time.Now()
testdb, testDSN, cleanupTestDB = dockerdb.Run(dockerdb.Config{
logger.Info("start postgres setup")
testdb, testDSN, cleanupTestDB = dockerdb.Run(logger, dockerdb.Config{
Database: "postgres",
Username: "postgres",
Password: "postgres",
Loading

0 comments on commit b589c76

Please sign in to comment.