Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reuse existing database connections #596

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion input/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func CollectFull(ctx context.Context, server *state.Server, connection *sql.DB,
logger.PrintError("Error collecting pg_stat_statements: %s", err)
var e *pq.Error
if errors.As(err, &e) && e.Code == "55000" && globalCollectionOpts.TestRun { // object_not_in_prerequisite_state
shared_preload_libraries, _ := postgres.GetPostgresSetting(ctx, "shared_preload_libraries", server, globalCollectionOpts, logger)
shared_preload_libraries, _ := postgres.GetPostgresSetting(ctx, connection, "shared_preload_libraries", server, globalCollectionOpts, logger)
logger.PrintInfo("HINT - Current shared_preload_libraries setting: '%s'. Your Postgres server may need to be restarted for changes to take effect.", shared_preload_libraries)
server.SelfTest.HintCollectionAspect(state.CollectionAspectPgStatStatements, "Current shared_preload_libraries setting: '%s'. Your Postgres server may need to be restarted for changes to take effect.", shared_preload_libraries)
}
Expand Down
8 changes: 2 additions & 6 deletions input/postgres/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,13 +72,9 @@ SELECT setting
FROM pg_settings
WHERE name = '%s'`

func GetPostgresSetting(ctx context.Context, settingName string, server *state.Server, globalCollectionOpts state.CollectionOpts, prefixedLogger *util.Logger) (string, error) {
func GetPostgresSetting(ctx context.Context, db *sql.DB, settingName string, server *state.Server, globalCollectionOpts state.CollectionOpts, prefixedLogger *util.Logger) (string, error) {
var value string

db, err := EstablishConnection(ctx, server, prefixedLogger, globalCollectionOpts, "")
if err != nil {
return "", fmt.Errorf("Could not connect to database to retrieve \"%s\": %s", settingName, err)
}
var err error

err = db.QueryRow(QueryMarkerSQL + fmt.Sprintf(settingValueSQL, settingName)).Scan(&value)
db.Close()
Expand Down
22 changes: 12 additions & 10 deletions input/system/selfhosted/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package selfhosted
import (
"bufio"
"context"
"database/sql"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -39,15 +40,10 @@ SELECT setting
FROM pg_settings
WHERE name = '%s'`

func getPostgresSetting(ctx context.Context, settingName string, server *state.Server, globalCollectionOpts state.CollectionOpts, prefixedLogger *util.Logger) (string, error) {
func getPostgresSetting(ctx context.Context, db *sql.DB, settingName string, server *state.Server, globalCollectionOpts state.CollectionOpts, prefixedLogger *util.Logger) (string, error) {
var value string

db, err := postgres.EstablishConnection(ctx, server, prefixedLogger, globalCollectionOpts, "")
if err != nil {
return "", fmt.Errorf("Could not connect to database to retrieve \"%s\": %s", settingName, err)
}

err = db.QueryRowContext(ctx, postgres.QueryMarkerSQL+fmt.Sprintf(settingValueSQL, settingName)).Scan(&value)
err := db.QueryRowContext(ctx, postgres.QueryMarkerSQL+fmt.Sprintf(settingValueSQL, settingName)).Scan(&value)
db.Close()
if err != nil {
return "", fmt.Errorf("Could not read \"%s\" setting: %s", settingName, err)
Expand All @@ -66,7 +62,13 @@ func DiscoverLogLocation(ctx context.Context, servers []*state.Server, globalCol
prefixedLogger.PrintWarning("WARNING - Database hostname is not localhost - Log Insights requires the collector to run on the database server directly for self-hosted systems")
}

logDestination, err := getPostgresSetting(ctx, "log_destination", server, globalCollectionOpts, prefixedLogger)
db, err := postgres.EstablishConnection(ctx, server, prefixedLogger, globalCollectionOpts, "")
if err != nil {
prefixedLogger.PrintError("Could not connect to database: %s", err)
continue
}

logDestination, err := getPostgresSetting(ctx, db, "log_destination", server, globalCollectionOpts, prefixedLogger)
if err != nil {
prefixedLogger.PrintError("ERROR - %s", err)
continue
Expand All @@ -80,7 +82,7 @@ func DiscoverLogLocation(ctx context.Context, servers []*state.Server, globalCol
continue
}

loggingCollector, err := getPostgresSetting(ctx, "logging_collector", server, globalCollectionOpts, prefixedLogger)
loggingCollector, err := getPostgresSetting(ctx, db, "logging_collector", server, globalCollectionOpts, prefixedLogger)
if err != nil {
prefixedLogger.PrintError("ERROR - %s", err)
continue
Expand All @@ -100,7 +102,7 @@ func DiscoverLogLocation(ctx context.Context, servers []*state.Server, globalCol
}

if loggingCollector == "on" {
logDirectory, err := getPostgresSetting(ctx, "log_directory", server, globalCollectionOpts, prefixedLogger)
logDirectory, err := getPostgresSetting(ctx, db, "log_directory", server, globalCollectionOpts, prefixedLogger)
if err != nil {
prefixedLogger.PrintError("ERROR - Could not retrieve log_directory setting from Postgres: %s", err)
continue
Expand Down
2 changes: 1 addition & 1 deletion runner/activity.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func processActivityForServer(ctx context.Context, server *state.Server, globalC
defer connection.Close()
}

trackActivityQuerySize, err := postgres.GetPostgresSetting(ctx, "track_activity_query_size", server, globalCollectionOpts, logger)
trackActivityQuerySize, err := postgres.GetPostgresSetting(ctx, connection, "track_activity_query_size", server, globalCollectionOpts, logger)
if err != nil {
activity.TrackActivityQuerySize = -1
} else {
Expand Down
35 changes: 12 additions & 23 deletions runner/full.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,28 +19,18 @@ import (
"github.com/pganalyze/collector/util"
)

func collectDiffAndSubmit(ctx context.Context, server *state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (state.PersistedState, state.CollectionStatus, error) {
func collectDiffAndSubmit(ctx context.Context, db *sql.DB, server *state.Server, globalCollectionOpts state.CollectionOpts, logger *util.Logger) (state.PersistedState, state.CollectionStatus, error) {
var newState state.PersistedState
var err error
var connection *sql.DB

connection, err = postgres.EstablishConnection(ctx, server, logger, globalCollectionOpts, "")
newState, transientState, err := input.CollectFull(ctx, server, db, globalCollectionOpts, logger)
if err != nil {
return newState, state.CollectionStatus{}, fmt.Errorf("Failed to connect to database: %s", err)
}

newState, transientState, err := input.CollectFull(ctx, server, connection, globalCollectionOpts, logger)
if err != nil {
connection.Close()
return newState, state.CollectionStatus{}, err
}
if globalCollectionOpts.TestRun {
logger.PrintInfo(" Test collection successful for %s", transientState.Version.Full)
}

// This is the easiest way to avoid opening multiple connections to different databases on the same instance
connection.Close()

logsDisabled, logsIgnoreStatement, logsIgnoreDuration, logsDisabledReason := logs.ValidateLogCollectionConfig(server, transientState.Settings)
collectionStatus := state.CollectionStatus{
LogSnapshotDisabled: logsDisabled,
Expand Down Expand Up @@ -89,7 +79,13 @@ func processServer(ctx context.Context, server *state.Server, globalCollectionOp
var collectionStatus state.CollectionStatus
var err error

err = checkReplicaCollectionDisabled(ctx, server, globalCollectionOpts, logger)
db, err := postgres.EstablishConnection(ctx, server, logger, globalCollectionOpts, "")
if err != nil {
return state.PersistedState{}, state.Grant{}, state.CollectionStatus{}, fmt.Errorf("Failed to connect to database: %s", err)
}
defer db.Close()

err = checkReplicaCollectionDisabled(ctx, db, server, globalCollectionOpts, logger)
if err != nil {
return state.PersistedState{}, state.Grant{}, state.CollectionStatus{}, err
}
Expand Down Expand Up @@ -119,7 +115,7 @@ func processServer(ctx context.Context, server *state.Server, globalCollectionOp
}

runFunc := func() {
newState, collectionStatus, err = collectDiffAndSubmit(ctx, server, globalCollectionOpts, logger)
newState, collectionStatus, err = collectDiffAndSubmit(ctx, db, server, globalCollectionOpts, logger)
}

var panicErr interface{}
Expand Down Expand Up @@ -152,19 +148,12 @@ func runCompletionCallback(callbackType string, callbackCmd string, sectionName
}
}

func checkReplicaCollectionDisabled(ctx context.Context, server *state.Server, opts state.CollectionOpts, logger *util.Logger) error {
func checkReplicaCollectionDisabled(ctx context.Context, db *sql.DB, server *state.Server, opts state.CollectionOpts, logger *util.Logger) error {
if !server.Config.SkipIfReplica {
return nil
}

connection, err := postgres.EstablishConnection(ctx, server, logger, opts, "")
if err != nil {
return fmt.Errorf("Failed to connect to database: %s", err)
}
defer connection.Close()

var isReplica bool
isReplica, err = postgres.GetIsReplica(ctx, logger, connection)
isReplica, err := postgres.GetIsReplica(ctx, logger, db)
if err != nil {
return fmt.Errorf("Error checking replication status")
}
Expand Down
8 changes: 7 additions & 1 deletion runner/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,13 @@ func TestLogsForAllServers(ctx context.Context, servers []*state.Server, globalC
continue
}

logLinePrefix, err := postgres.GetPostgresSetting(ctx, "log_line_prefix", server, globalCollectionOpts, prefixedLogger)
db, err := postgres.EstablishConnection(ctx, server, prefixedLogger, globalCollectionOpts, "")
if err != nil {
prefixedLogger.PrintError("Could not connect to database: %s", err)
continue
}

logLinePrefix, err := postgres.GetPostgresSetting(ctx, db, "log_line_prefix", server, globalCollectionOpts, prefixedLogger)
if err != nil {
prefixedLogger.PrintError("ERROR - Could not check log_line_prefix for server: %s", err)
hasFailedServers = true
Expand Down
Loading