diff --git a/cmd/icingadb-migrate/convert.go b/cmd/icingadb-migrate/convert.go index 965077e56..c061c3407 100644 --- a/cmd/icingadb-migrate/convert.go +++ b/cmd/icingadb-migrate/convert.go @@ -50,7 +50,7 @@ type commentRow = struct { func convertCommentRows( env string, envId icingadbTypes.Binary, _ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []commentRow, -) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) { +) (stages []icingaDbOutputStage, checkpoint any) { var commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck []contracts.Entity for _, row := range idoRows { @@ -207,7 +207,9 @@ func convertCommentRows( } } - icingaDbInserts = [][]contracts.Entity{commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck} + stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{ + commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck, + }}} return } @@ -236,7 +238,7 @@ type downtimeRow = struct { func convertDowntimeRows( env string, envId icingadbTypes.Binary, _ func(interface{}, string, ...interface{}), _ *sqlx.Tx, idoRows []downtimeRow, -) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) { +) (stages []icingaDbOutputStage, checkpoint any) { var downtimeHistory, allHistory, sla []contracts.Entity for _, row := range idoRows { @@ -357,7 +359,7 @@ func convertDowntimeRows( sla = append(sla, s) } - icingaDbInserts = [][]contracts.Entity{downtimeHistory, allHistory, sla} + stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{downtimeHistory, allHistory, sla}}} return } @@ -377,7 +379,7 @@ type flappingRow = struct { func convertFlappingRows( env string, envId icingadbTypes.Binary, selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []flappingRow, -) (icingaDbInserts, icingaDbUpserts [][]contracts.Entity, checkpoint any) { +) (stages []icingaDbOutputStage, checkpoint any) { if len(idoRows) < 1 { return } @@ -506,8 +508,10 @@ func convertFlappingRows( } } - icingaDbInserts = [][]contracts.Entity{flappingHistory, allHistory} - icingaDbUpserts = [][]contracts.Entity{flappingHistoryUpserts} + stages = []icingaDbOutputStage{{ + inserts: [][]contracts.Entity{flappingHistory, allHistory}, + upserts: [][]contracts.Entity{flappingHistoryUpserts}, + }} return } @@ -528,7 +532,7 @@ type notificationRow = struct { func convertNotificationRows( env string, envId icingadbTypes.Binary, selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, idoRows []notificationRow, -) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) { +) (stages []icingaDbOutputStage, checkpoint any) { if len(idoRows) < 1 { return } @@ -665,7 +669,9 @@ func convertNotificationRows( } } - icingaDbInserts = [][]contracts.Entity{notificationHistory, userNotificationHistory, allHistory} + stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{ + notificationHistory, userNotificationHistory, allHistory, + }}} return } @@ -721,7 +727,7 @@ type stateRow = struct { func convertStateRows( env string, envId icingadbTypes.Binary, selectCache func(dest interface{}, query string, args ...interface{}), _ *sqlx.Tx, idoRows []stateRow, -) (icingaDbInserts, _ [][]contracts.Entity, checkpoint any) { +) (stages []icingaDbOutputStage, checkpoint any) { if len(idoRows) < 1 { return } @@ -819,6 +825,6 @@ func convertStateRows( } } - icingaDbInserts = [][]contracts.Entity{stateHistory, allHistory, sla} + stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{stateHistory, allHistory, sla}}} return } diff --git a/cmd/icingadb-migrate/main.go b/cmd/icingadb-migrate/main.go index 9618ec2f8..b567db756 100644 --- a/cmd/icingadb-migrate/main.go +++ b/cmd/icingadb-migrate/main.go @@ -368,7 +368,7 @@ func migrateOneType[IdoRow any]( c *Config, idb *icingadb.DB, envId []byte, ht *historyType, convertRows func(env string, envId icingadbTypes.Binary, selectCache func(dest interface{}, query string, args ...interface{}), ido *sqlx.Tx, - idoRows []IdoRow) (icingaDbInserts, icingaDbUpserts [][]contracts.Entity, checkpoint any), + idoRows []IdoRow) (stages []icingaDbOutputStage, checkpoint any), ) { var lastQuery string var lastStmt *sqlx.Stmt @@ -423,30 +423,35 @@ func migrateOneType[IdoRow any]( ht, ht.migrationQuery, args, ht.lastId, func(idoRows []IdoRow) (checkpoint interface{}) { // ... convert them, ... - inserts, upserts, lastIdoId := convertRows(c.Icinga2.Env, envId, selectCache, ht.snapshot, idoRows) + stages, lastIdoId := convertRows(c.Icinga2.Env, envId, selectCache, ht.snapshot, idoRows) // ... and insert them: - for _, op := range []struct { - kind string - data [][]contracts.Entity - streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error - }{{"INSERT IGNORE", inserts, idb.CreateIgnoreStreamed}, {"UPSERT", upserts, idb.UpsertStreamed}} { - for _, table := range op.data { - if len(table) < 1 { - continue - } - - ch := make(chan contracts.Entity, len(table)) - for _, row := range table { - ch <- row - } - - close(ch) - - if err := op.streamer(context.Background(), ch); err != nil { - log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])). - Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + for _, stage := range stages { + for _, op := range []struct { + kind string + data [][]contracts.Entity + streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error + }{ + {"INSERT IGNORE", stage.inserts, idb.CreateIgnoreStreamed}, + {"UPSERT", stage.upserts, idb.UpsertStreamed}, + } { + for _, table := range op.data { + if len(table) < 1 { + continue + } + + ch := make(chan contracts.Entity, len(table)) + for _, row := range table { + ch <- row + } + + close(ch) + + if err := op.streamer(context.Background(), ch); err != nil { + log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])). + Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + } } } } diff --git a/cmd/icingadb-migrate/misc.go b/cmd/icingadb-migrate/misc.go index c228f3a8e..c1130a975 100644 --- a/cmd/icingadb-migrate/misc.go +++ b/cmd/icingadb-migrate/misc.go @@ -237,6 +237,10 @@ func (hts historyTypes) forEach(f func(*historyType)) { _ = eg.Wait() } +type icingaDbOutputStage struct { + inserts, upserts [][]contracts.Entity +} + var types = historyTypes{ { name: "ack & comment",