Skip to content

Commit

Permalink
fix(mysql): delete after sync according to batch
Browse files Browse the repository at this point in the history
  • Loading branch information
hantmac committed Jan 17, 2025
1 parent b077a9d commit bd972b7
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 7 deletions.
6 changes: 4 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,13 @@ func main() {
panic(err)
}
} else {
dbs, err := src.GetDatabasesAccordingToSourceDbRegex(cfg.SourceDB)
dbName := fmt.Sprintf("^%s$", cfg.SourceDB)
dbs, err := src.GetDatabasesAccordingToSourceDbRegex(dbName)
if err != nil {
panic(err)
}
dbTables, err = src.GetTablesAccordingToSourceTableRegex(cfg.SourceTable, dbs)
tableName := fmt.Sprintf("^%s$", cfg.SourceTable)
dbTables, err = src.GetTablesAccordingToSourceTableRegex(tableName, dbs)
if err != nil {
panic(err)
}
Expand Down
44 changes: 39 additions & 5 deletions source/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,50 @@ func (s *MysqlSource) GetMinMaxTimeSplitKey() (string, string, error) {
}

func (s *MysqlSource) DeleteAfterSync() error {
if s.cfg.DeleteAfterSync {
_, err := s.db.Exec(fmt.Sprintf("delete from %s.%s where %s", s.cfg.SourceDB,
s.cfg.SourceTable, s.cfg.SourceWhereCondition))
if err != nil {
return err
if !s.cfg.DeleteAfterSync {
return nil
}

dbTables, err := s.GetDbTablesAccordingToSourceDbTables()
if err != nil {
return err
}

for db, tables := range dbTables {
for _, table := range tables {
count, err := s.GetSourceReadRowsCount()
if err != nil {
log.Printf("Error getting row count for table %s.%s: %v", db, table, err)
continue
}

// Delete in batches
for count > 0 {
limit := min(int(s.cfg.BatchSize), count)
query := fmt.Sprintf("DELETE FROM %s.%s WHERE %s LIMIT %d", db, table, s.cfg.SourceWhereCondition, limit)
_, err := s.db.Exec(query)
if err != nil {
log.Printf("Error deleting rows from table %s.%s: %v", db, table, err)
break
}
count -= limit
log.Printf("Deleted %d rows from table %s.%s\n", limit, db, table)
time.Sleep(time.Duration(s.cfg.BatchMaxInterval) * time.Second)
}
}
}

return nil
}

// Utility function to get the smaller of two integers
func min(a, b int) int {
if a < b {
return a
}
return b
}

func (s *MysqlSource) QueryTableData(threadNum int, conditionSql string) ([][]interface{}, []string, error) {
startTime := time.Now()
execSql := fmt.Sprintf("SELECT * FROM %s.%s WHERE %s", s.cfg.SourceDB,
Expand Down

0 comments on commit bd972b7

Please sign in to comment.