-
Notifications
You must be signed in to change notification settings - Fork 2.1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
VReplication: Optimize replication on target tablets #17166
base: main
Are you sure you want to change the base?
Changes from all commits
23024bd
a5472f4
ab65f0c
4371f80
ac27fd5
ad8233e
bf930af
11b3493
ed81da1
fcfa376
2e3334f
aa359bc
5f5c76a
f0f61db
21564fd
b8a3271
5530030
5f15e3f
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -413,7 +413,7 @@ Flags: | |
--vreplication_copy_phase_duration duration Duration for each copy phase loop (before running the next catchup: default 1h) (default 1h0m0s) | ||
--vreplication_copy_phase_max_innodb_history_list_length int The maximum InnoDB transaction history that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 1000000) | ||
--vreplication_copy_phase_max_mysql_replication_lag int The maximum MySQL replication lag (in seconds) that can exist on a vstreamer (source) before starting another round of copying rows. This helps to limit the impact on the source tablet. (default 43200) | ||
--vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 3) | ||
--vreplication_experimental_flags int (Bitmask) of experimental features in vreplication to enable (default 7) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 |
||
--vreplication_heartbeat_update_interval int Frequency (in seconds, default 1, max 60) at which the time_updated column of a vreplication stream when idling (default 1) | ||
--vreplication_max_time_to_retry_on_error duration stop automatically retrying when we've had consecutive failures with the same error for this long after the first occurrence | ||
--vreplication_net_read_timeout int Session value of net_read_timeout for vreplication, in seconds (default 300) | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -35,7 +35,7 @@ import ( | |
) | ||
|
||
const ( | ||
vdiffTimeout = 120 * time.Second // We can leverage auto retry on error with this longer-than-usual timeout | ||
vdiffTimeout = 180 * time.Second // We can leverage auto retry on error with this longer-than-usual timeout | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You can see the reason for the vdiff test changes in the commit message: f0f61db |
||
vdiffRetryTimeout = 30 * time.Second | ||
vdiffStatusCheckInterval = 5 * time.Second | ||
vdiffRetryInterval = 5 * time.Second | ||
|
@@ -71,7 +71,8 @@ func doVtctlclientVDiff(t *testing.T, keyspace, workflow, cells string, want *ex | |
ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflow) | ||
t.Run(fmt.Sprintf("vtctlclient vdiff %s", ksWorkflow), func(t *testing.T) { | ||
// update-table-stats is needed in order to test progress reports. | ||
uuid, _ := performVDiff2Action(t, true, ksWorkflow, cells, "create", "", false, "--auto-retry", "--update-table-stats") | ||
uuid, _ := performVDiff2Action(t, true, ksWorkflow, cells, "create", "", false, "--auto-retry", | ||
"--update-table-stats", fmt.Sprintf("--filtered_replication_wait_time=%v", vdiffTimeout/2)) | ||
info := waitForVDiff2ToComplete(t, true, ksWorkflow, cells, uuid, time.Time{}) | ||
require.NotNil(t, info) | ||
require.Equal(t, workflow, info.Workflow) | ||
|
@@ -164,7 +165,7 @@ func doVtctldclientVDiff(t *testing.T, keyspace, workflow, cells string, want *e | |
ksWorkflow := fmt.Sprintf("%s.%s", keyspace, workflow) | ||
t.Run(fmt.Sprintf("vtctldclient vdiff %s", ksWorkflow), func(t *testing.T) { | ||
// update-table-stats is needed in order to test progress reports. | ||
flags := []string{"--auto-retry", "--update-table-stats"} | ||
flags := []string{"--auto-retry", "--update-table-stats", fmt.Sprintf("--filtered-replication-wait-time=%v", vdiffTimeout/2)} | ||
if len(extraFlags) > 0 { | ||
flags = append(flags, extraFlags...) | ||
} | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -618,66 +618,57 @@ func valsEqual(v1, v2 sqltypes.Value) bool { | |
func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error { | ||
bindLocations := tp.BulkInsertValues.BindLocations() | ||
if len(tp.Fields) < len(bindLocations) { | ||
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations ", | ||
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "wrong number of fields: got %d fields for %d bind locations", | ||
len(tp.Fields), len(bindLocations)) | ||
} | ||
|
||
type colInfo struct { | ||
typ querypb.Type | ||
length int64 | ||
offset int64 | ||
field *querypb.Field | ||
} | ||
rowInfo := make([]*colInfo, 0) | ||
|
||
offset := int64(0) | ||
for i, field := range tp.Fields { // collect info required for fields to be bound | ||
length := row.Lengths[i] | ||
if !tp.FieldsToSkip[strings.ToLower(field.Name)] { | ||
rowInfo = append(rowInfo, &colInfo{ | ||
typ: field.Type, | ||
length: length, | ||
offset: offset, | ||
field: field, | ||
}) | ||
} | ||
if length > 0 { | ||
offset += row.Lengths[i] | ||
// Bind field values to locations. | ||
var ( | ||
offset int64 | ||
offsetQuery int | ||
fieldsIndex int | ||
field *querypb.Field | ||
) | ||
for i, loc := range bindLocations { | ||
field = tp.Fields[fieldsIndex] | ||
length := row.Lengths[fieldsIndex] | ||
for tp.FieldsToSkip[strings.ToLower(field.Name)] { | ||
if length > 0 { | ||
offset += length | ||
} | ||
fieldsIndex++ | ||
field = tp.Fields[fieldsIndex] | ||
length = row.Lengths[fieldsIndex] | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This looks fine. I don't have the context to understand why this change is necessarily faster. Can you please explain? |
||
} | ||
} | ||
|
||
// bind field values to locations | ||
var offsetQuery int | ||
for i, loc := range bindLocations { | ||
col := rowInfo[i] | ||
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:loc.Offset]) | ||
typ := col.typ | ||
typ := field.Type | ||
|
||
switch typ { | ||
case querypb.Type_TUPLE: | ||
return vterrors.Errorf(vtrpcpb.Code_INTERNAL, "unexpected Type_TUPLE for value %d", i) | ||
case querypb.Type_JSON: | ||
if col.length < 0 { // An SQL NULL and not an actual JSON value | ||
if length < 0 { // An SQL NULL and not an actual JSON value | ||
buf.WriteString(sqltypes.NullStr) | ||
} else { // A JSON value (which may be a JSON null literal value) | ||
buf2 := row.Values[col.offset : col.offset+col.length] | ||
buf2 := row.Values[offset : offset+length] | ||
vv, err := vjson.MarshalSQLValue(buf2) | ||
if err != nil { | ||
return err | ||
} | ||
buf.WriteString(vv.RawStr()) | ||
} | ||
default: | ||
if col.length < 0 { | ||
if length < 0 { | ||
// -1 means a null variable; serialize it directly | ||
buf.WriteString(sqltypes.NullStr) | ||
} else { | ||
raw := row.Values[col.offset : col.offset+col.length] | ||
raw := row.Values[offset : offset+length] | ||
var vv sqltypes.Value | ||
|
||
if conversion, ok := tp.ConvertCharset[col.field.Name]; ok && col.length > 0 { | ||
if conversion, ok := tp.ConvertCharset[field.Name]; ok && length > 0 { | ||
// Non-null string value, for which we have a charset conversion instruction | ||
out, err := tp.convertStringCharset(raw, conversion, col.field.Name) | ||
out, err := tp.convertStringCharset(raw, conversion, field.Name) | ||
if err != nil { | ||
return err | ||
} | ||
|
@@ -690,6 +681,10 @@ func (tp *TablePlan) appendFromRow(buf *bytes2.Buffer, row *querypb.Row) error { | |
} | ||
} | ||
offsetQuery = loc.Offset + loc.Length | ||
if length > 0 { | ||
offset += length | ||
} | ||
fieldsIndex++ | ||
} | ||
buf.WriteString(tp.BulkInsertValues.Query[offsetQuery:]) | ||
return nil | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This change will be reverted before merging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After thinking about it more, I'd like to keep this enabled for the examples. Seems like a good case for it. I can remove/revert the change though if others disagree. If we do keep it here, I could also add it for vtgate and vtctld.