diff --git a/.github/workflows/tests_with_database.yml b/.github/workflows/tests_with_database.yml new file mode 100644 index 000000000..2426a2f2e --- /dev/null +++ b/.github/workflows/tests_with_database.yml @@ -0,0 +1,70 @@ +name: Tests with database + +on: + push: + branches: + - main + pull_request: {} + +jobs: + postgresql: + name: ${{ matrix.database.name }} + runs-on: ubuntu-latest + + strategy: + fail-fast: false + matrix: + database: + - {name: PostgreSQL 9.6, image: "postgres:9.6"} + - {name: PostgreSQL 10, image: "postgres:10"} + - {name: PostgreSQL 11, image: "postgres:11"} + - {name: PostgreSQL 12, image: "postgres:12"} + - {name: PostgreSQL 13, image: "postgres:13"} + - {name: PostgreSQL 14, image: "postgres:14"} + - {name: PostgreSQL 15, image: "postgres:15"} + - {name: PostgreSQL latest, image: "postgres:latest"} + + env: + NOTIFICATIONS_TESTS_DB_TYPE: pgsql + NOTIFICATIONS_TESTS_DB: notifications + NOTIFICATIONS_TESTS_DB_USER: postgres + NOTIFICATIONS_TESTS_DB_PASSWORD: notifications + NOTIFICATIONS_TESTS_DB_HOST: 127.0.0.1 + NOTIFICATIONS_TESTS_DB_PORT: 5432 + + services: + postgres: + image: ${{ matrix.database.image }} + env: + POSTGRES_PASSWORD: ${{ env.NOTIFICATIONS_TESTS_DB_PASSWORD }} + POSTGRES_DB: ${{ env.NOTIFICATIONS_TESTS_DB }} + # Wait until postgres becomes ready + options: >- + --health-cmd pg_isready + --health-interval 10s + --health-timeout 5s + --health-retries 5 + ports: + - 5432:5432 + + steps: + - name: Setup Go + uses: actions/setup-go@v4 + with: + go-version: stable + + - name: Checkout code + uses: actions/checkout@v4 + + - name: Importing Schema + env: + PGPASSWORD: ${{ env.NOTIFICATIONS_TESTS_DB_PASSWORD }} + run: | + psql -U postgres -w -h 127.0.0.1 -d ${{ env.NOTIFICATIONS_TESTS_DB }} < ${{ github.workspace }}/schema/pgsql/schema.sql + + - name: Download dependencies + run: go get -v -t -d ./... + + - name: Run tests + timeout-minutes: 10 + run: go test -v -timeout 5m ./... diff --git a/internal/incident/restore_incidents_test.go b/internal/incident/restore_incidents_test.go new file mode 100644 index 000000000..f633fd10d --- /dev/null +++ b/internal/incident/restore_incidents_test.go @@ -0,0 +1,223 @@ +package incident + +import ( + "context" + "crypto/rand" + "fmt" + "github.com/creasty/defaults" + "github.com/icinga/icinga-notifications/internal/config" + "github.com/icinga/icinga-notifications/internal/event" + "github.com/icinga/icinga-notifications/internal/object" + baseConfig "github.com/icinga/icingadb/pkg/config" + "github.com/icinga/icingadb/pkg/icingadb" + "github.com/icinga/icingadb/pkg/logging" + "github.com/icinga/icingadb/pkg/types" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.uber.org/zap/zaptest" + "os" + "strconv" + "strings" + "testing" + "time" +) + +func TestIncidentLoading(t *testing.T) { + ctx := context.Background() + db := getTestDB(ctx, t) + + // Insert a dummy sources for our test cases! + source := config.Source{ID: 1, Type: "notifications", Name: "Icinga Notifications", Icinga2InsecureTLS: types.Bool{Bool: false, Valid: true}} + stmt, _ := db.BuildInsertStmt(source) + _, err := db.NamedExecContext(ctx, stmt, source) + require.NoError(t, err, "populating source table should not fail") + + // Reduce the default placeholders per statement to a meaningful number, so that we can + // test some parallelism when loading the incidents. + db.Options.MaxPlaceholdersPerStatement = 100 + + // Due to the 10*maxPlaceholders constraint, only 10 goroutines are going to process simultaneously. + // Therefore, reduce the default maximum number of connections per table to 4 in order to fully simulate + // semaphore lock wait cycles for a given table. + db.Options.MaxConnectionsPerTable = 4 + + testData := make(map[string]*Incident, 10*db.Options.MaxPlaceholdersPerStatement) + for j := 1; j <= 10*db.Options.MaxPlaceholdersPerStatement; j++ { + i := makeIncident(ctx, db, t, false) + testData[i.ObjectID.String()] = i + } + + t.Run("WithNoRecoveredIncidents", func(t *testing.T) { + assertIncidents(ctx, db, t, testData) + }) + + t.Run("WithSomeRecoveredIncidents", func(t *testing.T) { + tx, err := db.BeginTxx(ctx, nil) + require.NoError(t, err, "starting a transaction should not fail") + + // Drop all cached incidents before re-loading them! + for _, i := range GetCurrentIncidents() { + RemoveCurrent(i.Object) + + // Mark some of the existing incidents as recovered. + if i.Id%20 == 0 { // 1000 / 20 => 50 existing incidents will be marked as recovered! + i.RecoveredAt = types.UnixMilli(time.Now()) + + require.NoError(t, i.Sync(ctx, tx), "failed to update/insert incident") + + // Drop it from our test data as it's recovered + delete(testData, i.ObjectID.String()) + } + } + require.NoError(t, tx.Commit(), "committing a transaction should not fail") + + assert.Len(t, GetCurrentIncidents(), 0, "there should be no cached incidents") + + for j := 1; j <= db.Options.MaxPlaceholdersPerStatement/2; j++ { + // We don't need to cache recovered incidents in memory. + _ = makeIncident(ctx, db, t, true) + + if j%2 == 0 { + // Add some extra new not recovered incidents to fully simulate a daemon reload. + i := makeIncident(ctx, db, t, false) + testData[i.ObjectID.String()] = i + } + } + + assertIncidents(ctx, db, t, testData) + }) +} + +// assertIncidents restores all not recovered incidents from the database and asserts them based on the given testData. +// +// The incident loading process is limited to a maximum duration of 10 seconds and will be +// aborted and causes the entire test suite to fail immediately, if it takes longer. +func assertIncidents(ctx context.Context, db *icingadb.DB, t *testing.T, testData map[string]*Incident) { + logger := logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Hour) + + // Since we have been using object.FromEvent() to persist the test objects to the database, + // these will be automatically added to the objects cache as well. So clear the cache before + // reloading the incidents, otherwise it will panic in object.RestoreObjects(). + object.ClearCache() + + // The incident loading process may hang due to unknown bugs or semaphore lock waits. + // Therefore, give it maximum time of 10s to finish normally, otherwise give up and fail. + ctx, cancelFunc := context.WithDeadline(ctx, time.Now().Add(10*time.Second)) + defer cancelFunc() + + err := LoadOpenIncidents(ctx, db, logger, &config.RuntimeConfig{}) + require.NoError(t, err, "failed to load not recovered incidents") + + incidents := GetCurrentIncidents() + assert.Len(t, incidents, len(testData), "failed to load all active incidents") + + for _, current := range incidents { + i := testData[current.ObjectID.String()] + assert.NotNilf(t, i, "found mysterious incident that's not part of our test data") + assert.NotNil(t, current.Object, "failed to restore incident object") + + if i != nil { + assert.Equal(t, i.Id, current.Id, "incidents linked to the same object don't have the same ID") + assert.Equal(t, i.Severity, current.Severity, "failed to restore incident severity") + assert.Equal(t, i.StartedAt, current.StartedAt, "failed to restore incident started at") + assert.Equal(t, i.RecoveredAt, current.RecoveredAt, "failed to restore incident recovered at") + + assert.NotNil(t, current.EscalationState, "incident escalation state map should've initialised") + assert.NotNil(t, current.Recipients, "incident recipients map should've initialised") + assert.NotNil(t, current.Rules, "incident rules map should've initialised") + + if current.Object != nil { + assert.Equal(t, i.Object, current.Object, "failed to fully restore incident") + } + } + } +} + +// makeIncident returns a fully initialised recovered/un-recovered incident. +// +// This will firstly create and synchronise a new object from a freshly generated dummy event with distinct +// tags and name, and ensures that no error is returned, otherwise it will cause the entire test suite to fail. +// Once the object has been successfully synchronised, an incident is created and synced with the database. +func makeIncident(ctx context.Context, db *icingadb.DB, t *testing.T, recovered bool) *Incident { + ev := &event.Event{ + Time: time.Time{}, + SourceId: 1, + Name: makeRandomString(t), + Tags: map[string]string{ // Always generate unique object tags not to produce same object ID! + "host": makeRandomString(t), + "service": makeRandomString(t), + }, + ExtraTags: map[string]string{ + "hostgroup/database-server": "", + "servicegroup/webserver": "", + }, + } + + o, err := object.FromEvent(ctx, db, ev) + require.NoError(t, err) + + i := NewIncident(db, o, &config.RuntimeConfig{}, nil) + i.StartedAt = types.UnixMilli(time.Unix(time.Now().Add(-2*time.Hour).Unix(), 0)) + i.Severity = event.SeverityCrit + if recovered { + i.Severity = event.SeverityOK + i.RecoveredAt = types.UnixMilli(time.Now()) + } + + tx, err := db.BeginTxx(ctx, nil) + require.NoError(t, err, "starting a transaction should not fail") + require.NoError(t, i.Sync(ctx, tx), "failed to insert incident") + require.NoError(t, tx.Commit(), "committing a transaction should not fail") + + return i +} + +func makeRandomString(t *testing.T) string { + buf := make([]byte, 20) + _, err := rand.Read(buf) + require.NoError(t, err, "failed to generate random string") + + return fmt.Sprintf("%x", buf) +} + +// getTestDB retrieves the database config from env variables, opens a new database and returns it. +// +// The test suite will be skipped if no environment variable is set, otherwise fails fatally when +// invalid configurations are specified. +func getTestDB(ctx context.Context, t *testing.T) *icingadb.DB { + c := &baseConfig.Database{} + require.NoError(t, defaults.Set(c), "applying config default should not fail") + + if v, ok := os.LookupEnv("NOTIFICATIONS_TESTS_DB_TYPE"); ok { + c.Type = strings.ToLower(v) + } else { + t.Skipf("Environment %q not set, skipping test!", "NOTIFICATIONS_TESTS_DB_TYPE") + } + + if v, ok := os.LookupEnv("NOTIFICATIONS_TESTS_DB"); ok { + c.Database = v + } + if v, ok := os.LookupEnv("NOTIFICATIONS_TESTS_DB_USER"); ok { + c.User = v + } + if v, ok := os.LookupEnv("NOTIFICATIONS_TESTS_DB_PASSWORD"); ok { + c.Password = v + } + if v, ok := os.LookupEnv("NOTIFICATIONS_TESTS_DB_HOST"); ok { + c.Host = v + } + if v, ok := os.LookupEnv("NOTIFICATIONS_TESTS_DB_PORT"); ok { + port, err := strconv.Atoi(v) + require.NoError(t, err, "invalid port provided") + + c.Port = port + } + + require.NoError(t, c.Validate(), "database config validation should not fail") + + db, err := c.Open(logging.NewLogger(zaptest.NewLogger(t).Sugar(), time.Hour)) + require.NoError(t, err, "connecting to database should not fail") + require.NoError(t, db.PingContext(ctx), "pinging the database should not fail") + + return db +} diff --git a/internal/object/object.go b/internal/object/object.go index e74ca18de..e7c936a7a 100644 --- a/internal/object/object.go +++ b/internal/object/object.go @@ -57,6 +57,15 @@ func GetFromCache(id types.Binary) *Object { return cache[id.String()] } +// ClearCache clears the global object cache store. +// Note, this is only used for unit tests not to run into "can't cache already cached object" error. +func ClearCache() { + cacheMu.Lock() + defer cacheMu.Unlock() + + cache = make(map[string]*Object) +} + // RestoreObjects restores all objects and their (extra)tags matching the given IDs from the database. // Returns error on any database failures and panics when trying to cache an object that's already in the cache store. func RestoreObjects(ctx context.Context, db *icingadb.DB, ids []types.Binary) error {