From dd327ff2d49b45fafb5f6702cceb9fa1f00dd56f Mon Sep 17 00:00:00 2001 From: releng Date: Thu, 31 Oct 2024 14:11:45 -0400 Subject: [PATCH] Sync from server repo (6bbd52ce15e) --- commands/cmd_create_db.go | 10 +- commands/cmd_revive_db.go | 2 +- commands/cmd_start_replication.go | 12 +- commands/vcluster_config.go | 32 ++- vclusterops/nma_poll_replication_status.go | 2 +- vclusterops/replication.go | 301 +++++++++++++++------ 6 files changed, 255 insertions(+), 104 deletions(-) diff --git a/commands/cmd_create_db.go b/commands/cmd_create_db.go index ef84be9..f2972cd 100644 --- a/commands/cmd_create_db.go +++ b/commands/cmd_create_db.go @@ -253,11 +253,11 @@ func (c *CmdCreateDB) validateParse(logger vlog.Printer) error { return err } - if !c.usePassword() { - err = c.getCertFilesFromCertPaths(&c.createDBOptions.DatabaseOptions) - if err != nil { - return err - } + // for creating a database, db password is mandatory input + // we need to read certs for connecting to node management agent + err = c.getCertFilesFromCertPaths(&c.createDBOptions.DatabaseOptions) + if err != nil { + return err } err = c.setDBPassword(&c.createDBOptions.DatabaseOptions) diff --git a/commands/cmd_revive_db.go b/commands/cmd_revive_db.go index edc03ef..63126af 100644 --- a/commands/cmd_revive_db.go +++ b/commands/cmd_revive_db.go @@ -215,7 +215,7 @@ func (c *CmdReviveDB) Run(vcc vclusterops.ClusterCommands) error { // config file already exists. This could happen if we have partially revived the db(sandbox or main cluster) already // In this case, we update the existing config file instead of overwriting it. dbConfig = *dbConfigPtr - updateConfig(vdb, &dbConfig) + UpdateDBConfig(vdb, &dbConfig, c.reviveDBOptions.Sandbox, c.reviveDBOptions.MainCluster) writeErr := dbConfig.write(c.reviveDBOptions.ConfigPath, true /*forceOverwrite*/) if writeErr != nil { vcc.DisplayWarning("Fail to update config file: %s", writeErr) diff --git a/commands/cmd_start_replication.go b/commands/cmd_start_replication.go index 4b5827c..d03d2fd 100644 --- a/commands/cmd_start_replication.go +++ b/commands/cmd_start_replication.go @@ -195,7 +195,12 @@ func (c *CmdStartReplication) validateParse(logger vlog.Printer) error { return err } } - err := c.parseTargetHostList() + err := c.getTargetCertFilesFromCertPaths(&c.startRepOptions.TargetDB) + if err != nil { + return err + } + + err = c.parseTargetHostList() if err != nil { return err } @@ -205,6 +210,11 @@ func (c *CmdStartReplication) validateParse(logger vlog.Printer) error { return err } + err = c.ValidateParseBaseTargetOptions(&c.startRepOptions.TargetDB) + if err != nil { + return err + } + err = c.ValidateParseBaseOptions(&c.startRepOptions.DatabaseOptions) if err != nil { return err diff --git a/commands/vcluster_config.go b/commands/vcluster_config.go index b83f594..00004b9 100644 --- a/commands/vcluster_config.go +++ b/commands/vcluster_config.go @@ -20,6 +20,7 @@ import ( "os" "path/filepath" "sort" + "strings" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -235,7 +236,7 @@ func readVDBToDBConfig(vdb *vclusterops.VCoordinationDatabase) (DatabaseConfig, return dbConfig, fmt.Errorf("cannot find host %s from HostNodeMap", host) } - nodeConfig := buildNodeConfig(vnode, vdb) + nodeConfig := BuildNodeConfig(vnode, vdb) dbConfig.Nodes = append(dbConfig.Nodes, &nodeConfig) } @@ -248,7 +249,7 @@ func readVDBToDBConfig(vdb *vclusterops.VCoordinationDatabase) (DatabaseConfig, return dbConfig, nil } -func buildNodeConfig(vnode *vclusterops.VCoordinationNode, +func BuildNodeConfig(vnode *vclusterops.VCoordinationNode, vdb *vclusterops.VCoordinationDatabase) NodeConfig { nodeConfig := NodeConfig{} nodeConfig.Name = vnode.Name @@ -281,7 +282,14 @@ func updateNodeConfig(vnode *vclusterops.VCoordinationNode, n.Address = vnode.Address n.Subcluster = vnode.Subcluster n.Sandbox = vnode.Sandbox - n.CatalogPath = vnode.CatalogPath + if n.CatalogPath == "" { + if strings.HasSuffix(vnode.CatalogPath, "/Catalog") { + // Remove the "/Catalog" suffix and assign the remaining path to catalogPath + n.CatalogPath = strings.TrimSuffix(vnode.CatalogPath, "/Catalog") + } else { + n.CatalogPath = vnode.CatalogPath + } + } if vdb.DataPrefix == "" && len(vnode.StorageLocations) > 0 && n.DataPath == "" { n.DataPath = vnode.StorageLocations[0] } @@ -289,7 +297,7 @@ func updateNodeConfig(vnode *vclusterops.VCoordinationNode, } // update the input dbConfig -func updateConfig(vdb *vclusterops.VCoordinationDatabase, dbConfig *DatabaseConfig) { +func UpdateDBConfig(vdb *vclusterops.VCoordinationDatabase, dbConfig *DatabaseConfig, sandbox string, mainClusterOnly bool) { var newNodes []*NodeConfig nodeConfigMap := make(map[string]*NodeConfig) for _, n := range dbConfig.Nodes { @@ -297,13 +305,15 @@ func updateConfig(vdb *vclusterops.VCoordinationDatabase, dbConfig *DatabaseConf } for _, vnode := range vdb.HostNodeMap { - if n, exists := nodeConfigMap[vnode.Name]; exists { - // If found, update the existing node configuration - updateNodeConfig(vnode, vdb, n) - } else { - // If not found, build and append a new node configuration - n := buildNodeConfig(vnode, vdb) - newNodes = append(newNodes, &n) + if sandbox == vnode.Sandbox || (mainClusterOnly && vnode.Sandbox == util.MainClusterSandbox) { + if n, exists := nodeConfigMap[vnode.Name]; exists { + // If found, update the existing node configuration + updateNodeConfig(vnode, vdb, n) + } else { + // If not found, build and append a new node configuration + n := BuildNodeConfig(vnode, vdb) + newNodes = append(newNodes, &n) + } } } dbConfig.Nodes = append(dbConfig.Nodes, newNodes...) diff --git a/vclusterops/nma_poll_replication_status.go b/vclusterops/nma_poll_replication_status.go index 05b6689..45e485f 100644 --- a/vclusterops/nma_poll_replication_status.go +++ b/vclusterops/nma_poll_replication_status.go @@ -45,13 +45,13 @@ func makeNMAPollReplicationStatusOp(targetDBOpt *DatabaseOptions, targetUsePassw op.vdb = vdb op.existingTransactionIDs = existingTransactionIDs op.newTransactionID = newTransactionID + op.TargetDB.UserName = targetDBOpt.UserName if targetUsePassword { err := util.ValidateUsernameAndPassword(op.name, targetUsePassword, targetDBOpt.UserName) if err != nil { return op, err } - op.TargetDB.UserName = targetDBOpt.UserName op.TargetDB.Password = targetDBOpt.Password } diff --git a/vclusterops/replication.go b/vclusterops/replication.go index 12d7a45..b08a197 100644 --- a/vclusterops/replication.go +++ b/vclusterops/replication.go @@ -189,12 +189,6 @@ func (options *VReplicationDatabaseOptions) validateAnalyzeOptions(logger vlog.P // VReplicateDatabase can copy all table data and metadata from this cluster to another func (vcc VClusterCommands) VReplicateDatabase(options *VReplicationDatabaseOptions) (int64, error) { - /* - * - Produce Instructions - * - Create a VClusterOpEngine - * - Give the instructions to the VClusterOpEngine to run - */ - // validate and analyze options err := options.validateAnalyzeOptions(vcc.Log) if err != nil { @@ -209,11 +203,202 @@ func (vcc VClusterCommands) VReplicateDatabase(options *VReplicationDatabaseOpti } asyncReplicationTransactionID := new(int64) + if options.Async { + err := vcc.replicateDatabaseAsync(options, &vdb, asyncReplicationTransactionID) + if err != nil { + return 0, err + } + } else { + err := vcc.replicateDatabaseSync(options, &vdb) + if err != nil { + return 0, err + } + } + return *asyncReplicationTransactionID, nil +} + +// Perform asynchronous database replication +func (vcc VClusterCommands) replicateDatabaseAsync(options *VReplicationDatabaseOptions, + vdb *VCoordinationDatabase, asyncReplicationTransactionID *int64) error { + /* + * Async replication steps: + * - (on target) Run NMA health check, get a list of existing transaction IDs + * - (on source) Run NMA health check, start asynchronous replication + * - (on target) Poll NMA for a new transaction ID - this is the ID for the new asynchronous replication operation + * + * Since source and target NMA certs can be different (VER-96992), we have to create multiple VClusterOpEngines to + * perform these steps. For each step: + * - Produce Instructions + * - Create a VClusterOpEngine with the correct certs (source or target) + * - Give the instructions to the VClusterOpEngine to run + */ + + // need username for https operations in source database + err := options.setUsePasswordAndValidateUsernameIfNeeded(vcc.Log) + if err != nil { + return err + } + + // verify the username for connecting to the target database + targetUsePassword := false + if options.TargetDB.Password != nil { + targetUsePassword = true + if options.TargetDB.UserName == "" { + username, e := util.GetCurrentUsername() + if e != nil { + return e + } + options.TargetDB.UserName = username + } + vcc.Log.Info("Current target username", "username", options.TargetDB.UserName) + } + + // Produce instructions for target NMA health check and getting a list of existing transaction IDs + transactionIDs := &[]int64{} + instructions, err := vcc.produceGetTransactionIDsInstructions(options, targetUsePassword, transactionIDs) + if err != nil { + return fmt.Errorf("fail to produce instructions for getting existing transaction IDs, %w", err) + } + + // Create a VClusterOpEngine, and add target certs to the engine + clusterOpEngine := makeClusterOpEngine(instructions, &options.TargetDB) + + // Give the instructions to the VClusterOpEngine to run + runError := clusterOpEngine.run(vcc.Log) + if runError != nil { + return fmt.Errorf("fail to get existing transaction IDs: %w", runError) + } + + // Produce instructions for starting async replication + instructions, err = vcc.produceStartAsyncReplicationInstructions(options, vdb, targetUsePassword) + if err != nil { + return fmt.Errorf("fail to produce instructions for starting replication, %w", err) + } + + // create a VClusterOpEngine, and add source certs to the engine + clusterOpEngine = makeClusterOpEngine(instructions, &options.DatabaseOptions) + + // give the instructions to the VClusterOpEngine to run + runError = clusterOpEngine.run(vcc.Log) + if runError != nil { + return fmt.Errorf("fail to start replication: %w", runError) + } + + // Produce instructions for getting a new transaction ID to identify the async replication operation + instructions, err = vcc.produceGetNewTransactionIDInstructions(options, vdb, targetUsePassword, + transactionIDs, asyncReplicationTransactionID) + if err != nil { + return fmt.Errorf("fail to produce instructions for getting transaction ID, %w", err) + } + + // Create a VClusterOpEngine, and add target certs to the engine + clusterOpEngine = makeClusterOpEngine(instructions, &options.TargetDB) + + // Give the instructions to the VClusterOpEngine to run + runError = clusterOpEngine.run(vcc.Log) + if runError != nil { + return fmt.Errorf("fail to get transaction ID: %w", runError) + } + + return nil +} + +func (vcc VClusterCommands) produceGetTransactionIDsInstructions(options *VReplicationDatabaseOptions, + targetUsePassword bool, transactionIDs *[]int64) ([]clusterOp, error) { + var instructions []clusterOp + + nmaHealthOp := makeNMAHealthOp(options.TargetDB.Hosts) + + // Retrieve a list of transaction IDs before async replication starts + nmaReplicationStatusData := nmaReplicationStatusRequestData{} + nmaReplicationStatusData.DBName = options.TargetDB.DBName + nmaReplicationStatusData.ExcludedTransactionIDs = []int64{} // Get all transaction IDs + nmaReplicationStatusData.GetTransactionIDsOnly = true // We only care about transaction IDs here + nmaReplicationStatusData.TransactionID = 0 // Set this to 0 so NMA returns all IDs + nmaReplicationStatusData.UserName = options.TargetDB.UserName + nmaReplicationStatusData.Password = options.TargetDB.Password + + nmaReplicationStatusOp, err := makeNMAReplicationStatusOp(options.TargetDB.Hosts, targetUsePassword, + &nmaReplicationStatusData, transactionIDs, nil) + if err != nil { + return instructions, err + } + + instructions = append(instructions, + &nmaHealthOp, + &nmaReplicationStatusOp, + ) + + return instructions, nil +} + +func (vcc VClusterCommands) produceStartAsyncReplicationInstructions(options *VReplicationDatabaseOptions, + vdb *VCoordinationDatabase, targetUsePassword bool) ([]clusterOp, error) { + var instructions []clusterOp + + initiatorTargetHost := getInitiator(options.TargetDB.Hosts) + + nmaHealthOp := makeNMAHealthOp(options.Hosts) + + nmaReplicationData := nmaStartReplicationRequestData{} + nmaReplicationData.DBName = options.DBName + nmaReplicationData.ExcludePattern = options.ExcludePattern + nmaReplicationData.IncludePattern = options.IncludePattern + nmaReplicationData.TableOrSchemaName = options.TableOrSchemaName + nmaReplicationData.Username = options.UserName + nmaReplicationData.Password = options.Password + nmaReplicationData.TargetDBName = options.TargetDB.DBName + nmaReplicationData.TargetHost = initiatorTargetHost + nmaReplicationData.TargetNamespace = options.TargetNamespace + nmaReplicationData.TargetUserName = options.TargetDB.UserName + nmaReplicationData.TargetPassword = options.TargetDB.Password + nmaReplicationData.TLSConfig = options.SourceTLSConfig + + nmaStartReplicationOp, err := makeNMAReplicationStartOp(options.Hosts, options.usePassword, targetUsePassword, + &nmaReplicationData, vdb) + if err != nil { + return instructions, err + } + + instructions = append(instructions, + &nmaHealthOp, + &nmaStartReplicationOp, + ) + + return instructions, nil +} + +func (vcc VClusterCommands) produceGetNewTransactionIDInstructions(options *VReplicationDatabaseOptions, + vdb *VCoordinationDatabase, targetUsePassword bool, transactionIDs *[]int64, + asyncReplicationTransactionID *int64) ([]clusterOp, error) { + var instructions []clusterOp + + nmaPollReplicationStatusOp, err := makeNMAPollReplicationStatusOp(&options.TargetDB, targetUsePassword, + options.SandboxName, vdb, transactionIDs, asyncReplicationTransactionID) + if err != nil { + return instructions, err + } + + instructions = append(instructions, + &nmaPollReplicationStatusOp, + ) + + return instructions, nil +} + +// Perform synchronous database replication +func (vcc VClusterCommands) replicateDatabaseSync(options *VReplicationDatabaseOptions, + vdb *VCoordinationDatabase) error { + /* + * - Produce Instructions + * - Create a VClusterOpEngine + * - Give the instructions to the VClusterOpEngine to run + */ // produce database replication instructions - instructions, err := vcc.produceDBReplicationInstructions(options, &vdb, asyncReplicationTransactionID) + instructions, err := vcc.produceSyncDBReplicationInstructions(options, vdb) if err != nil { - return 0, fmt.Errorf("fail to produce instructions, %w", err) + return fmt.Errorf("fail to produce instructions, %w", err) } // create a VClusterOpEngine, and add certs to the engine @@ -228,18 +413,17 @@ func (vcc VClusterCommands) VReplicateDatabase(options *VReplicationDatabaseOpti "2. set EnableConnectCredentialForwarding to True in source database using vsql " + "3. configure a Trust Authentication in target database using vsql") } - return 0, fmt.Errorf("fail to replicate database: %w", runError) + return fmt.Errorf("fail to replicate database: %w", runError) } - return *asyncReplicationTransactionID, nil + + return nil } -// The generated instructions will later perform the following operations necessary -// for a successful replication. -// - Check NMA connectivity -// - Check Vertica versions -// - Replicate database -func (vcc VClusterCommands) produceDBReplicationInstructions(options *VReplicationDatabaseOptions, - vdb *VCoordinationDatabase, asyncReplicationTransactionID *int64) ([]clusterOp, error) { +// The generated instructions will later perform the following operations necessary for synchronous replication: +// - Disallow multiple namespaces +// - Replicate database (synchronous) +func (vcc VClusterCommands) produceSyncDBReplicationInstructions(options *VReplicationDatabaseOptions, + vdb *VCoordinationDatabase) ([]clusterOp, error) { var instructions []clusterOp // need username for https operations in source database @@ -263,77 +447,24 @@ func (vcc VClusterCommands) produceDBReplicationInstructions(options *VReplicati } initiatorTargetHost := getInitiator(options.TargetDB.Hosts) - if options.Async { - nmaHealthOp := makeNMAHealthOp(append(options.Hosts, options.TargetDB.Hosts...)) - - transactionIDs := &[]int64{} - - // Retrieve a list of transaction IDs before async replication starts - nmaReplicationStatusData := nmaReplicationStatusRequestData{} - nmaReplicationStatusData.DBName = options.TargetDB.DBName - nmaReplicationStatusData.ExcludedTransactionIDs = []int64{} // Get all transaction IDs - nmaReplicationStatusData.GetTransactionIDsOnly = true // We only care about transaction IDs here - nmaReplicationStatusData.TransactionID = 0 // Set this to 0 so NMA returns all IDs - nmaReplicationStatusData.UserName = options.TargetDB.UserName - nmaReplicationStatusData.Password = options.TargetDB.Password - - nmaReplicationStatusOp, err := makeNMAReplicationStatusOp(options.TargetDB.Hosts, targetUsePassword, - &nmaReplicationStatusData, transactionIDs, nil) - if err != nil { - return instructions, err - } - - nmaReplicationData := nmaStartReplicationRequestData{} - nmaReplicationData.DBName = options.DBName - nmaReplicationData.ExcludePattern = options.ExcludePattern - nmaReplicationData.IncludePattern = options.IncludePattern - nmaReplicationData.TableOrSchemaName = options.TableOrSchemaName - nmaReplicationData.Username = options.UserName - nmaReplicationData.Password = options.Password - nmaReplicationData.TargetDBName = options.TargetDB.DBName - nmaReplicationData.TargetHost = initiatorTargetHost - nmaReplicationData.TargetNamespace = options.TargetNamespace - nmaReplicationData.TargetUserName = options.TargetDB.UserName - nmaReplicationData.TargetPassword = options.TargetDB.Password - nmaReplicationData.TLSConfig = options.SourceTLSConfig - - nmaStartReplicationOp, err := makeNMAReplicationStartOp(options.Hosts, options.usePassword, targetUsePassword, - &nmaReplicationData, vdb) - if err != nil { - return instructions, err - } - - nmaPollReplicationStatusOp, err := makeNMAPollReplicationStatusOp(&options.TargetDB, targetUsePassword, - options.SandboxName, vdb, transactionIDs, asyncReplicationTransactionID) - if err != nil { - return instructions, err - } - - instructions = append(instructions, - &nmaHealthOp, - &nmaReplicationStatusOp, - &nmaStartReplicationOp, - &nmaPollReplicationStatusOp, - ) - } else { - httpsDisallowMultipleNamespacesOp, err := makeHTTPSDisallowMultipleNamespacesOp(options.Hosts, - options.usePassword, options.UserName, options.Password, options.SandboxName, vdb) - if err != nil { - return instructions, err - } - httpsStartReplicationOp, err := makeHTTPSStartReplicationOp(options.DBName, options.Hosts, options.usePassword, - options.UserName, options.Password, targetUsePassword, &options.TargetDB, initiatorTargetHost, - options.SourceTLSConfig, options.SandboxName, vdb) - if err != nil { - return instructions, err - } + httpsDisallowMultipleNamespacesOp, err := makeHTTPSDisallowMultipleNamespacesOp(options.Hosts, + options.usePassword, options.UserName, options.Password, options.SandboxName, vdb) + if err != nil { + return instructions, err + } - instructions = append(instructions, - &httpsDisallowMultipleNamespacesOp, - &httpsStartReplicationOp, - ) + httpsStartReplicationOp, err := makeHTTPSStartReplicationOp(options.DBName, options.Hosts, options.usePassword, + options.UserName, options.Password, targetUsePassword, &options.TargetDB, initiatorTargetHost, + options.SourceTLSConfig, options.SandboxName, vdb) + if err != nil { + return instructions, err } + instructions = append(instructions, + &httpsDisallowMultipleNamespacesOp, + &httpsStartReplicationOp, + ) + return instructions, nil }