Skip to content

Commit

Permalink
stage save
Browse files Browse the repository at this point in the history
  • Loading branch information
ymakedaq committed Oct 23, 2024
1 parent f1debcc commit 0b73c59
Show file tree
Hide file tree
Showing 5 changed files with 102 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,14 @@ import (
"dbm-services/mysql/db-tools/dbactuator/pkg/util"
)

// ImportSchemaFromLocalSpiderAct 从本地Spider导入表结构到中控节点
type ImportSchemaFromLocalSpiderAct struct {
*subcmd.BaseOptions
Service spiderctl.ImportSchemaFromLocalSpiderComp
}

// NewImportSchemaToTdbctlCommand create new subcommand
func NewImportSchemaToTdbctlCommand() *cobra.Command {
act := ImportSchemaFromLocalSpiderAct{
act := ImportSchemaFromBackendAct{
BaseOptions: subcmd.GBaseOptions,
}
cmd := &cobra.Command{
Use: "import-schema-to-tdbctl",
Short: "从spider节点导入表结构到中控节点",
Short: "从backend分片0导入表结构到中控节点",
Example: fmt.Sprintf(
`dbactuator spiderctl import-schema-to-tdbctl %s %s`,
subcmd.CmdBaseExampleStr, subcmd.ToPrettyJson(act.Service.Example()),
Expand All @@ -48,6 +42,12 @@ func NewImportSchemaToTdbctlCommand() *cobra.Command {
return cmd
}

// ImportSchemaFromLocalSpiderAct 从本地Spider导入表结构到中控节点
type ImportSchemaFromLocalSpiderAct struct {
*subcmd.BaseOptions
Service spiderctl.ImportSchemaFromLocalSpiderComp
}

// Init 初始化
func (d *ImportSchemaFromLocalSpiderAct) Init() (err error) {
logger.Info("InitCLusterRoutingAct Init")
Expand Down Expand Up @@ -79,3 +79,45 @@ func (d *ImportSchemaFromLocalSpiderAct) Run() (err error) {
logger.Info("import schema to empty tdbctl succcess ~")
return nil
}

// ImportSchemaFromBackendAct 从backend导入表结构到中控节点
type ImportSchemaFromBackendAct struct {
*subcmd.BaseOptions
Service spiderctl.ImportSchemaFromBackendComp
}

// Init prepare run env
func (d *ImportSchemaFromBackendAct) Init() (err error) {
logger.Info("InitCLusterRoutingAct Init")
if err = d.Deserialize(&d.Service.Params); err != nil {
logger.Error("DeserializeAndValidate failed, %v", err)
return err
}
d.Service.GeneralParam = subcmd.GeneralRuntimeParam
return
}

// Run 执行
func (d *ImportSchemaFromBackendAct) Run() (err error) {
steps := subcmd.Steps{
{
FunName: "初始化",
Func: d.Service.Init,
},
{
FunName: "从本地spider导出表结构至tdbctl",
Func: d.Service.Migrate,
},
{
FunName: "从本地spider导出存储过程、触发器等dbctl",
Func: d.Service.MigrateRoutinesAndTriger,
},
}

if err = steps.Run(); err != nil {
return err
}

logger.Info("import schema to tdbctl succcess ~")
return nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,12 @@ package spiderctlcmd
import (
"fmt"

"github.com/spf13/cobra"

"dbm-services/common/go-pubpkg/logger"
"dbm-services/mysql/db-tools/dbactuator/internal/subcmd"
"dbm-services/mysql/db-tools/dbactuator/pkg/components/spiderctl"
"dbm-services/mysql/db-tools/dbactuator/pkg/util"

"github.com/spf13/cobra"
)

// InitCLusterRoutingAct 初始化tendb cluster 集群的路由关系
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"os"
"path"
"runtime"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -68,6 +67,20 @@ type importSchemaFromBackendRuntime struct {
spiderAdminPwd string
}

// Example subcommand example input
func (c *ImportSchemaFromBackendComp) Example() interface{} {
comp := ImportSchemaFromBackendComp{
Params: ImportSchemaFromBackendParam{
Host: "1.1.1.1",
Port: 26000,
SpiderPort: 25000,
BackendHost: "1.1.1.1",
BackendPort: 20000,
},
}
return comp
}

// Init prepare run env
func (c *ImportSchemaFromBackendComp) Init() (err error) {
c.spiderAdminUser = c.Params.TdbctlUser
Expand Down Expand Up @@ -138,6 +151,27 @@ func (c *ImportSchemaFromBackendComp) Init() (err error) {
return err
}

// Migrate do migrate
func (c *ImportSchemaFromBackendComp) Migrate() (err error) {
switch {
case c.Params.UseMydumper && c.Params.Stream:
logger.Info("will use mydumper to stream migrate schema")
err = c.streamMigrate()
case c.Params.UseMydumper:
logger.Info("will use mydumper to migrate schema")
err = c.migrateUseMydumper()
default:
logger.Info("will use mysqldump to migrate schema")
err = c.migrateUseMysqlDump()
}
if err != nil {
logger.Error("migrate schema failed %s", err.Error())
return err
}
logger.Info("migrate schema success~")
return nil
}

func (c *ImportSchemaFromBackendComp) streamMigrate() (err error) {
logger.Info("will create mydumper.cnf ...")
mydumperCnf := path.Join(c.tmpDumpDir, "mydumper.cnf")
Expand Down Expand Up @@ -314,7 +348,9 @@ func (c *ImportSchemaFromBackendComp) migrateUseMysqlDump() (err error) {
return errors.Join(errs...)
}

func (c *ImportSchemaFromBackendComp) migrateRoutinesAndTrigeer() (err error) {
// MigrateRoutinesAndTriger TODO
func (c *ImportSchemaFromBackendComp) MigrateRoutinesAndTriger() (err error) {
logger.Info("will import routines and triggers to tdbctl")
var dumper mysqlutil.Dumper
dumpOption := mysqlutil.MySQLDumpOption{
DumpSchema: false,
Expand All @@ -339,7 +375,7 @@ func (c *ImportSchemaFromBackendComp) migrateRoutinesAndTrigeer() (err error) {
},
OutputfileName: c.tmpDumpFile,
}
if err := dumper.Dump(); err != nil {
if err = dumper.Dump(); err != nil {
logger.Error("dump 入存储过程、触发器、event failed: %s", err.Error())
return err
}
Expand All @@ -358,11 +394,7 @@ func (c *ImportSchemaFromBackendComp) migrateRoutinesAndTrigeer() (err error) {
logger.Error("执行导入存储过程、触发器、event的SQL文件:%s 失败:%s", c.tmpDumpFile, err.Error())
return err
}
return
}

func buildMyDumperRe(dbs []string) string {
return fmt.Sprintf("^(%s)", strings.Join(dbs, "|"))
return err
}

func buildBackendDbNames(dbs []string) (beDbs []string) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from backend.components.db_remote_service.client import DRSApi
from backend.configuration.constants import DBType
from backend.constants import IP_PORT_DIVIDER
from backend.db_meta.enums import ClusterType, TenDBClusterSpiderRole
from backend.db_meta.enums import ClusterType, InstanceInnerRole, TenDBClusterSpiderRole
from backend.db_meta.exceptions import ClusterNotExistException
from backend.db_meta.models import Cluster, StorageInstance
from backend.flow.consts import LONG_JOB_TIMEOUT
Expand Down Expand Up @@ -215,6 +215,11 @@ def run(self):
slave_spiders = cluster_obj.proxyinstance_set.filter(
tendbclusterspiderext__spider_role=TenDBClusterSpiderRole.SPIDER_SLAVE.value
)
shard0 = cluster_obj.tendbclusterstorageset_set.get(
shard_id=0, storage_instance_tuple__ejector__instance_inner_role=InstanceInnerRole.MASTER
)
shard0_host = shard0.storage_instance_tuple.receiver.machine.ip
shard0_port = shard0.storage_instance_tuple.receiver.port
master_spider_ips = [c.machine.ip for c in master_spiders]
logging.info("master_spider_ips: %s" % [c.machine.ip for c in master_spiders])
if len(master_spider_ips) < 2:
Expand Down Expand Up @@ -350,6 +355,8 @@ def run(self):
"use_mydumper": self.use_mydumper,
"drop_before": self.drop_before,
"threads": self.threads,
"shard_0_host": shard0_host,
"shard_0_port": shard0_port,
"tdbctl_user": self.tdbctl_user,
"tdbctl_pass": self.tdbctl_pass,
}
Expand Down
2 changes: 2 additions & 0 deletions dbm-ui/backend/flow/utils/mysql/mysql_act_playload.py
Original file line number Diff line number Diff line change
Expand Up @@ -394,6 +394,8 @@ def get_import_schema_to_tdbctl_payload(self, **kwargs):
"extend": {
"host": kwargs["ip"],
"port": self.cluster["ctl_port"],
"backend_host": self.cluster["shard_0_host"],
"backend_port": self.cluster["shard_0_port"],
"spider_port": self.cluster["spider_port"],
"use_mydumper": self.cluster["use_mydumper"],
"stream": self.cluster["stream"],
Expand Down

0 comments on commit 0b73c59

Please sign in to comment.