Skip to content

Commit

Permalink
minor fixes (#238)
Browse files Browse the repository at this point in the history
- fix tidb rename ddl npe
- fix scheduler potential dead lock
  • Loading branch information
Ryan-Git authored Nov 26, 2019
1 parent d6a14f9 commit 20964d9
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 16 deletions.
33 changes: 23 additions & 10 deletions pkg/inputs/mysqlstream/binlog_tailer.go
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,8 @@ func (tailer *BinlogTailer) Start() error {
log.Fatalf("[binlogTailer] failed to flush position cache, err: %v", errors.ErrorStack(err))
}

sent := 0

for i := range dbNames {
dbName := dbNames[i]
table := tables[i]
Expand Down Expand Up @@ -473,23 +475,34 @@ func (tailer *BinlogTailer) Start() error {
ddlSQL,
int64(e.Header.Timestamp),
received)

// do not send messages without router to the system
if consts.IsInternalDBTraffic(dbName) ||
(tailer.router != nil && !tailer.router.Exists(ddlMsg)) {
continue
}

if err := tailer.emitter.Emit(ddlMsg); err != nil {
log.Fatalf("failed to emit ddl msg: %v", errors.ErrorStack(err))
}
sent++
}

// emit barrier msg
barrierMsg = NewBarrierMsg(tailer.AfterMsgCommit)
barrierMsg.InputContext = inputContext{op: ddl, position: currentPosition}
if err := tailer.emitter.Emit(barrierMsg); err != nil {
log.Fatalf("failed to emit barrier msg: %v", errors.ErrorStack(err))
}
<-barrierMsg.Done
if err := tailer.positionCache.Flush(); err != nil {
log.Fatalf("[binlogTailer] failed to flush position cache, err: %v", errors.ErrorStack(err))
if sent > 0 {
// emit barrier msg
barrierMsg = NewBarrierMsg(tailer.AfterMsgCommit)
barrierMsg.InputContext = inputContext{op: ddl, position: currentPosition}
if err := tailer.emitter.Emit(barrierMsg); err != nil {
log.Fatalf("failed to emit barrier msg: %v", errors.ErrorStack(err))
}
<-barrierMsg.Done
if err := tailer.positionCache.Flush(); err != nil {
log.Fatalf("[binlogTailer] failed to flush position cache, err: %v", errors.ErrorStack(err))
}

log.Infof("[binlogTailer] ddl done with gtid: %v, stmt: %s", ev.GSet.String(), string(ev.Query))
}

log.Infof("[binlogTailer] ddl done with gtid: %v, stmt: %s", ev.GSet.String(), string(ev.Query))
case *replication.GTIDEvent:
// GTID stands for Global Transaction IDentifier
// It is composed of two parts:
Expand Down
13 changes: 8 additions & 5 deletions pkg/outputs/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,12 +352,15 @@ func (output *MySQLOutput) Execute(msgs []*core.Msg) error {

if output.isTiDB {
a := &ast.RenameTableStmt{
OldTable: toTableName(os, ot),
NewTable: toTableName(ns, nt),
TableToTables: make([]*ast.TableToTable, 1),
OldTable: toTableName(os, ot),
NewTable: toTableName(ns, nt),
TableToTables: []*ast.TableToTable{
{
OldTable: toTableName(os, ot),
NewTable: toTableName(ns, nt),
},
},
}
a.TableToTables[0].OldTable = a.OldTable
a.TableToTables[0].NewTable = a.NewTable
targetDDLs = append(targetDDLs, restore(a))
} else {
tmp.TableToTables[i].OldTable = toTableName(os, ot)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ func (scheduler *batchScheduler) dispatchMsg(msg *core.Msg) error {

func (scheduler *batchScheduler) startTableDispatcher(tableKey string) {
scheduler.tableBuffers[tableKey] = make(chan *core.Msg, scheduler.cfg.MaxBatchPerWorker*10)
scheduler.tableLatchC[tableKey] = make(chan uint64, scheduler.cfg.MaxBatchPerWorker*10)
scheduler.tableLatchC[tableKey] = make(chan uint64, scheduler.cfg.SlidingWindowSize)
scheduler.tableBufferWg.Add(1)

go func(c chan *core.Msg, tableLatchC chan uint64, key string) {
Expand Down

0 comments on commit 20964d9

Please sign in to comment.