Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: 优化 bksql 查询逻辑,增加 thedate 条件过滤 --story=121169369 #664

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pkg/unify-query/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ lint:

.PHONY: addlicense
addlicense:
find ./ -type f \( -iname \*.go -o -iname \*.py -iname \*.sh \)|xargs addlicense -v -f ../../scripts/license.txt -ignore vendor/*
find ./ -type f \( -iname \*.go -o -iname \*.py -iname \*.sh \) | xargs addlicense -v -f ../../scripts/license.txt -ignore vendor/*

.PHONY: imports
imports: addlicense
goimports-reviser -rm-unused -set-alias -format ./...
goimports-reviser -rm-unused -set-alias -format -project-name "github.com/TencentBlueKing/bkmonitor-datalink/pkg" ./...
6 changes: 6 additions & 0 deletions pkg/unify-query/influxdb/router_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,12 @@ func MockSpaceRouter(ctx context.Context) {
"false": false
},
"targeting": [
{
"query": "spaceUID in [\"bkdata\"]",
"percentage": {
"false": 100
}
}
],
"defaultRule": {
"variation": "true"
Expand Down
39 changes: 39 additions & 0 deletions pkg/unify-query/internal/function/function.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
package function

import (
"time"

"github.com/prometheus/prometheus/model/labels"
)

Expand All @@ -24,3 +26,40 @@ func MatcherToMetricName(matchers ...*labels.Matcher) string {

return ""
}

func RangeDateWithUnit(unit string, start, end time.Time, step int) (dates []string) {
var (
addYear int
addMonth int
addDay int
toDate func(t time.Time) time.Time
format string
)

switch unit {
case "year":
addYear = step
format = "2006"
toDate = func(t time.Time) time.Time {
return time.Date(t.Year(), 1, 1, 0, 0, 0, 0, t.Location())
}
case "month":
addMonth = step
format = "200601"
toDate = func(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), 1, 0, 0, 0, 0, t.Location())
}
default:
addDay = step
format = "20060102"
toDate = func(t time.Time) time.Time {
return time.Date(t.Year(), t.Month(), t.Day(), 0, 0, 0, 0, t.Location())
}
}

for d := toDate(start); !d.After(toDate(end)); d = d.AddDate(addYear, addMonth, addDay) {
dates = append(dates, d.Format(format))
}

return dates
}
3 changes: 2 additions & 1 deletion pkg/unify-query/metadata/featureFlag.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ func GetMustVmQueryFeatureFlag(ctx context.Context, tableID string) bool {

span.Set("ff-user-custom", ffUser.GetCustom())

status := featureFlag.BoolVariation(ctx, ffUser, "must-vm-query", false)
// 如果匹配不到,则默认查询 vm
status := featureFlag.BoolVariation(ctx, ffUser, "must-vm-query", true)

// 根据查询时间范围判断是否满足当前时间配置
vmDataTime := featureFlag.IntVariation(ctx, ffUser, "range-vm-query", 0)
Expand Down
45 changes: 45 additions & 0 deletions pkg/unify-query/metadata/featureFlag_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,51 @@ import (
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/log"
)

func TestGetBkDataTableIDCheck(t *testing.T) {
ctx := InitHashID(context.Background())
InitMetadata()

featureFlag.MockFeatureFlag(ctx, `{
"bk-data-table-id-auth": {
"variations": {
"Default": true,
"true": true,
"false": false
},
"targeting": [
{
"query": "spaceUid in [\"bkdata\", \"bkdata_1\"]",
"percentage": {
"false": 100,
"true": 0
}
}
],
"defaultRule": {
"variation": "Default"
}
}
}`)

var actual bool

SetUser(ctx, "", "bkdata_1", "")
actual = GetBkDataTableIDCheck(ctx)
assert.Equal(t, false, actual)

SetUser(ctx, "", "bkmonitor", "")
actual = GetBkDataTableIDCheck(ctx)
assert.Equal(t, true, actual)

SetUser(ctx, "", "bkdata_1_1", "")
actual = GetBkDataTableIDCheck(ctx)
assert.Equal(t, true, actual)

SetUser(ctx, "", "bkdata", "")
actual = GetBkDataTableIDCheck(ctx)
assert.Equal(t, false, actual)
}

func TestGetMustVmQueryFeatureFlag(t *testing.T) {
ctx := context.Background()

Expand Down
27 changes: 27 additions & 0 deletions pkg/unify-query/tsdb/bksql/format.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/prometheus/prometheus/prompb"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/internal/function"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/metadata"
)

Expand Down Expand Up @@ -131,6 +132,25 @@ func (f *QueryFactory) ParserQuery() (err error) {
return
}

func (f *QueryFactory) getTheDateFilters() (theDateFilter string) {
// bkbase 使用 时区东八区 转换为 thedate
loc, _ := time.LoadLocation("Asia/ShangHai")
start := f.start.In(loc)
end := f.end.In(loc)

dates := function.RangeDateWithUnit("day", start, end, 1)

if len(dates) == 0 {
return ""
}

if len(dates) == 1 {
return fmt.Sprintf("`%s` = '%s'", theDate, dates[0])
}

return fmt.Sprintf("`%s` >= '%s' AND `%s` <= '%s'", theDate, dates[0], theDate, dates[len(dates)-1])
}

func (f *QueryFactory) SQL() (sql string, err error) {
f.sql.Reset()
err = f.ParserQuery()
Expand All @@ -151,6 +171,13 @@ func (f *QueryFactory) SQL() (sql string, err error) {
f.write(db)
f.write("WHERE")
f.write(fmt.Sprintf("`%s` >= %d AND `%s` < %d", f.timeField, f.start.UnixMilli(), f.timeField, f.end.UnixMilli()))

theDateFilter := f.getTheDateFilters()
if theDateFilter != "" {
f.write("AND")
f.write(theDateFilter)
}

if f.query.BkSqlCondition != "" {
f.write("AND")
f.write("(" + f.query.BkSqlCondition + ")")
Expand Down
46 changes: 41 additions & 5 deletions pkg/unify-query/tsdb/bksql/format_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

"github.com/stretchr/testify/assert"

"github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/log"
"github.com/TencentBlueKing/bkmonitor-datalink/pkg/unify-query/metadata"
)

Expand All @@ -26,6 +27,9 @@ func TestNewSqlFactory(t *testing.T) {
for name, c := range map[string]struct {
query *metadata.Query
expected string

start time.Time
end time.Time
}{
"sum-count_over_time-with-promql-1": {
query: &metadata.Query{
Expand All @@ -46,7 +50,7 @@ func TestNewSqlFactory(t *testing.T) {
Size: 0,
Orders: metadata.Orders{"ip": true},
},
expected: "SELECT `ip`, COUNT(`gseIndex`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000))) AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1717144141000 AND `dtEventTimeStamp` < 1717147741000 AND (gseIndex > 0) GROUP BY `ip`, (`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000)) ORDER BY `_timestamp_` ASC, `ip` ASC",
expected: "SELECT `ip`, COUNT(`gseIndex`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000))) AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1717144141000 AND `dtEventTimeStamp` < 1717147741000 AND `thedate` = '20240531' AND (gseIndex > 0) GROUP BY `ip`, (`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000)) ORDER BY `_timestamp_` ASC, `ip` ASC",
},
"sum-with-promql-1": {
query: &metadata.Query{
Expand All @@ -66,9 +70,33 @@ func TestNewSqlFactory(t *testing.T) {
Size: 10,
Orders: nil,
},
expected: "SELECT `ip`, SUM(`gseIndex`) AS `_value_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1717144141000 AND `dtEventTimeStamp` < 1717147741000 AND (gseIndex > 0) GROUP BY `ip` LIMIT 10",
expected: "SELECT `ip`, SUM(`gseIndex`) AS `_value_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1717144141000 AND `dtEventTimeStamp` < 1717147741000 AND `thedate` = '20240531' AND (gseIndex > 0) GROUP BY `ip` LIMIT 10",
},
"count-with-count-promql-1": {
query: &metadata.Query{
DB: "100133_ieod_logsearch4_errorlog_p",
Measurement: "doris",
Field: "gseIndex",
Aggregates: metadata.Aggregates{
{
Name: "count",
Dimensions: []string{
"ip",
},
Window: time.Minute,
},
},
BkSqlCondition: "gseIndex > 0",
},
expected: "SELECT `ip`, COUNT(`gseIndex`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000))) AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1717144141000 AND `dtEventTimeStamp` < 1717147741000 AND `thedate` = '20240531' AND (gseIndex > 0) GROUP BY `ip`, (`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000)) ORDER BY `_timestamp_` ASC",
},
"count-without-promql-1": {
"count-with-count-promql-2": {
// 2024-12-07 21:36:40 UTC
// 2024-12-08 05:36:40 Asia/ShangHai
start: time.Unix(1733607400, 0),
// 2024-12-11 17:49:35 UTC
// 2024-12-12 01:49:35 Asia/ShangHai
end: time.Unix(1733939375, 0),
query: &metadata.Query{
DB: "100133_ieod_logsearch4_errorlog_p",
Measurement: "doris",
Expand All @@ -84,12 +112,20 @@ func TestNewSqlFactory(t *testing.T) {
},
BkSqlCondition: "gseIndex > 0",
},
expected: "SELECT `ip`, COUNT(`gseIndex`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000))) AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1717144141000 AND `dtEventTimeStamp` < 1717147741000 AND (gseIndex > 0) GROUP BY `ip`, (`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000)) ORDER BY `_timestamp_` ASC",
expected: "SELECT `ip`, COUNT(`gseIndex`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000))) AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1733607400000 AND `dtEventTimeStamp` < 1733939375000 AND `thedate` >= '20241208' AND `thedate` <= '20241212' AND (gseIndex > 0) GROUP BY `ip`, (`dtEventTimeStamp` - (`dtEventTimeStamp` % 60000)) ORDER BY `_timestamp_` ASC",
},
} {
t.Run(name, func(t *testing.T) {
ctx := metadata.InitHashID(context.Background())
fact := NewQueryFactory(ctx, c.query).WithRangeTime(start, end)
if c.start.Unix() == 0 {
c.start = start
}
if c.end.Unix() == 0 {
c.end = end
}

log.Infof(ctx, "start: %s, end: %s", c.start, c.end)
fact := NewQueryFactory(ctx, c.query).WithRangeTime(c.start, c.end)
sql, err := fact.SQL()
assert.Nil(t, err)
assert.Equal(t, c.expected, sql)
Expand Down
52 changes: 46 additions & 6 deletions pkg/unify-query/tsdb/bksql/instance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,8 @@ func TestInstance_bkSql(t *testing.T) {
end := time.UnixMilli(1718193555000)

testCases := []struct {
start time.Time
end time.Time
query *metadata.Query

expected string
Expand All @@ -143,7 +145,7 @@ func TestInstance_bkSql(t *testing.T) {
},
},
},
expected: "SELECT `namespace`, COUNT(`login_rate`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 15000))) AS `_timestamp_` FROM `132_lol_new_login_queue_login_1min` WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 AND (namespace REGEXP '^(bgp2\\-new|gz100)$') GROUP BY `namespace`, (`dtEventTimeStamp` - (`dtEventTimeStamp` % 15000)) ORDER BY `_timestamp_` ASC",
expected: "SELECT `namespace`, COUNT(`login_rate`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 15000))) AS `_timestamp_` FROM `132_lol_new_login_queue_login_1min` WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 AND `thedate` = '20240612' AND (namespace REGEXP '^(bgp2\\-new|gz100)$') GROUP BY `namespace`, (`dtEventTimeStamp` - (`dtEventTimeStamp` % 15000)) ORDER BY `_timestamp_` ASC",
},
{
query: &metadata.Query{
Expand All @@ -156,7 +158,7 @@ func TestInstance_bkSql(t *testing.T) {
},
},

expected: "SELECT SUM(`value`) AS `_value_` FROM `132_hander_opmon_avg` WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000",
expected: "SELECT SUM(`value`) AS `_value_` FROM `132_hander_opmon_avg` WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 AND `thedate` = '20240612'",
},
{
query: &metadata.Query{
Expand All @@ -165,7 +167,7 @@ func TestInstance_bkSql(t *testing.T) {
Field: "value",
Size: 5,
},
expected: "SELECT *, `value` AS `_value_`, `dtEventTimeStamp` AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 LIMIT 5",
expected: "SELECT *, `value` AS `_value_`, `dtEventTimeStamp` AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 AND `thedate` = '20240612' LIMIT 5",
},
{
query: &metadata.Query{
Expand All @@ -176,7 +178,7 @@ func TestInstance_bkSql(t *testing.T) {
"_time": false,
},
},
expected: "SELECT *, `value` AS `_value_`, `dtEventTimeStamp` AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 ORDER BY `_timestamp_` DESC",
expected: "SELECT *, `value` AS `_value_`, `dtEventTimeStamp` AS `_timestamp_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 AND `thedate` = '20240612' ORDER BY `_timestamp_` DESC",
},
{
query: &metadata.Query{
Expand All @@ -194,14 +196,52 @@ func TestInstance_bkSql(t *testing.T) {
Size: 5,
},

expected: "SELECT `ip`, COUNT(`gseIndex`) AS `_value_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 GROUP BY `ip` LIMIT 5",
expected: "SELECT `ip`, COUNT(`gseIndex`) AS `_value_` FROM `100133_ieod_logsearch4_errorlog_p`.doris WHERE `dtEventTimeStamp` >= 1718189940000 AND `dtEventTimeStamp` < 1718193555000 AND `thedate` = '20240612' GROUP BY `ip` LIMIT 5",
},
{
start: time.Unix(1733756400, 0),
end: time.Unix(1733846399, 0),
query: &metadata.Query{
DB: "101068_MatchFullLinkTimeConsumptionFlow_CostTime",
Field: "matchstep_start_to_fail_0_100",
Aggregates: metadata.Aggregates{
{
Name: "count",
},
},
},

expected: "SELECT COUNT(`matchstep_start_to_fail_0_100`) AS `_value_` FROM `101068_MatchFullLinkTimeConsumptionFlow_CostTime` WHERE `dtEventTimeStamp` >= 1733756400000 AND `dtEventTimeStamp` < 1733846399000 AND `thedate` >= '20241209' AND `thedate` <= '20241210'",
},
{
start: time.Unix(1733756400, 0),
end: time.Unix(1733846399, 0),
query: &metadata.Query{
DB: "101068_MatchFullLinkTimeConsumptionFlow_CostTime",
Field: "matchstep_start_to_fail_0_100",
Aggregates: metadata.Aggregates{
{
Name: "count",
Window: time.Hour,
},
},
},

expected: "SELECT COUNT(`matchstep_start_to_fail_0_100`) AS `_value_`, MAX((`dtEventTimeStamp` - (`dtEventTimeStamp` % 3600000))) AS `_timestamp_` FROM `101068_MatchFullLinkTimeConsumptionFlow_CostTime` WHERE `dtEventTimeStamp` >= 1733756400000 AND `dtEventTimeStamp` < 1733846399000 AND `thedate` >= '20241209' AND `thedate` <= '20241210' GROUP BY (`dtEventTimeStamp` - (`dtEventTimeStamp` % 3600000)) ORDER BY `_timestamp_` ASC",
},
}

for i, c := range testCases {
t.Run(fmt.Sprintf("%d", i), func(t *testing.T) {
ctx := metadata.InitHashID(context.Background())
sql, err := NewQueryFactory(ctx, c.query).WithRangeTime(start, end).SQL()
if c.start.Unix() == 0 {
c.start = start
}
if c.end.Unix() == 0 {
c.end = end
}

sql, err := NewQueryFactory(ctx, c.query).WithRangeTime(c.start, c.end).SQL()
assert.Nil(t, err)
if err == nil {
assert.Equal(t, c.expected, sql)
Expand Down
Loading