Skip to content

Commit

Permalink
Merge pull request #4 from ag-ramachandran/bugfix/tracetablename
Browse files Browse the repository at this point in the history
User defined trace tablename supprted
  • Loading branch information
ag-ramachandran authored Jan 31, 2023
2 parents 1a6caf5 + 6d09290 commit 4cacb34
Show file tree
Hide file tree
Showing 10 changed files with 175 additions and 66 deletions.
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ Then, you should create json config file:
"clientSecret": "",
"database": "<database>",
"endpoint": "https://<cluster>.<region>.kusto.windows.net",
"tenantId": ""
"tenantId": "",
"traceTableName":"<trace_table>" // defaults to `OTELTraces` if not provided
}
```

Expand Down
4 changes: 2 additions & 2 deletions build/plugin/jaeger-kusto-plugin-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
"kustoConfigPath": "/config/jaeger-kusto-config.json",
"logLevel": "info",
"logJson": true,
"tracingSamplerPercentage": 1.0,
"tracingRPCMetrics": true
"tracingSamplerPercentage": 0.0,
"tracingRPCMetrics": false
}
4 changes: 2 additions & 2 deletions build/server/jaeger-kusto-plugin-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,6 @@
"logLevel": "info",
"logJson": true,
"remoteMode": true,
"tracingSamplerPercentage": 1.0,
"tracingRPCMetrics": true
"tracingSamplerPercentage": 0.0,
"tracingRPCMetrics": false
}
15 changes: 10 additions & 5 deletions config/kusto_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ import (

// KustoConfig contains AzureAD service principal and Kusto cluster configs
type KustoConfig struct {
ClientID string `json:"clientId"`
ClientSecret string `json:"clientSecret"`
TenantID string `json:"tenantId"`
Endpoint string `json:"endpoint"`
Database string `json:"database"`
ClientID string `json:"clientId"`
ClientSecret string `json:"clientSecret"`
TenantID string `json:"tenantId"`
Endpoint string `json:"endpoint"`
Database string `json:"database"`
TraceTableName string `json:"traceTableName"`
}

// ParseKustoConfig reads file at path and returns instance of KustoConfig or error
Expand Down Expand Up @@ -39,5 +40,9 @@ func (kc *KustoConfig) Validate() error {
if kc.ClientID == "" || kc.ClientSecret == "" || kc.TenantID == "" {
return errors.New("missing client configuration (ClientId, ClientSecret, TenantId) for kusto")
}
//if no Tracetable name provided, default to OTELTraces.
if kc.TraceTableName == "" {
kc.TraceTableName = "OTELTraces"
}
return nil
}
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ package main

import (
"flag"
"github.com/dodopizza/jaeger-kusto/runner"
"os"

"github.com/dodopizza/jaeger-kusto/runner"

"github.com/dodopizza/jaeger-kusto/config"
"github.com/dodopizza/jaeger-kusto/store"
)
Expand Down
4 changes: 2 additions & 2 deletions store/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ type kustoFactory struct {
client *kusto.Client
}

func newKustoFactory(client *kusto.Client, pc *config.PluginConfig, database string) *kustoFactory {
func newKustoFactory(client *kusto.Client, pc *config.PluginConfig, database string, table string) *kustoFactory {
return &kustoFactory{
client: client,
Database: database,
Table: "OTELTraces",
Table: table,
PluginConfig: pc,
}
}
Expand Down
38 changes: 38 additions & 0 deletions store/queryUtils.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,44 @@ var (
ErrStartAndEndTimeNotSet = errors.New("start and End Time must be set")
)

var (
getTrace = `getTrace`
getTraceQuery = `%s | where TraceID == ParamTraceID | extend Duration=totimespan(datetime_diff('microsecond',EndTime,StartTime)) , ProcessServiceName=tostring(ResourceAttributes.['service.name']) | project-rename Tags=TraceAttributes,Logs=Events,ProcessTags=ResourceAttributes| extend References=iff(isempty(ParentID),todynamic("[]"),pack_array(bag_pack("refType","CHILD_OF","traceID",TraceID,"spanID",ParentID)))`

getServices = `getServices`
getServicesQuery = `set query_results_cache_max_age = time(5m); %s
| extend ProcessServiceName=tostring(ResourceAttributes.['service.name'])
| summarize by ProcessServiceName
| sort by ProcessServiceName asc`

getOpsWithNoParams = `getOpsWithNoParams`
getOpsWithNoParamsQuery = `%s
| summarize count() by SpanName
| sort by count_
| project-away count_`

getOpsWithParams = `getOpsWithParams`
getOpsWithParamsQuery = `%s | extend ProcessServiceName=tostring(ResourceAttributes.['service.name'])
| where ProcessServiceName == ParamProcessServiceName
| summarize count() by SpanName
| sort by count_
| project-away count_`

getDependencies = `getDependencies`
getDependenciesQuery = `%s | extend ProcessServiceName=tostring(ResourceAttributes.['service.name'])
| where StartTime < ParamEndTs and StartTime > (ParamEndTs-ParamLookBack)
| project ProcessServiceName, SpanID, ChildOfSpanId = ParentID | join (%s | extend ProcessServiceName=tostring(ResourceAttributes.['service.name'])
| project ChildOfSpanId=SpanID, ParentService=ProcessServiceName) on ChildOfSpanId | where ProcessServiceName != ParentService
| extend Call=pack('Parent', ParentService, 'Child', ProcessServiceName) | summarize CallCount=count() by tostring(Call) | extend Call=parse_json(Call)
| evaluate bag_unpack(Call)`

getTraceIdBase = `getTraceIdBase`
getTraceIdBaseQuery = `%s | extend Duration=totimespan(datetime_diff('microsecond',EndTime,StartTime)) , ProcessServiceName=tostring(ResourceAttributes.['service.name'])`

getTracesBase = `getTracesBase`
getTracesBaseQuery = `%s | extend ProcessServiceName=tostring(ResourceAttributes.['service.name']),Duration=totimespan(datetime_diff('millisecond',EndTime,StartTime))`
)

// taken from https://github.com/logzio/jaeger-logzio/blob/master/store/queryUtils.go
func validateQuery(p *spanstore.TraceQueryParameters) error {
if p == nil {
Expand Down
120 changes: 75 additions & 45 deletions store/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package store
import (
"context"
"fmt"
"log"
"strings"
"time"

Expand All @@ -12,6 +13,7 @@ import (
"github.com/Azure/azure-kusto-go/kusto/unsafe"

"github.com/Azure/azure-kusto-go/kusto"
"github.com/Azure/azure-kusto-go/kusto/data/errors"
"github.com/Azure/azure-kusto-go/kusto/data/table"
"github.com/Azure/azure-kusto-go/kusto/data/types"
"github.com/jaegertracing/jaeger/model"
Expand All @@ -28,14 +30,29 @@ type kustoReaderClient interface {
Query(ctx context.Context, db string, query kusto.Stmt, options ...kusto.QueryOption) (*kusto.RowIterator, error)
}

var queryMap = map[string]string{}

func newKustoSpanReader(factory *kustoFactory, logger hclog.Logger) (*kustoSpanReader, error) {
prepareReaderStatements(factory.Table)
return &kustoSpanReader{
factory.Reader(),
factory.Database,
logger,
}, nil
}

// Prepares reader queries parts beforehand
func prepareReaderStatements(tableName string) {

queryMap[getTrace] = fmt.Sprintf(getTraceQuery, tableName)
queryMap[getServices] = fmt.Sprintf(getServicesQuery, tableName)
queryMap[getOpsWithNoParams] = fmt.Sprintf(getOpsWithNoParamsQuery, tableName)
queryMap[getOpsWithParams] = fmt.Sprintf(getOpsWithParamsQuery, tableName)
queryMap[getDependencies] = fmt.Sprintf(getDependenciesQuery, tableName, tableName)
queryMap[getTraceIdBase] = fmt.Sprintf(getTraceIdBaseQuery, tableName)
queryMap[getTracesBase] = fmt.Sprintf(getTracesBaseQuery, tableName)
}

const defaultNumTraces = 20

var safetySwitch = unsafe.Stmt{
Expand All @@ -45,21 +62,27 @@ var safetySwitch = unsafe.Stmt{

// GetTrace finds trace by TraceID
func (r *kustoSpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (*model.Trace, error) {
kustoStmt := kusto.NewStmt(`OTELTraces | where TraceID == ParamTraceID | extend Duration=totimespan(datetime_diff('microsecond',EndTime,StartTime)) , ProcessServiceName=tostring(ResourceAttributes.['service.name']) | project-rename Tags=TraceAttributes,Logs=Events,ProcessTags=ResourceAttributes|extend References=iff(isempty(ParentID),todynamic("[]"),pack_array(bag_pack("refType","CHILD_OF","traceID",TraceID,"spanID",ParentID)))`).MustDefinitions(
kustoStmt := kusto.NewStmt("", kusto.UnsafeStmt(safetySwitch)).UnsafeAdd(queryMap[getTrace]).MustDefinitions(
kusto.NewDefinitions().Must(
kusto.ParamTypes{
"ParamTraceID": kusto.ParamType{Type: types.String},
},
)).MustParameters(kusto.NewParameters().Must(kusto.QueryValues{"ParamTraceID": traceID.String()}))

log.Default().Println(kustoStmt.String())

iter, err := r.client.Query(ctx, r.database, kustoStmt)
if err != nil {
return nil, err
}
defer iter.Stop()

var spans []*model.Span
err = iter.Do(
func(row *table.Row) error {
err = iter.DoOnRowOrError(
func(row *table.Row, e *errors.Error) error {
if e != nil {
return e
}
rec := kustoSpan{}
if err := row.ToStruct(&rec); err != nil {
return err
Expand All @@ -81,7 +104,11 @@ func (r *kustoSpanReader) GetTrace(ctx context.Context, traceID model.TraceID) (

// GetServices finds all possible services that spanstore contains
func (r *kustoSpanReader) GetServices(ctx context.Context) ([]string, error) {
iter, err := r.client.Query(ctx, r.database, kusto.NewStmt("set query_results_cache_max_age = time(5m); OTELTraces | extend ProcessServiceName=tostring(ResourceAttributes.['service.name']) | summarize by ProcessServiceName | sort by ProcessServiceName asc"))

kustoStmt := kusto.NewStmt("", kusto.UnsafeStmt(safetySwitch)).UnsafeAdd(queryMap[getServices])
log.Default().Println(kustoStmt.String())
iter, err := r.client.Query(ctx, r.database, kustoStmt)

if err != nil {
return nil, err
}
Expand All @@ -92,8 +119,11 @@ func (r *kustoSpanReader) GetServices(ctx context.Context) ([]string, error) {
}

var services []string
err = iter.Do(
func(row *table.Row) error {
err = iter.DoOnRowOrError(
func(row *table.Row, e *errors.Error) error {
if e != nil {
return e
}
service := Service{}
if err := row.ToStruct(&service); err != nil {
return err
Expand All @@ -118,23 +148,17 @@ func (r *kustoSpanReader) GetOperations(ctx context.Context, query spanstore.Ope

var kustoStmt kusto.Stmt
if query.ServiceName == "" && query.SpanKind == "" {
kustoStmt = kusto.NewStmt(`OTELTraces
| summarize count() by SpanName
| sort by count_
| project-away count_`)
kustoStmt = kusto.NewStmt("", kusto.UnsafeStmt(safetySwitch)).UnsafeAdd(queryMap[getOpsWithNoParams])
}

if query.ServiceName != "" && query.SpanKind == "" {
kustoStmt = kusto.NewStmt(`OTELTraces | extend ProcessServiceName=tostring(ResourceAttributes.['service.name'])
| where ProcessServiceName == ParamProcessServiceName
| summarize count() by SpanName
| sort by count_
| project-away count_`).MustDefinitions(
kusto.NewDefinitions().Must(
kusto.ParamTypes{
"ParamProcessServiceName": kusto.ParamType{Type: types.String},
},
)).MustParameters(kusto.NewParameters().Must(kusto.QueryValues{"ParamProcessServiceName": query.ServiceName}))
kustoStmt = kusto.NewStmt("", kusto.UnsafeStmt(safetySwitch)).UnsafeAdd(queryMap[getOpsWithParams]).
MustDefinitions(
kusto.NewDefinitions().Must(
kusto.ParamTypes{
"ParamProcessServiceName": kusto.ParamType{Type: types.String},
},
)).MustParameters(kusto.NewParameters().Must(kusto.QueryValues{"ParamProcessServiceName": query.ServiceName}))
}

iter, err := r.client.Query(ctx, r.database, kustoStmt)
Expand All @@ -144,8 +168,11 @@ func (r *kustoSpanReader) GetOperations(ctx context.Context, query spanstore.Ope
defer iter.Stop()

operations := []spanstore.Operation{}
err = iter.Do(
func(row *table.Row) error {
err = iter.DoOnRowOrError(
func(row *table.Row, e *errors.Error) error {
if e != nil {
return e
}
operation := Operation{}
if err := row.ToStruct(&operation); err != nil {
return err
Expand Down Expand Up @@ -175,7 +202,7 @@ func (r *kustoSpanReader) FindTraceIDs(ctx context.Context, query *spanstore.Tra
TraceID string `kusto:"TraceID"`
}

kustoStmt := kusto.NewStmt("OTELTraces | extend Duration=totimespan(datetime_diff('microsecond',EndTime,StartTime)) , ProcessServiceName=tostring(ResourceAttributes.['service.name'])", kusto.UnsafeStmt(safetySwitch))
kustoStmt := kusto.NewStmt("", kusto.UnsafeStmt(safetySwitch)).UnsafeAdd(queryMap[getTraceIdBase])
kustoDefinitions := make(kusto.ParamTypes)
kustoParameters := make(kusto.QueryValues)

Expand Down Expand Up @@ -236,8 +263,11 @@ func (r *kustoSpanReader) FindTraceIDs(ctx context.Context, query *spanstore.Tra
defer iter.Stop()

var traceIds []model.TraceID
err = iter.Do(
func(row *table.Row) error {
err = iter.DoOnRowOrError(
func(row *table.Row, e *errors.Error) error {
if e != nil {
return e
}
rec := TraceID{}
if err := row.ToStruct(&rec); err != nil {
return err
Expand All @@ -264,7 +294,7 @@ func (r *kustoSpanReader) FindTraces(ctx context.Context, query *spanstore.Trace
query.NumTraces = defaultNumTraces
}

kustoStmt := kusto.NewStmt("let TraceIDs = (OTELTraces | extend ProcessServiceName=tostring(ResourceAttributes.['service.name']),Duration=totimespan(datetime_diff('millisecond',EndTime,StartTime))", kusto.UnsafeStmt(safetySwitch))
kustoStmt := kusto.NewStmt("", kusto.UnsafeStmt(safetySwitch)).UnsafeAdd(fmt.Sprintf(`let TraceIDs = (%s`, queryMap[getTracesBase]))
kustoDefinitions := make(kusto.ParamTypes)
kustoParameters := make(kusto.QueryValues)

Expand Down Expand Up @@ -314,7 +344,7 @@ func (r *kustoSpanReader) FindTraces(ctx context.Context, query *spanstore.Trace
kustoDefinitions["ParamNumTraces"] = kusto.ParamType{Type: types.Int}
kustoParameters["ParamNumTraces"] = int32(query.NumTraces)

kustoStmt = kustoStmt.Add("); OTELTraces | extend ProcessServiceName=tostring(ResourceAttributes.['service.name']),Duration=totimespan(datetime_diff('millisecond',EndTime,StartTime))")
kustoStmt = kustoStmt.UnsafeAdd(fmt.Sprintf(`); %s`, queryMap[getTracesBase]))

kustoStmt = kustoStmt.Add(` | where StartTime > ParamStartTimeMin`)
kustoDefinitions["ParamStartTimeMin"] = kusto.ParamType{Type: types.DateTime}
Expand All @@ -339,9 +369,11 @@ func (r *kustoSpanReader) FindTraces(ctx context.Context, query *spanstore.Trace

m := make(map[model.TraceID][]*model.Span)

err = iter.Do(
func(row *table.Row) error {

err = iter.DoOnRowOrError(
func(row *table.Row, e *errors.Error) error {
if e != nil {
return e
}
rec := kustoSpan{}
if err := row.ToStruct(&rec); err != nil {
return err
Expand Down Expand Up @@ -377,28 +409,26 @@ func (r *kustoSpanReader) GetDependencies(ctx context.Context, endTs time.Time,
CallCount value.Long `kusto:"CallCount"`
}

kustoStmt := kusto.NewStmt(`OTELTraces | extend ProcessServiceName=tostring(ResourceAttributes.['service.name'])
| where StartTime < ParamEndTs and StartTime > (ParamEndTs-ParamLookBack)
| project ProcessServiceName, SpanID, ChildOfSpanId = ParentID | join (OTELTraces | extend ProcessServiceName=tostring(ResourceAttributes.['service.name'])
| project ChildOfSpanId=SpanID, ParentService=ProcessServiceName) on ChildOfSpanId | where ProcessServiceName != ParentService
| extend Call=pack('Parent', ParentService, 'Child', ProcessServiceName) | summarize CallCount=count() by tostring(Call) | extend Call=parse_json(Call)
| evaluate bag_unpack(Call)
`).MustDefinitions(
kusto.NewDefinitions().Must(
kusto.ParamTypes{
"ParamEndTs": kusto.ParamType{Type: types.DateTime},
"ParamLookBack": kusto.ParamType{Type: types.Timespan},
},
)).MustParameters(kusto.NewParameters().Must(kusto.QueryValues{"ParamEndTs": endTs, "ParamLookBack": lookback}))
kustoStmt := kusto.NewStmt("", kusto.UnsafeStmt(safetySwitch)).UnsafeAdd(queryMap[getDependencies]).
MustDefinitions(
kusto.NewDefinitions().Must(
kusto.ParamTypes{
"ParamEndTs": kusto.ParamType{Type: types.DateTime},
"ParamLookBack": kusto.ParamType{Type: types.Timespan},
},
)).MustParameters(kusto.NewParameters().Must(kusto.QueryValues{"ParamEndTs": endTs, "ParamLookBack": lookback}))
iter, err := r.client.Query(ctx, r.database, kustoStmt)
if err != nil {
return nil, err
}
defer iter.Stop()

var dependencyLinks []model.DependencyLink
err = iter.Do(
func(row *table.Row) error {
err = iter.DoOnRowOrError(
func(row *table.Row, e *errors.Error) error {
if e != nil {
return e
}
rec := kustoDependencyLink{}
if err := row.ToStruct(&rec); err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ func NewStore(pc *config.PluginConfig, kc *config.KustoConfig, logger hclog.Logg
return nil, err
}

factory := newKustoFactory(client, pc, kc.Database)
// create factory for trace table opertations
factory := newKustoFactory(client, pc, kc.Database, kc.TraceTableName)

reader, err := newKustoSpanReader(factory, logger)
if err != nil {
Expand Down
Loading

0 comments on commit 4cacb34

Please sign in to comment.