Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

VReplication: Optimize replication on target tablets #17166

Open
wants to merge 18 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/common/scripts/vttablet-up.sh
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ vttablet \
--service_map 'grpc-queryservice,grpc-tabletmanager,grpc-updatestream' \
--pid_file $VTDATAROOT/$tablet_dir/vttablet.pid \
--heartbeat_on_demand_duration=5s \
--pprof-http \
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change will be reverted before merging.

Copy link
Contributor Author

@mattlord mattlord Nov 10, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

After thinking about it more, I'd like to keep this enabled for the examples. Seems like a good case for it. I can remove/revert the change though if others disagree. If we do keep it here, I could also add it for vtgate and vtctld.

> $VTDATAROOT/$tablet_dir/vttablet.out 2>&1 &

# Block waiting for the tablet to be listening
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vtcombo.txt
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ Flags:
--vreplication_copy_phase_duration duration Duration for each copy phase loop (before running the next catchup: default 1h) (default 1h0m0s)
--vreplication_copy_phase_max_innodb_history_list_length int The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 1000000)
--vreplication_copy_phase_max_mysql_replication_lag int The maximum MySQL replication lag (in seconds) that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 43200)
--vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 3)
--vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 7)
--vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1)
--vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence
--vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300)
Expand Down
2 changes: 1 addition & 1 deletion go/flags/endtoend/vttablet.txt
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ Flags:
--vreplication_copy_phase_duration duration Duration for each copy phase loop (before running the next catchup: default 1h) (default 1h0m0s)
--vreplication_copy_phase_max_innodb_history_list_length int The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 1000000)
--vreplication_copy_phase_max_mysql_replication_lag int The maximum MySQL replication lag (in seconds) that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 43200)
--vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 3)
--vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 7)
--vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1)
--vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence
--vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300)
Expand Down
4 changes: 0 additions & 4 deletions go/test/endtoend/onlineddl/flow/onlineddl_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,6 @@ import (
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)
Expand Down Expand Up @@ -145,9 +144,6 @@ func TestMain(m *testing.M) {
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "2s",
"--watch_replication_stream",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ import (
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

type WriteMetrics struct {
Expand Down Expand Up @@ -184,9 +183,6 @@ func TestMain(m *testing.M) {
"--heartbeat_on_demand_duration", "5s",
"--migration_check_interval", "5s",
"--watch_replication_stream",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ import (
"vitess.io/vitess/go/timer"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/schema"
vttablet "vitess.io/vitess/go/vt/vttablet/common"
)

type testcase struct {
Expand Down Expand Up @@ -436,9 +435,6 @@ func TestMain(m *testing.M) {
"--migration_check_interval", "5s",
"--vstream_packet_size", "4096", // Keep this value small and below 10k to ensure multilple vstream iterations
"--watch_replication_stream",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
clusterInstance.VtGateExtraArgs = []string{
"--ddl_strategy", "online",
Expand Down
13 changes: 0 additions & 13 deletions go/test/endtoend/vreplication/cluster_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@ import (
"vitess.io/vitess/go/vt/mysqlctl"
"vitess.io/vitess/go/vt/sqlparser"
"vitess.io/vitess/go/vt/vtgate/planbuilder/plancontext"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
)
Expand Down Expand Up @@ -101,18 +100,6 @@ func (cc *ClusterConfig) enableGTIDCompression() func() {
}
}

// setAllVTTabletExperimentalFlags sets all the experimental flags for vttablet and returns a function
// that can be used to reset them in a defer.
func setAllVTTabletExperimentalFlags() func() {
experimentalArgs := fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching)
oldArgs := extraVTTabletArgs
extraVTTabletArgs = append(extraVTTabletArgs, experimentalArgs)
return func() {
extraVTTabletArgs = oldArgs
}
}

// VitessCluster represents all components within the test cluster
type VitessCluster struct {
t *testing.T
Expand Down
4 changes: 0 additions & 4 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/log"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
)
Expand All @@ -43,9 +42,6 @@ func TestFKWorkflow(t *testing.T) {
extraVTTabletArgs = []string{
// Ensure that there are multiple copy phase cycles per table.
"--vstream_packet_size=256",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}
defer func() { extraVTTabletArgs = nil }()

Expand Down
4 changes: 0 additions & 4 deletions go/test/endtoend/vreplication/vdiff2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ import (
"vitess.io/vitess/go/test/endtoend/cluster"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/sqlparser"
vttablet "vitess.io/vitess/go/vt/vttablet/common"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
Expand Down Expand Up @@ -140,9 +139,6 @@ func TestVDiff2(t *testing.T) {
extraVTTabletArgs = []string{
// This forces us to use multiple vstream packets even with small test tables.
"--vstream_packet_size=1",
// Test VPlayer batching mode.
fmt.Sprintf("--vreplication_experimental_flags=%d",
vttablet.VReplicationExperimentalFlagAllowNoBlobBinlogRowImage|vttablet.VReplicationExperimentalFlagOptimizeInserts|vttablet.VReplicationExperimentalFlagVPlayerBatching),
}

vc = NewVitessCluster(t, &clusterOptions{cells: strings.Split(cellNames, ",")})
Expand Down
7 changes: 4 additions & 3 deletions go/test/endtoend/vreplication/vdiff_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
)

const (
vdiffTimeout = 120 * time.Second // We can leverage auto retry on error with this longer-than-usual timeout
vdiffTimeout = 180 * time.Second // We can leverage auto retry on error with this longer-than-usual timeout
Copy link
Contributor Author

@mattlord mattlord Nov 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can see the reason for the vdiff test changes in the commit message: f0f61db

vdiffRetryTimeout = 30 * time.Second
vdiffStatusCheckInterval = 5 * time.Second
vdiffRetryInterval = 5 * time.Second
Expand Down Expand Up @@ -71,7 +71,8 @@ func doVtctlclientVDiff(t *testing.T, keyspace, workflow, cells string, want *ex
ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflow)
t.Run(fmt.Sprintf("vtctlclient vdiff %s", ksWorkflow), func(t *testing.T) {
// update-table-stats is needed in order to test progress reports.
uuid, _ := performVDiff2Action(t, true, ksWorkflow, cells, "create", "", false, "--auto-retry", "--update-table-stats")
uuid, _ := performVDiff2Action(t, true, ksWorkflow, cells, "create", "", false, "--auto-retry",
"--update-table-stats", fmt.Sprintf("--filtered_replication_wait_time=%v", vdiffTimeout/2))
info := waitForVDiff2ToComplete(t, true, ksWorkflow, cells, uuid, time.Time{})
require.NotNil(t, info)
require.Equal(t, workflow, info.Workflow)
Expand Down Expand Up @@ -164,7 +165,7 @@ func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *e
ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflow)
t.Run(fmt.Sprintf("vtctldclient vdiff %s", ksWorkflow), func(t *testing.T) {
// update-table-stats is needed in order to test progress reports.
flags := []string{"--auto-retry", "--update-table-stats"}
flags := []string{"--auto-retry", "--update-table-stats", fmt.Sprintf("--filtered-replication-wait-time=%v", vdiffTimeout/2)}
if len(extraFlags) > 0 {
flags = append(flags, extraFlags...)
}
Expand Down
3 changes: 0 additions & 3 deletions go/test/endtoend/vreplication/vreplication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,6 @@ func TestVreplicationCopyThrottling(t *testing.T) {
}

func TestBasicVreplicationWorkflow(t *testing.T) {
defer setAllVTTabletExperimentalFlags()
sourceKsOpts["DBTypeVersion"] = "mysql-8.0"
targetKsOpts["DBTypeVersion"] = "mysql-8.0"
testBasicVreplicationWorkflow(t, "noblob")
Expand Down Expand Up @@ -595,8 +594,6 @@ func TestCellAliasVreplicationWorkflow(t *testing.T) {
cells := []string{"zone1", "zone2"}
resetCompression := mainClusterConfig.enableGTIDCompression()
defer resetCompression()
resetExperimentalFlags := setAllVTTabletExperimentalFlags()
defer resetExperimentalFlags()
vc = NewVitessCluster(t, &clusterOptions{cells: cells})
defer vc.TearDown()

Expand Down
3 changes: 1 addition & 2 deletions go/vt/vttablet/common/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,7 @@ const (
)

var (
// Default flags: currently VReplicationExperimentalFlagVPlayerBatching is not enabled by default.
vreplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage
vreplicationExperimentalFlags = VReplicationExperimentalFlagOptimizeInserts | VReplicationExperimentalFlagAllowNoBlobBinlogRowImage | VReplicationExperimentalFlagVPlayerBatching
vreplicationNetReadTimeout = 300
vreplicationNetWriteTimeout = 600
vreplicationCopyPhaseDuration = 1 * time.Hour
Expand Down
72 changes: 34 additions & 38 deletions go/vt/vttablet/tabletmanager/vreplication/replicator_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,67 +617,59 @@ func valsEqual(v1, v2 sqltypes.Value) bool {
// on the source: sum/count for aggregation queries, for example.
func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
bindLocations := tp.BulkInsertValues.BindLocations()
if len(tp.Fields) < len(bindLocations) {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ",
len(tp.Fields), len(bindLocations))
}

type colInfo struct {
typ querypb.Type
length int64
offset int64
field *querypb.Field
}
rowInfo := make([]*colInfo, 0)

offset := int64(0)
for i, field := range tp.Fields { // collect info required for fields to be bound
length := row.Lengths[i]
if !tp.FieldsToSkip[strings.ToLower(field.Name)] {
rowInfo = append(rowInfo, &colInfo{
typ: field.Type,
length: length,
offset: offset,
field: field,
})
}
if length > 0 {
offset += row.Lengths[i]
usedFieldCnt := len(tp.Fields) - len(tp.FieldsToSkip)
if usedFieldCnt != len(bindLocations) {
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations",
usedFieldCnt, len(bindLocations))
}

// Bind field values to locations.
var (
offset int64
offsetQuery int
fieldsIndex int
field *querypb.Field
)
for i, loc := range bindLocations {
field = tp.Fields[fieldsIndex]
length := row.Lengths[fieldsIndex]
for tp.FieldsToSkip[strings.ToLower(field.Name)] {
if length > 0 {
offset += length
}
fieldsIndex++
field = tp.Fields[fieldsIndex]
length = row.Lengths[fieldsIndex]
}
}

// bind field values to locations
var offsetQuery int
for i, loc := range bindLocations {
col := rowInfo[i]
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:loc.Offset])
typ := col.typ
typ := field.Type

switch typ {
case querypb.Type_TUPLE:
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected Type_TUPLE for value %d", i)
case querypb.Type_JSON:
if col.length < 0 { // An SQL NULL and not an actual JSON value
if length < 0 { // An SQL NULL and not an actual JSON value
buf.WriteString(sqltypes.NullStr)
} else { // A JSON value (which may be a JSON null literal value)
buf2 := row.Values[col.offset : col.offset+col.length]
buf2 := row.Values[offset : offset+length]
vv, err := vjson.MarshalSQLValue(buf2)
if err != nil {
return err
}
buf.WriteString(vv.RawStr())
}
default:
if col.length < 0 {
if length < 0 {
// -1 means a null variable; serialize it directly
buf.WriteString(sqltypes.NullStr)
} else {
raw := row.Values[col.offset : col.offset+col.length]
raw := row.Values[offset : offset+length]
var vv sqltypes.Value

if conversion, ok := tp.ConvertCharset[col.field.Name]; ok && col.length > 0 {
if conversion, ok := tp.ConvertCharset[field.Name]; ok && length > 0 {
// Non-null string value, for which we have a charset conversion instruction
out, err := tp.convertStringCharset(raw, conversion, col.field.Name)
out, err := tp.convertStringCharset(raw, conversion, field.Name)
if err != nil {
return err
}
Expand All @@ -690,6 +682,10 @@ func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error {
}
}
offsetQuery = loc.Offset + loc.Length
if length > 0 {
offset += length
}
fieldsIndex++
}
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:])
return nil
Expand Down
Loading
Loading