-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathaggregator.go
591 lines (483 loc) · 17.1 KB
/
aggregator.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
/*
Copyright © 2021, 2022, 2023 Red Hat, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Entry point to the insights results aggregator service.
//
// The service contains consumer (usually Kafka consumer) that consumes
// messages from given source, processes those messages and stores them
// in configured data store. It also starts REST API servers with
// endpoints that expose several types of information: list of organizations,
// list of clusters for given organization, and cluster health.
package main
import (
"context"
"database/sql"
"encoding/json"
"fmt"
"os"
"os/signal"
"strconv"
"strings"
"syscall"
"github.com/RedHatInsights/insights-operator-utils/logger"
"github.com/rs/zerolog/log"
"golang.org/x/sync/errgroup"
"github.com/RedHatInsights/insights-results-aggregator/conf"
"github.com/RedHatInsights/insights-results-aggregator/metrics"
"github.com/RedHatInsights/insights-results-aggregator/migration"
"github.com/RedHatInsights/insights-results-aggregator/storage"
"github.com/RedHatInsights/insights-results-aggregator/types"
)
// Exit codes
const (
// ExitStatusOK means that the tool finished with success
ExitStatusOK = iota
// ExitStatusError is a general error code
ExitStatusError
// ExitStatusPrepareDbError is returned when the DB preparation (including rule content loading) fails
ExitStatusPrepareDbError
// ExitStatusConsumerError is returned in case of any consumer-related error
ExitStatusConsumerError
// ExitStatusServerError is returned in case of any REST API server-related error
ExitStatusServerError
// ExitStatusMigrationError is returned in case of an error while attempting to perform DB migrations
ExitStatusMigrationError
)
// Messages
const (
databasePreparationMessage = "database preparation exited with error code %v"
)
// Other constants
const (
defaultConfigFilename = "config"
typeStr = "type"
)
var (
// BuildVersion contains the major.minor version of the CLI client.
// It is set up during build process.
BuildVersion = "*not set*"
// BuildTime contains timestamp when the CLI client has been built.
// It is set up during build process.
BuildTime = "*not set*"
// BuildBranch contains Git branch used to build this application.
// It is set up during build process.
BuildBranch = "*not set*"
// BuildCommit contains Git commit used to build this application.
// It is set up during build process.
BuildCommit = "*not set*"
// UtilsVersion contains currently used version of
// github.com/RedHatInsights/insights-operator-utils package
UtilsVersion = "*not set*"
// autoMigrate determines if the prepareDB function upgrades
// the database to the latest migration version. This is necessary
// for unit tests that work with an empty DB.
autoMigrate = false
)
// fillInInfoParams function fills-in additional info used by /info endpoint
// handler
func fillInInfoParams(params map[string]string) {
params["BuildVersion"] = BuildVersion
params["BuildTime"] = BuildTime
params["BuildBranch"] = BuildBranch
params["BuildCommit"] = BuildCommit
params["UtilsVersion"] = UtilsVersion
}
// createStorage function initializes connection to preconfigured storage,
// usually PostgreSQL or AWS RDS.
func createStorage() (storage.OCPRecommendationsStorage, storage.DVORecommendationsStorage, error) {
ocpStorageCfg := conf.GetOCPRecommendationsStorageConfiguration()
// Redis configuration needs to be present in ocpStorageCfg, as the connection is created in the same function
ocpStorageCfg.RedisConfiguration = conf.GetRedisConfiguration()
dvoStorageCfg := conf.GetDVORecommendationsStorageConfiguration()
var ocpStorage storage.OCPRecommendationsStorage
var dvoStorage storage.DVORecommendationsStorage
var err error
// create any storage we have configured
if ocpStorageCfg.Type != "" {
ocpStorage, err = storage.NewOCPRecommendationsStorage(ocpStorageCfg)
if err != nil {
log.Error().Err(err).Msg("storage.NewOCPRecommendationsStorage")
return nil, nil, err
}
}
if dvoStorageCfg.Type != "" {
dvoStorage, err = storage.NewDVORecommendationsStorage(dvoStorageCfg)
if err != nil {
log.Error().Err(err).Msg("storage.NewDVORecommendationsStorage")
return nil, nil, err
}
}
return ocpStorage, dvoStorage, nil
}
// closeStorage function closes specified DBStorage with proper error checking
// whether the close operation was successful or not.
func closeStorage(storage storage.Storage) {
if storage == nil {
return
}
err := storage.Close()
if err != nil {
// TODO: error state might be returned from this function
log.Error().Err(err).Msg("Error during closing storage connection")
}
}
// prepareDBMigrations function checks the actual database version and when
// autoMigrate is set performs migration to the latest schema version
// available.
func prepareDBMigrations(dbStorage storage.Storage) int {
driverType := dbStorage.GetDBDriverType()
if driverType != types.DBDriverPostgres {
log.Info().Msg("Skipping migration for non-SQL database type")
return ExitStatusOK
}
dbConn, dbSchema := dbStorage.GetConnection(), dbStorage.GetDBSchema()
// ensure DB schema exists
if err := migration.InitDBSchema(dbConn, dbSchema); err != nil {
closeStorage(dbStorage)
log.Error().Err(err).Msg("Unable to initialize DB schema")
return ExitStatusPrepareDbError
}
log.Debug().Msgf("%v DB schema found", dbSchema)
// This is only used by some unit tests.
if autoMigrate {
if err := dbStorage.MigrateToLatest(); err != nil {
log.Error().Err(err).Msg("unable to migrate DB to latest version")
return ExitStatusPrepareDbError
}
} else {
currentVersion, err := migration.GetDBVersion(dbStorage.GetConnection(), dbStorage.GetDBSchema())
if err != nil {
log.Error().Err(err).Msg("unable to check DB migration version")
return ExitStatusPrepareDbError
}
log.Debug().Msgf("%v DB schema current migration %v", dbSchema, currentVersion)
maxVersion := dbStorage.GetMaxVersion()
if currentVersion != maxVersion {
log.Error().Msgf("old DB migration version (current: %d, latest: %d)", currentVersion, maxVersion)
return ExitStatusPrepareDbError
}
log.Debug().Msgf("%v DB schema maximum migration %v", dbSchema, maxVersion)
}
return ExitStatusOK
}
// prepareDB function opens a connection to database and loads all available
// rule content into it.
func prepareDB() int {
// TODO: when aggregator supports both storages at once, update the code below
// task to support both storages at once: https://issues.redhat.com/browse/CCXDEV-12316
ocpRecommendationsStorage, dvoRecommendationsStorage, err := createStorage()
if err != nil {
log.Error().Err(err).Msg("Error creating storage")
return ExitStatusPrepareDbError
}
if ocpRecommendationsStorage != nil {
log.Debug().Msg("checking OCP storage migrations")
defer closeStorage(ocpRecommendationsStorage)
// Ensure that the DB is at the latest migration version.
if exitCode := prepareDBMigrations(ocpRecommendationsStorage); exitCode != ExitStatusOK {
return exitCode
}
// do not initialize last_checked_at map if we're running as dvo-writer
if conf.GetStorageBackendConfiguration().Use != types.DVORecommendationsStorage {
// Initialize the database.
err = ocpRecommendationsStorage.Init()
if err != nil {
log.Error().Err(err).Msg("DB initialization error")
return ExitStatusPrepareDbError
}
}
// temporarily print some information from DB because of limited access to DB
ocpRecommendationsStorage.PrintRuleDisableDebugInfo()
}
if dvoRecommendationsStorage != nil {
log.Debug().Msg("checking DVO storage migrations")
defer closeStorage(dvoRecommendationsStorage)
// Ensure that the DB is at the latest migration version.
if exitCode := prepareDBMigrations(dvoRecommendationsStorage); exitCode != ExitStatusOK {
return exitCode
}
}
return ExitStatusOK
}
// startService function starts service and returns error code in case the
// service can't be started properly. If service is terminated correctly,
// ExitStatusOK is returned instead.
func startService() int {
metricsCfg := conf.GetMetricsConfiguration()
if metricsCfg.Namespace != "" {
metrics.AddMetricsWithNamespace(metricsCfg.Namespace)
}
prepDbExitCode := prepareDB()
if prepDbExitCode != ExitStatusOK {
log.Info().Msgf(databasePreparationMessage, prepDbExitCode)
return prepDbExitCode
}
log.Debug().Msg("DB initialized")
ctx, cancel := context.WithCancel(context.Background())
errorGroup := new(errgroup.Group)
brokerConf := conf.GetBrokerConfiguration()
// if broker is disabled, simply don't start it
if brokerConf.Enabled {
errorGroup.Go(func() error {
defer cancel()
err := startConsumer(brokerConf)
if err != nil {
log.Error().Err(err).Msg("Consumer start failure")
return err
}
return nil
})
} else {
log.Info().Msg("Broker is disabled, not starting it")
}
errorGroup.Go(func() error {
defer cancel()
err := startServer()
if err != nil {
log.Error().Err(err).Msg("Server start failure")
return err
}
return nil
})
// it's gonna finish when either of goroutines finishes or fails
<-ctx.Done()
if errCode := stopService(); errCode != ExitStatusOK {
return errCode
}
if err := errorGroup.Wait(); err != nil {
// no need to log the error here since it's an error of the first failed goroutine
return ExitStatusError
}
return ExitStatusOK
}
// stopService function stops the service and return error code indicating
// service status.
func stopService() int {
errCode := ExitStatusOK
err := stopServer()
if err != nil {
log.Error().Err(err).Msg("Server stop failure")
errCode += ExitStatusServerError
}
brokerConf := conf.GetBrokerConfiguration()
if brokerConf.Enabled {
err = stopConsumer()
if err != nil {
log.Error().Err(err).Msg("Consumer stop failure")
errCode += ExitStatusConsumerError
}
}
return errCode
}
// initInfoLog is helper function to print value of one string parameter to
// logs.
func initInfoLog(msg string) {
log.Info().Str(typeStr, "init").Msg(msg)
}
// printVersionInfo function prints basic information about service version
// into log file.
func printVersionInfo() {
initInfoLog("Version: " + BuildVersion)
initInfoLog("Build time: " + BuildTime)
initInfoLog("Branch: " + BuildBranch)
initInfoLog("Commit: " + BuildCommit)
initInfoLog("Utils version:" + UtilsVersion)
}
const helpMessageTemplate = `
Aggregator service for insights results
Usage:
%+v [command]
The commands are:
<EMPTY> starts aggregator
start-service starts aggregator
help prints help
print-help prints help
print-config prints current configuration set by files & env variables
print-env prints env variables
print-version-info prints version info
migration prints information about migrations (current, latest)
migration <version> migrates database to the specified version
`
// printHelp function prints help to the standard output.
func printHelp() int {
fmt.Printf(helpMessageTemplate, os.Args[0])
return ExitStatusOK
}
// printConfig function prints the actual service configuration to the standard
// output.
func printConfig() int {
configBytes, err := json.MarshalIndent(conf.Config, "", " ")
if err != nil {
log.Error().Err(err).Msg("printConfig: marshall config failure")
return 1
}
fmt.Println(string(configBytes))
return ExitStatusOK
}
// printEnv function prints all environment variables to the standard output.
func printEnv() int {
for _, keyVal := range os.Environ() {
fmt.Println(keyVal)
}
return ExitStatusOK
}
// getDBForMigrations function opens a DB connection and prepares the DB for
// migrations. Non-OK exit code is returned as the last return value in case
// of an error. Otherwise, database and connection pointers are returned.
func getDBForMigrations() (storage.Storage, *sql.DB, int) {
// use OCP recommendations storage only, unless migrations will be available for other storage(s) too
var db storage.Storage
ocpStorage, dvoStorage, err := createStorage()
if err != nil {
log.Error().Err(err).Msg("Unable to prepare DB for migrations")
return nil, nil, ExitStatusPrepareDbError
}
// migrations are allowed only if a single storage backend is selected
backend := conf.GetStorageBackendConfiguration().Use
switch backend {
case types.OCPRecommendationsStorage:
db = ocpStorage
case types.DVORecommendationsStorage:
db = dvoStorage
default:
log.Error().Msg("storage backend does not support database migrations")
return nil, nil, ExitStatusMigrationError
}
dbConn := db.GetConnection()
// ensure DB schema is created
if err := migration.InitDBSchema(dbConn, db.GetDBSchema()); err != nil {
closeStorage(db)
log.Error().Err(err).Msg("Unable to initialize DB schema")
return nil, nil, ExitStatusPrepareDbError
}
if err := migration.InitInfoTable(dbConn, db.GetDBSchema()); err != nil {
closeStorage(db)
log.Error().Err(err).Msg("Unable to initialize migration info table")
return nil, nil, ExitStatusPrepareDbError
}
return db, dbConn, ExitStatusOK
}
// printMigrationInfo function prints information about current DB migration
// version without making any modifications.
func printMigrationInfo(storage storage.Storage, dbConn *sql.DB) int {
currMigVer, err := migration.GetDBVersion(dbConn, storage.GetDBSchema())
if err != nil {
log.Error().Err(err).Msg("Unable to get current DB version")
return ExitStatusMigrationError
}
log.Info().Msgf("Current DB version: %d", currMigVer)
log.Info().Msgf("Maximum available version: %d", storage.GetMaxVersion())
return ExitStatusOK
}
// setMigrationVersion function attempts to migrate the DB to the target
// version.
func setMigrationVersion(db storage.Storage, dbConn *sql.DB, versStr string) int {
var targetVersion migration.Version
if versStrLower := strings.ToLower(versStr); versStrLower == "latest" || versStrLower == "max" {
targetVersion = db.GetMaxVersion()
} else {
vers, err := strconv.ParseUint(versStr, 10, 64)
if err != nil {
log.Error().Err(err).Msg("Unable to parse target migration version")
return ExitStatusMigrationError
}
targetVersion = migration.Version(vers)
}
if err := migration.SetDBVersion(dbConn, db.GetDBDriverType(), db.GetDBSchema(), targetVersion, db.GetMigrations()); err != nil {
log.Error().Err(err).Msg("Unable to perform migration")
return ExitStatusMigrationError
}
log.Info().Msgf("Database version is now %d", targetVersion)
return ExitStatusOK
}
// performMigrations function handles migrations subcommand. This can be used
// to either print the current DB migration version or to migrate to a
// different version.
func performMigrations() int {
migrationArgs := os.Args[2:]
db, dbConn, exitCode := getDBForMigrations()
if exitCode != ExitStatusOK {
return exitCode
}
defer closeStorage(db)
switch len(migrationArgs) {
case 0:
return printMigrationInfo(db, dbConn)
case 1:
return setMigrationVersion(db, dbConn, migrationArgs[0])
default:
log.Error().Msg("Unexpected number of arguments to migrations command (expected 0-1)")
return ExitStatusMigrationError
}
}
func stopServiceOnProcessStopSignal() {
signals := make(chan os.Signal, 1)
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-signals
fmt.Println("SIGINT or SIGTERM was sent, stopping the service...")
errCode := stopService()
if errCode != 0 {
log.Error().Msgf("unable to stop the service, code is %v", errCode)
os.Exit(errCode)
}
}()
}
// main function represents entry point to the service.
func main() {
err := conf.LoadConfiguration(defaultConfigFilename)
if err != nil {
panic(err)
}
err = logger.InitZerolog(
conf.GetLoggingConfiguration(),
conf.GetCloudWatchConfiguration(),
conf.GetSentryLoggingConfiguration(),
conf.GetKafkaZerologConfiguration(),
)
if err != nil {
log.Error().Err(err).Msg("Unable to init ZeroLog")
}
command := "start-service"
if len(os.Args) >= 2 {
command = strings.ToLower(strings.TrimSpace(os.Args[1]))
}
errCode := handleCommand(command)
if errCode != 0 {
log.Error().Msgf("Service exited with non-zero code %v", errCode)
os.Exit(errCode)
}
}
// handleCommand function recognizes command provided via CLI and call the
// relevant code. Unknown commands are handled properly.
func handleCommand(command string) int {
switch command {
case "start-service":
printVersionInfo()
stopServiceOnProcessStopSignal()
return startService()
case "help", "print-help":
return printHelp()
case "print-config":
return printConfig()
case "print-env":
return printEnv()
case "print-version-info":
printVersionInfo()
case "migrations", "migration", "migrate":
return performMigrations()
default:
fmt.Printf("\nCommand '%v' not found\n", command)
return printHelp()
}
return ExitStatusOK
}