From 3afb802951fee331d3ca957f4c60ef7aecfe532b Mon Sep 17 00:00:00 2001 From: Michal Grandys Date: Wed, 4 Sep 2024 03:13:09 +0200 Subject: [PATCH] [receiver/sqlquery] support attributes for logs (#34599) **Description:** Introduced a new configuration option, attribute_columns, for log queries. This option was already available for metric queries. Note: If an attribute column is missing in the result set, the collection process fails with an error to maintain consistency with metric query behaviour. Conversely, in the existing implementation, if the body column is absent, log body will be populated with an empty string. For consistency, it might be preferable for both behaviours to align. However, since this could introduce a breaking change, it may be more appropriate to implement this adjustment in a separate PR. **Link to tracking Issue:** [24459](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/24459) **Testing:** Added column `attribute` in test set, and verification in the integration test **Documentation:** Migrated description for `attribute_columns` from metrics queries section to general "Queries" section, updated example configuration --------- Co-authored-by: Curtis Robert --- .../sqlqueryreceiver-logs-attrbutes.yaml | 27 ++++++++++++++ internal/sqlquery/config.go | 3 +- receiver/sqlqueryreceiver/README.md | 5 +-- receiver/sqlqueryreceiver/config_test.go | 3 +- receiver/sqlqueryreceiver/integration_test.go | 35 +++++++++++++------ receiver/sqlqueryreceiver/logs_receiver.go | 13 +++++-- .../sqlqueryreceiver/logs_receiver_test.go | 21 +++++++++++ .../testdata/config-logs.yaml | 3 +- .../testdata/integration/mysql/init.sql | 13 +++---- .../testdata/integration/oracle/init.sql | 23 ++++++------ .../testdata/integration/postgresql/init.sql | 15 ++++---- 11 files changed, 120 insertions(+), 41 deletions(-) create mode 100644 .chloggen/sqlqueryreceiver-logs-attrbutes.yaml diff --git a/.chloggen/sqlqueryreceiver-logs-attrbutes.yaml b/.chloggen/sqlqueryreceiver-logs-attrbutes.yaml new file mode 100644 index 000000000000..8c94e335a434 --- /dev/null +++ b/.chloggen/sqlqueryreceiver-logs-attrbutes.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: sqlqueryreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support populating log attributes from sql query + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [24459] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/internal/sqlquery/config.go b/internal/sqlquery/config.go index c679d81db4d7..7412a8069bc0 100644 --- a/internal/sqlquery/config.go +++ b/internal/sqlquery/config.go @@ -68,7 +68,8 @@ func (q Query) Validate() error { } type LogsCfg struct { - BodyColumn string `mapstructure:"body_column"` + BodyColumn string `mapstructure:"body_column"` + AttributeColumns []string `mapstructure:"attribute_columns"` } func (config LogsCfg) Validate() error { diff --git a/receiver/sqlqueryreceiver/README.md b/receiver/sqlqueryreceiver/README.md index fe87655e1630..935c40d572ea 100644 --- a/receiver/sqlqueryreceiver/README.md +++ b/receiver/sqlqueryreceiver/README.md @@ -54,6 +54,8 @@ Additionally, each `query` section supports the following properties: See the below section [Tracking processed results](#tracking-processed-results). - `tracking_start_value` (optional, default `""`) Applies only to logs. In case of a parameterized query, defines the initial value for the parameter. See the below section [Tracking processed results](#tracking-processed-results). +- `attribute_columns`(optional): a list of column names in the returned dataset used to set attributes on the signal. + These attributes may be case-sensitive, depending on the driver (e.g. Oracle DB). Example: @@ -104,8 +106,6 @@ Each _metric_ in the configuration will produce one OTel metric per row returned - `metric_name`(required): the name assigned to the OTel metric. - `value_column`(required): the column name in the returned dataset used to set the value of the metric's datapoint. This may be case-sensitive, depending on the driver (e.g. Oracle DB). -- `attribute_columns`(optional): a list of column names in the returned dataset used to set attibutes on the datapoint. - These attributes may be case-sensitive, depending on the driver (e.g. Oracle DB). - `data_type` (optional): can be `gauge` or `sum`; defaults to `gauge`. - `value_type` (optional): can be `int` or `double`; defaults to `int`. - `monotonic` (optional): boolean; whether a cumulative sum's value is monotonically increasing (i.e. never rolls over @@ -135,6 +135,7 @@ receivers: tracking_column: log_id logs: - body_column: log_body + attribute_columns: [ "log_attribute_1", "log_attribute_2" ] - sql: "select count(*) as count, genre from movie group by genre" metrics: - metric_name: movie.genres diff --git a/receiver/sqlqueryreceiver/config_test.go b/receiver/sqlqueryreceiver/config_test.go index 14931601ad9b..50ea2d7ee91a 100644 --- a/receiver/sqlqueryreceiver/config_test.go +++ b/receiver/sqlqueryreceiver/config_test.go @@ -126,7 +126,8 @@ func TestLoadConfig(t *testing.T) { TrackingStartValue: "10", Logs: []sqlquery.LogsCfg{ { - BodyColumn: "log_body", + BodyColumn: "log_body", + AttributeColumns: []string{"log_attribute_1", "log_attribute_2"}, }, }, }, diff --git a/receiver/sqlqueryreceiver/integration_test.go b/receiver/sqlqueryreceiver/integration_test.go index a58c6fcf31d3..801625c19f20 100644 --- a/receiver/sqlqueryreceiver/integration_test.go +++ b/receiver/sqlqueryreceiver/integration_test.go @@ -57,7 +57,8 @@ func TestPostgresIntegrationLogsTrackingWithoutStorage(t *testing.T) { SQL: "select * from simple_logs where id > $1", Logs: []sqlquery.LogsCfg{ { - BodyColumn: "body", + BodyColumn: "body", + AttributeColumns: []string{"attribute"}, }, }, TrackingColumn: "id", @@ -93,7 +94,8 @@ func TestPostgresIntegrationLogsTrackingWithoutStorage(t *testing.T) { SQL: "select * from simple_logs where id > $1", Logs: []sqlquery.LogsCfg{ { - BodyColumn: "body", + BodyColumn: "body", + AttributeColumns: []string{"attribute"}, }, }, TrackingColumn: "id", @@ -145,7 +147,8 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) { SQL: "select * from simple_logs where id > $1", Logs: []sqlquery.LogsCfg{ { - BodyColumn: "body", + BodyColumn: "body", + AttributeColumns: []string{"attribute"}, }, }, TrackingColumn: "id", @@ -187,7 +190,8 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) { SQL: "select * from simple_logs where id > $1", Logs: []sqlquery.LogsCfg{ { - BodyColumn: "body", + BodyColumn: "body", + AttributeColumns: []string{"attribute"}, }, }, TrackingColumn: "id", @@ -220,7 +224,8 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) { SQL: "select * from simple_logs where id > $1", Logs: []sqlquery.LogsCfg{ { - BodyColumn: "body", + BodyColumn: "body", + AttributeColumns: []string{"attribute"}, }, }, TrackingColumn: "id", @@ -316,7 +321,7 @@ func printLogs(allLogs []plog.Logs) { func insertPostgresSimpleLogs(t *testing.T, container testcontainers.Container, existingLogID, newLogCount int) { for newLogID := existingLogID + 1; newLogID <= existingLogID+newLogCount; newLogID++ { - query := fmt.Sprintf("insert into simple_logs (id, insert_time, body) values (%d, now(), 'another log %d');", newLogID, newLogID) + query := fmt.Sprintf("insert into simple_logs (id, insert_time, body, attribute) values (%d, now(), 'another log %d', 'TLSv1.2');", newLogID, newLogID) returnValue, returnMessageReader, err := container.Exec(context.Background(), []string{ "psql", "-U", "otel", "-c", query, }) @@ -610,15 +615,25 @@ func testAllSimpleLogs(t *testing.T, logs []plog.Logs) { assert.Len(t, logs, 1) assert.Equal(t, 1, logs[0].ResourceLogs().Len()) assert.Equal(t, 1, logs[0].ResourceLogs().At(0).ScopeLogs().Len()) - expectedEntries := []string{ + expectedLogBodies := []string{ "- - - [03/Jun/2022:21:59:26 +0000] \"GET /api/health HTTP/1.1\" 200 6197 4 \"-\" \"-\" 445af8e6c428303f -", "- - - [03/Jun/2022:21:59:26 +0000] \"GET /api/health HTTP/1.1\" 200 6205 5 \"-\" \"-\" 3285f43cd4baa202 -", "- - - [03/Jun/2022:21:59:29 +0000] \"GET /api/health HTTP/1.1\" 200 6233 4 \"-\" \"-\" 579e8362d3185b61 -", "- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6207 5 \"-\" \"-\" 8c6ac61ae66e509f -", "- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6200 4 \"-\" \"-\" c163495861e873d8 -", } - assert.Equal(t, len(expectedEntries), logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len()) - for i := range expectedEntries { - assert.Equal(t, expectedEntries[i], logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Body().Str()) + expectedLogAttributes := []string{ + "TLSv1.2", + "TLSv1", + "TLSv1.2", + "TLSv1", + "TLSv1.2", + } + assert.Equal(t, len(expectedLogBodies), logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len()) + for i := range expectedLogBodies { + logRecord := logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i) + assert.Equal(t, expectedLogBodies[i], logRecord.Body().Str()) + logAttribute, _ := logRecord.Attributes().Get("attribute") + assert.Equal(t, expectedLogAttributes[i], logAttribute.Str()) } } diff --git a/receiver/sqlqueryreceiver/logs_receiver.go b/receiver/sqlqueryreceiver/logs_receiver.go index cacdfad7c43f..20d6beea0825 100644 --- a/receiver/sqlqueryreceiver/logs_receiver.go +++ b/receiver/sqlqueryreceiver/logs_receiver.go @@ -291,7 +291,7 @@ func (queryReceiver *logsQueryReceiver) collect(ctx context.Context) (plog.Logs, for logsConfigIndex, logsConfig := range queryReceiver.query.Logs { for _, row := range rows { logRecord := scopeLogs.AppendEmpty() - rowToLog(row, logsConfig, logRecord) + errs = append(errs, rowToLog(row, logsConfig, logRecord)) logRecord.SetObservedTimestamp(observedAt) if logsConfigIndex == 0 { errs = append(errs, queryReceiver.storeTrackingValue(ctx, row)) @@ -315,8 +315,17 @@ func (queryReceiver *logsQueryReceiver) storeTrackingValue(ctx context.Context, return nil } -func rowToLog(row sqlquery.StringMap, config sqlquery.LogsCfg, logRecord plog.LogRecord) { +func rowToLog(row sqlquery.StringMap, config sqlquery.LogsCfg, logRecord plog.LogRecord) error { logRecord.Body().SetStr(row[config.BodyColumn]) + attrs := logRecord.Attributes() + for _, columnName := range config.AttributeColumns { + if attrVal, found := row[columnName]; found { + attrs.PutStr(columnName, attrVal) + } else { + return fmt.Errorf("rowToLog: attribute_column not found: '%s'", columnName) + } + } + return nil } func (queryReceiver *logsQueryReceiver) shutdown(_ context.Context) error { diff --git a/receiver/sqlqueryreceiver/logs_receiver_test.go b/receiver/sqlqueryreceiver/logs_receiver_test.go index 34877c23ce23..7aeb9129609c 100644 --- a/receiver/sqlqueryreceiver/logs_receiver_test.go +++ b/receiver/sqlqueryreceiver/logs_receiver_test.go @@ -51,3 +51,24 @@ func TestLogsQueryReceiver_Collect(t *testing.T) { "Observed timestamps of all log records collected in a single scrape should be equal", ) } + +func TestLogsQueryReceiver_MissingColumnInResultSetForAttributeColumn(t *testing.T) { + fakeClient := &sqlquery.FakeDBClient{ + StringMaps: [][]sqlquery.StringMap{ + {{"col1": "42"}}, + }, + } + queryReceiver := logsQueryReceiver{ + client: fakeClient, + query: sqlquery.Query{ + Logs: []sqlquery.LogsCfg{ + { + BodyColumn: "col1", + AttributeColumns: []string{"expected_column"}, + }, + }, + }, + } + _, err := queryReceiver.collect(context.Background()) + assert.ErrorContains(t, err, "rowToLog: attribute_column not found: 'expected_column'") +} diff --git a/receiver/sqlqueryreceiver/testdata/config-logs.yaml b/receiver/sqlqueryreceiver/testdata/config-logs.yaml index 9fdf2db2e733..fd5d082efc18 100644 --- a/receiver/sqlqueryreceiver/testdata/config-logs.yaml +++ b/receiver/sqlqueryreceiver/testdata/config-logs.yaml @@ -7,4 +7,5 @@ sqlquery: tracking_start_value: '10' tracking_column: log_id logs: - - body_column: log_body + - body_column: log_body + attribute_columns: [ "log_attribute_1", "log_attribute_2" ] diff --git a/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql b/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql index d8b0bc4905d9..7c562c635294 100644 --- a/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql +++ b/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql @@ -23,12 +23,13 @@ create table simple_logs id integer, insert_time timestamp, body text, + attribute text, primary key (id) ); -insert into simple_logs (id, insert_time, body) values -(1, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -'), -(2, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -'), -(3, '2022-06-03 21:59:29', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -'), -(4, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -'), -(5, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -'); +insert into simple_logs (id, insert_time, body, attribute) values +(1, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -', 'TLSv1.2'), +(2, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1'), +(3, '2022-06-03 21:59:29', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2'), +(4, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -', 'TLSv1'), +(5, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2'); diff --git a/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql b/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql index c3581a8b8cbb..213f3beebac1 100644 --- a/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql +++ b/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql @@ -27,17 +27,18 @@ create table simple_logs ( id number primary key, insert_time timestamp with time zone, - body varchar2(4000) + body varchar2(4000), + attribute varchar2(100) ); grant select on simple_logs to otel; -insert into simple_logs (id, insert_time, body) values -(1, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -'); -insert into simple_logs (id, insert_time, body) values -(2, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -'); -insert into simple_logs (id, insert_time, body) values -(3, TIMESTAMP '2022-06-03 21:59:29 +00:00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -'); -insert into simple_logs (id, insert_time, body) values -(4, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -'); -insert into simple_logs (id, insert_time, body) values -(5, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -'); +insert into simple_logs (id, insert_time, body, attribute) values +(1, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -', 'TLSv1.2'); +insert into simple_logs (id, insert_time, body, attribute) values +(2, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1'); +insert into simple_logs (id, insert_time, body, attribute) values +(3, TIMESTAMP '2022-06-03 21:59:29 +00:00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2'); +insert into simple_logs (id, insert_time, body, attribute) values +(4, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -', 'TLSv1'); +insert into simple_logs (id, insert_time, body, attribute) values +(5, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2'); diff --git a/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql b/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql index 5f0ebfd2bed0..0379e43136a6 100644 --- a/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql +++ b/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql @@ -25,14 +25,15 @@ create table simple_logs ( id integer primary key, insert_time timestamp, - body text + body text, + attribute text ); grant select, insert on simple_logs to otel; -insert into simple_logs (id, insert_time, body) values -(1, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -'), -(2, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -'), -(3, '2022-06-03 21:59:29+00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -'), -(4, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -'), -(5, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -'); +insert into simple_logs (id, insert_time, body, attribute) values +(1, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -', 'TLSv1.2'), +(2, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1'), +(3, '2022-06-03 21:59:29+00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2'), +(4, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -', 'TLSv1'), +(5, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2');