Skip to content

Commit

Permalink
feat: add getServiceOperations api for observability #290
Browse files Browse the repository at this point in the history
  • Loading branch information
sunface committed Oct 17, 2023
1 parent 50cdb9b commit 2659c0f
Show file tree
Hide file tree
Showing 7 changed files with 125 additions and 28 deletions.
14 changes: 8 additions & 6 deletions query/internal/plugins/builtin/observability/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,14 @@ import (
)

const (
TestDatasourceAPI = "testDatasource"
GetServiceInfoListAPI = "getServiceInfoList"
GetServiceNamesAPI = "getServiceNames"
TestDatasourceAPI = "testDatasource"
GetServiceInfoListAPI = "getServiceInfoList"
GetServiceNamesAPI = "getServiceNames"
GetServiceOperationsAPI = "getServiceOperations"
)

var APIRoutes = map[string]func(c *gin.Context, ds *models.Datasource, conn ch.Conn) models.PluginResult{
GetServiceInfoListAPI: GetServiceInfoList,
GetServiceNamesAPI: GetServiceNames,
var APIRoutes = map[string]func(c *gin.Context, ds *models.Datasource, conn ch.Conn, params map[string]interface{}) models.PluginResult{
GetServiceInfoListAPI: GetServiceInfoList,
GetServiceNamesAPI: GetServiceNames,
GetServiceOperationsAPI: GetServiceOperations,
}
51 changes: 36 additions & 15 deletions query/internal/plugins/builtin/observability/api/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,53 @@ type ServiceNameRes struct {
ServiceName string `ch:"serviceName"`
}

func GetServiceNames(c *gin.Context, ds *models.Datasource, conn ch.Conn) models.PluginResult {
start := c.Query("start")
end := c.Query("end")
query := fmt.Sprintf("SELECT serviceName FROM signoz_traces.distributed_signoz_index_v2 WHERE timestamp >= %s AND timestamp <= %s GROUP BY serviceName", start, end)
func GetServiceNames(c *gin.Context, ds *models.Datasource, conn ch.Conn, params map[string]interface{}) models.PluginResult {
query := "SELECT DISTINCT serviceName FROM signoz_traces.distributed_top_level_operations"

var res []ServiceNameRes
err := conn.Select(c.Request.Context(), &res, query)
rows, err := conn.Query(c.Request.Context(), query)
if err != nil {
return models.GenPluginResult(models.PluginStatusError, err.Error(), nil)
}
defer rows.Close()

logger.Info("Query service names", "query", query)

columns := []string{"service"}
data := make([][]interface{}, 0)
for _, v := range res {
data = append(data, []interface{}{v.ServiceName})
res, err := models.ConvertDbRowsToPluginData(rows)
if err != nil {
return models.GenPluginResult(models.PluginStatusError, err.Error(), nil)
}

return models.GenPluginResult(models.PluginStatusSuccess, "", res)
}

func GetServiceOperations(c *gin.Context, ds *models.Datasource, conn ch.Conn, params map[string]interface{}) models.PluginResult {
serviceI := params["service"]
fmt.Println("ere33333:", serviceI)
if serviceI == nil {
return models.GenPluginResult(models.PluginStatusError, "service is required", nil)
}

service := serviceI.(string)

query := fmt.Sprintf("SELECT DISTINCT name FROM signoz_traces.distributed_top_level_operations WHERE serviceName='%s'", service)
rows, err := conn.Query(c.Request.Context(), query)
if err != nil {
logger.Warn("Error Query service operations", "query", query, "error", err)
return models.GenPluginResult(models.PluginStatusError, err.Error(), nil)
}
defer rows.Close()

logger.Info("Query service operations", "query", query)

res, err := models.ConvertDbRowsToPluginData(rows)
if err != nil {
return models.GenPluginResult(models.PluginStatusError, err.Error(), nil)
}

return models.GenPluginResult(models.PluginStatusSuccess, "", models.PluginResultData{
Columns: columns,
Data: data,
})
return models.GenPluginResult(models.PluginStatusSuccess, "", res)
}

func GetServiceInfoList(c *gin.Context, ds *models.Datasource, conn ch.Conn) models.PluginResult {
func GetServiceInfoList(c *gin.Context, ds *models.Datasource, conn ch.Conn, params map[string]interface{}) models.PluginResult {
fmt.Println("here33333")
// rows, err := conn.Query(c.Request.Context(), query)
// if err != nil {
Expand Down
15 changes: 14 additions & 1 deletion query/internal/plugins/builtin/observability/observability.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,10 @@
package clickhouse

import (
"encoding/json"
"fmt"
"sync"
"time"

ch "github.com/ClickHouse/clickhouse-go/v2"
"github.com/DataObserve/datav/query/internal/plugins/builtin/observability/api"
Expand Down Expand Up @@ -41,7 +44,17 @@ func (p *ObservabilityPlugin) Query(c *gin.Context, ds *models.Datasource) model
}
route, ok := api.APIRoutes[query]
if ok {
res := route(c, ds, conn)
paramStr := c.Query("params")
params := make(map[string]interface{})
err := json.Unmarshal([]byte(paramStr), &params)
if err != nil {
return models.GenPluginResult(models.PluginStatusError, fmt.Sprintf("decode params error: %s", err.Error()), nil)
}

start := time.Now()
res := route(c, ds, conn, params)

colorlog.RootLogger.Info("Excecute observability query api", "query", query, "time", time.Since(start).String())
return models.PluginResult{
Status: models.PluginStatusSuccess,
Error: "",
Expand Down
2 changes: 0 additions & 2 deletions query/internal/proxy/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ package proxy

import (
"bytes"
"fmt"
"io"
"net"
"net/http"
Expand Down Expand Up @@ -48,7 +47,6 @@ func ProxyDatasource(c *gin.Context) {
}

queryPlugin := models.GetPlugin(ds.Type)
fmt.Println(ds.Type, queryPlugin)
if queryPlugin != nil {
result := queryPlugin.Query(c, ds)
c.JSON(http.StatusOK, result)
Expand Down
47 changes: 46 additions & 1 deletion query/pkg/models/plugin.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
package models

import "github.com/gin-gonic/gin"
import (
"reflect"
"time"

"github.com/ClickHouse/clickhouse-go/v2/lib/driver"
"github.com/gin-gonic/gin"
)

const (
PluginStatusSuccess = "success"
Expand Down Expand Up @@ -47,3 +53,42 @@ func GenPluginResult(status, err string, data interface{}) PluginResult {
Data: data,
}
}

func ConvertDbRowsToPluginData(rows driver.Rows) (*PluginResultData, error) {
columns := rows.Columns()
columnTypes := rows.ColumnTypes()
types := make(map[string]string)
data := make([][]interface{}, 0)
for rows.Next() {
v := make([]interface{}, len(columns))
for i := range v {
t := columnTypes[i].ScanType()
v[i] = reflect.New(t).Interface()

tp := t.String()
if tp == "time.Time" {
types[columns[i]] = "time"
}
}

err := rows.Scan(v...)
if err != nil {
return nil, err
}

for i, v0 := range v {
v1, ok := v0.(*time.Time)
if ok {
v[i] = v1.Unix()
}
}

data = append(data, v)
}

return &PluginResultData{
Columns: columns,
Data: data,
ColumnTypes: types,
}, nil
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,5 +128,15 @@ const apiList = [{
}`,
paramsDesc: [["env", "environment name, such as dev, test, prod etc"]],
format: DataFormat.Table
},
{
name: "getServiceOperations",
desc: "get service operations",
params: `{
"env": "test",
"service": "datav"
}`,
paramsDesc: [["env", "environment name, such as dev, test, prod etc"]],
format: DataFormat.Table
}
]
]
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,23 @@ export const runQuery = async (panel: Panel, q: PanelQuery, range: TimeRange, ds
status: string,
error: string,
data: QueryPluginResult
} = await requestApi.get(`/proxy/${ds.id}?query=${replaceWithVariables(q.metrics)}&params=${q.data.params}&start=${start}&end=${end}&step=${q.interval}`)
} = await requestApi.get(`/proxy/${ds.id}?query=${replaceWithVariables(q.metrics)}&params=${q.data[q.metrics].params}&start=${start}&end=${end}&step=${q.interval}`)

if (res.status !== "success") {
if (res.status !== "success" ) {
console.log("Failed to fetch data from target datasource", res)
return {
error: res.error,
data: []
}
}

if (res.data && res.data.status != "success") {
return {
error: res.data.error,
data: []
}
}

let data;
switch (q.data["format"]) {
case DataFormat.TimeSeries:
Expand Down

0 comments on commit 2659c0f

Please sign in to comment.