From 2659c0feec2149114dd78dd3fb908b659eb9a61c Mon Sep 17 00:00:00 2001 From: sunface Date: Tue, 17 Oct 2023 18:10:54 +0800 Subject: [PATCH] feat: add getServiceOperations api for observability #290 --- .../plugins/builtin/observability/api/api.go | 14 ++--- .../builtin/observability/api/service.go | 51 +++++++++++++------ .../builtin/observability/observability.go | 15 +++++- query/internal/proxy/datasource.go | 2 - query/pkg/models/plugin.go | 47 ++++++++++++++++- .../datasource/observability/QueryEditor.tsx | 12 ++++- .../datasource/observability/query_runner.ts | 12 ++++- 7 files changed, 125 insertions(+), 28 deletions(-) diff --git a/query/internal/plugins/builtin/observability/api/api.go b/query/internal/plugins/builtin/observability/api/api.go index f72373184..24dfffb15 100644 --- a/query/internal/plugins/builtin/observability/api/api.go +++ b/query/internal/plugins/builtin/observability/api/api.go @@ -7,12 +7,14 @@ import ( ) const ( - TestDatasourceAPI = "testDatasource" - GetServiceInfoListAPI = "getServiceInfoList" - GetServiceNamesAPI = "getServiceNames" + TestDatasourceAPI = "testDatasource" + GetServiceInfoListAPI = "getServiceInfoList" + GetServiceNamesAPI = "getServiceNames" + GetServiceOperationsAPI = "getServiceOperations" ) -var APIRoutes = map[string]func(c *gin.Context, ds *models.Datasource, conn ch.Conn) models.PluginResult{ - GetServiceInfoListAPI: GetServiceInfoList, - GetServiceNamesAPI: GetServiceNames, +var APIRoutes = map[string]func(c *gin.Context, ds *models.Datasource, conn ch.Conn, params map[string]interface{}) models.PluginResult{ + GetServiceInfoListAPI: GetServiceInfoList, + GetServiceNamesAPI: GetServiceNames, + GetServiceOperationsAPI: GetServiceOperations, } diff --git a/query/internal/plugins/builtin/observability/api/service.go b/query/internal/plugins/builtin/observability/api/service.go index 7cd34c974..9683c7474 100644 --- a/query/internal/plugins/builtin/observability/api/service.go +++ b/query/internal/plugins/builtin/observability/api/service.go @@ -15,32 +15,53 @@ type ServiceNameRes struct { ServiceName string `ch:"serviceName"` } -func GetServiceNames(c *gin.Context, ds *models.Datasource, conn ch.Conn) models.PluginResult { - start := c.Query("start") - end := c.Query("end") - query := fmt.Sprintf("SELECT serviceName FROM signoz_traces.distributed_signoz_index_v2 WHERE timestamp >= %s AND timestamp <= %s GROUP BY serviceName", start, end) +func GetServiceNames(c *gin.Context, ds *models.Datasource, conn ch.Conn, params map[string]interface{}) models.PluginResult { + query := "SELECT DISTINCT serviceName FROM signoz_traces.distributed_top_level_operations" - var res []ServiceNameRes - err := conn.Select(c.Request.Context(), &res, query) + rows, err := conn.Query(c.Request.Context(), query) if err != nil { return models.GenPluginResult(models.PluginStatusError, err.Error(), nil) } + defer rows.Close() logger.Info("Query service names", "query", query) - columns := []string{"service"} - data := make([][]interface{}, 0) - for _, v := range res { - data = append(data, []interface{}{v.ServiceName}) + res, err := models.ConvertDbRowsToPluginData(rows) + if err != nil { + return models.GenPluginResult(models.PluginStatusError, err.Error(), nil) + } + + return models.GenPluginResult(models.PluginStatusSuccess, "", res) +} + +func GetServiceOperations(c *gin.Context, ds *models.Datasource, conn ch.Conn, params map[string]interface{}) models.PluginResult { + serviceI := params["service"] + fmt.Println("ere33333:", serviceI) + if serviceI == nil { + return models.GenPluginResult(models.PluginStatusError, "service is required", nil) + } + + service := serviceI.(string) + + query := fmt.Sprintf("SELECT DISTINCT name FROM signoz_traces.distributed_top_level_operations WHERE serviceName='%s'", service) + rows, err := conn.Query(c.Request.Context(), query) + if err != nil { + logger.Warn("Error Query service operations", "query", query, "error", err) + return models.GenPluginResult(models.PluginStatusError, err.Error(), nil) + } + defer rows.Close() + + logger.Info("Query service operations", "query", query) + + res, err := models.ConvertDbRowsToPluginData(rows) + if err != nil { + return models.GenPluginResult(models.PluginStatusError, err.Error(), nil) } - return models.GenPluginResult(models.PluginStatusSuccess, "", models.PluginResultData{ - Columns: columns, - Data: data, - }) + return models.GenPluginResult(models.PluginStatusSuccess, "", res) } -func GetServiceInfoList(c *gin.Context, ds *models.Datasource, conn ch.Conn) models.PluginResult { +func GetServiceInfoList(c *gin.Context, ds *models.Datasource, conn ch.Conn, params map[string]interface{}) models.PluginResult { fmt.Println("here33333") // rows, err := conn.Query(c.Request.Context(), query) // if err != nil { diff --git a/query/internal/plugins/builtin/observability/observability.go b/query/internal/plugins/builtin/observability/observability.go index 4f468d112..a67ece1cf 100644 --- a/query/internal/plugins/builtin/observability/observability.go +++ b/query/internal/plugins/builtin/observability/observability.go @@ -1,7 +1,10 @@ package clickhouse import ( + "encoding/json" + "fmt" "sync" + "time" ch "github.com/ClickHouse/clickhouse-go/v2" "github.com/DataObserve/datav/query/internal/plugins/builtin/observability/api" @@ -41,7 +44,17 @@ func (p *ObservabilityPlugin) Query(c *gin.Context, ds *models.Datasource) model } route, ok := api.APIRoutes[query] if ok { - res := route(c, ds, conn) + paramStr := c.Query("params") + params := make(map[string]interface{}) + err := json.Unmarshal([]byte(paramStr), ¶ms) + if err != nil { + return models.GenPluginResult(models.PluginStatusError, fmt.Sprintf("decode params error: %s", err.Error()), nil) + } + + start := time.Now() + res := route(c, ds, conn, params) + + colorlog.RootLogger.Info("Excecute observability query api", "query", query, "time", time.Since(start).String()) return models.PluginResult{ Status: models.PluginStatusSuccess, Error: "", diff --git a/query/internal/proxy/datasource.go b/query/internal/proxy/datasource.go index 2d1021d7d..91d054313 100644 --- a/query/internal/proxy/datasource.go +++ b/query/internal/proxy/datasource.go @@ -14,7 +14,6 @@ package proxy import ( "bytes" - "fmt" "io" "net" "net/http" @@ -48,7 +47,6 @@ func ProxyDatasource(c *gin.Context) { } queryPlugin := models.GetPlugin(ds.Type) - fmt.Println(ds.Type, queryPlugin) if queryPlugin != nil { result := queryPlugin.Query(c, ds) c.JSON(http.StatusOK, result) diff --git a/query/pkg/models/plugin.go b/query/pkg/models/plugin.go index d255a86f6..9dea7a737 100644 --- a/query/pkg/models/plugin.go +++ b/query/pkg/models/plugin.go @@ -1,6 +1,12 @@ package models -import "github.com/gin-gonic/gin" +import ( + "reflect" + "time" + + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" + "github.com/gin-gonic/gin" +) const ( PluginStatusSuccess = "success" @@ -47,3 +53,42 @@ func GenPluginResult(status, err string, data interface{}) PluginResult { Data: data, } } + +func ConvertDbRowsToPluginData(rows driver.Rows) (*PluginResultData, error) { + columns := rows.Columns() + columnTypes := rows.ColumnTypes() + types := make(map[string]string) + data := make([][]interface{}, 0) + for rows.Next() { + v := make([]interface{}, len(columns)) + for i := range v { + t := columnTypes[i].ScanType() + v[i] = reflect.New(t).Interface() + + tp := t.String() + if tp == "time.Time" { + types[columns[i]] = "time" + } + } + + err := rows.Scan(v...) + if err != nil { + return nil, err + } + + for i, v0 := range v { + v1, ok := v0.(*time.Time) + if ok { + v[i] = v1.Unix() + } + } + + data = append(data, v) + } + + return &PluginResultData{ + Columns: columns, + Data: data, + ColumnTypes: types, + }, nil +} diff --git a/ui/src/views/dashboard/plugins/built-in/datasource/observability/QueryEditor.tsx b/ui/src/views/dashboard/plugins/built-in/datasource/observability/QueryEditor.tsx index 1f6d6c018..e9873473b 100644 --- a/ui/src/views/dashboard/plugins/built-in/datasource/observability/QueryEditor.tsx +++ b/ui/src/views/dashboard/plugins/built-in/datasource/observability/QueryEditor.tsx @@ -128,5 +128,15 @@ const apiList = [{ }`, paramsDesc: [["env", "environment name, such as dev, test, prod etc"]], format: DataFormat.Table +}, +{ + name: "getServiceOperations", + desc: "get service operations", + params: `{ + "env": "test", + "service": "datav" +}`, + paramsDesc: [["env", "environment name, such as dev, test, prod etc"]], + format: DataFormat.Table } -] \ No newline at end of file +] diff --git a/ui/src/views/dashboard/plugins/built-in/datasource/observability/query_runner.ts b/ui/src/views/dashboard/plugins/built-in/datasource/observability/query_runner.ts index e34b28e89..54cf6ed29 100644 --- a/ui/src/views/dashboard/plugins/built-in/datasource/observability/query_runner.ts +++ b/ui/src/views/dashboard/plugins/built-in/datasource/observability/query_runner.ts @@ -45,15 +45,23 @@ export const runQuery = async (panel: Panel, q: PanelQuery, range: TimeRange, ds status: string, error: string, data: QueryPluginResult - } = await requestApi.get(`/proxy/${ds.id}?query=${replaceWithVariables(q.metrics)}¶ms=${q.data.params}&start=${start}&end=${end}&step=${q.interval}`) + } = await requestApi.get(`/proxy/${ds.id}?query=${replaceWithVariables(q.metrics)}¶ms=${q.data[q.metrics].params}&start=${start}&end=${end}&step=${q.interval}`) - if (res.status !== "success") { + if (res.status !== "success" ) { console.log("Failed to fetch data from target datasource", res) return { error: res.error, data: [] } } + + if (res.data && res.data.status != "success") { + return { + error: res.data.error, + data: [] + } + } + let data; switch (q.data["format"]) { case DataFormat.TimeSeries: