Skip to content

Commit

Permalink
database_observability: fix error handling during result set iteration (
Browse files Browse the repository at this point in the history
#2467)

database_observability: fix error handling during result set iteration

We were previously checking for errors after the rs iteration loop,
but rs.Err() should be checked after rs.Next() returns false.

This moves the error check outside the loop to ensure proper handling.

---------

Co-authored-by: MattNolf <[email protected]>
  • Loading branch information
cristiangreco and matthewnolf authored Jan 22, 2025
1 parent 7dcb41a commit 5bfbaf8
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 15 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ Main (unreleased)

- (_Experimental_) Fix handling of view table types when detecting schema in `database_observability.mysql` (@matthewnolf)

- (_Experimental_) fix error handling during result set iteration in `database_observability.mysql` (@cristiangreco)

- Add json format support for log export via faro receiver (@ravishankar15)

- Add livedebugging support for `prometheus.remote_write` (@ravishankar15)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,11 +123,6 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
defer rs.Close()

for rs.Next() {
if err := rs.Err(); err != nil {
level.Error(c.logger).Log("msg", "failed to iterate rs", "err", err)
break
}

var digest, schemaName, sampleText, sampleSeen, sampleTimerWait string
err := rs.Scan(&digest, &schemaName, &sampleText, &sampleSeen, &sampleTimerWait)
if err != nil {
Expand Down Expand Up @@ -186,6 +181,11 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
}
}

if err := rs.Err(); err != nil {
level.Error(c.logger).Log("msg", "error during iterating over samples result set", "err", err)
return err
}

return nil
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package collector
import (
"context"
"database/sql/driver"
"fmt"
"os"
"testing"
"time"
Expand Down Expand Up @@ -402,4 +403,50 @@ func TestQuerySampleSQLDriverErrors(t *testing.T) {
err = mock.ExpectationsWereMet()
require.NoError(t, err)
})

t.Run("result set iteration error", func(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.NoError(t, err)
defer db.Close()

lokiClient := loki_fake.NewClient(func() {})

collector, err := NewQuerySample(QuerySampleArguments{
DB: db,
InstanceKey: "mysql-db",
CollectInterval: time.Second,
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
})
require.NoError(t, err)
require.NotNil(t, collector)

mock.ExpectQuery(selectQuerySamples).WithoutArgs().RowsWillBeClosed().
WillReturnRows(
sqlmock.NewRows([]string{
"digest",
"query_sample_text",
"query_sample_seen",
"query_sample_timer_wait",
}).AddRow(
"abc123",
"SELECT 1",
"2024-01-01",
"1000",
).RowError(0, fmt.Errorf("rs error")),
)

err = collector.Start(context.Background())
require.NoError(t, err)

require.Eventually(t, func() bool {
return collector.Stopped()
}, 5*time.Second, 100*time.Millisecond)

collector.Stop()
lokiClient.Stop()

err = mock.ExpectationsWereMet()
require.NoError(t, err)
})
}
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,6 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {

var schemas []string
for rs.Next() {
if err := rs.Err(); err != nil {
level.Error(c.logger).Log("msg", "failed to iterate rs", "err", err)
break
}

var schema string
if err := rs.Scan(&schema); err != nil {
level.Error(c.logger).Log("msg", "failed to scan schemata", "err", err)
Expand All @@ -176,6 +171,11 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
}
}

if err := rs.Err(); err != nil {
level.Error(c.logger).Log("msg", "error during iterating over schemas result set", "err", err)
return err
}

if len(schemas) == 0 {
level.Info(c.logger).Log("msg", "no schema detected from information_schema.schemata")
return nil
Expand All @@ -192,11 +192,6 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
defer rs.Close()

for rs.Next() {
if err := rs.Err(); err != nil {
level.Error(c.logger).Log("msg", "failed to iterate rs", "err", err)
break
}

var tableName, tableType string
var createTime, updateTime time.Time
if err := rs.Scan(&tableName, &tableType, &createTime, &updateTime); err != nil {
Expand All @@ -219,6 +214,11 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error {
},
}
}

if err := rs.Err(); err != nil {
level.Error(c.logger).Log("msg", "error during iterating over tables result set", "err", err)
return err
}
}

// TODO(cristian): consider moving this into the loop above
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package collector

import (
"context"
"fmt"
"os"
"testing"
"time"
Expand Down Expand Up @@ -164,6 +165,110 @@ func TestSchemaTable(t *testing.T) {
require.Equal(t, `level=info msg="table detected" op="table_detection" instance="mysql-db" schema="some_schema" table="some_table"`, lokiEntries[1].Line)
require.Equal(t, `level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="CREATE VIEW some_view (id INT)"`, lokiEntries[2].Line)

err = mock.ExpectationsWereMet()
require.NoError(t, err)
})
t.Run("schemas result set iteration error", func(t *testing.T) {
// The goroutine which deletes expired entries runs indefinitely,
// see https://github.com/hashicorp/golang-lru/blob/v2.0.7/expirable/expirable_lru.go#L79-L80
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1"))

db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.NoError(t, err)
defer db.Close()

lokiClient := loki_fake.NewClient(func() {})

collector, err := NewSchemaTable(SchemaTableArguments{
DB: db,
InstanceKey: "mysql-db",
CollectInterval: time.Second,
EntryHandler: lokiClient,
CacheTTL: time.Minute,
Logger: log.NewLogfmtLogger(os.Stderr),
})
require.NoError(t, err)
require.NotNil(t, collector)

mock.ExpectQuery(selectSchemaName).WithoutArgs().WillReturnRows(
sqlmock.NewRows(
[]string{"schema_name"},
).AddRow(
"some_schema",
).RowError(0, fmt.Errorf("rs error")))

err = collector.Start(context.Background())
require.NoError(t, err)

require.Eventually(t, func() bool {
return collector.Stopped()
}, 5*time.Second, 100*time.Millisecond)

collector.Stop()
lokiClient.Stop()

err = mock.ExpectationsWereMet()
require.NoError(t, err)
})
t.Run("tables result set iteration error", func(t *testing.T) {
// The goroutine which deletes expired entries runs indefinitely,
// see https://github.com/hashicorp/golang-lru/blob/v2.0.7/expirable/expirable_lru.go#L79-L80
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1"))

db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.NoError(t, err)
defer db.Close()

lokiClient := loki_fake.NewClient(func() {})

collector, err := NewSchemaTable(SchemaTableArguments{
DB: db,
InstanceKey: "mysql-db",
CollectInterval: time.Second,
EntryHandler: lokiClient,
CacheTTL: time.Minute,
Logger: log.NewLogfmtLogger(os.Stderr),
})
require.NoError(t, err)
require.NotNil(t, collector)

mock.ExpectQuery(selectSchemaName).WithoutArgs().WillReturnRows(
sqlmock.NewRows([]string{
"schema_name",
}).AddRow(
"some_schema",
),
)
mock.ExpectQuery(selectTableName).WithArgs("some_schema").WillReturnRows(
sqlmock.NewRows([]string{
"table_name",
"table_type",
"create_time",
"update_time",
}).AddRow(
"some_table",
"BASE TABLE",
time.Date(2024, 1, 1, 0, 0, 0, 0, time.UTC),
time.Date(2024, 2, 2, 0, 0, 0, 0, time.UTC),
).RowError(0, fmt.Errorf("rs error")),
)

err = collector.Start(context.Background())
require.NoError(t, err)

require.Eventually(t, func() bool {
return len(lokiClient.Received()) == 1
}, 5*time.Second, 100*time.Millisecond)

collector.Stop()
lokiClient.Stop()

lokiEntries := lokiClient.Received()
for _, entry := range lokiEntries {
require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels)
}
require.Equal(t, `level=info msg="schema detected" op="schema_detection" instance="mysql-db" schema="some_schema"`, lokiEntries[0].Line)

err = mock.ExpectationsWereMet()
require.NoError(t, err)
})
Expand Down

0 comments on commit 5bfbaf8

Please sign in to comment.