Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

database_observability: extend parsing of table names in queries #2556

Merged
merged 2 commits into from
Jan 29, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ Main (unreleased)

- (_Experimental_) Fix handling of view table types when detecting schema in `database_observability.mysql` (@matthewnolf)

- (_Experimental_) fix error handling during result set iteration in `database_observability.mysql` (@cristiangreco)
- (_Experimental_) Fix error handling during result set iteration in `database_observability.mysql` (@cristiangreco)

- (_Experimental_) Better support for table name parsing in `database_observability.mysql` (@cristiangreco)

- Add json format support for log export via faro receiver (@ravishankar15)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func (c *QuerySample) fetchQuerySamples(ctx context.Context) error {
},
}

tables := c.tablesFromQuery(stmt)
tables := c.tablesFromQuery(digest, stmt)
for _, table := range tables {
c.entryHandler.Chan() <- loki.Entry{
Labels: model.LabelSet{"job": database_observability.JobName},
Expand Down Expand Up @@ -199,29 +199,49 @@ func (c QuerySample) stmtType(stmt sqlparser.Statement) string {
return "update"
case *sqlparser.Delete:
return "delete"
case *sqlparser.Union:
return "select" // label union as a select
default:
return ""
}
}

func (c QuerySample) tablesFromQuery(stmt sqlparser.Statement) []string {
func (c QuerySample) tablesFromQuery(digest string, stmt sqlparser.Statement) []string {
var parsedTables []string

switch stmt := stmt.(type) {
case *sqlparser.Select:
parsedTables = c.parseTableExprs(stmt.From)
case *sqlparser.Insert:
parsedTables = []string{c.parseTableName(stmt.Table)}
parsedTables = c.parseTableExprs(digest, stmt.From)
case *sqlparser.Update:
parsedTables = c.parseTableExprs(stmt.TableExprs)
parsedTables = c.parseTableExprs(digest, stmt.TableExprs)
case *sqlparser.Delete:
parsedTables = c.parseTableExprs(stmt.TableExprs)
parsedTables = c.parseTableExprs(digest, stmt.TableExprs)
case *sqlparser.Insert:
parsedTables = []string{c.parseTableName(stmt.Table)}
switch insRowsStmt := stmt.Rows.(type) {
case *sqlparser.Select:
parsedTables = append(parsedTables, c.tablesFromQuery(digest, insRowsStmt)...)
case *sqlparser.Union:
for _, side := range []sqlparser.SelectStatement{insRowsStmt.Left, insRowsStmt.Right} {
parsedTables = append(parsedTables, c.tablesFromQuery(digest, side)...)
}
case *sqlparser.ParenSelect:
parsedTables = append(parsedTables, c.tablesFromQuery(digest, insRowsStmt.Select)...)
default:
level.Error(c.logger).Log("msg", "unknown insert type", "digest", digest)
}
case *sqlparser.Union:
for _, side := range []sqlparser.SelectStatement{stmt.Left, stmt.Right} {
parsedTables = append(parsedTables, c.tablesFromQuery(digest, side)...)
}
default:
level.Error(c.logger).Log("msg", "unknown statement type", "digest", digest)
}

return parsedTables
}

func (c QuerySample) parseTableExprs(tables sqlparser.TableExprs) []string {
func (c QuerySample) parseTableExprs(digest string, tables sqlparser.TableExprs) []string {
parsedTables := []string{}
for i := 0; i < len(tables); i++ {
t := tables[i]
Expand All @@ -231,16 +251,28 @@ func (c QuerySample) parseTableExprs(tables sqlparser.TableExprs) []string {
case sqlparser.TableName:
parsedTables = append(parsedTables, c.parseTableName(expr))
case *sqlparser.Subquery:
subquery := expr.Select.(*sqlparser.Select)
parsedTables = append(parsedTables, c.parseTableExprs(subquery.From)...)
switch subqueryExpr := expr.Select.(type) {
case *sqlparser.Select:
parsedTables = append(parsedTables, c.parseTableExprs(digest, subqueryExpr.From)...)
case *sqlparser.Union:
for _, side := range []sqlparser.SelectStatement{subqueryExpr.Left, subqueryExpr.Right} {
parsedTables = append(parsedTables, c.tablesFromQuery(digest, side)...)
}
case *sqlparser.ParenSelect:
parsedTables = append(parsedTables, c.tablesFromQuery(digest, subqueryExpr.Select)...)
default:
level.Error(c.logger).Log("msg", "unknown subquery type", "digest", digest)
}
default:
level.Error(c.logger).Log("msg", "unknown nested table expression", "table", tableExpr)
level.Error(c.logger).Log("msg", "unknown nested table expression", "digest", digest, "table", tableExpr)
}
case *sqlparser.JoinTableExpr:
// continue parsing both sides of join
tables = append(tables, tableExpr.LeftExpr, tableExpr.RightExpr)
case *sqlparser.ParenTableExpr:
tables = append(tables, tableExpr.Exprs...)
default:
level.Error(c.logger).Log("msg", "unknown table type", "table", t)
level.Error(c.logger).Log("msg", "unknown table type", "digest", digest, "table", t)
}
}
return parsedTables
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -247,6 +247,86 @@ func TestQuerySample(t *testing.T) {
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="other_schema" digest="abc123" table="some_table"`,
},
},
{
name: "union query",
rows: [][]driver.Value{{
"abc123",
"some_schema",
"SELECT id, name FROM employees_ny UNION SELECT id, name FROM employees_ca UNION SELECT id, name FROM employees_tx",
"2024-01-01T00:00:00.000Z",
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select id, name from employees_ny union select id, name from employees_ca union select id, name from employees_tx"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_ny"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_ca"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_tx"`,
},
},
{
name: "from subquery with union",
rows: [][]driver.Value{{
"abc123",
"some_schema",
"SELECT COUNT(DISTINCT t.role_id) AS roles, COUNT(DISTINCT r.id) AS fixed_roles FROM (SELECT role_id FROM user_role UNION ALL SELECT role_id FROM team_role) AS t LEFT JOIN (SELECT id FROM role WHERE name LIKE 'prefix%') AS r ON t.role_id = r.id",
"2024-01-01T00:00:00.000Z",
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select COUNT(distinct t.role_id) as roles, COUNT(distinct r.id) as fixed_roles from (select role_id from user_role union all select role_id from team_role) as t left join (select id from role where name like :redacted1) as r on t.role_id = r.id"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="user_role"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="team_role"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="role"`,
},
},
{
name: "subquery and union",
rows: [][]driver.Value{{
"abc123",
"some_schema",
"SELECT * FROM (SELECT id, name FROM employees_us_east UNION SELECT id, name FROM employees_us_west) as employees_us UNION SELECT id, name FROM employees_emea",
"2024-01-01T00:00:00.000Z",
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from (select id, name from employees_us_east union select id, name from employees_us_west) as employees_us union select id, name from employees_emea"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_us_east"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_us_west"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_emea"`,
},
},
{
name: "insert with subquery and union",
rows: [][]driver.Value{{
"abc123",
"some_schema",
"INSERT INTO customers (id, name) SELECT id, name FROM customers_us UNION SELECT id, name FROM customers_eu",
"2024-01-01T00:00:00.000Z",
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="insert" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="insert into customers(id, name) select id, name from customers_us union select id, name from customers_eu"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="customers"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="customers_us"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="customers_eu"`,
},
},
{
name: "join with subquery and union",
rows: [][]driver.Value{{
"abc123",
"some_schema",
"SELECT * FROM departments dep JOIN (SELECT id, name FROM employees_us UNION SELECT id, name FROM employees_eu) employees ON dep.id = employees.id",
"2024-01-01T00:00:00.000Z",
"1000",
}},
logs: []string{
`level=info msg="query samples fetched" op="query_sample" instance="mysql-db" schema="some_schema" digest="abc123" query_type="select" query_sample_seen="2024-01-01T00:00:00.000Z" query_sample_timer_wait="1000" query_sample_redacted="select * from departments as dep join (select id, name from employees_us union select id, name from employees_eu) as employees on dep.id = employees.id"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="departments"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_us"`,
`level=info msg="table name parsed" op="query_parsed_table_name" instance="mysql-db" schema="some_schema" digest="abc123" table="employees_eu"`,
},
},
fridgepoet marked this conversation as resolved.
Show resolved Hide resolved
}

for _, tc := range testcases {
Expand Down
Loading