From 10a61cb0eceed6e9927541d56b45d9e9682a1a5c Mon Sep 17 00:00:00 2001 From: Cristian Greco Date: Mon, 10 Feb 2025 09:43:00 +0100 Subject: [PATCH] database_observability: pass `op` and `instance` as labels in loki.Entry (#2647) database_observability: pass `op` and `instance` as labels in loki.Entry Move these fields to labels in loki.Entry, to allow indexing and relabeling. --- CHANGELOG.md | 1 + .../mysql/collector/query_sample.go | 20 +- .../mysql/collector/query_sample_test.go | 311 ++++++++++++------ .../mysql/collector/schema_table.go | 26 +- .../mysql/collector/schema_table_test.go | 84 ++--- 5 files changed, 281 insertions(+), 161 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ba76fad8a7..b8dc657ec8 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -38,6 +38,7 @@ Main (unreleased) - Added table columns parsing (@cristiagreco) - Add enable/disable collector configurability to `database_observability.mysql`. This removes the `query_samples_enabled` argument, now configurable via enable/disable collector. (@fridgepoet) - Refactor cache config in schema_table collector (@cristiangreco) + - Use labels for some indexed logs elements (@cristiangreco) ### Bugfixes diff --git a/internal/component/database_observability/mysql/collector/query_sample.go b/internal/component/database_observability/mysql/collector/query_sample.go index 5f0d644791..2e83c9ec1d 100644 --- a/internal/component/database_observability/mysql/collector/query_sample.go +++ b/internal/component/database_observability/mysql/collector/query_sample.go @@ -155,12 +155,16 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error { } c.entryHandler.Chan() <- loki.Entry{ - Labels: model.LabelSet{"job": database_observability.JobName}, + Labels: model.LabelSet{ + "job": database_observability.JobName, + "op": OP_QUERY_SAMPLE, + "instance": model.LabelValue(c.instanceKey), + }, Entry: logproto.Entry{ Timestamp: time.Unix(0, time.Now().UnixNano()), Line: fmt.Sprintf( - `level=info msg="query samples fetched" op="%s" instance="%s" schema="%s" digest="%s" query_type="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_sample_redacted="%s"`, - OP_QUERY_SAMPLE, c.instanceKey, schemaName, digest, c.stmtType(stmt), sampleSeen, sampleTimerWait, sampleRedactedText, + `level=info msg="query samples fetched" schema="%s" digest="%s" query_type="%s" query_sample_seen="%s" query_sample_timer_wait="%s" query_sample_redacted="%s"`, + schemaName, digest, c.stmtType(stmt), sampleSeen, sampleTimerWait, sampleRedactedText, ), }, } @@ -168,12 +172,16 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error { tables := c.tablesFromQuery(digest, stmt) for _, table := range tables { c.entryHandler.Chan() <- loki.Entry{ - Labels: model.LabelSet{"job": database_observability.JobName}, + Labels: model.LabelSet{ + "job": database_observability.JobName, + "op": OP_QUERY_PARSED_TABLE_NAME, + "instance": model.LabelValue(c.instanceKey), + }, Entry: logproto.Entry{ Timestamp: time.Unix(0, time.Now().UnixNano()), Line: fmt.Sprintf( - `level=info msg="table name parsed" op="%s" instance="%s" schema="%s" digest="%s" table="%s"`, - OP_QUERY_PARSED_TABLE_NAME, c.instanceKey, schemaName, digest, table, + `level=info msg="table name parsed" schema="%s" digest="%s" table="%s"`, + schemaName, digest, table, ), }, } diff --git a/internal/component/database_observability/mysql/collector/query_sample_test.go b/internal/component/database_observability/mysql/collector/query_sample_test.go index 5ae80706e1..8cb869c4e3 100644 --- a/internal/component/database_observability/mysql/collector/query_sample_test.go +++ b/internal/component/database_observability/mysql/collector/query_sample_test.go @@ -22,9 +22,10 @@ func TestQuerySample(t *testing.T) { defer goleak.VerifyNone(t) testcases := []struct { - name string - rows [][]driver.Value - logs []string + name string + rows [][]driver.Value + logsLabels []model.LabelSet + logsLines []string }{ { name: "select query", @@ -35,9 +36,13 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, }, }, { @@ -49,9 +54,13 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="insert" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="insert into some_table(id, name) values (:redacted1, :redacted2)"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="insert" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="insert into some_table(id, name) values (:redacted1, :redacted2)"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, }, }, { @@ -63,9 +72,13 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="update" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="update some_table set active = false, reason = null where id = :redacted1 and name = :redacted2"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="update" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="update some_table set active = false, reason = null where id = :redacted1 and name = :redacted2"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, }, }, { @@ -77,9 +90,13 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="delete" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="delete from some_table where id = :redacted1"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="delete" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="delete from some_table where id = :redacted1"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, }, }, { @@ -91,10 +108,15 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select t.id, t.val1, o.val2 from some_table as t join other_table as o on t.id = o.id where o.val2 = :redacted1 order by t.val1 desc"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="other_table"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select t.id, t.val1, o.val2 from some_table as t join other_table as o on t.id = o.id where o.val2 = :redacted1 order by t.val1 desc"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="other_table"`, }, }, { @@ -109,13 +131,17 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" ` + + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" ` + `query_sample_redacted="select ifnull(schema_name, :redacted1) as schema_name, digest, count_star from (select * from ` + `performance_schema.events_statements_summary_by_digest where schema_name not in ::redacted2 ` + `and last_seen > date_sub(now(), interval :redacted3 second) order by last_seen desc) as q ` + `group by q.schema_name, q.digest, q.count_star limit :redacted4"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="performance_schema.events_statements_summary_by_digest"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="performance_schema.events_statements_summary_by_digest"`, }, }, { @@ -127,9 +153,13 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select val1, val3 from some_table where id = :redacted1"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select val1, val3 from some_table where id = :redacted1"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, }, }, { @@ -147,9 +177,13 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, }, }, { @@ -161,9 +195,13 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, }, }, { @@ -175,8 +213,11 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="begin"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="begin"`, }, }, { @@ -188,8 +229,11 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="commit"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="commit"`, }, }, { @@ -201,9 +245,13 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="alter table some_table"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="alter table some_table"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, }, }, { @@ -221,9 +269,13 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, }, }, { @@ -241,11 +293,17 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="other_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="other_schema" digest="abc123" table="some_table"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, + `level=info msg="query samples fetched" schema="other_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, + `level=info msg="table name parsed" schema="other_schema" digest="abc123" table="some_table"`, }, }, { @@ -257,11 +315,17 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select id, name from employees_ny union select id, name from employees_ca union select id, name from employees_tx"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_ny"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_ca"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_tx"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select id, name from employees_ny union select id, name from employees_ca union select id, name from employees_tx"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="employees_ny"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="employees_ca"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="employees_tx"`, }, }, { @@ -273,11 +337,17 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select COUNT(distinct t.role_id) as roles, COUNT(distinct r.id) as fixed_roles from (select role_id from user_role union all select role_id from team_role) as t left join (select id from role where name like :redacted1) as r on t.role_id = r.id"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="user_role"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="team_role"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="role"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select COUNT(distinct t.role_id) as roles, COUNT(distinct r.id) as fixed_roles from (select role_id from user_role union all select role_id from team_role) as t left join (select id from role where name like :redacted1) as r on t.role_id = r.id"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="user_role"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="team_role"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="role"`, }, }, { @@ -289,11 +359,17 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from (select id, name from employees_us_east union select id, name from employees_us_west) as employees_us union select id, name from employees_emea"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_us_east"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_us_west"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_emea"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from (select id, name from employees_us_east union select id, name from employees_us_west) as employees_us union select id, name from employees_emea"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="employees_us_east"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="employees_us_west"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="employees_emea"`, }, }, { @@ -305,11 +381,17 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="insert" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="insert into customers(id, name) select id, name from customers_us union select id, name from customers_eu"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="customers"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="customers_us"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="customers_eu"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="insert" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="insert into customers(id, name) select id, name from customers_us union select id, name from customers_eu"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="customers"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="customers_us"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="customers_eu"`, }, }, { @@ -321,11 +403,17 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from departments as dep join (select id, name from employees_us union select id, name from employees_eu) as employees on dep.id = employees.id"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="departments"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_us"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_eu"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from departments as dep join (select id, name from employees_us union select id, name from employees_eu) as employees on dep.id = employees.id"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="departments"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="employees_us"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="employees_eu"`, }, }, { @@ -337,12 +425,19 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="insert" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="insert into some_table select * from departments as dep join (select id, name from employees_us union select id, name from employees_eu) as employees on dep.id = employees.id"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="departments"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_us"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_eu"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="insert" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="insert into some_table select * from departments as dep join (select id, name from employees_us union select id, name from employees_eu) as employees on dep.id = employees.id"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="departments"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="employees_us"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="employees_eu"`, }, }, { @@ -354,8 +449,11 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="show create table"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="show create table"`, }, }, { @@ -367,8 +465,11 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="show variables"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="show variables"`, }, }, { @@ -380,9 +481,13 @@ func TestQuerySample(t *testing.T) { "2024-01-01T00:00:00.000Z", "1000", }}, - logs: []string{ - `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="drop table if exists some_table"`, - `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, + logsLabels: []model.LabelSet{ + {"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, + {"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, + }, + logsLines: []string{ + `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="drop table if exists some_table"`, + `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, }, }, } @@ -424,7 +529,7 @@ func TestQuerySample(t *testing.T) { require.NoError(t, err) require.Eventually(t, func() bool { - return len(lokiClient.Received()) == len(tc.logs) + return len(lokiClient.Received()) == len(tc.logsLines) }, 5*time.Second, 100*time.Millisecond) collector.Stop() @@ -438,10 +543,10 @@ func TestQuerySample(t *testing.T) { require.NoError(t, err) lokiEntries := lokiClient.Received() - require.Equal(t, len(tc.logs), len(lokiEntries)) + require.Equal(t, len(tc.logsLines), len(lokiEntries)) for i, entry := range lokiEntries { - require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels) - require.Equal(t, tc.logs[i], entry.Line) + require.Equal(t, tc.logsLabels[i], entry.Labels) + require.Equal(t, tc.logsLines[i], entry.Line) } }) } @@ -512,12 +617,10 @@ func TestQuerySampleSQLDriverErrors(t *testing.T) { require.NoError(t, err) lokiEntries := lokiClient.Received() - for _, entry := range lokiEntries { - require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels) - } - - require.Equal(t, `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line) - require.Equal(t, `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, lokiEntries[1].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, lokiEntries[0].Labels) + require.Equal(t, `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, lokiEntries[1].Labels) + require.Equal(t, `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, lokiEntries[1].Line) }) t.Run("result set iteration error", func(t *testing.T) { @@ -580,12 +683,10 @@ func TestQuerySampleSQLDriverErrors(t *testing.T) { require.NoError(t, err) lokiEntries := lokiClient.Received() - for _, entry := range lokiEntries { - require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels) - } - - require.Equal(t, `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line) - require.Equal(t, `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, lokiEntries[1].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, lokiEntries[0].Labels) + require.Equal(t, `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, lokiEntries[1].Labels) + require.Equal(t, `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, lokiEntries[1].Line) }) t.Run("connection error recovery", func(t *testing.T) { @@ -644,11 +745,9 @@ func TestQuerySampleSQLDriverErrors(t *testing.T) { require.NoError(t, err) lokiEntries := lokiClient.Received() - for _, entry := range lokiEntries { - require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels) - } - - require.Equal(t, `level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line) - require.Equal(t, `level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="some_table"`, lokiEntries[1].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_QUERY_SAMPLE, "instance": "mysql-db"}, lokiEntries[0].Labels) + require.Equal(t, `level=info msg="query samples fetched" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from some_table where id = :redacted1"`, lokiEntries[0].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_QUERY_PARSED_TABLE_NAME, "instance": "mysql-db"}, lokiEntries[1].Labels) + require.Equal(t, `level=info msg="table name parsed" schema="some_schema" digest="abc123" table="some_table"`, lokiEntries[1].Line) }) } diff --git a/internal/component/database_observability/mysql/collector/schema_table.go b/internal/component/database_observability/mysql/collector/schema_table.go index f186ec8cb3..e11cbd717d 100644 --- a/internal/component/database_observability/mysql/collector/schema_table.go +++ b/internal/component/database_observability/mysql/collector/schema_table.go @@ -200,10 +200,14 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error { schemas = append(schemas, schema) c.entryHandler.Chan() <- loki.Entry{ - Labels: model.LabelSet{"job": database_observability.JobName}, + Labels: model.LabelSet{ + "job": database_observability.JobName, + "op": OP_SCHEMA_DETECTION, + "instance": model.LabelValue(c.instanceKey), + }, Entry: logproto.Entry{ Timestamp: time.Unix(0, time.Now().UnixNano()), - Line: fmt.Sprintf(`level=info msg="schema detected" op="%s" instance="%s" schema="%s"`, OP_SCHEMA_DETECTION, c.instanceKey, schema), + Line: fmt.Sprintf(`level=info msg="schema detected" schema="%s"`, schema), }, } } @@ -246,10 +250,14 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error { }) c.entryHandler.Chan() <- loki.Entry{ - Labels: model.LabelSet{"job": database_observability.JobName}, + Labels: model.LabelSet{ + "job": database_observability.JobName, + "op": OP_TABLE_DETECTION, + "instance": model.LabelValue(c.instanceKey), + }, Entry: logproto.Entry{ Timestamp: time.Unix(0, time.Now().UnixNano()), - Line: fmt.Sprintf(`level=info msg="table detected" op="%s" instance="%s" schema="%s" table="%s"`, OP_TABLE_DETECTION, c.instanceKey, schema, tableName), + Line: fmt.Sprintf(`level=info msg="table detected" schema="%s" table="%s"`, schema, tableName), }, } } @@ -290,12 +298,16 @@ func (c *SchemaTable) extractSchema(ctx context.Context) error { } c.entryHandler.Chan() <- loki.Entry{ - Labels: model.LabelSet{"job": database_observability.JobName}, + Labels: model.LabelSet{ + "job": database_observability.JobName, + "op": OP_CREATE_STATEMENT, + "instance": model.LabelValue(c.instanceKey), + }, Entry: logproto.Entry{ Timestamp: time.Unix(0, time.Now().UnixNano()), Line: fmt.Sprintf( - `level=info msg="create table" op="%s" instance="%s" schema="%s" table="%s" create_statement="%s" table_spec="%s"`, - OP_CREATE_STATEMENT, c.instanceKey, table.schema, table.tableName, base64.StdEncoding.EncodeToString([]byte(table.createStmt)), base64.StdEncoding.EncodeToString([]byte(table.tableSpec)), + `level=info msg="create table" schema="%s" table="%s" create_statement="%s" table_spec="%s"`, + table.schema, table.tableName, base64.StdEncoding.EncodeToString([]byte(table.createStmt)), base64.StdEncoding.EncodeToString([]byte(table.tableSpec)), ), }, } diff --git a/internal/component/database_observability/mysql/collector/schema_table_test.go b/internal/component/database_observability/mysql/collector/schema_table_test.go index 6a9862fc05..abb7de8b90 100644 --- a/internal/component/database_observability/mysql/collector/schema_table_test.go +++ b/internal/component/database_observability/mysql/collector/schema_table_test.go @@ -114,12 +114,12 @@ func TestSchemaTable(t *testing.T) { require.NoError(t, err) lokiEntries := lokiClient.Received() - for _, entry := range lokiEntries { - require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels) - } - require.Equal(t, `level=info msg="schema detected" op="schema_detection" instance="mysql-db" schema="some_schema"`, lokiEntries[0].Line) - require.Equal(t, `level=info msg="table detected" op="table_detection" instance="mysql-db" schema="some_schema" table="some_table"`, lokiEntries[1].Line) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) + require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) + require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) }) t.Run("detect table schema, cache enabled (write)", func(t *testing.T) { t.Parallel() @@ -218,12 +218,12 @@ func TestSchemaTable(t *testing.T) { require.Equal(t, 1, collector.cache.Len()) lokiEntries := lokiClient.Received() - for _, entry := range lokiEntries { - require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels) - } - require.Equal(t, `level=info msg="schema detected" op="schema_detection" instance="mysql-db" schema="some_schema"`, lokiEntries[0].Line) - require.Equal(t, `level=info msg="table detected" op="table_detection" instance="mysql-db" schema="some_schema" table="some_table"`, lokiEntries[1].Line) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) + require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) + require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) }) t.Run("detect table schema, cache enabled (write and read)", func(t *testing.T) { t.Parallel() @@ -347,15 +347,18 @@ func TestSchemaTable(t *testing.T) { require.NoError(t, err) lokiEntries := lokiClient.Received() - for _, entry := range lokiEntries { - require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels) - } - require.Equal(t, `level=info msg="schema detected" op="schema_detection" instance="mysql-db" schema="some_schema"`, lokiEntries[0].Line) - require.Equal(t, `level=info msg="table detected" op="table_detection" instance="mysql-db" schema="some_schema" table="some_table"`, lokiEntries[1].Line) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) - require.Equal(t, `level=info msg="schema detected" op="schema_detection" instance="mysql-db" schema="some_schema"`, lokiEntries[3].Line) - require.Equal(t, `level=info msg="table detected" op="table_detection" instance="mysql-db" schema="some_schema" table="some_table"`, lokiEntries[4].Line) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[5].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) + require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) + require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[3].Labels) + require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[3].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[4].Labels) + require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[4].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[5].Labels) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[5].Line) }) t.Run("detect view schema", func(t *testing.T) { t.Parallel() @@ -454,12 +457,12 @@ func TestSchemaTable(t *testing.T) { require.NoError(t, err) lokiEntries := lokiClient.Received() - for _, entry := range lokiEntries { - require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels) - } - require.Equal(t, `level=info msg="schema detected" op="schema_detection" instance="mysql-db" schema="some_schema"`, lokiEntries[0].Line) - require.Equal(t, `level=info msg="table detected" op="table_detection" instance="mysql-db" schema="some_schema" table="some_table"`, lokiEntries[1].Line) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE VIEW some_view (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) + require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) + require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE VIEW some_view (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) }) t.Run("schemas result set iteration error", func(t *testing.T) { t.Parallel() @@ -508,10 +511,8 @@ func TestSchemaTable(t *testing.T) { require.NoError(t, err) lokiEntries := lokiClient.Received() - for _, entry := range lokiEntries { - require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels) - } - require.Equal(t, `level=info msg="schema detected" op="schema_detection" instance="mysql-db" schema="some_schema"`, lokiEntries[0].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) + require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) }) t.Run("tables result set iteration error", func(t *testing.T) { t.Parallel() @@ -577,11 +578,10 @@ func TestSchemaTable(t *testing.T) { require.NoError(t, err) lokiEntries := lokiClient.Received() - for _, entry := range lokiEntries { - require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels) - } - require.Equal(t, `level=info msg="schema detected" op="schema_detection" instance="mysql-db" schema="some_schema"`, lokiEntries[0].Line) - require.Equal(t, `level=info msg="table detected" op="table_detection" instance="mysql-db" schema="some_schema" table="some_table"`, lokiEntries[1].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) + require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) + require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) }) t.Run("connection error recovery", func(t *testing.T) { t.Parallel() @@ -669,11 +669,11 @@ func TestSchemaTable(t *testing.T) { }, 5*time.Second, 100*time.Millisecond) lokiEntries := lokiClient.Received() - for _, entry := range lokiEntries { - require.Equal(t, model.LabelSet{"job": database_observability.JobName}, entry.Labels) - } - require.Equal(t, `level=info msg="schema detected" op="schema_detection" instance="mysql-db" schema="some_schema"`, lokiEntries[0].Line) - require.Equal(t, `level=info msg="table detected" op="table_detection" instance="mysql-db" schema="some_schema" table="some_table"`, lokiEntries[1].Line) - require.Equal(t, fmt.Sprintf(`level=info msg="create table" op="create_statement" instance="mysql-db" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_SCHEMA_DETECTION, "instance": "mysql-db"}, lokiEntries[0].Labels) + require.Equal(t, `level=info msg="schema detected" schema="some_schema"`, lokiEntries[0].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_TABLE_DETECTION, "instance": "mysql-db"}, lokiEntries[1].Labels) + require.Equal(t, `level=info msg="table detected" schema="some_schema" table="some_table"`, lokiEntries[1].Line) + require.Equal(t, model.LabelSet{"job": database_observability.JobName, "op": OP_CREATE_STATEMENT, "instance": "mysql-db"}, lokiEntries[2].Labels) + require.Equal(t, fmt.Sprintf(`level=info msg="create table" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, base64.StdEncoding.EncodeToString([]byte("CREATE TABLE some_table (id INT)")), base64.StdEncoding.EncodeToString([]byte(`{"columns":[{"name":"id","type":"int","not_null":true,"auto_increment":true,"primary_key":true,"default_value":"null"}]}`))), lokiEntries[2].Line) }) }