Skip to content

Commit

Permalink
Dynamic statement length.
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Štiller <[email protected]>
  • Loading branch information
Jakub Štiller committed Oct 25, 2023
1 parent e5cca16 commit 63e3c83
Show file tree
Hide file tree
Showing 3 changed files with 33 additions and 19 deletions.
7 changes: 4 additions & 3 deletions collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,9 @@ const (
// Namespace for all metrics.
namespace = "pg"

defaultEnabled = true
defaultDisabled = false
collectorFlagPrefix = "collector."
defaultEnabled = true
defaultDisabled = false
)

var (
Expand Down Expand Up @@ -75,7 +76,7 @@ func registerCollector(name string, isDefaultEnabled bool, createFunc func(colle
}

// Create flag for this collector
flagName := fmt.Sprintf("collector.%s", name)
flagName := fmt.Sprint(collectorFlagPrefix, name)
flagHelp := fmt.Sprintf("Enable the %s collector (default: %s).", name, helpDefaultState)
defaultValue := fmt.Sprintf("%v", isDefaultEnabled)

Expand Down
27 changes: 20 additions & 7 deletions collector/pg_stat_statements.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,29 +25,40 @@ import (

const statStatementsSubsystem = "stat_statements"

var includeQueryFlag *bool = nil
var (
includeQueryFlag *bool = nil
statementLengthFlag *uint = nil
)

func init() {
// WARNING:
// Disabled by default because this set of metrics can be quite expensive on a busy server
// Every unique query will cause a new timeseries to be created
registerCollector(statStatementsSubsystem, defaultDisabled, NewPGStatStatementsCollector)

flagName := fmt.Sprintf("collector.%s.include_query", statStatementsSubsystem)
flagHelp := "Enable selecting statement query together with queryId. (default: false)"
defaultValue := fmt.Sprintf("%v", defaultDisabled)
includeQueryFlag = kingpin.Flag(flagName, flagHelp).Default(defaultValue).Bool()
includeQueryFlag = kingpin.Flag(
fmt.Sprint(collectorFlagPrefix, statStatementsSubsystem, ".include_query"),
"Enable selecting statement query together with queryId. (default: disabled)").
Default(fmt.Sprintf("%v", defaultDisabled)).
Bool()
statementLengthFlag = kingpin.Flag(
fmt.Sprint(collectorFlagPrefix, statStatementsSubsystem, ".query_length"),
"Maximum length of the statement text.").
Default("120").
Uint()
}

type PGStatStatementsCollector struct {
log log.Logger
includeQueryStatement bool
statementLength uint
}

func NewPGStatStatementsCollector(config collectorConfig) (Collector, error) {
return &PGStatStatementsCollector{
log: config.logger,
includeQueryStatement: *includeQueryFlag,
statementLength: *statementLengthFlag,
}, nil
}

Expand Down Expand Up @@ -89,8 +100,10 @@ var (
[]string{"queryid", "query"},
prometheus.Labels{},
)
)

pgStatStatementQuerySelect = "LEFT(pg_stat_statements.query, 120) as query,"
const (
pgStatStatementQuerySelect = `LEFT(pg_stat_statements.query, %d) as query,`

pgStatStatementsQuery = `SELECT
pg_get_userbyid(userid) as user,
Expand Down Expand Up @@ -144,7 +157,7 @@ func (c PGStatStatementsCollector) Update(ctx context.Context, instance *instanc
}
var querySelect = ""
if c.includeQueryStatement {
querySelect = pgStatStatementQuerySelect
querySelect = fmt.Sprintf(pgStatStatementQuerySelect, c.statementLength)
}
query := fmt.Sprintf(queryTemplate, querySelect)

Expand Down
18 changes: 9 additions & 9 deletions collector/pg_stat_statements_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,15 +76,15 @@ func TestPGStateStatementsCollectorWithStatement(t *testing.T) {

inst := &instance{db: db, version: semver.MustParse("12.0.0")}

columns := []string{"user", "datname", "queryid", "query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 100) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns).
AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2)
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, pgStatStatementQuerySelect))).WillReturnRows(rows)
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsQuery, fmt.Sprintf(pgStatStatementQuerySelect, 100)))).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatStatementsCollector{includeQueryStatement: true}
c := PGStatStatementsCollector{includeQueryStatement: true, statementLength: 100}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
Expand Down Expand Up @@ -163,15 +163,15 @@ func TestPGStateStatementsCollectorNullWithStatement(t *testing.T) {

inst := &instance{db: db, version: semver.MustParse("13.3.7")}

columns := []string{"user", "datname", "queryid", "query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 200) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns).
AddRow(nil, nil, nil, nil, nil, nil, nil, nil, nil)
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, pgStatStatementQuerySelect))).WillReturnRows(rows)
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, fmt.Sprintf(pgStatStatementQuerySelect, 200)))).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatStatementsCollector{includeQueryStatement: true}
c := PGStatStatementsCollector{includeQueryStatement: true, statementLength: 200}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
Expand Down Expand Up @@ -250,15 +250,15 @@ func TestPGStateStatementsCollectorNewPGWithStatement(t *testing.T) {

inst := &instance{db: db, version: semver.MustParse("13.3.7")}

columns := []string{"user", "datname", "queryid", "query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
columns := []string{"user", "datname", "queryid", "LEFT(pg_stat_statements.query, 300) as query", "calls_total", "seconds_total", "rows_total", "block_read_seconds_total", "block_write_seconds_total"}
rows := sqlmock.NewRows(columns).
AddRow("postgres", "postgres", 1500, "select 1 from foo", 5, 0.4, 100, 0.1, 0.2)
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, pgStatStatementQuerySelect))).WillReturnRows(rows)
mock.ExpectQuery(sanitizeQuery(fmt.Sprintf(pgStatStatementsNewQuery, fmt.Sprintf(pgStatStatementQuerySelect, 300)))).WillReturnRows(rows)

ch := make(chan prometheus.Metric)
go func() {
defer close(ch)
c := PGStatStatementsCollector{includeQueryStatement: true}
c := PGStatStatementsCollector{includeQueryStatement: true, statementLength: 300}

if err := c.Update(context.Background(), inst, ch); err != nil {
t.Errorf("Error calling PGStatStatementsCollector.Update: %s", err)
Expand Down

0 comments on commit 63e3c83

Please sign in to comment.