From 71c1d2fa4dbb8c3042edf2ac51265f21159a6e3e Mon Sep 17 00:00:00 2001 From: Julian Brost Date: Wed, 26 Jul 2023 15:46:22 +0200 Subject: [PATCH] Migration: refactor output/processing of converted entities This commit simplifies the `icingaDbOutputStage` type to contain only one entity slice to be insert/upsert. This allows to simplify the handling in `migrateOneType()` by removing nested loops. Additionally, a bit of code inside that function is outsourced into a new `utils.ChanFromSlice()` function. This makes the body of the loop over the insert/upsert operation (the loop using the `op` variable) simple enough so that it can just be unrolled which saves the inline struct and slice definition for that loop. --- cmd/icingadb-migrate/convert.go | 37 ++++++++++++++-------- cmd/icingadb-migrate/main.go | 40 +++++++++--------------- cmd/icingadb-migrate/misc.go | 2 +- pkg/utils/utils.go | 13 ++++++++ pkg/utils/utils_test.go | 54 +++++++++++++++++++++++++++++++++ 5 files changed, 107 insertions(+), 39 deletions(-) create mode 100644 pkg/utils/utils_test.go diff --git a/cmd/icingadb-migrate/convert.go b/cmd/icingadb-migrate/convert.go index 1d5dd5bad..b5ed47a88 100644 --- a/cmd/icingadb-migrate/convert.go +++ b/cmd/icingadb-migrate/convert.go @@ -207,9 +207,12 @@ func convertCommentRows( } } - stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{ - commentHistory, acknowledgementHistory, allHistoryComment, allHistoryAck, - }}} + stages = []icingaDbOutputStage{ + {insert: commentHistory}, + {insert: acknowledgementHistory}, + {insert: allHistoryComment}, + {insert: allHistoryAck}, + } return } @@ -359,7 +362,11 @@ func convertDowntimeRows( sla = append(sla, s) } - stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{downtimeHistory, allHistory, sla}}} + stages = []icingaDbOutputStage{ + {insert: downtimeHistory}, + {insert: allHistory}, + {insert: sla}, + } return } @@ -509,11 +516,9 @@ func convertFlappingRows( } stages = []icingaDbOutputStage{ - { - inserts: [][]contracts.Entity{flappingHistory}, - upserts: [][]contracts.Entity{flappingHistoryUpserts}, - }, - {inserts: [][]contracts.Entity{allHistory}}, + {insert: flappingHistory}, + {upsert: flappingHistoryUpserts}, + {insert: allHistory}, } return } @@ -672,9 +677,11 @@ func convertNotificationRows( } } - stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{ - notificationHistory, userNotificationHistory, allHistory, - }}} + stages = []icingaDbOutputStage{ + {insert: notificationHistory}, + {insert: userNotificationHistory}, + {insert: allHistory}, + } return } @@ -828,6 +835,10 @@ func convertStateRows( } } - stages = []icingaDbOutputStage{{inserts: [][]contracts.Entity{stateHistory, allHistory, sla}}} + stages = []icingaDbOutputStage{ + {insert: stateHistory}, + {insert: allHistory}, + {insert: sla}, + } return } diff --git a/cmd/icingadb-migrate/main.go b/cmd/icingadb-migrate/main.go index b567db756..0bb219106 100644 --- a/cmd/icingadb-migrate/main.go +++ b/cmd/icingadb-migrate/main.go @@ -9,7 +9,6 @@ import ( "github.com/creasty/defaults" "github.com/goccy/go-yaml" "github.com/icinga/icingadb/pkg/config" - "github.com/icinga/icingadb/pkg/contracts" "github.com/icinga/icingadb/pkg/icingadb" "github.com/icinga/icingadb/pkg/logging" icingadbTypes "github.com/icinga/icingadb/pkg/types" @@ -428,30 +427,21 @@ func migrateOneType[IdoRow any]( // ... and insert them: for _, stage := range stages { - for _, op := range []struct { - kind string - data [][]contracts.Entity - streamer func(context.Context, <-chan contracts.Entity, ...icingadb.OnSuccess[contracts.Entity]) error - }{ - {"INSERT IGNORE", stage.inserts, idb.CreateIgnoreStreamed}, - {"UPSERT", stage.upserts, idb.UpsertStreamed}, - } { - for _, table := range op.data { - if len(table) < 1 { - continue - } - - ch := make(chan contracts.Entity, len(table)) - for _, row := range table { - ch <- row - } - - close(ch) - - if err := op.streamer(context.Background(), ch); err != nil { - log.With("backend", "Icinga DB", "op", op.kind, "table", utils.TableName(table[0])). - Fatalf("%+v", errors.Wrap(err, "can't perform DML")) - } + if len(stage.insert) > 0 { + ch := utils.ChanFromSlice(stage.insert) + + if err := idb.CreateIgnoreStreamed(context.Background(), ch); err != nil { + log.With("backend", "Icinga DB", "op", "INSERT IGNORE", "table", utils.TableName(stage.insert[0])). + Fatalf("%+v", errors.Wrap(err, "can't perform DML")) + } + } + + if len(stage.upsert) > 0 { + ch := utils.ChanFromSlice(stage.upsert) + + if err := idb.UpsertStreamed(context.Background(), ch); err != nil { + log.With("backend", "Icinga DB", "op", "UPSERT", "table", utils.TableName(stage.upsert[0])). + Fatalf("%+v", errors.Wrap(err, "can't perform DML")) } } } diff --git a/cmd/icingadb-migrate/misc.go b/cmd/icingadb-migrate/misc.go index c1130a975..f1db20cbe 100644 --- a/cmd/icingadb-migrate/misc.go +++ b/cmd/icingadb-migrate/misc.go @@ -238,7 +238,7 @@ func (hts historyTypes) forEach(f func(*historyType)) { } type icingaDbOutputStage struct { - inserts, upserts [][]contracts.Entity + insert, upsert []contracts.Entity } var types = historyTypes{ diff --git a/pkg/utils/utils.go b/pkg/utils/utils.go index b5a963f9a..8ccf29b33 100644 --- a/pkg/utils/utils.go +++ b/pkg/utils/utils.go @@ -216,3 +216,16 @@ func JoinHostPort(host string, port int) string { return net.JoinHostPort(host, fmt.Sprint(port)) } + +// ChanFromSlice takes a slice of values and returns a channel from which these values can be received. +// This channel is closed after the last value was sent. +func ChanFromSlice[T any](values []T) <-chan T { + ch := make(chan T, len(values)) + for _, value := range values { + ch <- value + } + + close(ch) + + return ch +} diff --git a/pkg/utils/utils_test.go b/pkg/utils/utils_test.go new file mode 100644 index 000000000..b0ea54b8f --- /dev/null +++ b/pkg/utils/utils_test.go @@ -0,0 +1,54 @@ +package utils + +import ( + "github.com/stretchr/testify/require" + "testing" +) + +func TestChanFromSlice(t *testing.T) { + t.Run("Nil", func(t *testing.T) { + ch := ChanFromSlice[int](nil) + require.NotNil(t, ch) + requireClosedEmpty(t, ch) + }) + + t.Run("Empty", func(t *testing.T) { + ch := ChanFromSlice([]int{}) + require.NotNil(t, ch) + requireClosedEmpty(t, ch) + }) + + t.Run("NonEmpty", func(t *testing.T) { + ch := ChanFromSlice([]int{42, 23, 1337}) + require.NotNil(t, ch) + requireReceive(t, ch, 42) + requireReceive(t, ch, 23) + requireReceive(t, ch, 1337) + requireClosedEmpty(t, ch) + }) +} + +// requireReceive is a helper function to check if a value can immediately be received from a channel. +func requireReceive(t *testing.T, ch <-chan int, expected int) { + t.Helper() + + select { + case v, ok := <-ch: + require.True(t, ok, "receiving should return a value") + require.Equal(t, expected, v) + default: + require.Fail(t, "receiving should not block") + } +} + +// requireReceive is a helper function to check if the channel is closed and empty. +func requireClosedEmpty(t *testing.T, ch <-chan int) { + t.Helper() + + select { + case _, ok := <-ch: + require.False(t, ok, "receiving from channel should not return anything") + default: + require.Fail(t, "receiving should not block") + } +}