From 2a149b79a0dfddf86f1c9acc4b3228ee34d55e5d Mon Sep 17 00:00:00 2001 From: Michal Grandys Date: Fri, 27 Sep 2024 14:55:46 +0200 Subject: [PATCH] [receiver/sqlquery] fail on missing value for logs, collect errors (#35189) **Description:** * [breaking] Fail if for log column not found in result set (for consistency with metrics behaviour) * Instead of fail-fast, collect all errors that occurred when transforming row to metric or log **Link to tracking Issue:** #35068 **Testing:** Added/updated unit tests **Documentation:** n/a Closes #35068 --- .../sqlqueryreceiver-collect-errors.yaml | 27 +++++++++++++++++ internal/sqlquery/metrics.go | 19 +++++++----- internal/sqlquery/scraper_test.go | 29 +++++++++++++++++++ receiver/sqlqueryreceiver/logs_receiver.go | 13 +++++++-- .../sqlqueryreceiver/logs_receiver_test.go | 10 ++++--- 5 files changed, 83 insertions(+), 15 deletions(-) create mode 100644 .chloggen/sqlqueryreceiver-collect-errors.yaml diff --git a/.chloggen/sqlqueryreceiver-collect-errors.yaml b/.chloggen/sqlqueryreceiver-collect-errors.yaml new file mode 100644 index 000000000000..7db33f01a012 --- /dev/null +++ b/.chloggen/sqlqueryreceiver-collect-errors.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: sqlqueryreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Fail if value for log column in result set is missing, collect errors + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35068] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/internal/sqlquery/metrics.go b/internal/sqlquery/metrics.go index 01cb8dbae856..4ffcde6d3f62 100644 --- a/internal/sqlquery/metrics.go +++ b/internal/sqlquery/metrics.go @@ -4,6 +4,7 @@ package sqlquery // import "github.com/open-telemetry/opentelemetry-collector-contrib/internal/sqlquery" import ( + "errors" "fmt" "strconv" @@ -18,36 +19,38 @@ func rowToMetric(row StringMap, cfg MetricCfg, dest pmetric.Metric, startTime pc dest.SetUnit(cfg.Unit) dataPointSlice := setMetricFields(cfg, dest) dataPoint := dataPointSlice.AppendEmpty() + var errs []error if cfg.StartTsColumn != "" { if val, found := row[cfg.StartTsColumn]; found { timestamp, err := strconv.ParseInt(val, 10, 64) if err != nil { - return fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.StartTsColumn, val, err) + errs = append(errs, fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.StartTsColumn, val, err)) } startTime = pcommon.Timestamp(timestamp) } else { - return fmt.Errorf("rowToMetric: start_ts_column not found") + errs = append(errs, fmt.Errorf("rowToMetric: start_ts_column not found")) } } if cfg.TsColumn != "" { if val, found := row[cfg.TsColumn]; found { timestamp, err := strconv.ParseInt(val, 10, 64) if err != nil { - return fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.TsColumn, val, err) + errs = append(errs, fmt.Errorf("failed to parse uint64 for %q, value was %q: %w", cfg.TsColumn, val, err)) } ts = pcommon.Timestamp(timestamp) } else { - return fmt.Errorf("rowToMetric: ts_column not found") + errs = append(errs, fmt.Errorf("rowToMetric: ts_column not found")) } } setTimestamp(cfg, dataPoint, startTime, ts, scrapeCfg) value, found := row[cfg.ValueColumn] if !found { - return fmt.Errorf("rowToMetric: value_column '%s' not found in result set", cfg.ValueColumn) + errs = append(errs, fmt.Errorf("rowToMetric: value_column '%s' not found in result set", cfg.ValueColumn)) } + err := setDataPointValue(cfg, value, dataPoint) if err != nil { - return fmt.Errorf("rowToMetric: %w", err) + errs = append(errs, fmt.Errorf("rowToMetric: %w", err)) } attrs := dataPoint.Attributes() for k, v := range cfg.StaticAttributes { @@ -57,10 +60,10 @@ func rowToMetric(row StringMap, cfg MetricCfg, dest pmetric.Metric, startTime pc if attrVal, found := row[columnName]; found { attrs.PutStr(columnName, attrVal) } else { - return fmt.Errorf("rowToMetric: attribute_column not found: '%s'", columnName) + errs = append(errs, fmt.Errorf("rowToMetric: attribute_column '%s' not found in result set", columnName)) } } - return nil + return errors.Join(errs...) } func setTimestamp(cfg MetricCfg, dp pmetric.NumberDataPoint, startTime pcommon.Timestamp, ts pcommon.Timestamp, scrapeCfg scraperhelper.ControllerConfig) { diff --git a/internal/sqlquery/scraper_test.go b/internal/sqlquery/scraper_test.go index e4567a189459..208657c7c3e7 100644 --- a/internal/sqlquery/scraper_test.go +++ b/internal/sqlquery/scraper_test.go @@ -456,6 +456,35 @@ func TestScraper_StartAndTS_ErrorOnColumnNotFound(t *testing.T) { assert.Error(t, err) } +func TestScraper_CollectRowToMetricsErrors(t *testing.T) { + client := &FakeDBClient{ + StringMaps: [][]StringMap{{ + { + "mycol": "42", + }, + }}, + } + scrpr := Scraper{ + Client: client, + Query: Query{ + Metrics: []MetricCfg{{ + MetricName: "my.name", + ValueColumn: "mycol_na", + TsColumn: "Ts", + StartTsColumn: "StartTs", + AttributeColumns: []string{"attr_na"}, + DataType: MetricTypeSum, + Aggregation: MetricAggregationCumulative, + }}, + }, + } + _, err := scrpr.Scrape(context.Background()) + assert.ErrorContains(t, err, "rowToMetric: start_ts_column not found") + assert.ErrorContains(t, err, "rowToMetric: ts_column not found") + assert.ErrorContains(t, err, "rowToMetric: value_column 'mycol_na' not found in result set") + assert.ErrorContains(t, err, "rowToMetric: attribute_column 'attr_na' not found in result set") +} + func TestScraper_StartAndTS_ErrorOnParse(t *testing.T) { client := &FakeDBClient{ StringMaps: [][]StringMap{{ diff --git a/receiver/sqlqueryreceiver/logs_receiver.go b/receiver/sqlqueryreceiver/logs_receiver.go index 20d6beea0825..f6d68978487c 100644 --- a/receiver/sqlqueryreceiver/logs_receiver.go +++ b/receiver/sqlqueryreceiver/logs_receiver.go @@ -316,16 +316,23 @@ func (queryReceiver *logsQueryReceiver) storeTrackingValue(ctx context.Context, } func rowToLog(row sqlquery.StringMap, config sqlquery.LogsCfg, logRecord plog.LogRecord) error { - logRecord.Body().SetStr(row[config.BodyColumn]) + var errs []error + value, found := row[config.BodyColumn] + if !found { + errs = append(errs, fmt.Errorf("rowToLog: body_column '%s' not found in result set", config.BodyColumn)) + } else { + logRecord.Body().SetStr(value) + } attrs := logRecord.Attributes() + for _, columnName := range config.AttributeColumns { if attrVal, found := row[columnName]; found { attrs.PutStr(columnName, attrVal) } else { - return fmt.Errorf("rowToLog: attribute_column not found: '%s'", columnName) + errs = append(errs, fmt.Errorf("rowToLog: attribute_column '%s' not found in result set", columnName)) } } - return nil + return errors.Join(errs...) } func (queryReceiver *logsQueryReceiver) shutdown(_ context.Context) error { diff --git a/receiver/sqlqueryreceiver/logs_receiver_test.go b/receiver/sqlqueryreceiver/logs_receiver_test.go index 7aeb9129609c..83f0d29ee270 100644 --- a/receiver/sqlqueryreceiver/logs_receiver_test.go +++ b/receiver/sqlqueryreceiver/logs_receiver_test.go @@ -52,7 +52,7 @@ func TestLogsQueryReceiver_Collect(t *testing.T) { ) } -func TestLogsQueryReceiver_MissingColumnInResultSetForAttributeColumn(t *testing.T) { +func TestLogsQueryReceiver_MissingColumnInResultSet(t *testing.T) { fakeClient := &sqlquery.FakeDBClient{ StringMaps: [][]sqlquery.StringMap{ {{"col1": "42"}}, @@ -63,12 +63,14 @@ func TestLogsQueryReceiver_MissingColumnInResultSetForAttributeColumn(t *testing query: sqlquery.Query{ Logs: []sqlquery.LogsCfg{ { - BodyColumn: "col1", - AttributeColumns: []string{"expected_column"}, + BodyColumn: "expected_body_column", + AttributeColumns: []string{"expected_column", "expected_column_2"}, }, }, }, } _, err := queryReceiver.collect(context.Background()) - assert.ErrorContains(t, err, "rowToLog: attribute_column not found: 'expected_column'") + assert.ErrorContains(t, err, "rowToLog: attribute_column 'expected_column' not found in result set") + assert.ErrorContains(t, err, "rowToLog: attribute_column 'expected_column_2' not found in result set") + assert.ErrorContains(t, err, "rowToLog: body_column 'expected_body_column' not found in result set") }