diff --git a/.chloggen/sqlqueryreceiver-logs-attrbutes.yaml b/.chloggen/sqlqueryreceiver-logs-attrbutes.yaml new file mode 100644 index 000000000000..8c94e335a434 --- /dev/null +++ b/.chloggen/sqlqueryreceiver-logs-attrbutes.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: sqlqueryreceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Support populating log attributes from sql query + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [24459] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/internal/sqlquery/config.go b/internal/sqlquery/config.go index c679d81db4d7..7412a8069bc0 100644 --- a/internal/sqlquery/config.go +++ b/internal/sqlquery/config.go @@ -68,7 +68,8 @@ func (q Query) Validate() error { } type LogsCfg struct { - BodyColumn string `mapstructure:"body_column"` + BodyColumn string `mapstructure:"body_column"` + AttributeColumns []string `mapstructure:"attribute_columns"` } func (config LogsCfg) Validate() error { diff --git a/receiver/sqlqueryreceiver/README.md b/receiver/sqlqueryreceiver/README.md index fe87655e1630..935c40d572ea 100644 --- a/receiver/sqlqueryreceiver/README.md +++ b/receiver/sqlqueryreceiver/README.md @@ -54,6 +54,8 @@ Additionally, each `query` section supports the following properties: See the below section [Tracking processed results](#tracking-processed-results). - `tracking_start_value` (optional, default `""`) Applies only to logs. In case of a parameterized query, defines the initial value for the parameter. See the below section [Tracking processed results](#tracking-processed-results). +- `attribute_columns`(optional): a list of column names in the returned dataset used to set attributes on the signal. + These attributes may be case-sensitive, depending on the driver (e.g. Oracle DB). Example: @@ -104,8 +106,6 @@ Each _metric_ in the configuration will produce one OTel metric per row returned - `metric_name`(required): the name assigned to the OTel metric. - `value_column`(required): the column name in the returned dataset used to set the value of the metric's datapoint. This may be case-sensitive, depending on the driver (e.g. Oracle DB). -- `attribute_columns`(optional): a list of column names in the returned dataset used to set attibutes on the datapoint. - These attributes may be case-sensitive, depending on the driver (e.g. Oracle DB). - `data_type` (optional): can be `gauge` or `sum`; defaults to `gauge`. - `value_type` (optional): can be `int` or `double`; defaults to `int`. - `monotonic` (optional): boolean; whether a cumulative sum's value is monotonically increasing (i.e. never rolls over @@ -135,6 +135,7 @@ receivers: tracking_column: log_id logs: - body_column: log_body + attribute_columns: [ "log_attribute_1", "log_attribute_2" ] - sql: "select count(*) as count, genre from movie group by genre" metrics: - metric_name: movie.genres diff --git a/receiver/sqlqueryreceiver/config_test.go b/receiver/sqlqueryreceiver/config_test.go index 14931601ad9b..50ea2d7ee91a 100644 --- a/receiver/sqlqueryreceiver/config_test.go +++ b/receiver/sqlqueryreceiver/config_test.go @@ -126,7 +126,8 @@ func TestLoadConfig(t *testing.T) { TrackingStartValue: "10", Logs: []sqlquery.LogsCfg{ { - BodyColumn: "log_body", + BodyColumn: "log_body", + AttributeColumns: []string{"log_attribute_1", "log_attribute_2"}, }, }, }, diff --git a/receiver/sqlqueryreceiver/integration_test.go b/receiver/sqlqueryreceiver/integration_test.go index a58c6fcf31d3..801625c19f20 100644 --- a/receiver/sqlqueryreceiver/integration_test.go +++ b/receiver/sqlqueryreceiver/integration_test.go @@ -57,7 +57,8 @@ func TestPostgresIntegrationLogsTrackingWithoutStorage(t *testing.T) { SQL: "select * from simple_logs where id > $1", Logs: []sqlquery.LogsCfg{ { - BodyColumn: "body", + BodyColumn: "body", + AttributeColumns: []string{"attribute"}, }, }, TrackingColumn: "id", @@ -93,7 +94,8 @@ func TestPostgresIntegrationLogsTrackingWithoutStorage(t *testing.T) { SQL: "select * from simple_logs where id > $1", Logs: []sqlquery.LogsCfg{ { - BodyColumn: "body", + BodyColumn: "body", + AttributeColumns: []string{"attribute"}, }, }, TrackingColumn: "id", @@ -145,7 +147,8 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) { SQL: "select * from simple_logs where id > $1", Logs: []sqlquery.LogsCfg{ { - BodyColumn: "body", + BodyColumn: "body", + AttributeColumns: []string{"attribute"}, }, }, TrackingColumn: "id", @@ -187,7 +190,8 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) { SQL: "select * from simple_logs where id > $1", Logs: []sqlquery.LogsCfg{ { - BodyColumn: "body", + BodyColumn: "body", + AttributeColumns: []string{"attribute"}, }, }, TrackingColumn: "id", @@ -220,7 +224,8 @@ func TestPostgresIntegrationLogsTrackingWithStorage(t *testing.T) { SQL: "select * from simple_logs where id > $1", Logs: []sqlquery.LogsCfg{ { - BodyColumn: "body", + BodyColumn: "body", + AttributeColumns: []string{"attribute"}, }, }, TrackingColumn: "id", @@ -316,7 +321,7 @@ func printLogs(allLogs []plog.Logs) { func insertPostgresSimpleLogs(t *testing.T, container testcontainers.Container, existingLogID, newLogCount int) { for newLogID := existingLogID + 1; newLogID <= existingLogID+newLogCount; newLogID++ { - query := fmt.Sprintf("insert into simple_logs (id, insert_time, body) values (%d, now(), 'another log %d');", newLogID, newLogID) + query := fmt.Sprintf("insert into simple_logs (id, insert_time, body, attribute) values (%d, now(), 'another log %d', 'TLSv1.2');", newLogID, newLogID) returnValue, returnMessageReader, err := container.Exec(context.Background(), []string{ "psql", "-U", "otel", "-c", query, }) @@ -610,15 +615,25 @@ func testAllSimpleLogs(t *testing.T, logs []plog.Logs) { assert.Len(t, logs, 1) assert.Equal(t, 1, logs[0].ResourceLogs().Len()) assert.Equal(t, 1, logs[0].ResourceLogs().At(0).ScopeLogs().Len()) - expectedEntries := []string{ + expectedLogBodies := []string{ "- - - [03/Jun/2022:21:59:26 +0000] \"GET /api/health HTTP/1.1\" 200 6197 4 \"-\" \"-\" 445af8e6c428303f -", "- - - [03/Jun/2022:21:59:26 +0000] \"GET /api/health HTTP/1.1\" 200 6205 5 \"-\" \"-\" 3285f43cd4baa202 -", "- - - [03/Jun/2022:21:59:29 +0000] \"GET /api/health HTTP/1.1\" 200 6233 4 \"-\" \"-\" 579e8362d3185b61 -", "- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6207 5 \"-\" \"-\" 8c6ac61ae66e509f -", "- - - [03/Jun/2022:21:59:31 +0000] \"GET /api/health HTTP/1.1\" 200 6200 4 \"-\" \"-\" c163495861e873d8 -", } - assert.Equal(t, len(expectedEntries), logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len()) - for i := range expectedEntries { - assert.Equal(t, expectedEntries[i], logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i).Body().Str()) + expectedLogAttributes := []string{ + "TLSv1.2", + "TLSv1", + "TLSv1.2", + "TLSv1", + "TLSv1.2", + } + assert.Equal(t, len(expectedLogBodies), logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().Len()) + for i := range expectedLogBodies { + logRecord := logs[0].ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(i) + assert.Equal(t, expectedLogBodies[i], logRecord.Body().Str()) + logAttribute, _ := logRecord.Attributes().Get("attribute") + assert.Equal(t, expectedLogAttributes[i], logAttribute.Str()) } } diff --git a/receiver/sqlqueryreceiver/logs_receiver.go b/receiver/sqlqueryreceiver/logs_receiver.go index cacdfad7c43f..20d6beea0825 100644 --- a/receiver/sqlqueryreceiver/logs_receiver.go +++ b/receiver/sqlqueryreceiver/logs_receiver.go @@ -291,7 +291,7 @@ func (queryReceiver *logsQueryReceiver) collect(ctx context.Context) (plog.Logs, for logsConfigIndex, logsConfig := range queryReceiver.query.Logs { for _, row := range rows { logRecord := scopeLogs.AppendEmpty() - rowToLog(row, logsConfig, logRecord) + errs = append(errs, rowToLog(row, logsConfig, logRecord)) logRecord.SetObservedTimestamp(observedAt) if logsConfigIndex == 0 { errs = append(errs, queryReceiver.storeTrackingValue(ctx, row)) @@ -315,8 +315,17 @@ func (queryReceiver *logsQueryReceiver) storeTrackingValue(ctx context.Context, return nil } -func rowToLog(row sqlquery.StringMap, config sqlquery.LogsCfg, logRecord plog.LogRecord) { +func rowToLog(row sqlquery.StringMap, config sqlquery.LogsCfg, logRecord plog.LogRecord) error { logRecord.Body().SetStr(row[config.BodyColumn]) + attrs := logRecord.Attributes() + for _, columnName := range config.AttributeColumns { + if attrVal, found := row[columnName]; found { + attrs.PutStr(columnName, attrVal) + } else { + return fmt.Errorf("rowToLog: attribute_column not found: '%s'", columnName) + } + } + return nil } func (queryReceiver *logsQueryReceiver) shutdown(_ context.Context) error { diff --git a/receiver/sqlqueryreceiver/logs_receiver_test.go b/receiver/sqlqueryreceiver/logs_receiver_test.go index 34877c23ce23..7aeb9129609c 100644 --- a/receiver/sqlqueryreceiver/logs_receiver_test.go +++ b/receiver/sqlqueryreceiver/logs_receiver_test.go @@ -51,3 +51,24 @@ func TestLogsQueryReceiver_Collect(t *testing.T) { "Observed timestamps of all log records collected in a single scrape should be equal", ) } + +func TestLogsQueryReceiver_MissingColumnInResultSetForAttributeColumn(t *testing.T) { + fakeClient := &sqlquery.FakeDBClient{ + StringMaps: [][]sqlquery.StringMap{ + {{"col1": "42"}}, + }, + } + queryReceiver := logsQueryReceiver{ + client: fakeClient, + query: sqlquery.Query{ + Logs: []sqlquery.LogsCfg{ + { + BodyColumn: "col1", + AttributeColumns: []string{"expected_column"}, + }, + }, + }, + } + _, err := queryReceiver.collect(context.Background()) + assert.ErrorContains(t, err, "rowToLog: attribute_column not found: 'expected_column'") +} diff --git a/receiver/sqlqueryreceiver/testdata/config-logs.yaml b/receiver/sqlqueryreceiver/testdata/config-logs.yaml index 9fdf2db2e733..fd5d082efc18 100644 --- a/receiver/sqlqueryreceiver/testdata/config-logs.yaml +++ b/receiver/sqlqueryreceiver/testdata/config-logs.yaml @@ -7,4 +7,5 @@ sqlquery: tracking_start_value: '10' tracking_column: log_id logs: - - body_column: log_body + - body_column: log_body + attribute_columns: [ "log_attribute_1", "log_attribute_2" ] diff --git a/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql b/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql index d8b0bc4905d9..7c562c635294 100644 --- a/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql +++ b/receiver/sqlqueryreceiver/testdata/integration/mysql/init.sql @@ -23,12 +23,13 @@ create table simple_logs id integer, insert_time timestamp, body text, + attribute text, primary key (id) ); -insert into simple_logs (id, insert_time, body) values -(1, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -'), -(2, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -'), -(3, '2022-06-03 21:59:29', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -'), -(4, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -'), -(5, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -'); +insert into simple_logs (id, insert_time, body, attribute) values +(1, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -', 'TLSv1.2'), +(2, '2022-06-03 21:59:26', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1'), +(3, '2022-06-03 21:59:29', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2'), +(4, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -', 'TLSv1'), +(5, '2022-06-03 21:59:31', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2'); diff --git a/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql b/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql index c3581a8b8cbb..213f3beebac1 100644 --- a/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql +++ b/receiver/sqlqueryreceiver/testdata/integration/oracle/init.sql @@ -27,17 +27,18 @@ create table simple_logs ( id number primary key, insert_time timestamp with time zone, - body varchar2(4000) + body varchar2(4000), + attribute varchar2(100) ); grant select on simple_logs to otel; -insert into simple_logs (id, insert_time, body) values -(1, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -'); -insert into simple_logs (id, insert_time, body) values -(2, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -'); -insert into simple_logs (id, insert_time, body) values -(3, TIMESTAMP '2022-06-03 21:59:29 +00:00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -'); -insert into simple_logs (id, insert_time, body) values -(4, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -'); -insert into simple_logs (id, insert_time, body) values -(5, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -'); +insert into simple_logs (id, insert_time, body, attribute) values +(1, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -', 'TLSv1.2'); +insert into simple_logs (id, insert_time, body, attribute) values +(2, TIMESTAMP '2022-06-03 21:59:26 +00:00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1'); +insert into simple_logs (id, insert_time, body, attribute) values +(3, TIMESTAMP '2022-06-03 21:59:29 +00:00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2'); +insert into simple_logs (id, insert_time, body, attribute) values +(4, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -', 'TLSv1'); +insert into simple_logs (id, insert_time, body, attribute) values +(5, TIMESTAMP '2022-06-03 21:59:31 +00:00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2'); diff --git a/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql b/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql index 5f0ebfd2bed0..0379e43136a6 100644 --- a/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql +++ b/receiver/sqlqueryreceiver/testdata/integration/postgresql/init.sql @@ -25,14 +25,15 @@ create table simple_logs ( id integer primary key, insert_time timestamp, - body text + body text, + attribute text ); grant select, insert on simple_logs to otel; -insert into simple_logs (id, insert_time, body) values -(1, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -'), -(2, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -'), -(3, '2022-06-03 21:59:29+00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -'), -(4, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -'), -(5, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -'); +insert into simple_logs (id, insert_time, body, attribute) values +(1, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6197 4 "-" "-" 445af8e6c428303f -', 'TLSv1.2'), +(2, '2022-06-03 21:59:26+00', '- - - [03/Jun/2022:21:59:26 +0000] "GET /api/health HTTP/1.1" 200 6205 5 "-" "-" 3285f43cd4baa202 -', 'TLSv1'), +(3, '2022-06-03 21:59:29+00', '- - - [03/Jun/2022:21:59:29 +0000] "GET /api/health HTTP/1.1" 200 6233 4 "-" "-" 579e8362d3185b61 -', 'TLSv1.2'), +(4, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6207 5 "-" "-" 8c6ac61ae66e509f -', 'TLSv1'), +(5, '2022-06-03 21:59:31+00', '- - - [03/Jun/2022:21:59:31 +0000] "GET /api/health HTTP/1.1" 200 6200 4 "-" "-" c163495861e873d8 -', 'TLSv1.2');