Skip to content

Commit

Permalink
Sync from server repo (d931a24a3ac)
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Spilchen committed Apr 10, 2024
1 parent 0e3c670 commit cf2e31d
Show file tree
Hide file tree
Showing 6 changed files with 295 additions and 25 deletions.
6 changes: 3 additions & 3 deletions commands/cluster_command_launcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,8 @@ const (
const (
targetDBNameFlag = "target-db-name"
targetDBNameKey = "targetDBName"
targetHostFlag = "target-hosts"
targetHostKey = "targetHosts"
targetHostsFlag = "target-hosts"
targetHostsKey = "targetHosts"
targetUserNameFlag = "target-db-user"
targetUserNameKey = "targetDBUser"
targetPasswordFileFlag = "target-password-file"
Expand Down Expand Up @@ -120,7 +120,7 @@ var flagKeyMap = map[string]string{
verboseFlag: verboseKey,
outputFileFlag: outputFileKey,
targetDBNameFlag: targetDBNameKey,
targetHostFlag: targetHostKey,
targetHostsFlag: targetHostsKey,
targetUserNameFlag: targetUserNameKey,
targetPasswordFileFlag: targetPasswordFileKey,
sourceTLSConfigFlag: sourceTLSConfigKey,
Expand Down
2 changes: 1 addition & 1 deletion commands/cmd_base.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,7 +308,7 @@ func (c *CmdBase) setDBPassword(opt *vclusterops.DatabaseOptions) error {
}

func (c *CmdBase) passwordFileHelper(passwordFile string) (string, error) {
if c.passwordFile == "" {
if passwordFile == "" {
return "", fmt.Errorf("password file path is empty")
}
// hyphen(`-`) is used to indicate that input should come
Expand Down
25 changes: 18 additions & 7 deletions commands/cmd_start_replication.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,12 +47,16 @@ func makeCmdStartReplication() *cobra.Command {
"Start database replication",
`This subcommand starts a database replication.
This subcommand copies table or schema data directly from one Eon Mode database's communal storage
to another.
This subcommand copies table or schema data directly from one Eon Mode
database's communal storage to another.
The --target-conn option serves as a collection file for gathering necessary target
information for replication. You need to run vcluster manage_connection to generate this
connection file in order to use this option.
The --target-conn option serves as a collection file for gathering necessary
target information for replication. You need to run vcluster manage_connection
to generate this connection file in order to use this option.
If the source database has EnableConnectCredentialForwarding enabled, the
target username and password can be ignored. If the target database uses trust
authentication, the password can be ignored.
Examples:
# Start database replication with config and connection file
Expand All @@ -75,12 +79,19 @@ Examples:
--target-hosts 10.20.30.43 --password-file /path/to/password-file --target-db-user dbadmin \
--target-password-file /path/to/password-file
`,
// Temporarily, the Vcluster CLI doesn't support a config file for this subcommand.
// It will include all hosts from the config file.
// VER-93450 will add 2 options for sandboxes, "source-sandbox" and "target-sandbox", to get the correct sourceHosts
[]string{dbNameFlag, hostsFlag, ipv6Flag, configFlag, passwordFlag, dbUserFlag, eonModeFlag},
)

// local flags
newCmd.setLocalFlags(cmd)

// Temporarily, targetDBName and targetHost are required.
// They will be removed after target-conn is implemented in VER-93130
markFlagsRequired(cmd, []string{targetDBNameFlag, targetHostsFlag})

// hide eon mode flag since we expect it to come from config file, not from user input
hideLocalFlags(cmd, []string{eonModeFlag})
return cmd
Expand All @@ -96,7 +107,7 @@ func (c *CmdStartReplication) setLocalFlags(cmd *cobra.Command) {
)
cmd.Flags().StringSliceVar(
&c.startRepOptions.TargetHosts,
targetHostFlag,
targetHostsFlag,
[]string{},
"Comma-separated list of hosts in target database")
cmd.Flags().StringVar(
Expand Down Expand Up @@ -184,7 +195,7 @@ func (c *CmdStartReplication) parseTargetHostList() error {

func (c *CmdStartReplication) parseTargetPassword() error {
options := c.startRepOptions
if !c.usePassword() {
if !c.parser.Changed(targetPasswordFileFlag) {
// reset password option to nil if password is not provided in cli
options.TargetPassword = nil
return nil
Expand Down
11 changes: 1 addition & 10 deletions commands/user_input_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,5 @@ func TestStartReplication(t *testing.T) {
// vcluster replication start should succeed
// since there is no op for this subcommand
err := simulateVClusterCli("vcluster replication start")
assert.NoError(t, err)

var passwordFilePath = os.TempDir() + "/password.txt"
tempConfig, _ := os.Create(passwordFilePath)
tempConfig.Close()
defer os.Remove(passwordFilePath)
err = simulateVClusterCli("vcluster replication start --db-name platform_test_db --hosts" +
" 192.168.1.101 --target-db-name test_db --target-hosts 192.168.1.103 --target-password-file " + passwordFilePath +
" --password-file " + passwordFilePath)
assert.NoError(t, err)
assert.ErrorContains(t, err, `required flag(s) "target-db-name", "target-hosts" not set`)
}
188 changes: 188 additions & 0 deletions vclusterops/https_start_replication_op.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
(c) Copyright [2023-2024] Open Text.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vclusterops

import (
"encoding/json"
"errors"
"fmt"

"github.com/vertica/vcluster/vclusterops/util"
)

type httpsStartReplicationOp struct {
opBase
opHTTPSBase
hostRequestBodyMap map[string]string
sourceDB string
targetHosts string
targetDB string
targetUserName string
targetPassword *string
tlsConfig string
}

func makeHTTPSStartReplicationOp(dbName string, sourceHosts []string,
sourceUseHTTPPassword bool, sourceUserName string,
sourceHTTPPassword *string, targetUseHTTPPassword bool, targetDB, targetUserName, targetHosts string,
targetHTTPSPassword *string, tlsConfig string) (httpsStartReplicationOp, error) {
op := httpsStartReplicationOp{}
op.name = "HTTPSStartReplicationOp"
op.description = "Start database replication"
op.sourceDB = dbName
op.hosts = sourceHosts
op.useHTTPPassword = sourceUseHTTPPassword
op.targetDB = targetDB
op.targetHosts = targetHosts
op.tlsConfig = tlsConfig

if sourceUseHTTPPassword {
err := util.ValidateUsernameAndPassword(op.name, sourceUseHTTPPassword, sourceUserName)
if err != nil {
return op, err
}
op.userName = sourceUserName
op.httpsPassword = sourceHTTPPassword
}
if targetUseHTTPPassword {
err := util.ValidateUsernameAndPassword(op.name, targetUseHTTPPassword, targetUserName)
if err != nil {
return op, err
}
op.targetUserName = targetUserName
op.targetPassword = targetHTTPSPassword
}

return op, nil
}

type replicateRequestData struct {
TargetHost string `json:"host"`
TargetDB string `json:"dbname"`
TargetUserName string `json:"user,omitempty"`
TargetPassword *string `json:"password,omitempty"`
TLSConfig string `json:"tls_config,omitempty"`
}

func (op *httpsStartReplicationOp) setupRequestBody(hosts []string) error {
op.hostRequestBodyMap = make(map[string]string)

for _, host := range hosts {
replicateData := replicateRequestData{}
replicateData.TargetHost = op.targetHosts
replicateData.TargetDB = op.targetDB
replicateData.TargetUserName = op.targetUserName
replicateData.TargetPassword = op.targetPassword
replicateData.TLSConfig = op.tlsConfig

dataBytes, err := json.Marshal(replicateData)
if err != nil {
return fmt.Errorf("[%s] fail to marshal request data to JSON string, detail %w", op.name, err)
}

op.hostRequestBodyMap[host] = string(dataBytes)
}

return nil
}

func (op *httpsStartReplicationOp) setupClusterHTTPRequest(hosts []string) error {
for _, host := range hosts {
httpRequest := hostHTTPRequest{}
httpRequest.Method = PostMethod
httpRequest.buildHTTPSEndpoint("replicate/start")
if op.useHTTPPassword {
httpRequest.Password = op.httpsPassword
httpRequest.Username = op.userName
}
httpRequest.RequestData = op.hostRequestBodyMap[host]
op.clusterHTTPRequest.RequestCollection[host] = httpRequest
}
return nil
}

func (op *httpsStartReplicationOp) prepare(execContext *opEngineExecContext) error {
if len(execContext.nodesInfo) == 0 {
return fmt.Errorf(`[%s] cannot find any hosts in OpEngineExecContext`, op.name)
}
// a source host will be an up host based on the input provided for hosts
var sourceHosts []string
for _, node := range execContext.nodesInfo {
if node.State != util.NodeDownState {
sourceHosts = append(sourceHosts, node.Address)
}
}
sourceHosts = util.SliceCommon(op.hosts, sourceHosts)
if len(sourceHosts) == 0 {
return fmt.Errorf(`[%s] cannot find any up hosts from source database %s`, op.name, op.sourceDB)
}
op.hosts = []string{sourceHosts[0]}

err := op.setupRequestBody(op.hosts)
if err != nil {
return err
}
execContext.dispatcher.setup(op.hosts)
return op.setupClusterHTTPRequest(op.hosts)
}

func (op *httpsStartReplicationOp) execute(execContext *opEngineExecContext) error {
if err := op.runExecute(execContext); err != nil {
return err
}

return op.processResult(execContext)
}

func (op *httpsStartReplicationOp) processResult(_ *opEngineExecContext) error {
var allErrs error

for host, result := range op.clusterHTTPRequest.ResultCollection {
op.logResponse(host, result)

if result.isUnauthorizedRequest() {
// skip checking response from other nodes because we will get the same error there
return result.err
}
if !result.isPassing() {
allErrs = errors.Join(allErrs, result.err)
continue
}

// decode the json-format response
// The successful response object will be a dictionary as below:
// {"detail": "REPLICATE"}
startRepRsp, err := op.parseAndCheckMapResponse(host, result.content)
if err != nil {
err = fmt.Errorf("[%s] fail to parse result on host %s, details: %w", op.name, host, err)
allErrs = errors.Join(allErrs, err)
continue
}

// verify if the response's content is correct
const startReplicationOpSuccMsg = "REPLICATE"
if startRepRsp["detail"] != startReplicationOpSuccMsg {
err = fmt.Errorf(`[%s] response detail should be '%s' but got '%s'`, op.name, startReplicationOpSuccMsg, startRepRsp["detail"])
allErrs = errors.Join(allErrs, err)
}
}

return allErrs
}

func (op *httpsStartReplicationOp) finalize(_ *opEngineExecContext) error {
return nil
}
Loading

0 comments on commit cf2e31d

Please sign in to comment.