diff --git a/docs/2.0/example-mysql2kafka.toml b/docs/2.0/example-mysql2kafka.toml index 4b0d9c66..37e5bdbc 100644 --- a/docs/2.0/example-mysql2kafka.toml +++ b/docs/2.0/example-mysql2kafka.toml @@ -54,7 +54,7 @@ id = "another_id" [output] type = "async-kafka" -[output.config] +[output.config.kafka-global-config] broker-addrs = ["1.2.3.4:9002"] mode = "async" diff --git a/pkg/inputs/mysqlstream/binlog_tailer.go b/pkg/inputs/mysqlstream/binlog_tailer.go index d3b1c50f..1e79c0fc 100644 --- a/pkg/inputs/mysqlstream/binlog_tailer.go +++ b/pkg/inputs/mysqlstream/binlog_tailer.go @@ -627,6 +627,12 @@ func (tailer *BinlogTailer) AppendMsgTxnBuffer(msg *core.Msg) { if len(tailer.msgTxnBuffer) >= config.TxnBufferLimit { tailer.FlushMsgTxnBuffer() tailer.ClearMsgTxnBuffer() + + // send heartbeat to prevent sliding window timeout + heartbeat := NewHeartbeatMsg(tailer.AfterMsgCommit) + if err := tailer.emitter.Emit(heartbeat); err != nil { + log.Fatalf("failed to emit heartbeat msg: %v", errors.ErrorStack(err)) + } } } diff --git a/pkg/inputs/mysqlstream/msg.go b/pkg/inputs/mysqlstream/msg.go index 741618ff..fa4a2a4e 100644 --- a/pkg/inputs/mysqlstream/msg.go +++ b/pkg/inputs/mysqlstream/msg.go @@ -30,6 +30,7 @@ const ( ddl binlogOp = "ddl" xid binlogOp = "xid" barrier binlogOp = "barrier" + heartbeat binlogOp = "heartbeat" ) type inputContext struct { @@ -369,6 +370,20 @@ func NewBarrierMsg(callback core.MsgCallbackFunc) *core.Msg { } } +func NewHeartbeatMsg(callback core.MsgCallbackFunc) *core.Msg { + return &core.Msg{ + Type: core.MsgCtl, + Timestamp: time.Now(), + Done: make(chan struct{}), + InputContext: inputContext{op: heartbeat}, + InputStreamKey: utils.NewStringPtr(inputStreamKey), + AfterCommitCallback: callback, + Phase: core.Phase{ + Start: time.Now(), + }, + } +} + func NewXIDMsg(ts int64, received time.Time, callback core.MsgCallbackFunc, position config.MySQLBinlogPosition) *core.Msg { return &core.Msg{ Phase: core.Phase{ diff --git a/pkg/outputs/mysql/mysql_test.go b/pkg/outputs/mysql/mysql_test.go index e2342a1b..c14d6ceb 100644 --- a/pkg/outputs/mysql/mysql_test.go +++ b/pkg/outputs/mysql/mysql_test.go @@ -2,6 +2,8 @@ package mysql import ( "bytes" + "fmt" + "strings" "testing" "github.com/pingcap/parser/ast" @@ -124,3 +126,17 @@ func TestDDL(t *testing.T) { require.Equal(t, expected[i], b.String()) } } + +func TestVarchar0(t *testing.T) { + s := "CREATE TABLE a (`scene` varchar(0) NOT NULL DEFAULT ' ' COMMENT '场景')" + + p := parser.New() + stmt, err := p.ParseOneStmt(s, "", "") + require.NoError(t, err) + + writer := &strings.Builder{} + ctx := format.NewRestoreCtx(format.RestoreStringSingleQuotes|format.RestoreKeyWordLowercase|format.RestoreNameBackQuotes, writer) + err = stmt.Restore(ctx) + require.NoError(t, err) + fmt.Println(writer.String()) +}