Skip to content

Commit

Permalink
feat: querier delete live_view
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochaoren1 committed Nov 28, 2024
1 parent edd8f13 commit 036ae82
Show file tree
Hide file tree
Showing 6 changed files with 44 additions and 28 deletions.
15 changes: 8 additions & 7 deletions server/controller/db/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,13 +27,14 @@ var log = logging.MustGetLogger("db.clickhouse")
var Db *sqlx.DB

type ClickHouseConfig struct {
Database string `default:"flow_tag" yaml:"database"`
Host string `default:"clickhouse" yaml:"host"`
Port uint32 `default:"9000" yaml:"port"`
UserName string `default:"default" yaml:"user-name"`
UserPassword string `default:"" yaml:"user-password"`
TimeOut uint32 `default:"30" yaml:"timeout"`
EndpointTcpPortName string `default:"tcp-port" yaml:"endpoint-tcp-port-name"`
Database string `default:"flow_tag" yaml:"database"`
Host string `default:"clickhouse" yaml:"host"`
Port uint32 `default:"9000" yaml:"port"`
UserName string `default:"default" yaml:"user-name"`
UserPassword string `default:"" yaml:"user-password"`
TimeOut uint32 `default:"30" yaml:"timeout"`
EndpointTcpPortName string `default:"tcp-port" yaml:"endpoint-tcp-port-name"`
UseLiveView bool `default:"true" yaml:"use-live-view"`
}

