diff --git a/commands/cluster_command_launcher.go b/commands/cluster_command_launcher.go index 1c8dad9..c9961b3 100644 --- a/commands/cluster_command_launcher.go +++ b/commands/cluster_command_launcher.go @@ -121,6 +121,16 @@ const ( targetPasswordFileKey = "targetPasswordFile" targetConnFlag = "target-conn" targetConnKey = "targetConn" + targetKeyFileFlag = "target-key-file" + targetKeyFileKey = "targetKeyFile" + targetCertFileFlag = "target-cert-file" + targetCertFileKey = "targetCertFile" + targetCaCertFileFlag = "target-ca-cert-file" + targetCaCertFileKey = "targetCaCertFile" + targetTLSModeFlag = "target-tls-mode" + targetTLSModeKey = "targetTLSMode" + targetIPv6Flag = "target-ipv6" + targetIPv6Key = "targetIPv6" asyncFlag = "async" asyncKey = "async" sourceTLSConfigFlag = "source-tlsconfig" @@ -133,6 +143,8 @@ const ( excludePatternKey = "excludePattern" targetNamespaceFlag = "target-namespace" targetNamespaceKey = "targetNamespace" + transactionIDFlag = "transaction-id" + transactionIDKey = "transactionID" ) // flags to viper key map @@ -164,12 +176,18 @@ var flagKeyMap = map[string]string{ targetHostsFlag: targetHostsKey, targetUserNameFlag: targetUserNameKey, targetPasswordFileFlag: targetPasswordFileKey, + targetKeyFileFlag: targetKeyFileKey, + targetCertFileFlag: targetCertFileKey, + targetCaCertFileFlag: targetCaCertFileKey, + targetTLSModeFlag: targetTLSModeKey, + targetIPv6Flag: targetIPv6Key, asyncFlag: asyncKey, sourceTLSConfigFlag: sourceTLSConfigKey, tableOrSchemaNameFlag: tableOrSchemaNameKey, includePatternFlag: includePatternKey, excludePatternFlag: excludePatternKey, targetNamespaceFlag: targetNamespaceKey, + transactionIDFlag: transactionIDKey, } // target database flags to viper key map @@ -178,6 +196,11 @@ var targetFlagKeyMap = map[string]string{ targetHostsFlag: targetHostsKey, targetUserNameFlag: targetUserNameKey, targetPasswordFileFlag: targetPasswordFileKey, + targetKeyFileFlag: targetKeyFileKey, + targetCertFileFlag: targetCertFileKey, + targetCaCertFileFlag: targetCaCertFileKey, + targetTLSModeFlag: targetTLSModeKey, + targetIPv6Flag: targetIPv6Key, } // map of viper keys to environment variables @@ -199,6 +222,7 @@ const ( configShowSubCmd = "show" replicationSubCmd = "replication" startReplicationSubCmd = "start" + replicationStatusSubCmd = "status" listAllNodesSubCmd = "list_all_nodes" startDBSubCmd = "start_db" dropDBSubCmd = "drop_db" @@ -239,6 +263,11 @@ type cmdGlobals struct { targetDB string targetUserName string connFile string + targetKeyFile string + targetCertFile string + targetCaCertFile string + targetTLSMode string + targetIPv6 bool } var ( @@ -360,6 +389,18 @@ func setTargetDBOptionsUsingViper(flag string) error { globals.targetUserName = viper.GetString(targetUserNameKey) case targetPasswordFileFlag: globals.targetPasswordFile = viper.GetString(targetPasswordFileKey) + case targetKeyFileFlag: + globals.targetKeyFile = viper.GetString(targetKeyFileKey) + case targetCertFileFlag: + globals.targetCertFile = viper.GetString(targetCertFileKey) + case targetCaCertFileFlag: + globals.targetCaCertFile = viper.GetString(targetCaCertFileKey) + case targetTLSModeFlag: + globals.targetTLSMode = viper.GetString(targetTLSModeKey) + case targetIPv6Flag: + globals.targetIPv6 = viper.GetBool(targetIPv6Key) + case verboseFlag: + globals.verbose = viper.GetBool(verboseKey) default: return fmt.Errorf("cannot find the relevant target database option for flag %q", flag) } @@ -373,7 +414,7 @@ func configViper(cmd *cobra.Command, flagsInConfig []string) error { initConfig() // target-flags are only available for replication start command - if cmd.CalledAs() == startReplicationSubCmd { + if cmd.CalledAs() == startReplicationSubCmd || cmd.CalledAs() == replicationStatusSubCmd { for targetFlag := range targetFlagKeyMap { flagsInConfig = append(flagsInConfig, targetFlag) } @@ -441,7 +482,7 @@ func loadConfig(cmd *cobra.Command) (err error) { // load target db options from connection file to viper // conn file is only available for replication subcommand - if cmd.CalledAs() == startReplicationSubCmd { + if cmd.CalledAs() == startReplicationSubCmd || cmd.CalledAs() == replicationStatusSubCmd { err := loadConnToViper() if err != nil { return err @@ -582,6 +623,7 @@ func constructCmds() []*cobra.Command { makeCmdScrutinize(), makeCmdManageConfig(), makeCmdReplication(), + makeCmdGetReplicationStatus(), makeCmdCreateConnection(), // hidden cmds (for internal testing only) makeCmdGetDrainingStatus(), diff --git a/commands/cmd_base.go b/commands/cmd_base.go index 6d1b409..a5d6992 100644 --- a/commands/cmd_base.go +++ b/commands/cmd_base.go @@ -71,8 +71,37 @@ func (c *CmdBase) ValidateParseBaseOptions(opt *vclusterops.DatabaseOptions) err // parse TLS mode. vclusterops allows different behavior for NMA and HTTPS conns, but // for simplicity and lack of use case outside k8s, vcluster does not. - if globals.tlsMode != "" { - switch tlsMode := strings.ToLower(globals.tlsMode); tlsMode { + err := validateParseTLSMode(opt, globals.tlsMode) + if err != nil { + return err + } + + return nil +} + +// ValidateParseBaseTargetOptions will validate and parse the required target options in each command +func (c *CmdBase) ValidateParseBaseTargetOptions(opt *vclusterops.DatabaseOptions) error { + // parse raw hosts + if len(opt.Hosts) > 0 { + err := util.ParseHostList(&opt.Hosts) + if err != nil { + return err + } + } + + // parse TLS mode. vclusterops allows different behavior for NMA and HTTPS conns, but + // for simplicity and lack of use case outside k8s, vcluster does not. + err := validateParseTLSMode(opt, globals.targetTLSMode) + if err != nil { + return err + } + + return nil +} + +func validateParseTLSMode(opt *vclusterops.DatabaseOptions, tlsMode string) error { + if tlsMode != "" { + switch tlsModeLower := strings.ToLower(tlsMode); tlsModeLower { case tlsModeEnable: opt.DoVerifyHTTPSServerCert = false opt.DoVerifyNMAServerCert = false @@ -87,7 +116,7 @@ func (c *CmdBase) ValidateParseBaseOptions(opt *vclusterops.DatabaseOptions) err opt.DoVerifyPeerCertHostname = true default: return fmt.Errorf("unrecognized TLS mode: %s. Allowed values are: '%s', '%s'", - globals.tlsMode, tlsModeEnable, tlsModeVerifyCA) + tlsMode, tlsModeEnable, tlsModeVerifyCA) } } @@ -132,6 +161,10 @@ func (c *CmdBase) setCommonFlags(cmd *cobra.Command, flags []string) { c.setTLSFlags(cmd) } + if cmd.Name() == startReplicationSubCmd || cmd.Name() == replicationStatusSubCmd { + c.setTargetDBFlags(cmd) + } + if util.StringInArray(outputFileFlag, flags) { cmd.Flags().StringVarP( &c.output, @@ -282,6 +315,79 @@ func (c *CmdBase) setTLSFlags(cmd *cobra.Command) { ) } +func (c *CmdBase) setTargetDBFlags(cmd *cobra.Command) { + cmd.Flags().StringSliceVar( + &globals.targetHosts, + targetHostsFlag, + []string{}, + "A comma-separated list of hosts in target database.", + ) + cmd.Flags().StringVar( + &globals.targetUserName, + targetUserNameFlag, + "", + "The name of a user in the target database.", + ) + cmd.Flags().StringVar( + &globals.targetPasswordFile, + targetPasswordFileFlag, + "", + "The absolute path to a file containing the password for the target database. ", + ) + cmd.Flags().StringVar( + &globals.connFile, + targetConnFlag, + "", + "[Required] The absolute path to the connection file created with the create_connection command, "+ + "containing the database name, hosts, and password (if any) for the target database. "+ + "Alternatively, you can provide this information manually with --target-db-name, "+ + "--target-hosts, and --target-password-file", + ) + markFlagsFileName(cmd, map[string][]string{targetConnFlag: {"yaml"}}) + + cmd.Flags().StringVar( + &globals.targetKeyFile, + targetKeyFileFlag, + "", + fmt.Sprintf("Path to the key file for the target database, the default value is %s", + filepath.Join(vclusterops.CertPathBase, "{username}.key")), + ) + markFlagsFileName(cmd, map[string][]string{targetKeyFileFlag: {"key"}}) + + cmd.Flags().StringVar( + &globals.targetCertFile, + targetCertFileFlag, + "", + fmt.Sprintf("Path to the cert file for the target database, the default value is %s", + filepath.Join(vclusterops.CertPathBase, "{username}.pem")), + ) + markFlagsFileName(cmd, map[string][]string{targetCertFileFlag: {"pem", "crt"}}) + cmd.MarkFlagsRequiredTogether(targetKeyFileFlag, targetCertFileFlag) + + cmd.Flags().StringVar( + &globals.targetCaCertFile, + targetCaCertFileFlag, + "", + fmt.Sprintf("Path to the trusted CA cert file for the target database, the default value is %s", + filepath.Join(vclusterops.CertPathBase, "rootca.pem")), + ) + markFlagsFileName(cmd, map[string][]string{caCertFileFlag: {"pem", "crt"}}) + + cmd.Flags().StringVar( + &globals.targetTLSMode, + targetTLSModeFlag, + "", + fmt.Sprintf("Mode for TLS validation for the target database. "+ + "Allowed values '%s', '%s', and '%s'. Default value is '%s'.", + tlsModeEnable, tlsModeVerifyCA, tlsModeVerifyFull, tlsModeEnable), + ) + cmd.Flags().BoolVar( + &globals.targetIPv6, + targetIPv6Flag, + false, + "Whether the target hosts use IPv6 addresses. Hostnames resolve to IPv4 by default.") +} + func (c *CmdBase) initConfigParam() error { // We need to find the path to the config param. The order of precedence is as follows: // 1. Option @@ -519,23 +625,44 @@ func (c *CmdBase) initCmdOutputFile() (*os.File, error) { // getCertFilesFromPaths will update cert and key file from cert path options func (c *CmdBase) getCertFilesFromCertPaths(opt *vclusterops.DatabaseOptions) error { + err := getCertFilesFromCertPaths(opt, globals.certFile, globals.keyFile, globals.caCertFile) + if err != nil { + return err + } + + return nil +} + +// getTargetCertFilesFromPaths will update target cert and key file from cert path options +func (c *CmdBase) getTargetCertFilesFromCertPaths(opt *vclusterops.DatabaseOptions) error { + err := getCertFilesFromCertPaths(opt, globals.targetCertFile, globals.targetKeyFile, globals.targetCaCertFile) + if err != nil { + return err + } + + return nil +} + +// getCertFilesFromPaths will update cert and key file from cert path options +func getCertFilesFromCertPaths(opt *vclusterops.DatabaseOptions, + certFile string, keyFile string, caCertFile string) error { // TODO don't make this conditional on not using a PW for auth (see callers) - if globals.certFile != "" { - certData, err := os.ReadFile(globals.certFile) + if certFile != "" { + certData, err := os.ReadFile(certFile) if err != nil { return fmt.Errorf("failed to read certificate file: %w", err) } opt.Cert = string(certData) } - if globals.keyFile != "" { - keyData, err := os.ReadFile(globals.keyFile) + if keyFile != "" { + keyData, err := os.ReadFile(keyFile) if err != nil { return fmt.Errorf("failed to read private key file: %w", err) } opt.Key = string(keyData) } - if globals.caCertFile != "" { - caCertData, err := os.ReadFile(globals.caCertFile) + if caCertFile != "" { + caCertData, err := os.ReadFile(caCertFile) if err != nil { return fmt.Errorf("failed to read trusted CA certificate file: %w", err) } diff --git a/commands/cmd_create_connection.go b/commands/cmd_create_connection.go index 1dad6ae..afb8d40 100644 --- a/commands/cmd_create_connection.go +++ b/commands/cmd_create_connection.go @@ -36,7 +36,7 @@ func makeCmdCreateConnection() *cobra.Command { newCmd := &CmdCreateConnection{} opt := vclusterops.VReplicationDatabaseFactory() newCmd.connectionOptions = &opt - opt.TargetPassword = new(string) + opt.TargetDB.Password = new(string) cmd := makeBasicCobraCmd( newCmd, @@ -63,25 +63,25 @@ Examples: // setLocalFlags will set the local flags the command has func (c *CmdCreateConnection) setLocalFlags(cmd *cobra.Command) { cmd.Flags().StringVar( - &c.connectionOptions.TargetDB, + &c.connectionOptions.TargetDB.DBName, dbNameFlag, "", "The name of the database. You should only use this option if you want to override the database name in your configuration file.", ) cmd.Flags().StringSliceVar( - &c.connectionOptions.TargetHosts, + &c.connectionOptions.TargetDB.Hosts, hostsFlag, []string{}, "A comma-separated list of hosts in database.") cmd.Flags().StringVar( - &c.connectionOptions.TargetUserName, + &c.connectionOptions.TargetDB.UserName, dbUserFlag, "", "The name of the user in the target database.", ) // password flags cmd.Flags().StringVar( - c.connectionOptions.TargetPassword, + c.connectionOptions.TargetDB.Password, passwordFileFlag, "", "The absolute path to a file containing the password to the target database.", diff --git a/commands/cmd_get_replication_status.go b/commands/cmd_get_replication_status.go new file mode 100644 index 0000000..61e99f3 --- /dev/null +++ b/commands/cmd_get_replication_status.go @@ -0,0 +1,195 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package commands + +import ( + "encoding/json" + "fmt" + + "github.com/spf13/cobra" + "github.com/spf13/viper" + "github.com/vertica/vcluster/vclusterops" + "github.com/vertica/vcluster/vclusterops/util" + "github.com/vertica/vcluster/vclusterops/vlog" +) + +/* CmdGetReplicationStatus + * + * Implements ClusterCommand interface + */ +type CmdGetReplicationStatus struct { + replicationStatusOptions *vclusterops.VReplicationStatusDatabaseOptions + CmdBase + targetPasswordFile string +} + +func makeCmdGetReplicationStatus() *cobra.Command { + newCmd := &CmdGetReplicationStatus{} + opt := vclusterops.VReplicationStatusFactory() + newCmd.replicationStatusOptions = &opt + + cmd := makeBasicCobraCmd( + newCmd, + replicationStatusSubCmd, + "Get the status of an asynchronous replication job.", + `Get the status of an asynchronous replication job + +Examples: + # Get replication status with connection file + vcluster replication status --config --target-conn /opt/vertica/config/target_connection.yaml \ + --transaction-id 12345678901234567 + + # Get replication status without connection file + # option and password-based authentication + vcluster replication status --target-db-name platform_db --target-hosts 10.20.30.43 \ + --target-db-user dbadmin --target-password-file /path/to/password-file \ + --transaction-id 12345678901234567 +`, + []string{outputFileFlag, targetIPv6Flag, targetHostsFlag, targetUserNameFlag, targetPasswordFileFlag, targetConnFlag, + targetKeyFileFlag, targetCertFileFlag, targetCaCertFileFlag, targetTLSModeFlag}, + ) + + // local flags + newCmd.setLocalFlags(cmd) + + // Must provide a connection file or target database/hosts/credentials arguments + markFlagsOneRequired(cmd, []string{targetConnFlag, targetDBNameFlag}) + markFlagsOneRequired(cmd, []string{targetConnFlag, targetHostsFlag}) + markFlagsOneRequired(cmd, []string{targetConnFlag, targetUserNameFlag}) + markFlagsOneRequired(cmd, []string{targetConnFlag, targetPasswordFileFlag}) + + markFlagsRequired(cmd, transactionIDFlag) + + return cmd +} + +// setLocalFlags will set the local flags the command has +func (c *CmdGetReplicationStatus) setLocalFlags(cmd *cobra.Command) { + cmd.Flags().StringVar( + &c.replicationStatusOptions.TargetDB.DBName, + targetDBNameFlag, + "", + "The target database where data was replicated to.", + ) + + cmd.Flags().Int64Var( + &c.replicationStatusOptions.TransactionID, + transactionIDFlag, + 0, + "[Required] The transaction ID of the asynchronous replication job output by the replication start command.", + ) +} + +func (c *CmdGetReplicationStatus) Parse(inputArgv []string, logger vlog.Printer) error { + c.argv = inputArgv + logger.LogMaskedArgParse(c.argv) + + return c.validateParse(logger) +} + +// all validations of the arguments should go in here +func (c *CmdGetReplicationStatus) validateParse(logger vlog.Printer) error { + logger.Info("Called method validateParse()") + if !c.usePassword() { + err := c.getTargetCertFilesFromCertPaths(&c.replicationStatusOptions.TargetDB) + if err != nil { + return err + } + } + + err := c.parseTargetHostList() + if err != nil { + return err + } + + err = c.parseTargetPassword() + if err != nil { + return err + } + + return c.ValidateParseBaseTargetOptions(&c.replicationStatusOptions.TargetDB) +} + +func (c *CmdGetReplicationStatus) parseTargetHostList() error { + if len(c.replicationStatusOptions.TargetDB.Hosts) == 0 { + return fmt.Errorf("you must specify at least one target host") + } + + err := util.ParseHostList(&c.replicationStatusOptions.TargetDB.Hosts) + if err != nil { + return fmt.Errorf("you must specify at least one target host") + } + return nil +} + +func (c *CmdGetReplicationStatus) parseTargetPassword() error { + options := c.replicationStatusOptions + if !viper.IsSet(targetPasswordFileKey) { + // reset password option to nil if password is not provided in cli + options.TargetDB.Password = nil + return nil + } + if c.replicationStatusOptions.TargetDB.Password == nil { + options.TargetDB.Password = new(string) + } + + if c.targetPasswordFile == "" { + return fmt.Errorf("target password file path is empty") + } + password, err := c.passwordFileHelper(c.targetPasswordFile) + if err != nil { + return err + } + *options.TargetDB.Password = password + return nil +} + +func (c *CmdGetReplicationStatus) Run(vcc vclusterops.ClusterCommands) error { + vcc.LogInfo("Called method Run()") + + options := c.replicationStatusOptions + + replicationStatus, err := vcc.VReplicationStatus(options) + if err != nil { + vcc.LogError(err, "failed to get replication status", "targetDB", options.TargetDB.DBName) + return err + } + + // Output replication status as JSON + if replicationStatus != nil { + bytes, err := json.MarshalIndent(replicationStatus, "", " ") + if err != nil { + return err + } + c.writeCmdOutputToFile(globals.file, bytes, vcc.GetLog()) + // If writing into stdout, add a new line + if c.output == "" { + fmt.Println("") + } + } + + vcc.DisplayInfo("Successfully retrieved replication status") + return nil +} + +// SetDatabaseOptions will assign a vclusterops.DatabaseOptions instance +func (c *CmdGetReplicationStatus) SetDatabaseOptions(_ *vclusterops.DatabaseOptions) { + c.replicationStatusOptions.TargetDB.UserName = globals.targetUserName + c.replicationStatusOptions.TargetDB.DBName = globals.targetDB + c.replicationStatusOptions.TargetDB.Hosts = globals.targetHosts + c.replicationStatusOptions.TargetDB.IPv6 = globals.targetIPv6 + c.targetPasswordFile = globals.targetPasswordFile +} diff --git a/commands/cmd_replication.go b/commands/cmd_replication.go index 4579072..9cfdc32 100644 --- a/commands/cmd_replication.go +++ b/commands/cmd_replication.go @@ -27,5 +27,6 @@ func makeCmdReplication() *cobra.Command { on-going replication operation.`) cmd.AddCommand(makeCmdStartReplication()) + cmd.AddCommand(makeCmdGetReplicationStatus()) return cmd } diff --git a/commands/cmd_start_replication.go b/commands/cmd_start_replication.go index 6f38d2b..4b5827c 100644 --- a/commands/cmd_start_replication.go +++ b/commands/cmd_start_replication.go @@ -103,7 +103,7 @@ Examples: // setLocalFlags will set the local flags the command has func (c *CmdStartReplication) setLocalFlags(cmd *cobra.Command) { cmd.Flags().StringVar( - &c.startRepOptions.TargetDB, + &c.startRepOptions.TargetDB.DBName, targetDBNameFlag, "", "The target database to replicate to.", @@ -114,17 +114,6 @@ func (c *CmdStartReplication) setLocalFlags(cmd *cobra.Command) { "", "The source sandbox to replicate from.", ) - cmd.Flags().StringSliceVar( - &c.startRepOptions.TargetHosts, - targetHostsFlag, - []string{}, - "A comma-separated list of hosts in target database.") - cmd.Flags().StringVar( - &c.startRepOptions.TargetUserName, - targetUserNameFlag, - "", - "The name of a user in the target database.", - ) cmd.Flags().StringVar( &c.startRepOptions.SourceTLSConfig, sourceTLSConfigFlag, @@ -132,15 +121,6 @@ func (c *CmdStartReplication) setLocalFlags(cmd *cobra.Command) { "The TLS configuration to use when connecting to the target database.\n "+ "This TLS configuration must also exist in the source database.", ) - cmd.Flags().StringVar( - &globals.connFile, - targetConnFlag, - "", - "[Required] The absolute path to the connection file created with the create_connection command, "+ - "containing the database name, hosts, and password (if any) for the target database. "+ - "Alternatively, you can provide this information manually with --target-db-name, "+ - "--target-hosts, and --target-password-file", - ) cmd.Flags().BoolVar( &c.startRepOptions.Async, asyncFlag, @@ -186,14 +166,6 @@ func (c *CmdStartReplication) setLocalFlags(cmd *cobra.Command) { " tables in the public schema to the default_namespace in the target"+ " cluster.", ) - markFlagsFileName(cmd, map[string][]string{targetConnFlag: {"yaml"}}) - // password flags - cmd.Flags().StringVar( - &c.targetPasswordFile, - targetPasswordFileFlag, - "", - "The absolute path to a file containing the password for the target database. ", - ) } func (c *CmdStartReplication) Parse(inputArgv []string, logger vlog.Printer) error { @@ -242,8 +214,8 @@ func (c *CmdStartReplication) validateParse(logger vlog.Printer) error { } func (c *CmdStartReplication) parseTargetHostList() error { - if len(c.startRepOptions.TargetHosts) > 0 { - err := util.ParseHostList(&c.startRepOptions.TargetHosts) + if len(c.startRepOptions.TargetDB.Hosts) > 0 { + err := util.ParseHostList(&c.startRepOptions.TargetDB.Hosts) if err != nil { return fmt.Errorf("you must specify at least one target host to replicate to") } @@ -255,11 +227,11 @@ func (c *CmdStartReplication) parseTargetPassword() error { options := c.startRepOptions if !viper.IsSet(targetPasswordFileKey) { // reset password option to nil if password is not provided in cli - options.TargetPassword = nil + options.TargetDB.Password = nil return nil } - if c.startRepOptions.TargetPassword == nil { - options.TargetPassword = new(string) + if c.startRepOptions.TargetDB.Password == nil { + options.TargetDB.Password = new(string) } if c.targetPasswordFile == "" { @@ -269,7 +241,7 @@ func (c *CmdStartReplication) parseTargetPassword() error { if err != nil { return err } - *options.TargetPassword = password + *options.TargetDB.Password = password return nil } @@ -280,14 +252,14 @@ func (c *CmdStartReplication) Run(vcc vclusterops.ClusterCommands) error { transactionID, err := vcc.VReplicateDatabase(options) if err != nil { - vcc.LogError(err, "failed to replicate to database", "targetDB", options.TargetDB) + vcc.LogError(err, "failed to replicate to database", "targetDB", options.TargetDB.DBName) return err } if options.Async { - vcc.DisplayInfo("Successfully started replication to database %s. Transaction ID: %d", options.TargetDB, transactionID) + vcc.DisplayInfo("Successfully started replication to database %s. Transaction ID: %d", options.TargetDB.DBName, transactionID) } else { - vcc.DisplayInfo("Successfully replicated to database %s", options.TargetDB) + vcc.DisplayInfo("Successfully replicated to database %s", options.TargetDB.DBName) } return nil @@ -296,8 +268,9 @@ func (c *CmdStartReplication) Run(vcc vclusterops.ClusterCommands) error { // SetDatabaseOptions will assign a vclusterops.DatabaseOptions instance func (c *CmdStartReplication) SetDatabaseOptions(opt *vclusterops.DatabaseOptions) { c.startRepOptions.DatabaseOptions = *opt - c.startRepOptions.TargetUserName = globals.targetUserName - c.startRepOptions.TargetDB = globals.targetDB - c.startRepOptions.TargetHosts = globals.targetHosts + c.startRepOptions.TargetDB.UserName = globals.targetUserName + c.startRepOptions.TargetDB.DBName = globals.targetDB + c.startRepOptions.TargetDB.Hosts = globals.targetHosts + c.startRepOptions.TargetDB.IPv6 = globals.targetIPv6 c.targetPasswordFile = globals.targetPasswordFile } diff --git a/commands/vcluster_connection.go b/commands/vcluster_connection.go index 8712fb7..ac546ae 100644 --- a/commands/vcluster_connection.go +++ b/commands/vcluster_connection.go @@ -52,10 +52,10 @@ func writeConn(targetdb *vclusterops.VReplicationDatabaseOptions) error { // readTargetDBToDBConn converts target database to DatabaseConnection func readTargetDBToDBConn(cnn *vclusterops.VReplicationDatabaseOptions) DatabaseConnection { targetDBconn := MakeTargetDatabaseConn() - targetDBconn.TargetDBName = cnn.TargetDB - targetDBconn.TargetHosts = cnn.TargetHosts - targetDBconn.TargetPasswordFile = *cnn.TargetPassword - targetDBconn.TargetDBUser = cnn.TargetUserName + targetDBconn.TargetDBName = cnn.TargetDB.DBName + targetDBconn.TargetHosts = cnn.TargetDB.Hosts + targetDBconn.TargetPasswordFile = *cnn.TargetDB.Password + targetDBconn.TargetDBUser = cnn.TargetDB.UserName return targetDBconn } diff --git a/vclusterops/cluster_op.go b/vclusterops/cluster_op.go index 84e08f0..e04ee79 100644 --- a/vclusterops/cluster_op.go +++ b/vclusterops/cluster_op.go @@ -564,6 +564,7 @@ type ClusterCommands interface { VRemoveSubcluster(removeScOpt *VRemoveScOptions) (VCoordinationDatabase, error) VRenameSubcluster(options *VRenameSubclusterOptions) error VReplicateDatabase(options *VReplicationDatabaseOptions) (int64, error) + VReplicationStatus(options *VReplicationStatusDatabaseOptions) (*ReplicationStatusResponse, error) VReviveDatabase(options *VReviveDatabaseOptions) (dbInfo string, vdbPtr *VCoordinationDatabase, err error) VSandbox(options *VSandboxOptions) error VScrutinize(options *VScrutinizeOptions) error diff --git a/vclusterops/https_start_replication_op.go b/vclusterops/https_start_replication_op.go index 2f590cc..0f0228b 100644 --- a/vclusterops/https_start_replication_op.go +++ b/vclusterops/https_start_replication_op.go @@ -26,7 +26,7 @@ import ( type httpsStartReplicationOp struct { opBase opHTTPSBase - TargetDatabaseOptions + TargetDB DatabaseOptions hostRequestBodyMap map[string]string sourceDB string targetHost string @@ -37,7 +37,7 @@ type httpsStartReplicationOp struct { func makeHTTPSStartReplicationOp(dbName string, sourceHosts []string, sourceUseHTTPPassword bool, sourceUserName string, - sourceHTTPPassword *string, targetUseHTTPPassword bool, targetDBOpt *TargetDatabaseOptions, + sourceHTTPPassword *string, targetUseHTTPPassword bool, targetDBOpt *DatabaseOptions, targetHost string, tlsConfig, sandbox string, vdb *VCoordinationDatabase) (httpsStartReplicationOp, error) { op := httpsStartReplicationOp{} op.name = "HTTPSStartReplicationOp" @@ -45,7 +45,7 @@ func makeHTTPSStartReplicationOp(dbName string, sourceHosts []string, op.sourceDB = dbName op.hosts = sourceHosts op.useHTTPPassword = sourceUseHTTPPassword - op.TargetDB = targetDBOpt.TargetDB + op.TargetDB.DBName = targetDBOpt.DBName op.targetHost = targetHost op.tlsConfig = tlsConfig op.sandbox = sandbox @@ -60,12 +60,12 @@ func makeHTTPSStartReplicationOp(dbName string, sourceHosts []string, op.httpsPassword = sourceHTTPPassword } if targetUseHTTPPassword { - err := util.ValidateUsernameAndPassword(op.name, targetUseHTTPPassword, targetDBOpt.TargetUserName) + err := util.ValidateUsernameAndPassword(op.name, targetUseHTTPPassword, targetDBOpt.UserName) if err != nil { return op, err } - op.TargetUserName = targetDBOpt.TargetUserName - op.TargetPassword = targetDBOpt.TargetPassword + op.TargetDB.UserName = targetDBOpt.UserName + op.TargetDB.Password = targetDBOpt.Password } return op, nil @@ -85,9 +85,9 @@ func (op *httpsStartReplicationOp) setupRequestBody(hosts []string) error { for _, host := range hosts { replicateData := replicateRequestData{} replicateData.TargetHost = op.targetHost - replicateData.TargetDB = op.TargetDB - replicateData.TargetUserName = op.TargetUserName - replicateData.TargetPassword = op.TargetPassword + replicateData.TargetDB = op.TargetDB.DBName + replicateData.TargetUserName = op.TargetDB.UserName + replicateData.TargetPassword = op.TargetDB.Password replicateData.TLSConfig = op.tlsConfig dataBytes, err := json.Marshal(replicateData) diff --git a/vclusterops/nma_poll_replication_status.go b/vclusterops/nma_poll_replication_status.go index 0029987..05b6689 100644 --- a/vclusterops/nma_poll_replication_status.go +++ b/vclusterops/nma_poll_replication_status.go @@ -26,7 +26,7 @@ import ( type nmaPollReplicationStatusOp struct { opBase - TargetDatabaseOptions + TargetDB DatabaseOptions hostRequestBodyMap map[string]string sandbox string vdb *VCoordinationDatabase @@ -34,25 +34,25 @@ type nmaPollReplicationStatusOp struct { newTransactionID *int64 } -func makeNMAPollReplicationStatusOp(targetDBOpt *TargetDatabaseOptions, targetUsePassword bool, +func makeNMAPollReplicationStatusOp(targetDBOpt *DatabaseOptions, targetUsePassword bool, sandbox string, vdb *VCoordinationDatabase, existingTransactionIDs *[]int64, newTransactionID *int64) (nmaPollReplicationStatusOp, error) { op := nmaPollReplicationStatusOp{} op.name = "NMAPollReplicationStatusOp" op.description = "Retrieve asynchronous replication transaction ID" - op.TargetDB = targetDBOpt.TargetDB - op.TargetHosts = targetDBOpt.TargetHosts + op.TargetDB.DBName = targetDBOpt.DBName + op.TargetDB.Hosts = targetDBOpt.Hosts op.sandbox = sandbox op.vdb = vdb op.existingTransactionIDs = existingTransactionIDs op.newTransactionID = newTransactionID if targetUsePassword { - err := util.ValidateUsernameAndPassword(op.name, targetUsePassword, targetDBOpt.TargetUserName) + err := util.ValidateUsernameAndPassword(op.name, targetUsePassword, targetDBOpt.UserName) if err != nil { return op, err } - op.TargetUserName = targetDBOpt.TargetUserName - op.TargetPassword = targetDBOpt.TargetPassword + op.TargetDB.UserName = targetDBOpt.UserName + op.TargetDB.Password = targetDBOpt.Password } return op, nil @@ -63,12 +63,12 @@ func (op *nmaPollReplicationStatusOp) updateRequestBody(hosts []string) error { for _, host := range hosts { requestData := nmaReplicationStatusRequestData{} - requestData.DBName = op.TargetDB + requestData.DBName = op.TargetDB.DBName requestData.ExcludedTransactionIDs = *op.existingTransactionIDs requestData.GetTransactionIDsOnly = true requestData.TransactionID = 0 - requestData.UserName = op.TargetUserName - requestData.Password = op.TargetPassword + requestData.UserName = op.TargetDB.UserName + requestData.Password = op.TargetDB.Password dataBytes, err := json.Marshal(requestData) if err != nil { @@ -94,14 +94,14 @@ func (op *nmaPollReplicationStatusOp) setupClusterHTTPRequest(hosts []string) er } func (op *nmaPollReplicationStatusOp) prepare(execContext *opEngineExecContext) error { - err := op.updateRequestBody(op.TargetHosts) + err := op.updateRequestBody(op.TargetDB.Hosts) if err != nil { return err } - execContext.dispatcher.setup(op.TargetHosts) + execContext.dispatcher.setup(op.TargetDB.Hosts) - return op.setupClusterHTTPRequest(op.TargetHosts) + return op.setupClusterHTTPRequest(op.TargetDB.Hosts) } func (op *nmaPollReplicationStatusOp) execute(execContext *opEngineExecContext) error { @@ -145,7 +145,7 @@ func (op *nmaPollReplicationStatusOp) shouldStopPolling() (bool, error) { continue } - responseObj := []replicationStatusResponse{} + responseObj := []ReplicationStatusResponse{} err := op.parseAndCheckResponse(host, result.content, &responseObj) if err != nil { return true, errors.Join(allErrs, err) diff --git a/vclusterops/nma_replication_status.go b/vclusterops/nma_replication_status.go index daf6656..7a21b62 100644 --- a/vclusterops/nma_replication_status.go +++ b/vclusterops/nma_replication_status.go @@ -29,22 +29,20 @@ type nmaReplicationStatusOp struct { nmaReplicationStatusRequestData TargetHosts []string hostRequestBodyMap map[string]string - sandbox string - vdb *VCoordinationDatabase transactionIDs *[]int64 + replicationStatus *[]ReplicationStatusResponse } func makeNMAReplicationStatusOp(targetHosts []string, targetUsePassword bool, - replicationStatusData *nmaReplicationStatusRequestData, sandbox string, vdb *VCoordinationDatabase, - transactionIDs *[]int64) (nmaReplicationStatusOp, error) { + replicationStatusData *nmaReplicationStatusRequestData, + transactionIDs *[]int64, replicationStatus *[]ReplicationStatusResponse) (nmaReplicationStatusOp, error) { op := nmaReplicationStatusOp{} op.name = "NMAReplicationStatusOp" op.description = "Get asynchronous replication status" op.TargetHosts = targetHosts op.nmaReplicationStatusRequestData = *replicationStatusData - op.sandbox = sandbox - op.vdb = vdb op.transactionIDs = transactionIDs + op.replicationStatus = replicationStatus if targetUsePassword { err := util.ValidateUsernameAndPassword(op.name, targetUsePassword, replicationStatusData.UserName) @@ -117,17 +115,6 @@ func (op *nmaReplicationStatusOp) finalize(_ *opEngineExecContext) error { return nil } -type replicationStatusResponse struct { - EndTime string `json:"end_time"` - NodeName string `json:"node_name"` - OpName string `json:"op_name"` - SentBytes int64 `json:"sent_bytes"` - StartTime string `json:"start_time"` - Status string `json:"status"` - TotalBytes int64 `json:"total_bytes"` - TransactionID int64 `json:"txn_id"` -} - func (op *nmaReplicationStatusOp) processResult(_ *opEngineExecContext) error { var allErrs error @@ -145,7 +132,7 @@ func (op *nmaReplicationStatusOp) processResult(_ *opEngineExecContext) error { continue } - responseObj := []replicationStatusResponse{} + responseObj := []ReplicationStatusResponse{} err := op.parseAndCheckResponse(host, result.content, &responseObj) if err != nil { allErrs = errors.Join(allErrs, err) @@ -159,7 +146,13 @@ func (op *nmaReplicationStatusOp) processResult(_ *opEngineExecContext) error { // If we're here, we've successfully received a status from one of the target hosts. // We don't need to check responses from other hosts as they should be the same - *op.transactionIDs = transactionIDs.ToSlice() + if op.transactionIDs != nil { + *op.transactionIDs = transactionIDs.ToSlice() + } + if op.replicationStatus != nil { + *op.replicationStatus = responseObj + } + return nil } diff --git a/vclusterops/replication.go b/vclusterops/replication.go index 8c949dc..12d7a45 100644 --- a/vclusterops/replication.go +++ b/vclusterops/replication.go @@ -23,13 +23,6 @@ import ( "github.com/vertica/vcluster/vclusterops/vlog" ) -type TargetDatabaseOptions struct { - TargetHosts []string - TargetDB string - TargetUserName string - TargetPassword *string -} - type ReplicationOptions struct { TableOrSchemaName string IncludePattern string @@ -42,7 +35,7 @@ type VReplicationDatabaseOptions struct { DatabaseOptions /* part 2: replication info */ - TargetDatabaseOptions + TargetDB DatabaseOptions SourceTLSConfig string SandboxName string Async bool @@ -72,22 +65,22 @@ func (options *VReplicationDatabaseOptions) validateEonOptions() error { } func (options *VReplicationDatabaseOptions) validateExtraOptions() error { - if len(options.TargetHosts) == 0 { + if len(options.TargetDB.Hosts) == 0 { return fmt.Errorf("must specify a target host or target host list") } // valiadate target database - if options.TargetDB == "" { + if options.TargetDB.DBName == "" { return fmt.Errorf("must specify a target database name") } - err := util.ValidateDBName(options.TargetDB) + err := util.ValidateDBName(options.TargetDB.DBName) if err != nil { return err } // need to provide a password or TLSconfig if source and target username are different - if options.TargetUserName != options.UserName { - if options.TargetPassword == nil && options.SourceTLSConfig == "" { + if options.TargetDB.UserName != options.UserName { + if options.TargetDB.Password == nil && options.SourceTLSConfig == "" { return fmt.Errorf("only trust authentication can support username without password or TLSConfig") } } @@ -168,9 +161,9 @@ func (options *VReplicationDatabaseOptions) validateParseOptions(logger vlog.Pri // analyzeOptions will modify some options based on what is chosen func (options *VReplicationDatabaseOptions) analyzeOptions() (err error) { - if len(options.TargetHosts) > 0 { + if len(options.TargetDB.Hosts) > 0 { // resolve RawHosts to be IP addresses - options.TargetHosts, err = util.ResolveRawHostsToAddresses(options.TargetHosts, options.IPv6) + options.TargetDB.Hosts, err = util.ResolveRawHostsToAddresses(options.TargetDB.Hosts, options.TargetDB.IPv6) if err != nil { return err } @@ -224,7 +217,7 @@ func (vcc VClusterCommands) VReplicateDatabase(options *VReplicationDatabaseOpti } // create a VClusterOpEngine, and add certs to the engine - clusterOpEngine := makeClusterOpEngine(instructions, options) + clusterOpEngine := makeClusterOpEngine(instructions, &options.DatabaseOptions) // give the instructions to the VClusterOpEngine to run runError := clusterOpEngine.run(vcc.Log) @@ -257,34 +250,35 @@ func (vcc VClusterCommands) produceDBReplicationInstructions(options *VReplicati // verify the username for connecting to the target database targetUsePassword := false - if options.TargetPassword != nil { + if options.TargetDB.Password != nil { targetUsePassword = true - if options.TargetUserName == "" { + if options.TargetDB.UserName == "" { username, e := util.GetCurrentUsername() if e != nil { return instructions, e } - options.TargetUserName = username + options.TargetDB.UserName = username } - vcc.Log.Info("Current target username", "username", options.TargetUserName) + vcc.Log.Info("Current target username", "username", options.TargetDB.UserName) } - initiatorTargetHost := getInitiator(options.TargetHosts) + initiatorTargetHost := getInitiator(options.TargetDB.Hosts) if options.Async { - nmaHealthOp := makeNMAHealthOp(append(options.Hosts, options.TargetHosts...)) + nmaHealthOp := makeNMAHealthOp(append(options.Hosts, options.TargetDB.Hosts...)) transactionIDs := &[]int64{} + // Retrieve a list of transaction IDs before async replication starts nmaReplicationStatusData := nmaReplicationStatusRequestData{} - nmaReplicationStatusData.DBName = options.TargetDB - nmaReplicationStatusData.ExcludedTransactionIDs = []int64{} - nmaReplicationStatusData.GetTransactionIDsOnly = true - nmaReplicationStatusData.TransactionID = 0 - nmaReplicationStatusData.UserName = options.TargetUserName - nmaReplicationStatusData.Password = options.TargetPassword - - nmaReplicationStatusOp, err := makeNMAReplicationStatusOp(options.TargetHosts, targetUsePassword, - &nmaReplicationStatusData, options.SandboxName, vdb, transactionIDs) + nmaReplicationStatusData.DBName = options.TargetDB.DBName + nmaReplicationStatusData.ExcludedTransactionIDs = []int64{} // Get all transaction IDs + nmaReplicationStatusData.GetTransactionIDsOnly = true // We only care about transaction IDs here + nmaReplicationStatusData.TransactionID = 0 // Set this to 0 so NMA returns all IDs + nmaReplicationStatusData.UserName = options.TargetDB.UserName + nmaReplicationStatusData.Password = options.TargetDB.Password + + nmaReplicationStatusOp, err := makeNMAReplicationStatusOp(options.TargetDB.Hosts, targetUsePassword, + &nmaReplicationStatusData, transactionIDs, nil) if err != nil { return instructions, err } @@ -296,11 +290,11 @@ func (vcc VClusterCommands) produceDBReplicationInstructions(options *VReplicati nmaReplicationData.TableOrSchemaName = options.TableOrSchemaName nmaReplicationData.Username = options.UserName nmaReplicationData.Password = options.Password - nmaReplicationData.TargetDBName = options.TargetDB + nmaReplicationData.TargetDBName = options.TargetDB.DBName nmaReplicationData.TargetHost = initiatorTargetHost nmaReplicationData.TargetNamespace = options.TargetNamespace - nmaReplicationData.TargetUserName = options.TargetUserName - nmaReplicationData.TargetPassword = options.TargetPassword + nmaReplicationData.TargetUserName = options.TargetDB.UserName + nmaReplicationData.TargetPassword = options.TargetDB.Password nmaReplicationData.TLSConfig = options.SourceTLSConfig nmaStartReplicationOp, err := makeNMAReplicationStartOp(options.Hosts, options.usePassword, targetUsePassword, @@ -309,7 +303,7 @@ func (vcc VClusterCommands) produceDBReplicationInstructions(options *VReplicati return instructions, err } - nmaPollReplicationStatusOp, err := makeNMAPollReplicationStatusOp(&options.TargetDatabaseOptions, targetUsePassword, + nmaPollReplicationStatusOp, err := makeNMAPollReplicationStatusOp(&options.TargetDB, targetUsePassword, options.SandboxName, vdb, transactionIDs, asyncReplicationTransactionID) if err != nil { return instructions, err @@ -329,7 +323,7 @@ func (vcc VClusterCommands) produceDBReplicationInstructions(options *VReplicati } httpsStartReplicationOp, err := makeHTTPSStartReplicationOp(options.DBName, options.Hosts, options.usePassword, - options.UserName, options.Password, targetUsePassword, &options.TargetDatabaseOptions, initiatorTargetHost, + options.UserName, options.Password, targetUsePassword, &options.TargetDB, initiatorTargetHost, options.SourceTLSConfig, options.SandboxName, vdb) if err != nil { return instructions, err diff --git a/vclusterops/replication_status.go b/vclusterops/replication_status.go new file mode 100644 index 0000000..518129f --- /dev/null +++ b/vclusterops/replication_status.go @@ -0,0 +1,249 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package vclusterops + +import ( + "fmt" + "sort" + "time" + + "github.com/vertica/vcluster/vclusterops/util" + "github.com/vertica/vcluster/vclusterops/vlog" +) + +type VReplicationStatusDatabaseOptions struct { + TargetDB DatabaseOptions + TransactionID int64 +} + +type ReplicationStatusResponse struct { + // Time replication was started + StartTime string `json:"start_time"` + + // End time, if replication has completed + EndTime string `json:"end_time"` + + // Current replication operation name. Possible values in order: + // - 'load_snapshot_prep' + // - 'data_transfer' - optional if source and target communal storage + // are the same + // - 'load_snapshot' - replication is complete if this op has a + // status of 'completed' + OpName string `json:"op_name"` + + // Current replication operation status. Possible values: + // 'started', 'failed', 'completed' + Status string `json:"status"` + + // Node the current replication operation is on + NodeName string `json:"node_name"` + + // Number of bytes transferred as part of replication + SentBytes int64 `json:"sent_bytes"` + + // Total number of bytes to be transferred as part of replication + TotalBytes int64 `json:"total_bytes"` + TransactionID int64 `json:"txn_id"` +} + +func VReplicationStatusFactory() VReplicationStatusDatabaseOptions { + options := VReplicationStatusDatabaseOptions{} + return options +} + +func (options *VReplicationStatusDatabaseOptions) validateRequiredOptions(_ vlog.Printer) error { + if len(options.TargetDB.Hosts) == 0 { + return fmt.Errorf("must specify a target host or target host list") + } + + // validate target database + if options.TargetDB.DBName == "" { + return fmt.Errorf("must specify a target database name") + } + err := util.ValidateDBName(options.TargetDB.DBName) + if err != nil { + return err + } + + // need to provide a password or TLSconfig if source and target username are different + if options.TargetDB.Password == nil { + return fmt.Errorf("must specify a target password") + } + + if options.TransactionID <= 0 { + return fmt.Errorf("must specify a valid transaction ID") + } + + return nil +} + +func (options *VReplicationStatusDatabaseOptions) validateParseOptions(logger vlog.Printer) error { + // batch 1: validate required params + err := options.validateRequiredOptions(logger) + if err != nil { + return err + } + + return nil +} + +// analyzeOptions will modify some options based on what is chosen +func (options *VReplicationStatusDatabaseOptions) analyzeOptions() (err error) { + if len(options.TargetDB.Hosts) > 0 { + // resolve RawHosts to be IP addresses + options.TargetDB.Hosts, err = util.ResolveRawHostsToAddresses(options.TargetDB.Hosts, options.TargetDB.IPv6) + if err != nil { + return err + } + } + return nil +} + +func (options *VReplicationStatusDatabaseOptions) validateAnalyzeOptions(logger vlog.Printer) error { + if err := options.validateParseOptions(logger); err != nil { + return err + } + return options.analyzeOptions() +} + +// VReplicationStatus returns the status of an asynchronous replication job based on a transaction ID +func (vcc VClusterCommands) VReplicationStatus(options *VReplicationStatusDatabaseOptions) (*ReplicationStatusResponse, error) { + /* + * - Produce Instructions + * - Create a VClusterOpEngine + * - Give the instructions to the VClusterOpEngine to run + */ + + // validate and analyze options + err := options.validateAnalyzeOptions(vcc.Log) + if err != nil { + return nil, err + } + + // produce database replication status instructions + replicationStatus := []ReplicationStatusResponse{} + instructions, err := vcc.produceReplicationStatusInstructions(options, &replicationStatus) + if err != nil { + return nil, fmt.Errorf("fail to produce instructions, %w", err) + } + + // create a VClusterOpEngine, and add certs to the engine + clusterOpEngine := makeClusterOpEngine(instructions, &options.TargetDB) + + // give the instructions to the VClusterOpEngine to run + runError := clusterOpEngine.run(vcc.Log) + if runError != nil { + return nil, fmt.Errorf("fail to get replication status: %w", runError) + } + + if len(replicationStatus) == 0 { + return nil, fmt.Errorf("invalid transaction ID") + } + finalReplicationStatus := getFinalReplicationStatus(replicationStatus) + return finalReplicationStatus, nil +} + +// The generated instructions will later perform the following operations necessary +// for a successful replication status retrieval +// - Check NMA connectivity +// - Get replication status +func (vcc VClusterCommands) produceReplicationStatusInstructions(options *VReplicationStatusDatabaseOptions, + replicationStatus *[]ReplicationStatusResponse) ([]clusterOp, error) { + var instructions []clusterOp + + // verify the username for connecting to the target database + targetUsePassword := false + if options.TargetDB.Password != nil { + targetUsePassword = true + if options.TargetDB.UserName == "" { + username, e := util.GetCurrentUsername() + if e != nil { + return instructions, e + } + options.TargetDB.UserName = username + } + vcc.Log.Info("Current target username", "username", options.TargetDB.UserName) + } + + nmaHealthOp := makeNMAHealthOp(options.TargetDB.Hosts) + + nmaReplicationStatusData := nmaReplicationStatusRequestData{} + nmaReplicationStatusData.DBName = options.TargetDB.DBName + nmaReplicationStatusData.ExcludedTransactionIDs = []int64{} // Doesn't matter since we specify a transaction ID + nmaReplicationStatusData.GetTransactionIDsOnly = false // Get all replication status info + nmaReplicationStatusData.TransactionID = options.TransactionID + nmaReplicationStatusData.UserName = options.TargetDB.UserName + nmaReplicationStatusData.Password = options.TargetDB.Password + + nmaReplicationStatusOp, err := makeNMAReplicationStatusOp(options.TargetDB.Hosts, targetUsePassword, + &nmaReplicationStatusData, nil, replicationStatus) + if err != nil { + return instructions, err + } + + instructions = append(instructions, + &nmaHealthOp, + &nmaReplicationStatusOp, + ) + + return instructions, nil +} + +func getFinalReplicationStatus(replicationStatus []ReplicationStatusResponse) *ReplicationStatusResponse { + if len(replicationStatus) == 0 { + return nil + } + + // Used to determine replication op order - replication ops with higher int values happen later + opOrder := make(map[string]int) + opOrder["load_snapshot_prep"] = 0 + opOrder["data_transfer"] = 1 + opOrder["load_snapshot"] = 2 + + // Sort statuses by start time, node name, then op name. This lets us search chronologically through the statuses + // to find the first failure or in-progress op if there is one + sort.Slice(replicationStatus, func(i, j int) bool { + iStatus := replicationStatus[i] + jStatus := replicationStatus[j] + + if iStatus.StartTime != jStatus.StartTime { + iStart, _ := time.Parse(time.UnixDate, iStatus.StartTime) + jStart, _ := time.Parse(time.UnixDate, jStatus.StartTime) + return iStart.Before(jStart) + } else if iStatus.NodeName != jStatus.NodeName { + return iStatus.NodeName < jStatus.NodeName + } + return opOrder[iStatus.OpName] < opOrder[jStatus.OpName] + }) + + // Get basic status info from the first op that was started + firstOp := replicationStatus[0] + finalReplicationStatus := ReplicationStatusResponse{} + finalReplicationStatus.TransactionID = firstOp.TransactionID + finalReplicationStatus.StartTime = firstOp.StartTime + + // Get the rest of the status info from the current op (the last op in the sorted list) + currentOp := replicationStatus[len(replicationStatus)-1] + + finalReplicationStatus.Status = currentOp.Status + finalReplicationStatus.EndTime = currentOp.EndTime + finalReplicationStatus.OpName = currentOp.OpName + finalReplicationStatus.SentBytes = currentOp.SentBytes + finalReplicationStatus.TotalBytes = currentOp.TotalBytes + finalReplicationStatus.NodeName = currentOp.NodeName + + return &finalReplicationStatus +} diff --git a/vclusterops/replication_status_test.go b/vclusterops/replication_status_test.go new file mode 100644 index 0000000..43b70eb --- /dev/null +++ b/vclusterops/replication_status_test.go @@ -0,0 +1,227 @@ +/* + (c) Copyright [2023-2024] Open Text. + Licensed under the Apache License, Version 2.0 (the "License"); + You may not use this file except in compliance with the License. + You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. +*/ + +package vclusterops + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +const ( + node1 = "node1" + node2 = "node2" + loadSnapshotPrepOp = "load_snapshot_prep" + dataTransferOp = "data_transfer" + loadSnapshotOp = "load_snapshot" + startedStatus = "started" + failedStatus = "failed" + completedStatus = "completed" + transactionID = 12345678901234567 +) + +// Setup - status objects for testing +var ( + node1LoadSnapshotPrepStarted = ReplicationStatusResponse{ + OpName: loadSnapshotPrepOp, + Status: startedStatus, + NodeName: node1, + StartTime: "Mon Sep 23 16:08:11 EDT 2024", + EndTime: "", + SentBytes: 0, + TotalBytes: 0, + TransactionID: transactionID, + } + node1LoadSnapshotPrep = ReplicationStatusResponse{ + OpName: loadSnapshotPrepOp, + Status: completedStatus, + NodeName: node1, + StartTime: "Mon Sep 23 16:08:11 EDT 2024", + EndTime: "Mon Sep 23 16:08:12 EDT 2024", + SentBytes: 0, + TotalBytes: 0, + TransactionID: transactionID, + } + node1DataTransfer = ReplicationStatusResponse{ + OpName: dataTransferOp, + Status: completedStatus, + NodeName: node1, + StartTime: "Mon Sep 23 16:08:11 EDT 2024", + EndTime: "Mon Sep 23 16:08:13 EDT 2024", + SentBytes: 1024, + TotalBytes: 1024, + TransactionID: transactionID, + } + node1LoadSnapshotFailed = ReplicationStatusResponse{ + OpName: loadSnapshotOp, + Status: failedStatus, + NodeName: node1, + StartTime: "Mon Sep 23 16:08:11 EDT 2024", + EndTime: "Mon Sep 23 16:08:13 EDT 2024", + SentBytes: 1024, + TotalBytes: 1024, + TransactionID: transactionID, + } + node1LoadSnapshotCompleted = ReplicationStatusResponse{ + OpName: loadSnapshotOp, + Status: completedStatus, + NodeName: node1, + StartTime: "Mon Sep 23 16:08:11 EDT 2024", + EndTime: "Mon Sep 23 16:08:13 EDT 2024", + SentBytes: 1024, + TotalBytes: 1024, + TransactionID: transactionID, + } + + node2LoadSnapshotPrep = ReplicationStatusResponse{ + OpName: loadSnapshotPrepOp, + Status: completedStatus, + NodeName: node2, + StartTime: "Mon Sep 23 16:08:12 EDT 2024", + EndTime: "Mon Sep 23 16:08:13 EDT 2024", + SentBytes: 0, + TotalBytes: 0, + TransactionID: transactionID, + } + node2DataTransferStarted = ReplicationStatusResponse{ + OpName: dataTransferOp, + Status: startedStatus, + NodeName: node2, + StartTime: "Mon Sep 23 16:08:12 EDT 2024", + EndTime: "", + SentBytes: 128, + TotalBytes: 2048, + TransactionID: transactionID, + } + node2DataTransfer = ReplicationStatusResponse{ + OpName: dataTransferOp, + Status: completedStatus, + NodeName: node2, + StartTime: "Mon Sep 23 16:08:12 EDT 2024", + EndTime: "Mon Sep 23 16:08:14 EDT 2024", + SentBytes: 2048, + TotalBytes: 2048, + TransactionID: transactionID, + } + node2LoadSnapshotCompleted = ReplicationStatusResponse{ + OpName: loadSnapshotOp, + Status: completedStatus, + NodeName: node2, + StartTime: "Mon Sep 23 16:08:12 EDT 2024", + EndTime: "Mon Sep 23 16:08:14 EDT 2024", + SentBytes: 2048, + TotalBytes: 2048, + TransactionID: transactionID, + } +) + +func TestGetFinalReplicationStatus(t *testing.T) { + // Negative - 1 node target DB, empty status list + replicationStatus := []ReplicationStatusResponse{} + actualStatus := getFinalReplicationStatus(replicationStatus) + assert.Nil(t, actualStatus) + + // Positive - 1 node target DB, "load_snapshot_prep" in progress + replicationStatus = []ReplicationStatusResponse{node1LoadSnapshotPrepStarted} + + expectedStatus := ReplicationStatusResponse{ + OpName: loadSnapshotPrepOp, + Status: startedStatus, + NodeName: node1, + StartTime: "Mon Sep 23 16:08:11 EDT 2024", + EndTime: "", + SentBytes: 0, + TotalBytes: 0, + TransactionID: transactionID, + } + + actualStatus = getFinalReplicationStatus(replicationStatus) + assert.Equal(t, expectedStatus, *actualStatus) + + // Positive - 1 node target DB, "load_snapshot" op failed + replicationStatus = []ReplicationStatusResponse{ + node1LoadSnapshotPrep, node1DataTransfer, node1LoadSnapshotFailed} + + expectedStatus = ReplicationStatusResponse{ + OpName: loadSnapshotOp, + Status: failedStatus, + NodeName: node1, + StartTime: "Mon Sep 23 16:08:11 EDT 2024", + EndTime: "Mon Sep 23 16:08:13 EDT 2024", + SentBytes: 1024, + TotalBytes: 1024, + TransactionID: transactionID, + } + + actualStatus = getFinalReplicationStatus(replicationStatus) + assert.Equal(t, expectedStatus, *actualStatus) + + // Positive - 1 node target DB, all ops complete + replicationStatus = []ReplicationStatusResponse{ + node1LoadSnapshotPrep, node1DataTransfer, node1LoadSnapshotCompleted} + + expectedStatus = ReplicationStatusResponse{ + OpName: loadSnapshotOp, + Status: completedStatus, + NodeName: node1, + StartTime: "Mon Sep 23 16:08:11 EDT 2024", + EndTime: "Mon Sep 23 16:08:13 EDT 2024", + SentBytes: 1024, + TotalBytes: 1024, + TransactionID: transactionID, + } + + actualStatus = getFinalReplicationStatus(replicationStatus) + assert.Equal(t, expectedStatus, *actualStatus) + + // Positive - 2 node target DB, node 2 data transfer still in progress + replicationStatus = []ReplicationStatusResponse{ + node1LoadSnapshotPrep, node1DataTransfer, node1LoadSnapshotCompleted, + node2LoadSnapshotPrep, node2DataTransferStarted} + + expectedStatus = ReplicationStatusResponse{ + OpName: dataTransferOp, + Status: startedStatus, + NodeName: node2, + StartTime: "Mon Sep 23 16:08:11 EDT 2024", + EndTime: "", + SentBytes: 128, + TotalBytes: 2048, + TransactionID: transactionID, + } + + actualStatus = getFinalReplicationStatus(replicationStatus) + assert.Equal(t, expectedStatus, *actualStatus) + + // Positive - 2 node target DB, all ops completed on all nodes + replicationStatus = []ReplicationStatusResponse{ + node1LoadSnapshotPrep, node1DataTransfer, node1LoadSnapshotCompleted, + node2LoadSnapshotPrep, node2DataTransfer, node2LoadSnapshotCompleted} + + expectedStatus = ReplicationStatusResponse{ + OpName: loadSnapshotOp, + Status: completedStatus, + NodeName: node2, + StartTime: "Mon Sep 23 16:08:11 EDT 2024", + EndTime: "Mon Sep 23 16:08:14 EDT 2024", + SentBytes: 2048, + TotalBytes: 2048, + TransactionID: transactionID, + } + + actualStatus = getFinalReplicationStatus(replicationStatus) + assert.Equal(t, expectedStatus, *actualStatus) +}