Skip to content

Commit

Permalink
Sync from server repo (81076026838)
Browse files Browse the repository at this point in the history
  • Loading branch information
Matt Spilchen committed Jan 17, 2024
1 parent a05178d commit 288b84d
Show file tree
Hide file tree
Showing 7 changed files with 695 additions and 25 deletions.
21 changes: 15 additions & 6 deletions commands/cmd_sandbox.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,20 +72,29 @@ func (c *CmdSandboxSubcluster) CommandType() string {

func (c *CmdSandboxSubcluster) Parse(inputArgv []string, logger vlog.Printer) error {
c.argv = inputArgv
// from now on we use the internal copy of argv
err := c.ValidateParseArgv(c.CommandType(), logger)
if err != nil {
return err
}
return c.parseInternal(logger)
}

func (c *CmdSandboxSubcluster) parseInternal(logger vlog.Printer) error {
logger.Info("Called parseInternal()")
if c.parser == nil {
return fmt.Errorf("unexpected nil for CmdSandboxSubcluster.parser")
}
logger.PrintInfo("Parsing sandboxing command input")
parseError := c.ParseArgv()
if parseError != nil {
return parseError
if !util.IsOptionSet(c.parser, "password") {
c.sbOptions.Password = nil
}
return nil
if !util.IsOptionSet(c.parser, "ipv6") {
c.CmdBase.ipv6 = nil
}
if !util.IsOptionSet(c.parser, "config-directory") {
c.sbOptions.ConfigDirectory = nil
}

return c.ValidateParseBaseOptions(&c.sbOptions.DatabaseOptions)
}

func (c *CmdSandboxSubcluster) Analyze(logger vlog.Printer) error {
Expand Down
17 changes: 11 additions & 6 deletions vclusterops/cluster_op_engine_context.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,17 @@ package vclusterops
import "github.com/vertica/vcluster/vclusterops/vlog"

type opEngineExecContext struct {
dispatcher requestDispatcher
networkProfiles map[string]networkProfile
nmaVDatabase nmaVDatabase
upHosts []string // a sorted host list that contains all up nodes
nodesInfo []NodeInfo
defaultSCName string // store the default subcluster name of the database
dispatcher requestDispatcher
networkProfiles map[string]networkProfile
nmaVDatabase nmaVDatabase
upHosts []string // a sorted host list that contains all up nodes
nodesInfo []NodeInfo

// This field is specifically used for sandboxing
// as sandboxing requires all nodes in the subcluster to be sandboxed to be UP.
upScInfo map[string]string // map with UP hosts as keys and their subcluster names as values.
sandboxingHosts []string // List of hosts that should run sandboxing command
defaultSCName string // store the default subcluster name of the database
hostsWithLatestCatalog []string
primaryHostsWithLatestCatalog []string
startupCommandMap map[string][]string // store start up command map to restart nodes
Expand Down
207 changes: 207 additions & 0 deletions vclusterops/https_check_subcluster_sandbox_op.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
/*
(c) Copyright [2023] Open Text.
Licensed under the Apache License, Version 2.0 (the "License");
You may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package vclusterops

import (
"errors"
"fmt"

"github.com/vertica/vcluster/vclusterops/vlog"
)

type httpsCheckSubclusterSandboxOp struct {
opBase
opHTTPSBase
ScToSandbox string
Sandbox string
}

// makeHTTPSCheckSubclusterSandboxOp initializes an op to find
// all subclusters and record their sandboxing information.
func makeHTTPSCheckSubclusterSandboxOp(logger vlog.Printer, hosts []string, scName string, sandbox string,
useHTTPPassword bool, userName string, httpsPassword *string) (httpsCheckSubclusterSandboxOp, error) {
op := httpsCheckSubclusterSandboxOp{}
op.name = "HTTPSCheckSubclusterSandboxOp"
op.logger = logger.WithName(op.name)
op.hosts = hosts
op.ScToSandbox = scName
op.Sandbox = sandbox

err := op.validateAndSetUsernameAndPassword(op.name, useHTTPPassword, userName,
httpsPassword)

return op, err
}

func (op *httpsCheckSubclusterSandboxOp) setupClusterHTTPRequest(hosts []string) error {
for _, host := range hosts {
httpRequest := hostHTTPRequest{}
httpRequest.Method = GetMethod
httpRequest.buildHTTPSEndpoint("subclusters")
if op.useHTTPPassword {
httpRequest.Password = op.httpsPassword
httpRequest.Username = op.userName
}
op.clusterHTTPRequest.RequestCollection[host] = httpRequest
}

return nil
}

func (op *httpsCheckSubclusterSandboxOp) prepare(execContext *opEngineExecContext) error {
execContext.dispatcher.setup(op.hosts)

return op.setupClusterHTTPRequest(op.hosts)
}

func (op *httpsCheckSubclusterSandboxOp) execute(execContext *opEngineExecContext) error {
if err := op.runExecute(execContext); err != nil {
return err
}

return op.processResult(execContext)
}

// the following struct will store a subcluster's information for this op
type subclusterSandboxInfo struct {
SCName string `json:"subcluster_name"`
Sandbox string `json:"sandbox"`
}

type scResps struct {
SCInfoList []subclusterSandboxInfo `json:"subcluster_list"`
}

func (op *httpsCheckSubclusterSandboxOp) processResult(execContext *opEngineExecContext) error {
var allErrs error
keysToRemove := make(map[string]struct{})
existingSandboxedHosts := make(map[string]struct{})
mainClusterHosts := make(map[string]struct{})

for host, result := range op.clusterHTTPRequest.ResultCollection {
op.logResponse(host, result)

if result.isUnauthorizedRequest() {
// skip checking response from other nodes because we will get the same error there
return result.err
}
if !result.isPassing() {
allErrs = errors.Join(allErrs, result.err)
// try processing other hosts' responses when the current host has some server errors
continue
}

// decode the json-format response
// A successful response object will be like below:
/*
{
"subcluster_list": [
{
"subcluster_name": "default_subcluster",
"control_set_size": -1,
"is_secondary": false,
"is_default": true,
"sandbox": ""
},
{
"subcluster_name": "sc1",
"control_set_size": 2,
"is_secondary": true,
"is_default": false,
"sandbox": ""
}
]
}
*/
subclusterResp := scResps{}
err := op.parseAndCheckResponse(host, result.content, &subclusterResp)
if err != nil {
err = fmt.Errorf(`[%s] fail to parse result on host %s, details: %w`, op.name, host, err)
allErrs = errors.Join(allErrs, err)
return allErrs
}

// Process sandboxing info
for _, scInfo := range subclusterResp.SCInfoList {
mainHosts, removeHosts, sandboxedHosts := op.processScInfo(scInfo, execContext)
// Accumulate maincluster hosts, hosts to be removed
// and hosts that are sandboxed
for _, host := range mainHosts {
mainClusterHosts[host] = struct{}{}
}
for h := range removeHosts {
keysToRemove[h] = struct{}{}
}
for h := range sandboxedHosts {
existingSandboxedHosts[h] = struct{}{}
}
}
}

// Use updated scInfo
for host := range existingSandboxedHosts {
// Just need one up host from the existing sandbox
// This will be used to add new subcluster to an existing sandbox
execContext.sandboxingHosts = append(execContext.sandboxingHosts, host)
break
}

for host := range mainClusterHosts {
if _, exists := keysToRemove[host]; !exists {
// Just one up host from main cluster
execContext.sandboxingHosts = append(execContext.sandboxingHosts, host)
break
}
}
return allErrs
}
func (op *httpsCheckSubclusterSandboxOp) processScInfo(scInfo subclusterSandboxInfo,
execContext *opEngineExecContext) (mainClusterHosts []string, keysToRemove, existingSandboxedHosts map[string]struct{}) {
keysToRemove = make(map[string]struct{})
for host, sc := range execContext.upScInfo {
if scInfo.Sandbox != "" && scInfo.SCName == sc {
keysToRemove, existingSandboxedHosts = op.processSandboxedSCInfo(scInfo, sc, host)
} else {
if scInfo.SCName == sc {
mainClusterHosts = append(mainClusterHosts, host)
}
// We do not want a host from the sc to be sandboxed to be the initiator
if sc == op.ScToSandbox {
keysToRemove[host] = struct{}{}
}
}
}
return
}

func (op *httpsCheckSubclusterSandboxOp) processSandboxedSCInfo(scInfo subclusterSandboxInfo,
sc, host string) (keysToRemove, existingSandboxedHosts map[string]struct{}) {
keysToRemove = make(map[string]struct{})
existingSandboxedHosts = make(map[string]struct{})
if scInfo.Sandbox != op.Sandbox {
op.logger.Info("subcluster " + sc + " is sandboxed")
if scInfo.SCName == sc {
keysToRemove[host] = struct{}{}
}
} else {
existingSandboxedHosts[host] = struct{}{}
}
return
}

func (op *httpsCheckSubclusterSandboxOp) finalize(_ *opEngineExecContext) error {
return nil
}
13 changes: 5 additions & 8 deletions vclusterops/https_get_up_nodes_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,13 @@ func (op *httpsGetUpNodesOp) execute(execContext *opEngineExecContext) error {

func (op *httpsGetUpNodesOp) processResult(execContext *opEngineExecContext) error {
var allErrs error
// golang does not have set data structure, use a map to simulate it
upHosts := make(map[string]struct{})
upScInfo := make(map[string]string)
exceptionHosts := []string{}
downHosts := []string{}

for host, result := range op.clusterHTTPRequest.ResultCollection {
op.logResponse(host, result)

if !result.isPassing() {
allErrs = errors.Join(allErrs, result.err)
}
Expand Down Expand Up @@ -154,15 +153,12 @@ func (op *httpsGetUpNodesOp) processResult(execContext *opEngineExecContext) err
}
if node.State == util.NodeUpState {
upHosts[node.Address] = struct{}{}
upScInfo[node.Address] = node.Subcluster
}
}

if len(upHosts) > 0 {
break
}
}

ignoreErrors := op.processHostLists(upHosts, exceptionHosts, downHosts, execContext)
ignoreErrors := op.processHostLists(upHosts, upScInfo, exceptionHosts, downHosts, execContext)
if ignoreErrors {
return nil
}
Expand All @@ -176,9 +172,10 @@ func (op *httpsGetUpNodesOp) finalize(_ *opEngineExecContext) error {

// processHostLists stashes the up hosts, and if there are no up hosts, prints and logs
// down or erratic hosts. Additionally, it determines if the op should fail or not.
func (op *httpsGetUpNodesOp) processHostLists(upHosts map[string]struct{},
func (op *httpsGetUpNodesOp) processHostLists(upHosts map[string]struct{}, upScInfo map[string]string,
exceptionHosts, downHosts []string,
execContext *opEngineExecContext) (ignoreErrors bool) {
execContext.upScInfo = upScInfo
if len(upHosts) > 0 {
for host := range upHosts {
execContext.upHosts = append(execContext.upHosts, host)
Expand Down
Loading

0 comments on commit 288b84d

Please sign in to comment.