diff --git a/cli/command/migrate.go b/cli/command/migrate.go index a025543ef..50acb3c98 100644 --- a/cli/command/migrate.go +++ b/cli/command/migrate.go @@ -39,23 +39,31 @@ import ( var ( MIGRATE_ETCD_STEPS = []int{ - playbook.STOP_SERVICE, - playbook.CLEAN_SERVICE, // only container + playbook.ADD_ETCD_MEMBER, playbook.PULL_IMAGE, playbook.CREATE_CONTAINER, playbook.SYNC_CONFIG, + playbook.AMEND_ETCD_CONFIG, playbook.START_ETCD, + playbook.AMEND_MDS_CONFIG, // add a etcd endpoint + playbook.RESTART_SERVICE, // restart all mds then add a etcd endpoint in mds.conf + playbook.REMOVE_ETCD_MEMBER, + playbook.STOP_SERVICE, + playbook.CLEAN_SERVICE, // only container + // playbook.AMEND_MDS_CONFIG, // remove a etcd endpoint + // playbook.RESTART_SERVICE, // restart all mds then remove a etcd endpoint in mds.conf + // playbook.RELOAD_METASERVER playbook.UPDATE_TOPOLOGY, } // mds MIGRATE_MDS_STEPS = []int{ - playbook.STOP_SERVICE, - playbook.CLEAN_SERVICE, // only container playbook.PULL_IMAGE, playbook.CREATE_CONTAINER, playbook.SYNC_CONFIG, playbook.START_MDS, + playbook.STOP_SERVICE, + playbook.CLEAN_SERVICE, // only container playbook.UPDATE_TOPOLOGY, } @@ -157,7 +165,7 @@ func checkMigrateTopology(curveadm *cli.CurveAdm, data string) error { } else if len(dcs2add) < len(dcs2del) { return errno.ERR_DELETE_SERVICE_WHILE_MIGRATING_IS_DENIED } - // len(dcs2add) == len(dcs2del) + if len(dcs2add) == 0 { return errno.ERR_NO_SERVICES_FOR_MIGRATING } @@ -199,6 +207,7 @@ func genMigratePlaybook(curveadm *cli.CurveAdm, migrates := getMigrates(curveadm, data) role := migrates[0].From.GetRole() steps := MIGRATE_ROLE_STEPS[role] + etcdDCs := curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD) // post clean if options.clean { @@ -221,10 +230,14 @@ func genMigratePlaybook(curveadm *cli.CurveAdm, config := dcs2add switch step { case playbook.STOP_SERVICE, - playbook.CLEAN_SERVICE: + playbook.CLEAN_SERVICE, + playbook.ADD_ETCD_MEMBER: config = dcs2del case playbook.BACKUP_ETCD_DATA: config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_ETCD) + case playbook.AMEND_MDS_CONFIG, + playbook.RESTART_SERVICE: + config = curveadm.FilterDeployConfigByRole(dcs, topology.ROLE_MDS) case playbook.CREATE_PHYSICAL_POOL, playbook.CREATE_LOGICAL_POOL, @@ -251,6 +264,11 @@ func genMigratePlaybook(curveadm *cli.CurveAdm, optionsKV[comm.KEY_POOLSET] = poolset case playbook.UPDATE_TOPOLOGY: optionsKV[comm.KEY_NEW_TOPOLOGY_DATA] = data + case playbook.ADD_ETCD_MEMBER, + playbook.AMEND_ETCD_CONFIG, + playbook.AMEND_MDS_CONFIG: + optionsKV[comm.KEY_MIGRATE_SERVERS] = migrates + optionsKV[comm.KEY_CLUSTER_DCS] = etcdDCs } pb.AddStep(&playbook.PlaybookStep{ diff --git a/client.yaml b/client.yaml new file mode 100644 index 000000000..7f6eb58e2 --- /dev/null +++ b/client.yaml @@ -0,0 +1,16 @@ +kind: curvefs +s3.ak: curve +s3.sk: Netease@2023 +s3.endpoint: 10.182.26.46:19005 +s3.bucket_name: curvefs +container_image: quay.io/opencurve/curve/curvefs:v2.7.0-rc1_d8a1137 +mdsOpt.rpcRetryOpt.addrs: 10.182.26.46:6700,10.182.26.35:6700,10.182.26.36:6700 +log_dir: /home/caoxianfei/client/logs +data_dir: /mnt/v27cache +diskCache.fullRatio: 90 +diskCache.safeRatio: 70 +diskCache.trimRatio: 50 +diskCache.maxUsableSpaceBytes: 10737418240 +diskCache.maxFileNums: 1000000 +diskCache.trimCheckIntervalSec: 5 +client.loglevel: 9 \ No newline at end of file diff --git a/curve b/curve new file mode 100755 index 000000000..2d8d84072 Binary files /dev/null and b/curve differ diff --git a/internal/common/common.go b/internal/common/common.go index 0af1bb19e..ae597ee08 100644 --- a/internal/common/common.go +++ b/internal/common/common.go @@ -57,6 +57,7 @@ const ( // migrate KEY_MIGRATE_STATUS = "MIGRATE_STATUS" KEY_MIGRATE_COMMON_STATUS = "MIGRATE_COMMON_STATUS" + KEY_CLUSTER_DCS = "CLUSTER_DCS" // check KEY_CHECK_WITH_WEAK = "CHECK_WITH_WEAK" diff --git a/internal/configure/topology/dc_get.go b/internal/configure/topology/dc_get.go index ee8e7c2b1..c18eb0225 100644 --- a/internal/configure/topology/dc_get.go +++ b/internal/configure/topology/dc_get.go @@ -121,7 +121,11 @@ func (dc *DeployConfig) GetInstances() int { return dc.instanc func (dc *DeployConfig) GetHostSequence() int { return dc.hostSequence } func (dc *DeployConfig) GetInstancesSequence() int { return dc.instancesSequence } func (dc *DeployConfig) GetServiceConfig() map[string]string { return dc.serviceConfig } -func (dc *DeployConfig) GetVariables() *variable.Variables { return dc.variables } +func (dc *DeployConfig) SetServiceConfig(key, value string) { + dc.serviceConfig[key] = value +} + +func (dc *DeployConfig) GetVariables() *variable.Variables { return dc.variables } // (2): config item func (dc *DeployConfig) GetPrefix() string { return dc.getString(CONFIG_PREFIX) } diff --git a/internal/configure/topology/variables.go b/internal/configure/topology/variables.go index a2b246805..de123bc33 100644 --- a/internal/configure/topology/variables.go +++ b/internal/configure/topology/variables.go @@ -118,9 +118,9 @@ var ( {name: "cluster_mds_dummy_addr"}, {name: "cluster_mds_dummy_port"}, {name: "cluster_chunkserver_addr", kind: []string{KIND_CURVEBS}}, - {name: "cluster_snapshotclone_addr", kind: []string{KIND_CURVEBS}}, + {name: "cluster_snapshotclone_addr"}, {name: "cluster_snapshotclone_proxy_addr", kind: []string{KIND_CURVEBS}}, - {name: "cluster_snapshotclone_dummy_port", kind: []string{KIND_CURVEBS}}, + {name: "cluster_snapshotclone_dummy_port"}, {name: "cluster_snapshotclone_nginx_upstream", kind: []string{KIND_CURVEBS}}, {name: "cluster_snapshot_addr"}, // tools-v2: compatible with some old version image {name: "cluster_snapshot_dummy_addr"}, // tools-v2 diff --git a/internal/errno/errno.go b/internal/errno/errno.go index 117460bdf..8bfe176b7 100644 --- a/internal/errno/errno.go +++ b/internal/errno/errno.go @@ -404,6 +404,9 @@ var ( ERR_GET_CHUNKSERVER_COPYSET = EC(410026, "failed to get chunkserver copyset") ERR_GET_MIGRATE_COPYSET = EC(410027, "migrate chunkserver copyset info must be 2") ERR_CONTAINER_NOT_REMOVED = EC(410027, "container not removed") + ERR_GET_CLUSTER_ETCD_ADDR = EC(410028, "failed to get cluster_etcd_addr variable") + ERR_ADD_ETCD_MEMEBER = EC(410029, "failed to add etcd member to existing etcd cluster") + ERR_REMOVE_ETCD_MEMBER = EC(410030, "failed to remove etcd member from existing etcd cluster") // 420: common (curvebs client) ERR_VOLUME_ALREADY_MAPPED = EC(420000, "volume already mapped") ERR_VOLUME_CONTAINER_LOSED = EC(420001, "volume container is losed") diff --git a/internal/playbook/factory.go b/internal/playbook/factory.go index b2c699fda..09284ed01 100644 --- a/internal/playbook/factory.go +++ b/internal/playbook/factory.go @@ -84,6 +84,10 @@ const ( INSTALL_CLIENT UNINSTALL_CLIENT ATTACH_LEADER_OR_RANDOM_CONTAINER + ADD_ETCD_MEMBER + AMEND_ETCD_CONFIG + AMEND_MDS_CONFIG + REMOVE_ETCD_MEMBER // bs FORMAT_CHUNKFILE_POOL @@ -251,6 +255,14 @@ func (p *Playbook) createTasks(step *PlaybookStep) (*tasks.Tasks, error) { t, err = comm.NewInstallClientTask(curveadm, config.GetCC(i)) case UNINSTALL_CLIENT: t, err = comm.NewUninstallClientTask(curveadm, nil) + case ADD_ETCD_MEMBER: + t, err = comm.NewAddEtcdMemberTask(curveadm, config.GetDC(i)) + case AMEND_ETCD_CONFIG: + t, err = comm.NewAmendEtcdConfigTask(curveadm, config.GetDC(i)) + case AMEND_MDS_CONFIG: + t, err = comm.NewAmendMdsConfigTask(curveadm, config.GetDC(i)) + case REMOVE_ETCD_MEMBER: + t, err = comm.NewRemoveEtcdMemberTask(curveadm, config.GetDC(i)) // bs case FORMAT_CHUNKFILE_POOL: t, err = bs.NewFormatChunkfilePoolTask(curveadm, config.GetFC(i)) diff --git a/internal/task/scripts/enable_etcd_auth.go b/internal/task/scripts/enable_etcd_auth.go index a18ff157c..33e14102e 100644 --- a/internal/task/scripts/enable_etcd_auth.go +++ b/internal/task/scripts/enable_etcd_auth.go @@ -12,13 +12,13 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ /* * Project: Curveadm * Created Date: 2023-08-02 * Author: wanghai (SeanHai) -*/ + */ package scripts diff --git a/internal/task/scripts/script.go b/internal/task/scripts/script.go index e310d42e6..f8cb7d86e 100644 --- a/internal/task/scripts/script.go +++ b/internal/task/scripts/script.go @@ -39,6 +39,10 @@ var ( WAIT string //go:embed shell/report.sh REPORT string + //go:embed shell/add_etcd.sh + ADD_ETCD string + //go:embed shell/remove_etcd.sh + REMOVE_ETCD string // CurveBS diff --git a/internal/task/scripts/shell/add_etcd.sh b/internal/task/scripts/shell/add_etcd.sh new file mode 100644 index 000000000..f23f549c9 --- /dev/null +++ b/internal/task/scripts/shell/add_etcd.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash + +# Usage: +# Example: +# Created Date: 2023-12-15 +# Author: Caoxianfei + +etcdctl=$1 +endpoints=$2 +old_name=$3 +new_name=$4 +new_peer_url=$5 + +${etcdctl} --endpoints=${endpoints} member add ${new_name} --peer-urls ${new_peer_url} > add_etcd.log 2>&1 +if [ $? -ne 0 ]; then + if cat add_etcd.log | grep -q "Peer URLs already exists"; then + exit 0 + else + exit 1 + fi +fi + +# output=$(${etcdctl} --endpoints=${endpoints} member list) +# if [ $? -ne 0 ]; then +# echo "failed to list all etcd members" +# exit 1 +# fi + +# id=$(echo "$output" | awk -v name="$old_name" -F ', ' '$3 == name {print $1}') +# if [ -z "${id}" ]; then +# echo "failed to get id of member ${old_name}" +# exit 1 +# fi + +# ${etcdctl} --endpoints=${endpoints} member remove ${id} +# if [ $? -ne 0 ]; then +# echo "failed to remove member ${old_name}" +# exit 1 +# fi + + + + diff --git a/internal/task/scripts/shell/remove_etcd.sh b/internal/task/scripts/shell/remove_etcd.sh new file mode 100644 index 000000000..52c8a7f73 --- /dev/null +++ b/internal/task/scripts/shell/remove_etcd.sh @@ -0,0 +1,32 @@ +#!/usr/bin/env bash + +# Usage: +# Example: +# Created Date: 2023-12-15 +# Author: Caoxianfei + +etcdctl=$1 +endpoints=$2 +old_name=$3 + +output=$(${etcdctl} --endpoints=${endpoints} member list) +if [ $? -ne 0 ]; then + echo "failed to list all etcd members" + exit 1 +fi + +id=$(echo "$output" | awk -v name="$old_name" -F ', ' '$3 == name {print $1}') +if [ -z "${id}" ]; then + echo "failed to get id of member ${old_name}" + exit 1 +fi + +${etcdctl} --endpoints=${endpoints} member remove ${id} +if [ $? -ne 0 ]; then + echo "failed to remove member ${old_name}" + exit 1 +fi + + + + diff --git a/internal/task/task/common/add_etcd_mem.go b/internal/task/task/common/add_etcd_mem.go new file mode 100644 index 000000000..2f6cb5653 --- /dev/null +++ b/internal/task/task/common/add_etcd_mem.go @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-12-20 + * Author: Caoxianfei + */ + +package common + +import ( + "fmt" + "strconv" + + "github.com/opencurve/curveadm/cli/cli" + comm "github.com/opencurve/curveadm/internal/common" + "github.com/opencurve/curveadm/internal/configure" + "github.com/opencurve/curveadm/internal/configure/topology" + "github.com/opencurve/curveadm/internal/errno" + "github.com/opencurve/curveadm/internal/task/context" + "github.com/opencurve/curveadm/internal/task/scripts" + "github.com/opencurve/curveadm/internal/task/step" + "github.com/opencurve/curveadm/internal/task/task" + tui "github.com/opencurve/curveadm/internal/tui/common" +) + +func checkAddEtcdMemberStatus(success *bool, out *string) step.LambdaType { + return func(ctx *context.Context) error { + if !*success { + return errno.ERR_ADD_ETCD_MEMEBER.S(*out) + } + return nil + } +} + +func NewAddEtcdMemberTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { + serviceId := curveadm.GetServiceId(dc.GetId()) + containerId, err := curveadm.GetContainerId(serviceId) + if curveadm.IsSkip(dc) { + return nil, nil + } else if err != nil { + return nil, err + } + hc, err := curveadm.GetHost(dc.GetHost()) + if err != nil { + return nil, err + } + + subname := fmt.Sprintf("host=%s role=%s containerId=%s", + dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) + t := task.NewTask("Backup Etcd Data", subname, hc.GetSSHConfig()) + + host, role := dc.GetHost(), dc.GetRole() + script := scripts.ADD_ETCD + layout := dc.GetProjectLayout() + scriptPath := fmt.Sprintf("%s/add_etcd.sh", layout.ServiceBinDir) + etcdctlPath := layout.ServiceBinDir + "/etcdctl" + endpoints, err := dc.GetVariables().Get("cluster_etcd_addr") + if err != nil { + return nil, errno.ERR_GET_CLUSTER_ETCD_ADDR + } + oldName := fmt.Sprint("etcd", strconv.Itoa(dc.GetHostSequence()), strconv.Itoa(dc.GetInstancesSequence())) + newName := fmt.Sprint("etcd", strconv.Itoa(dc.GetHostSequence()+3), strconv.Itoa(dc.GetInstancesSequence())) + migrates := []*configure.MigrateServer{} + if curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS) != nil { + migrates = curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS).([]*configure.MigrateServer) + } + toService := migrates[0].To + peerUrl := fmt.Sprint("http://", toService.GetListenIp(), ":", strconv.Itoa(toService.GetListenPort())) + addEtcdCmd := fmt.Sprintf("/bin/bash %s %s %s %s %s %s", scriptPath, etcdctlPath, endpoints, oldName, newName, peerUrl) + + var success bool + var out string + t.AddStep(&step.ListContainers{ + ShowAll: true, + Format: `"{{.ID}}"`, + Filter: fmt.Sprintf("id=%s", containerId), + Out: &out, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: CheckContainerExist(host, role, containerId, &out), + }) + t.AddStep(&step.InstallFile{ + ContainerId: &containerId, + ContainerDestPath: scriptPath, + Content: &script, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.ContainerExec{ + ContainerId: &containerId, + Success: &success, + Out: &out, + Command: addEtcdCmd, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: checkAddEtcdMemberStatus(&success, &out), + }) + + return t, nil +} diff --git a/internal/task/task/common/amend_etcd_config.go b/internal/task/task/common/amend_etcd_config.go new file mode 100644 index 000000000..cf36fa45a --- /dev/null +++ b/internal/task/task/common/amend_etcd_config.go @@ -0,0 +1,119 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-12-20 + * Author: Caoxianfei + */ + +package common + +import ( + "fmt" + "strconv" + "strings" + + "github.com/opencurve/curveadm/cli/cli" + comm "github.com/opencurve/curveadm/internal/common" + "github.com/opencurve/curveadm/internal/configure" + "github.com/opencurve/curveadm/internal/configure/topology" + "github.com/opencurve/curveadm/internal/task/step" + "github.com/opencurve/curveadm/internal/task/task" + tui "github.com/opencurve/curveadm/internal/tui/common" +) + +const ( + AMEND_NAME = "name" + AMEND_ENDPOINTS = "initial-cluster" + AMEND_STATE = "initial-cluster-state" +) + +var options = make(map[string]interface{}) + +func mutateEtcdConf(dc *topology.DeployConfig, delimiter string, forceRender bool) step.Mutate { + return func(in, key, value string) (out string, err error) { + if len(key) == 0 { + out = in + return + } + if key == AMEND_NAME { + value = options[AMEND_NAME].(string) + } else if key == AMEND_ENDPOINTS { + value = options[AMEND_ENDPOINTS].(string) + } else if key == AMEND_STATE { + value = "existing" + } + + out = fmt.Sprintf("%s%s%s", key, delimiter, value) + return + } +} + +func NewAmendEtcdConfigTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { + serviceId := curveadm.GetServiceId(dc.GetId()) + containerId, err := curveadm.GetContainerId(serviceId) + if curveadm.IsSkip(dc) { + return nil, nil + } else if err != nil { + return nil, err + } + hc, err := curveadm.GetHost(dc.GetHost()) + if err != nil { + return nil, err + } + + subname := fmt.Sprintf("host=%s role=%s containerId=%s", + dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) + t := task.NewTask("Override Etcd configure", subname, hc.GetSSHConfig()) + + layout := dc.GetProjectLayout() + migrates := []*configure.MigrateServer{} + if curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS) != nil { + migrates = curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS).([]*configure.MigrateServer) + } + dcs := []*topology.DeployConfig{} + if curveadm.MemStorage().Get(comm.KEY_CLUSTER_DCS) != nil { + dcs = curveadm.MemStorage().Get(comm.KEY_CLUSTER_DCS).([]*topology.DeployConfig) + } + endpoints := []string{} + for _, dc := range dcs { + ept := fmt.Sprint("etcd", strconv.Itoa(dc.GetHostSequence()), strconv.Itoa(dc.GetInstancesSequence()), + "=", "http://", dc.GetListenIp(), ":", strconv.Itoa(dc.GetListenPort())) + endpoints = append(endpoints, ept) + } + toService := migrates[0].To + newName := fmt.Sprint("etcd", strconv.Itoa(toService.GetHostSequence()+3), strconv.Itoa(toService.GetInstancesSequence())) + toSeriveEndpint := fmt.Sprint(newName, "=", "http://", + toService.GetListenIp(), ":", strconv.Itoa(toService.GetListenPort())) + endpoints = append(endpoints, toSeriveEndpint) + endpointsStr := strings.Join(endpoints, ",") + + options[AMEND_NAME] = newName + options[AMEND_ENDPOINTS] = endpointsStr + + t.AddStep(&step.SyncFile{ // sync etcd.conf config + ContainerSrcId: &containerId, + ContainerSrcPath: layout.ServiceConfPath, + ContainerDestId: &containerId, + ContainerDestPath: layout.ServiceConfPath, + KVFieldSplit: ETCD_CONFIG_DELIMITER, + Mutate: mutateEtcdConf(dc, ETCD_CONFIG_DELIMITER, false), + ExecOptions: curveadm.ExecOptions(), + }) + + return t, nil +} diff --git a/internal/task/task/common/amend_mds_config.go b/internal/task/task/common/amend_mds_config.go new file mode 100644 index 000000000..4ae8bb2ef --- /dev/null +++ b/internal/task/task/common/amend_mds_config.go @@ -0,0 +1,99 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-12-20 + * Author: Caoxianfei + */ + +package common + +import ( + "fmt" + "strconv" + + "github.com/opencurve/curveadm/cli/cli" + comm "github.com/opencurve/curveadm/internal/common" + "github.com/opencurve/curveadm/internal/configure" + "github.com/opencurve/curveadm/internal/configure/topology" + "github.com/opencurve/curveadm/internal/task/step" + "github.com/opencurve/curveadm/internal/task/task" + tui "github.com/opencurve/curveadm/internal/tui/common" +) + +const ( + ETCD_ENDPOINT = "etcd.endpoint" +) + +func mutateMDSConf(dc *topology.DeployConfig, delimiter string, forceRender bool) step.Mutate { + return func(in, key, value string) (out string, err error) { + if len(key) == 0 { + out = in + return + } + if key == ETCD_ENDPOINT { + value = options[ETCD_ENDPOINT].(string) + } + + out = fmt.Sprintf("%s%s%s", key, delimiter, value) + return + } +} + +func NewAmendMdsConfigTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { + serviceId := curveadm.GetServiceId(dc.GetId()) + containerId, err := curveadm.GetContainerId(serviceId) + if curveadm.IsSkip(dc) { + return nil, nil + } else if err != nil { + return nil, err + } + hc, err := curveadm.GetHost(dc.GetHost()) + if err != nil { + return nil, err + } + + subname := fmt.Sprintf("host=%s role=%s containerId=%s", + dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) + t := task.NewTask("Override Mds configure", subname, hc.GetSSHConfig()) + + layout := dc.GetProjectLayout() + endpoints, err := dc.GetVariables().Get("cluster_etcd_addr") + if err != nil { + return nil, err + } + migrates := []*configure.MigrateServer{} + if curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS) != nil { + migrates = curveadm.MemStorage().Get(comm.KEY_MIGRATE_SERVERS).([]*configure.MigrateServer) + } + toService := migrates[0].To + toSeriveEndpint := fmt.Sprint(toService.GetListenIp(), ":", strconv.Itoa(toService.GetListenClientPort())) + endpoints = fmt.Sprint(endpoints, ",", toSeriveEndpint) + options[ETCD_ENDPOINT] = endpoints + + t.AddStep(&step.SyncFile{ // sync mds.conf config again + ContainerSrcId: &containerId, + ContainerSrcPath: layout.ServiceConfPath, + ContainerDestId: &containerId, + ContainerDestPath: layout.ServiceConfPath, + KVFieldSplit: DEFAULT_CONFIG_DELIMITER, + Mutate: mutateMDSConf(dc, DEFAULT_CONFIG_DELIMITER, false), + ExecOptions: curveadm.ExecOptions(), + }) + + return t, nil +} diff --git a/internal/task/task/common/remove_etcd_mem.go b/internal/task/task/common/remove_etcd_mem.go new file mode 100644 index 000000000..119b34999 --- /dev/null +++ b/internal/task/task/common/remove_etcd_mem.go @@ -0,0 +1,107 @@ +/* + * Copyright (c) 2023 NetEase Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/* + * Project: CurveAdm + * Created Date: 2023-12-20 + * Author: Caoxianfei + */ + +package common + +import ( + "fmt" + "strconv" + + "github.com/opencurve/curveadm/cli/cli" + "github.com/opencurve/curveadm/internal/configure/topology" + "github.com/opencurve/curveadm/internal/errno" + "github.com/opencurve/curveadm/internal/task/context" + "github.com/opencurve/curveadm/internal/task/scripts" + "github.com/opencurve/curveadm/internal/task/step" + "github.com/opencurve/curveadm/internal/task/task" + tui "github.com/opencurve/curveadm/internal/tui/common" +) + +func checkRemoveEtcdMemberStatus(success *bool, out *string) step.LambdaType { + return func(ctx *context.Context) error { + if !*success { + return errno.ERR_REMOVE_ETCD_MEMBER.S(*out) + } + return nil + } +} + +func NewRemoveEtcdMemberTask(curveadm *cli.CurveAdm, dc *topology.DeployConfig) (*task.Task, error) { + serviceId := curveadm.GetServiceId(dc.GetId()) + containerId, err := curveadm.GetContainerId(serviceId) + if curveadm.IsSkip(dc) { + return nil, nil + } else if err != nil { + return nil, err + } + hc, err := curveadm.GetHost(dc.GetHost()) + if err != nil { + return nil, err + } + + subname := fmt.Sprintf("host=%s role=%s containerId=%s", + dc.GetHost(), dc.GetRole(), tui.TrimContainerId(containerId)) + t := task.NewTask("Remove Old Etcd Member", subname, hc.GetSSHConfig()) + + host, role := dc.GetHost(), dc.GetRole() + script := scripts.REMOVE_ETCD + layout := dc.GetProjectLayout() + scriptPath := fmt.Sprintf("%s/remove_etcd.sh", layout.ServiceBinDir) + etcdctlPath := layout.ServiceBinDir + "/etcdctl" + endpoints, err := dc.GetVariables().Get("cluster_etcd_addr") + if err != nil { + return nil, errno.ERR_GET_CLUSTER_ETCD_ADDR + } + oldName := fmt.Sprint("etcd", strconv.Itoa(dc.GetHostSequence()), strconv.Itoa(dc.GetInstancesSequence())) + removeEtcdCmd := fmt.Sprintf("/bin/bash %s %s %s %s", scriptPath, etcdctlPath, endpoints, oldName) + + var success bool + var out string + t.AddStep(&step.ListContainers{ + ShowAll: true, + Format: `"{{.ID}}"`, + Filter: fmt.Sprintf("id=%s", containerId), + Out: &out, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: CheckContainerExist(host, role, containerId, &out), + }) + t.AddStep(&step.InstallFile{ + ContainerId: &containerId, + ContainerDestPath: scriptPath, + Content: &script, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.ContainerExec{ + ContainerId: &containerId, + Success: &success, + Out: &out, + Command: removeEtcdCmd, + ExecOptions: curveadm.ExecOptions(), + }) + t.AddStep(&step.Lambda{ + Lambda: checkRemoveEtcdMemberStatus(&success, &out), + }) + + return t, nil +} diff --git a/topology.yaml b/topology.yaml new file mode 100644 index 000000000..43d38ec0a --- /dev/null +++ b/topology.yaml @@ -0,0 +1,42 @@ +kind: curvefs +global: + container_image: quay.io/opencurve/curve/curvefs:v2.7.0-rc1_d8a1137 + log_dir: ${home}/curvefs27/logs/${service_role} + data_dir: ${home}/curvefs27/data/${service_role} + variable: + home: /data/cxf/chunkserver0 + machine1: curve7 + machine2: curve15 + machine3: curve16 + machine4: curve21 + +etcd_services: + config: + listen.ip: ${service_host} + listen.port: 23800 + listen.client_port: 23790 + deploy: + - host: ${machine1} + - host: ${machine2} + - host: ${machine3} +mds_services: + config: + listen.ip: ${service_host} + listen.port: 6700 + listen.dummy_port: 7700 + deploy: + - host: ${machine1} + - host: ${machine2} + - host: ${machine3} + +metaserver_services: + config: + listen.ip: ${service_host} + listen.port: 16800 + listen.external_port: 17800 + trash.scanPeriodSec: 1 + trash.expiredAfterSec: 1 + deploy: + - host: ${machine1} + - host: ${machine2} + - host: ${machine3}