func Connect(cfg ClickHouseConfig) (*sqlx.DB, error) {
Expand Down
4 changes: 3 additions & 1 deletion server/controller/tagrecorder/dictionary.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,9 @@ func (c *Dictionary) update(clickHouseCfg *clickhouse.ClickHouseConfig) {
if updateDictError != nil {
return
}

if !c.cfg.ClickHouseCfg.UseLiveView {
continue
}
// Get the current view in the database
views := []string{}
if err := ckDb.Select(&views, fmt.Sprintf("SHOW TABLES FROM %s LIKE '%%view'", ckDatabaseName)); err != nil {
Expand Down
20 changes: 11 additions & 9 deletions server/querier/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,17 @@ type TraceIdWithIndex struct {
}

type Clickhouse struct {
User string `default:"default" yaml:"user-name"`
Password string `default:"" yaml:"user-password"`
Host string `default:"clickhouse" yaml:"host"`
Port int `default:"9000" yaml:"port"`
Timeout int `default:"60" yaml:"timeout"`
ConnectTimeout int `default:"2" yaml:"connect-timeout"`
MaxConnection int `default:"20" yaml:"max-connection"`
UseQueryCache bool `default:"true" yaml:"use-query-cache"`
QueryCacheTTL string `default:"600" yaml:"query-cache-ttl"`
User string `default:"default" yaml:"user-name"`
Password string `default:"" yaml:"user-password"`
Host string `default:"clickhouse" yaml:"host"`
Port int `default:"9000" yaml:"port"`
Timeout int `default:"60" yaml:"timeout"`
ConnectTimeout int `default:"2" yaml:"connect-timeout"`
MaxConnection int `default:"20" yaml:"max-connection"`
UseQueryCache bool `default:"true" yaml:"use-query-cache"`
QueryCacheTTL string `default:"600" yaml:"query-cache-ttl"`
UseLiveView bool `default:"true" yaml:"use-live-view"`
UseNewCacheParameter bool `default:"false" yaml:"use-new-cache-parameter"`
}

type AutoCustomTags struct {
Expand Down
7 changes: 6 additions & 1 deletion server/querier/engine/clickhouse/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,12 @@ func (c *Client) DoQuery(params *QueryParams) (result *common.Result, err error)
sqlstr, callbacks, query_uuid, columnSchemaMap, simpleSql := params.Sql, params.Callbacks, params.QueryUUID, params.ColumnSchemaMap, params.SimpleSql
queryCacheStr := ""
if params.UseQueryCache {
queryCacheStr = " SETTINGS use_query_cache = true, query_cache_store_results_of_queries_with_nondeterministic_functions = 1"
queryCacheStr = " SETTINGS use_query_cache = true, query_cache_nondeterministic_function_handling = 'save'"
if config.Cfg.Clickhouse.UseNewCacheParameter {
queryCacheStr += ", query_cache_nondeterministic_function_handling = 'save'"
} else {
queryCacheStr += ", query_cache_store_results_of_queries_with_nondeterministic_functions = 1"
}
if params.QueryCacheTTL != "" {
queryCacheStr += fmt.Sprintf(", query_cache_ttl = %s", params.QueryCacheTTL)
}
Expand Down
22 changes: 14 additions & 8 deletions server/querier/engine/clickhouse/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -1153,6 +1153,12 @@ func GetPrometheusFilter(promTag, table, op, value string, e *CHEngine) (string,
}
// Determine whether the tag is app_label or target_label
isAppLabel := false
appViewName := "app_label_live_view"
targetViewName := "target_label_live_view"
if !config.Cfg.Clickhouse.UseLiveView {
appViewName = "app_label_map"
targetViewName = "target_label_map"
}
if appLabels, ok := trans_prometheus.ORGPrometheus[e.ORGID].MetricAppLabelLayout[table]; ok {
for _, appLabel := range appLabels {
if appLabel.AppLabelName == nameNoPreffix {
Expand All @@ -1162,19 +1168,19 @@ func GetPrometheusFilter(promTag, table, op, value string, e *CHEngine) (string,
return filter, nil
}
if strings.Contains(op, "match") {
filter = fmt.Sprintf("toUInt64(app_label_value_id_%d) GLOBAL IN (SELECT label_value_id FROM flow_tag.app_label_live_view WHERE label_name_id=%d and %s(label_value,%s))", appLabel.AppLabelColumnIndex, labelNameID, op, value)
filter = fmt.Sprintf("toUInt64(app_label_value_id_%d) GLOBAL IN (SELECT label_value_id FROM flow_tag.app_label_map WHERE label_name_id=%d and %s(label_value,%s))", appLabel.AppLabelColumnIndex, labelNameID, op, value)
} else {
filter = fmt.Sprintf("toUInt64(app_label_value_id_%d) GLOBAL IN (SELECT label_value_id FROM flow_tag.app_label_live_view WHERE label_name_id=%d and label_value %s %s)", appLabel.AppLabelColumnIndex, labelNameID, op, value)
filter = fmt.Sprintf("toUInt64(app_label_value_id_%d) GLOBAL IN (SELECT label_value_id FROM flow_tag.app_label_map WHERE label_name_id=%d and label_value %s %s)", appLabel.AppLabelColumnIndex, labelNameID, op, value)
}
break
}
}
}
if !isAppLabel {
if strings.Contains(op, "match") {
filter = fmt.Sprintf("toUInt64(target_id) GLOBAL IN (SELECT target_id FROM flow_tag.target_label_live_view WHERE metric_id=%d and label_name_id=%d and %s(label_value,%s))", metricID, labelNameID, op, value)
filter = fmt.Sprintf("toUInt64(target_id) GLOBAL IN (SELECT target_id FROM flow_tag.target_label_map WHERE metric_id=%d and label_name_id=%d and %s(label_value,%s))", metricID, labelNameID, op, value)
} else {
filter = fmt.Sprintf("toUInt64(target_id) GLOBAL IN (SELECT target_id FROM flow_tag.target_label_live_view WHERE metric_id=%d and label_name_id=%d and label_value %s %s)", metricID, labelNameID, op, value)
filter = fmt.Sprintf("toUInt64(target_id) GLOBAL IN (SELECT target_id FROM flow_tag.target_label_map WHERE metric_id=%d and label_name_id=%d and label_value %s %s)", metricID, labelNameID, op, value)
}
}
return filter, nil
Expand Down Expand Up @@ -1226,9 +1232,9 @@ func GetRemoteReadFilter(promTag, table, op, value, originFilter string, e *CHEn

// lru timeout
if strings.Contains(op, "match") {
sql = fmt.Sprintf("SELECT label_value_id FROM flow_tag.app_label_live_view WHERE label_name_id=%d and %s(label_value,%s) GROUP BY label_value_id", labelNameID, op, value)
sql = fmt.Sprintf("SELECT label_value_id FROM flow_tag.app_label_map WHERE label_name_id=%d and %s(label_value,%s) GROUP BY label_value_id", labelNameID, op, value)
} else {
sql = fmt.Sprintf("SELECT label_value_id FROM flow_tag.app_label_live_view WHERE label_name_id=%d and label_value %s %s GROUP BY label_value_id", labelNameID, op, value)
sql = fmt.Sprintf("SELECT label_value_id FROM flow_tag.app_label_map WHERE label_name_id=%d and label_value %s %s GROUP BY label_value_id", labelNameID, op, value)
}
chClient := client.Client{
Host: config.Cfg.Clickhouse.Host,
Expand Down Expand Up @@ -1263,9 +1269,9 @@ func GetRemoteReadFilter(promTag, table, op, value, originFilter string, e *CHEn
if !isAppLabel {
transFilter := ""
if strings.Contains(op, "match") {
transFilter = fmt.Sprintf("SELECT target_id FROM flow_tag.target_label_live_view WHERE metric_id=%d and label_name_id=%d and %s(label_value,%s) GROUP BY target_id", metricID, labelNameID, op, value)
transFilter = fmt.Sprintf("SELECT target_id FROM flow_tag.target_label_map WHERE metric_id=%d and label_name_id=%d and %s(label_value,%s) GROUP BY target_id", metricID, labelNameID, op, value)
} else {
transFilter = fmt.Sprintf("SELECT target_id FROM flow_tag.target_label_live_view WHERE metric_id=%d and label_name_id=%d and label_value %s %s GROUP BY target_id", metricID, labelNameID, op, value)
transFilter = fmt.Sprintf("SELECT target_id FROM flow_tag.target_label_map WHERE metric_id=%d and label_name_id=%d and label_value %s %s GROUP BY target_id", metricID, labelNameID, op, value)
}
targetLabelFilter := TargetLabelFilter{OriginFilter: originFilter, TransFilter: transFilter}
e.TargetLabelFilters = append(e.TargetLabelFilters, targetLabelFilter)
Expand Down
4 changes: 2 additions & 2 deletions server/querier/engine/clickhouse/group.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,13 +165,13 @@ func GetPrometheusNotNullFilter(name string, e *CHEngine) (view.Node, bool) {
for _, appLabel := range appLabels {
if appLabel.AppLabelName == nameNoPreffix {
isAppLabel = true
filter = fmt.Sprintf("toUInt64(app_label_value_id_%d) GLOBAL IN (SELECT label_value_id FROM flow_tag.app_label_live_view WHERE label_name_id=%d)", appLabel.AppLabelColumnIndex, labelNameID)
filter = fmt.Sprintf("toUInt64(app_label_value_id_%d) GLOBAL IN (SELECT label_value_id FROM flow_tag.app_label_map WHERE label_name_id=%d)", appLabel.AppLabelColumnIndex, labelNameID)
break
}
}
}
if !isAppLabel {
filter = fmt.Sprintf("toUInt64(target_id) GLOBAL IN (SELECT target_id FROM flow_tag.target_label_live_view WHERE metric_id=%d and label_name_id=%d)", metricID, labelNameID)
filter = fmt.Sprintf("toUInt64(target_id) GLOBAL IN (SELECT target_id FROM flow_tag.target_label_map WHERE metric_id=%d and label_name_id=%d)", metricID, labelNameID)
}
return &view.Expr{Value: "(" + filter + ")"}, true
}
Expand Down

0 comments on commit 036ae82

Please sign in to comment.