Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V2 route handling #1149

Merged
merged 9 commits into from
Jan 2, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ type (
}
)

type LogManagerIFace interface {
ResolveIndexPattern(ctx context.Context, schema schema.Registry, pattern string) (results []string, err error)
}

func NewTableMap() *TableMap {
return util.NewSyncMap[string, *Table]()
}
Expand Down
11 changes: 3 additions & 8 deletions quesma/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@ const (
ExistsAndIsArray
)

func (lm *LogManager) Query(ctx context.Context, query string) (*sql.Rows, error) {
rows, err := lm.chDb.QueryContext(ctx, query)
return rows, err
}

type PerformanceResult struct {
QueryID string
Duration time.Duration
Expand Down Expand Up @@ -106,7 +101,7 @@ var random = rand.New(rand.NewSource(time.Now().UnixNano()))
const slowQueryThreshold = 30 * time.Second
const slowQuerySampleRate = 0.1

func (lm *LogManager) shouldExplainQuery(elapsed time.Duration) bool {
func shouldExplainQuery(elapsed time.Duration) bool {
return elapsed > slowQueryThreshold && random.Float64() < slowQuerySampleRate
}

Expand Down Expand Up @@ -187,7 +182,7 @@ func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, field

ctx = clickhouse.Context(ctx, clickhouse.WithSettings(settings), clickhouse.WithQueryID(queryID))

rows, err := lm.Query(ctx, queryAsString)
rows, err := lm.chDb.QueryContext(ctx, queryAsString)
if err != nil {
elapsed := span.End(err)
performanceResult.Duration = elapsed
Expand All @@ -200,7 +195,7 @@ func executeQuery(ctx context.Context, lm *LogManager, query *model.Query, field
performanceResult.Duration = elapsed
performanceResult.RowsReturned = len(res)
if err == nil {
if lm.shouldExplainQuery(elapsed) {
if shouldExplainQuery(elapsed) {
performanceResult.ExplainPlan = lm.explainQuery(ctx, queryAsString, elapsed)
}
}
Expand Down
8 changes: 8 additions & 0 deletions quesma/elasticsearch/index_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type (
GetSources() Sources
GetSourceNames() map[string]bool
GetSourceNamesMatching(indexPattern string) map[string]bool
Resolve(indexPattern string) (Sources, bool, error)
}
indexManagement struct {
ElasticsearchUrl string
Expand All @@ -41,6 +42,10 @@ func NewIndexManagement(elasticsearch config.ElasticsearchConfiguration) IndexMa
}
}

func (im *indexManagement) Resolve(indexPattern string) (Sources, bool, error) {
return im.indexResolver.Resolve(indexPattern)
}

func (im *indexManagement) ReloadIndices() {
sources, _, err := im.indexResolver.Resolve("*")
if err != nil {
Expand Down Expand Up @@ -135,6 +140,9 @@ func (s stubIndexManagement) GetSources() Sources {
}
return Sources{DataStreams: dataStreams}
}
func (s stubIndexManagement) Resolve(_ string) (Sources, bool, error) {
return Sources{}, true, nil
}

func (s stubIndexManagement) GetSourceNames() map[string]bool {
var result = make(map[string]bool)
Expand Down
9 changes: 7 additions & 2 deletions quesma/frontend_connectors/router_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,13 @@ func responseFromElasticV2(ctx context.Context, elkResponse *http.Response, w ht
}

func responseFromQuesmaV2(ctx context.Context, unzipped []byte, w http.ResponseWriter, quesmaResponse *quesma_api.Result, zip bool) {
id := ctx.Value(tracing.RequestIdCtxKey).(string)
logger.Debug().Str(logger.RID, id).Msg("responding from Quesma")
var reqIdFromContext string
if id := ctx.Value(tracing.RequestIdCtxKey); id != nil {
reqIdFromContext = id.(string)
} else {
reqIdFromContext = "EMPTY_REQUEST_ID_IN_CONTEXT"
}
logger.Debug().Str(logger.RID, reqIdFromContext).Msg("responding from Quesma")

for key, value := range quesmaResponse.Meta {
if headerStringValue, ok := value.(string); ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,15 +41,16 @@ func addFieldCapabilityFromSchemaRegistry(fields map[string]map[string]model.Fie
fields[colName][fieldTypeName] = fieldCapability
}
}
func handleFieldCapsIndex(cfg *config.QuesmaConfiguration, schemaRegistry schema.Registry, indexes []string) ([]byte, error) {

func handleFieldCapsIndex(cfg map[string]config.IndexConfiguration, schemaRegistry schema.Registry, indexes []string) ([]byte, error) {
fields := make(map[string]map[string]model.FieldCapability)
for _, resolvedIndex := range indexes {
if len(resolvedIndex) == 0 {
continue
}

if schemaDefinition, found := schemaRegistry.FindSchema(schema.IndexName(resolvedIndex)); found {
indexConfig, configured := cfg.IndexConfig[resolvedIndex]
indexConfig, configured := cfg[resolvedIndex]
if configured && !indexConfig.IsClickhouseQueryEnabled() {
continue
}
Expand Down Expand Up @@ -100,7 +101,7 @@ func EmptyFieldCapsResponse() []byte {
}
}

func HandleFieldCaps(ctx context.Context, cfg *config.QuesmaConfiguration, schemaRegistry schema.Registry, index string, lm *clickhouse.LogManager) ([]byte, error) {
func HandleFieldCaps(ctx context.Context, cfg map[string]config.IndexConfiguration, schemaRegistry schema.Registry, index string, lm clickhouse.LogManagerIFace) ([]byte, error) {
indexes, err := lm.ResolveIndexPattern(ctx, schemaRegistry, index)
if err != nil {
return nil, err
Expand Down
104 changes: 50 additions & 54 deletions quesma/quesma/functionality/field_capabilities/field_caps_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,25 +76,24 @@ func TestFieldCaps(t *testing.T) {
]
}
`)
resp, err := handleFieldCapsIndex(&config.QuesmaConfiguration{
IndexConfig: map[string]config.IndexConfiguration{
resp, err := handleFieldCapsIndex(
map[string]config.IndexConfiguration{
"logs-generic-default": {
QueryTarget: []string{config.ClickhouseTarget},
IngestTarget: []string{config.ClickhouseTarget},
},
},
}, &schema.StaticRegistry{
Tables: map[schema.IndexName]schema.Schema{
"logs-generic-default": {
Fields: map[schema.FieldName]schema.Field{
"service.name": {PropertyName: "service.name", InternalPropertyName: "service.name", Type: schema.QuesmaTypeKeyword},
"arrayOfArraysOfStrings": {PropertyName: "arrayOfArraysOfStrings", InternalPropertyName: "arrayOfArraysOfStrings", Type: schema.QuesmaTypeKeyword},
"arrayOfTuples": {PropertyName: "arrayOfTuples", InternalPropertyName: "arrayOfTuples", Type: schema.QuesmaTypeObject},
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
}, &schema.StaticRegistry{
Tables: map[schema.IndexName]schema.Schema{
"logs-generic-default": {
Fields: map[schema.FieldName]schema.Field{
"service.name": {PropertyName: "service.name", InternalPropertyName: "service.name", Type: schema.QuesmaTypeKeyword},
"arrayOfArraysOfStrings": {PropertyName: "arrayOfArraysOfStrings", InternalPropertyName: "arrayOfArraysOfStrings", Type: schema.QuesmaTypeKeyword},
"arrayOfTuples": {PropertyName: "arrayOfTuples", InternalPropertyName: "arrayOfTuples", Type: schema.QuesmaTypeObject},
"host.name": {PropertyName: "host.name", InternalPropertyName: "host.name", Type: schema.QuesmaTypeObject},
},
},
},
},
}, []string{"logs-generic-default"})
}, []string{"logs-generic-default"})
assert.NoError(t, err)
expectedResp, err := json.MarshalIndent(expected, "", " ")
assert.NoError(t, err)
Expand Down Expand Up @@ -141,16 +140,15 @@ func TestFieldCapsWithAliases(t *testing.T) {
"logs-generic-default"
]
}`)
resp, err := handleFieldCapsIndex(&config.QuesmaConfiguration{
IndexConfig: map[string]config.IndexConfiguration{"logs-generic-default": {QueryTarget: []string{config.ClickhouseTarget}, IngestTarget: []string{config.ClickhouseTarget}}},
}, &schema.StaticRegistry{
Tables: map[schema.IndexName]schema.Schema{
"logs-generic-default": {
Fields: map[schema.FieldName]schema.Field{"@timestamp": {PropertyName: "@timestamp", InternalPropertyName: "@timestamp", Type: schema.QuesmaTypeTimestamp}},
Aliases: map[schema.FieldName]schema.FieldName{"timestamp": "@timestamp"},
resp, err := handleFieldCapsIndex(
map[string]config.IndexConfiguration{"logs-generic-default": {QueryTarget: []string{config.ClickhouseTarget}, IngestTarget: []string{config.ClickhouseTarget}}}, &schema.StaticRegistry{
Tables: map[schema.IndexName]schema.Schema{
"logs-generic-default": {
Fields: map[schema.FieldName]schema.Field{"@timestamp": {PropertyName: "@timestamp", InternalPropertyName: "@timestamp", Type: schema.QuesmaTypeTimestamp}},
Aliases: map[schema.FieldName]schema.FieldName{"timestamp": "@timestamp"},
},
},
},
}, []string{"logs-generic-default"})
}, []string{"logs-generic-default"})
assert.NoError(t, err)
expectedResp, err := json.MarshalIndent(expected, "", " ")
assert.NoError(t, err)
Expand Down Expand Up @@ -181,8 +179,8 @@ func TestFieldCapsMultipleIndexes(t *testing.T) {
"foo.bar2": {Name: "foo.bar2", Type: clickhouse.BaseType{Name: "String"}},
},
})
resp, err := handleFieldCapsIndex(&config.QuesmaConfiguration{
IndexConfig: map[string]config.IndexConfiguration{
resp, err := handleFieldCapsIndex(
map[string]config.IndexConfiguration{
"logs-1": {
QueryTarget: []string{config.ClickhouseTarget},
IngestTarget: []string{config.ClickhouseTarget},
Expand All @@ -191,21 +189,20 @@ func TestFieldCapsMultipleIndexes(t *testing.T) {
QueryTarget: []string{config.ClickhouseTarget},
IngestTarget: []string{config.ClickhouseTarget},
},
},
}, &schema.StaticRegistry{
Tables: map[schema.IndexName]schema.Schema{
"logs-1": {
Fields: map[schema.FieldName]schema.Field{
"foo.bar1": {PropertyName: "foo.bar1", InternalPropertyName: "foo.bar1", Type: schema.QuesmaTypeKeyword},
}, &schema.StaticRegistry{
Tables: map[schema.IndexName]schema.Schema{
"logs-1": {
Fields: map[schema.FieldName]schema.Field{
"foo.bar1": {PropertyName: "foo.bar1", InternalPropertyName: "foo.bar1", Type: schema.QuesmaTypeKeyword},
},
},
},
"logs-2": {
Fields: map[schema.FieldName]schema.Field{
"foo.bar2": {PropertyName: "foo.bar2", InternalPropertyName: "foo.bar2", Type: schema.QuesmaTypeKeyword},
"logs-2": {
Fields: map[schema.FieldName]schema.Field{
"foo.bar2": {PropertyName: "foo.bar2", InternalPropertyName: "foo.bar2", Type: schema.QuesmaTypeKeyword},
},
},
},
},
}, []string{"logs-1", "logs-2"})
}, []string{"logs-1", "logs-2"})
assert.NoError(t, err)
expectedResp, err := json.MarshalIndent([]byte(`{
"fields": {
Expand Down Expand Up @@ -290,8 +287,8 @@ func TestFieldCapsMultipleIndexesConflictingEntries(t *testing.T) {
"foo.bar": {Name: "foo.bar", Type: clickhouse.BaseType{Name: "Boolean"}},
},
})
resp, err := handleFieldCapsIndex(&config.QuesmaConfiguration{
IndexConfig: map[string]config.IndexConfiguration{
resp, err := handleFieldCapsIndex(
map[string]config.IndexConfiguration{
"logs-1": {
QueryTarget: []string{config.ClickhouseTarget},
IngestTarget: []string{config.ClickhouseTarget},
Expand All @@ -304,26 +301,25 @@ func TestFieldCapsMultipleIndexesConflictingEntries(t *testing.T) {
QueryTarget: []string{config.ClickhouseTarget},
IngestTarget: []string{config.ClickhouseTarget},
},
},
}, &schema.StaticRegistry{
Tables: map[schema.IndexName]schema.Schema{
"logs-1": {
Fields: map[schema.FieldName]schema.Field{
"foo.bar": {PropertyName: "foo.bar", InternalPropertyName: "foo.bar", Type: schema.QuesmaTypeKeyword},
}, &schema.StaticRegistry{
Tables: map[schema.IndexName]schema.Schema{
"logs-1": {
Fields: map[schema.FieldName]schema.Field{
"foo.bar": {PropertyName: "foo.bar", InternalPropertyName: "foo.bar", Type: schema.QuesmaTypeKeyword},
},
},
},
"logs-2": {
Fields: map[schema.FieldName]schema.Field{
"foo.bar": {PropertyName: "foo.bar", InternalPropertyName: "foo.bar", Type: schema.QuesmaTypeBoolean},
"logs-2": {
Fields: map[schema.FieldName]schema.Field{
"foo.bar": {PropertyName: "foo.bar", InternalPropertyName: "foo.bar", Type: schema.QuesmaTypeBoolean},
},
},
},
"logs-3": {
Fields: map[schema.FieldName]schema.Field{
"foo.bar": {PropertyName: "foo.bar", InternalPropertyName: "foo.bar", Type: schema.QuesmaTypeBoolean},
"logs-3": {
Fields: map[schema.FieldName]schema.Field{
"foo.bar": {PropertyName: "foo.bar", InternalPropertyName: "foo.bar", Type: schema.QuesmaTypeBoolean},
},
},
},
},
}, []string{"logs-1", "logs-2", "logs-3"})
}, []string{"logs-1", "logs-2", "logs-3"})
assert.NoError(t, err)
expectedResp, err := json.MarshalIndent([]byte(`{
"fields": {
Expand Down
4 changes: 2 additions & 2 deletions quesma/quesma/functionality/resolve/resolve.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ import (

// HandleResolve combines results from both schema.Registry (ClickHouse) and Elasticsearch,
// This endpoint is used in Kibana/OSD when creating Data Views/Index Patterns.
func HandleResolve(pattern string, sr schema.Registry, cfg *config.QuesmaConfiguration) (elasticsearch.Sources, error) {
func HandleResolve(pattern string, sr schema.Registry, ir elasticsearch.IndexResolver) (elasticsearch.Sources, error) {
sourcesToShow := &elasticsearch.Sources{}

normalizedPattern := elasticsearch.NormalizePattern(pattern) // changes `_all` to `*`

sourcesFromElasticsearch, _, err := elasticsearch.NewIndexResolver(cfg.Elasticsearch).Resolve(normalizedPattern)
sourcesFromElasticsearch, _, err := ir.Resolve(normalizedPattern)
if err != nil {
logger.Warn().Msgf("Failed fetching resolving sources matching `%s`: %v", pattern, err)
} else {
Expand Down
Loading
Loading