Skip to content

Commit

Permalink
feat(mysqlstream): add heartbeat msg to prevent sliding window timeout (
Browse files Browse the repository at this point in the history
#275)

* feat(mysqlstream): add heartbeat msg to prevent sliding window timeout when handling large transaction
  • Loading branch information
Ryan-Git authored Apr 20, 2020
1 parent e15326c commit 5b01dfc
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 1 deletion.
2 changes: 1 addition & 1 deletion docs/2.0/example-mysql2kafka.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ id = "another_id"

[output]
type = "async-kafka"
[output.config]
[output.config.kafka-global-config]
broker-addrs = ["1.2.3.4:9002"]
mode = "async"

Expand Down
6 changes: 6 additions & 0 deletions pkg/inputs/mysqlstream/binlog_tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -627,6 +627,12 @@ func (tailer *BinlogTailer) AppendMsgTxnBuffer(msg *core.Msg) {
if len(tailer.msgTxnBuffer) >= config.TxnBufferLimit {
tailer.FlushMsgTxnBuffer()
tailer.ClearMsgTxnBuffer()

// send heartbeat to prevent sliding window timeout
heartbeat := NewHeartbeatMsg(tailer.AfterMsgCommit)
if err := tailer.emitter.Emit(heartbeat); err != nil {
log.Fatalf("failed to emit heartbeat msg: %v", errors.ErrorStack(err))
}
}
}

Expand Down
15 changes: 15 additions & 0 deletions pkg/inputs/mysqlstream/msg.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ const (
ddl binlogOp = "ddl"
xid binlogOp = "xid"
barrier binlogOp = "barrier"
heartbeat binlogOp = "heartbeat"
)

type inputContext struct {
Expand Down Expand Up @@ -369,6 +370,20 @@ func NewBarrierMsg(callback core.MsgCallbackFunc) *core.Msg {
}
}

func NewHeartbeatMsg(callback core.MsgCallbackFunc) *core.Msg {
return &core.Msg{
Type: core.MsgCtl,
Timestamp: time.Now(),
Done: make(chan struct{}),
InputContext: inputContext{op: heartbeat},
InputStreamKey: utils.NewStringPtr(inputStreamKey),
AfterCommitCallback: callback,
Phase: core.Phase{
Start: time.Now(),
},
}
}

func NewXIDMsg(ts int64, received time.Time, callback core.MsgCallbackFunc, position config.MySQLBinlogPosition) *core.Msg {
return &core.Msg{
Phase: core.Phase{
Expand Down
16 changes: 16 additions & 0 deletions pkg/outputs/mysql/mysql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package mysql

import (
"bytes"
"fmt"
"strings"
"testing"

"github.com/pingcap/parser/ast"
Expand Down Expand Up @@ -124,3 +126,17 @@ func TestDDL(t *testing.T) {
require.Equal(t, expected[i], b.String())
}
}

func TestVarchar0(t *testing.T) {
s := "CREATE TABLE a (`scene` varchar(0) NOT NULL DEFAULT ' ' COMMENT '场景')"

p := parser.New()
stmt, err := p.ParseOneStmt(s, "", "")
require.NoError(t, err)

writer := &strings.Builder{}
ctx := format.NewRestoreCtx(format.RestoreStringSingleQuotes|format.RestoreKeyWordLowercase|format.RestoreNameBackQuotes, writer)
err = stmt.Restore(ctx)
require.NoError(t, err)
fmt.Println(writer.String())
}

0 comments on commit 5b01dfc

Please sign in to comment.