Skip to content

Commit

Permalink
Add report for unsupported query types. (#20)
Browse files Browse the repository at this point in the history
@trzysiek migrated PR:
QuesmaOrg/poc-elk-mitmproxy#847

@jakozaur improved it and it works well
![Screenshot 2024-05-02 at 21 08
23](https://github.com/QuesmaOrg/quesma/assets/972989/e1db0a3b-f858-486a-915f-a05b8f48b2bc)

Some ideas:
- I prefer to have one list of requests, that would allow us to evolve
it when we have multiple containers and want to centralize panels
- Let's try to keep unified pages and re-use them as much as possible.

---------

Co-authored-by: Krzysztof Kiewicz <[email protected]>
  • Loading branch information
jakozaur and trzysiek authored May 4, 2024
1 parent 768a323 commit 1f02b37
Show file tree
Hide file tree
Showing 12 changed files with 1,945 additions and 48 deletions.
33 changes: 28 additions & 5 deletions quesma/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,11 @@ var (
)

const (
RID = "request_id" // request id key for the logger
Reason = "reason" // Known error reason key for the logger
Path = "path"
AsyncId = "async_id"
RID = "request_id" // request id key for the logger
Reason = "reason" // Known error reason key for the logger
Path = "path"
AsyncId = "async_id"
ReasonPrefixUnsupportedQueryType = "unsupported_search_query: " // Reason for Error messages for unsupported queries will start with this prefix
)

const (
Expand All @@ -38,7 +39,7 @@ const (

var logger zerolog.Logger

// Returns channel where log messages will be sent
// InitLogger returns channel where log messages will be sent
func InitLogger(cfg config.QuesmaConfiguration, sig chan os.Signal, doneCh chan struct{}, asyncQueryTraceLogger *tracing.AsyncTraceLogger) <-chan tracing.LogWithLevel {
zerolog.TimeFieldFormat = time.RFC3339Nano // without this we don't have milliseconds timestamp precision
var output io.Writer = zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.StampMilli}
Expand Down Expand Up @@ -116,6 +117,24 @@ func InitSimpleLoggerForTests() {
Logger()
}

func InitOnlyChannelLoggerForTests(cfg config.QuesmaConfiguration, asyncQueryTraceLogger *tracing.AsyncTraceLogger) <-chan tracing.LogWithLevel {
zerolog.TimeFieldFormat = time.RFC3339Nano // without this we don't have milliseconds timestamp precision
logChannel := make(chan tracing.LogWithLevel, 50000) // small number like 5 or 10 made entire Quesma totally unresponsive during the few seconds where Kibana spams with messages
chanWriter := channelWriter{ch: logChannel}

logger = zerolog.New(chanWriter).
Level(cfg.Logging.Level).
With().
Timestamp().
Caller().
Logger()

globalError := errorstats.GlobalErrorHook{}
logger = logger.Hook(&globalError)
logger = logger.Hook(asyncQueryTraceLogger)
return logChannel
}

func openLogFiles(logsPath string) {
var err error
StdLogFile, err = os.OpenFile(
Expand Down Expand Up @@ -220,3 +239,7 @@ func Fatal() *zerolog.Event {
func Panic() *zerolog.Event {
return logger.Panic()
}

func ReasonUnsupportedQuery(queryType string) string {
return ReasonPrefixUnsupportedQueryType + queryType
}
92 changes: 92 additions & 0 deletions quesma/model/query_types_list.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package model

// list of all aggregation types in Elasticsearch.
var AggregationQueryTypes = []string{
// metrics:
"avg",
"boxplot",
"cardinality",
"extended_stats",
"geo_bounds",
"geo_centroid",
"geo_line",
"cartesian_bounds",
"cartesian_centroid",
"matrix_stats",
"max",
"median_absolute_deviation",
"min",
"percentile_ranks",
"percentiles",
"rate",
"scripted_metric",
"stats",
"string_stats",
"sum",
"t_test",
"top_hits",
"top_metrics",
"value_count",
"weighted_avg",

// bucket:
"adjacency_matrix",
"auto_date_histogram",
"categorize_text",
"children",
"composite",
"date_histogram",
"date_range",
"diversified_sampler",
"filter",
"filters",
"frequent_item_sets",
"geo_distance",
"geohash_grid",
"geohex_grid",
"geotile_grid",
"global",
"histogram",
"ip_prefix",
"ip_range",
"missing",
"multi_terms",
"nested",
"parent",
"random_sampler",
"range",
"rare_terms",
"reverse_nested",
"sampler",
"significant_terms",
"significant_text",
"terms",
"time_series",
"variable_width_histogram",

// pipeline:
"avg_bucket",
"bucket_script",
"bucket_count_ks_test",
"bucket_correlation",
"bucket_selector",
"bucket_sort",
"change_point",
"cumulative_cardinality",
"cumulative_sum",
"derivative",
"extended_stats_bucket",
"inference",
"max_bucket",
"min_bucket",
"moving_avg",
"moving_fn",
"moving_percentiles",
"normalize",
"percentiles_bucket",
"serial_diff",
"stats_bucket",
"sum_bucket",
}

// TODO list of all Query DSL types in Elasticsearch.
65 changes: 52 additions & 13 deletions quesma/queryparser/aggregation_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,9 +231,56 @@ func (cw *ClickhouseQueryTranslator) ParseAggregationJson(queryAsJson string) ([
}

// 'resultAccumulator' - array when we store results
// 'queryMap' always looks like this:
//
// "aggs": {
// "arbitrary_aggregation_name": {
// ["some aggregation":
// { "arbitrary_aggregation_name_2": { ... },]
// ["some other aggregation": { ... },]
// ["aggs": { ... }]
// }
// }
//
// Notice that on 0, 2, ..., level of nesting we have "aggs" key or aggregation type.
// On 1, 3, ... level of nesting we have names of aggregations, which can be any arbitrary strings.
// This function is called on those 1, 3, ... levels, and parses and saves those aggregation names.
func (cw *ClickhouseQueryTranslator) parseAggregationNames(currentAggr *aggrQueryBuilder, queryMap QueryMap, resultAccumulator *[]model.QueryWithAggregation) {
// We process subaggregations, introduced via (k, v), meaning 'aggregation_name': { dict }
for k, v := range queryMap {
// I assume it's new aggregator name
logger.DebugWithCtx(cw.Ctx).Msgf("names += %s", k)
currentAggr.Aggregators = append(currentAggr.Aggregators, model.NewAggregatorEmpty(k))
if subAggregation, ok := v.(QueryMap); ok {
cw.parseAggregation(currentAggr, subAggregation, resultAccumulator)
} else {
logger.ErrorWithCtxAndReason(cw.Ctx, logger.ReasonUnsupportedQuery("unexpected_type")).
Msgf("unexpected type of subaggregation: (%v: %v), value type: %T. Skipping", k, v, v)
}
logger.DebugWithCtx(cw.Ctx).Msgf("names -= %s", k)
currentAggr.Aggregators = currentAggr.Aggregators[:len(currentAggr.Aggregators)-1]
}
}

// Builds aggregations recursively. Seems to be working on all examples so far,
// even though it's a pretty simple algorithm.
// When making changes, look at the order in which we parse fields, it is very important for correctness.
//
// 'resultAccumulator' - array when we store results
// 'queryMap' always looks like this:
//
// "aggs": {
// "arbitrary_aggregation_name": {
// ["some aggregation":
// { "arbitrary_aggregation_name_2": { ... },]
// ["some other aggregation": { ... },]
// ["aggs": { ... }]
// }
// }
//
// Notice that on 0, 2, ..., level of nesting we have "aggs" key or aggregation type.
// On 1, 3, ... level of nesting we have names of aggregations, which can be any arbitrary strings.
// This function is called on those 0, 2, ... levels, and parses the actual aggregations.
func (cw *ClickhouseQueryTranslator) parseAggregation(currentAggr *aggrQueryBuilder, queryMap QueryMap, resultAccumulator *[]model.QueryWithAggregation) {
if len(queryMap) == 0 {
return
Expand Down Expand Up @@ -319,7 +366,7 @@ func (cw *ClickhouseQueryTranslator) parseAggregation(currentAggr *aggrQueryBuil
if aggs, ok := queryMap["aggs"].(QueryMap); ok {
aggsCopy, err := deepcopy.Anything(aggs)
if err == nil {
cw.parseAggregation(currentAggr, aggsCopy.(QueryMap), resultAccumulator)
cw.parseAggregationNames(currentAggr, aggsCopy.(QueryMap), resultAccumulator)
} else {
logger.ErrorWithCtx(cw.Ctx).Msgf("deepcopy 'aggs' map error: %v. Skipping current filter: %v, aggs: %v", err, filter, aggs)
}
Expand All @@ -332,7 +379,7 @@ func (cw *ClickhouseQueryTranslator) parseAggregation(currentAggr *aggrQueryBuil

aggsHandledSeparately := isRange || isFilters
if aggs, ok := queryMap["aggs"]; ok && !aggsHandledSeparately {
cw.parseAggregation(currentAggr, aggs.(QueryMap), resultAccumulator)
cw.parseAggregationNames(currentAggr, aggs.(QueryMap), resultAccumulator)
}
delete(queryMap, "aggs") // no-op if no "aggs"

Expand All @@ -341,18 +388,10 @@ func (cw *ClickhouseQueryTranslator) parseAggregation(currentAggr *aggrQueryBuil
*resultAccumulator = append(*resultAccumulator, currentAggr.buildBucketAggregation(metadata))
}

// 5. At the end, we process subaggregations, introduced via (k, v), meaning 'subaggregation_name': { dict }
for k, v := range queryMap {
// I assume it's new aggregator name
logger.DebugWithCtx(cw.Ctx).Msgf("names += %s", k)
currentAggr.Aggregators = append(currentAggr.Aggregators, model.NewAggregatorEmpty(k))
if subAggregation, ok := v.(QueryMap); ok {
cw.parseAggregation(currentAggr, subAggregation, resultAccumulator)
} else {
logger.ErrorWithCtx(cw.Ctx).Msgf("unexpected type of subaggregation: (%v: %v), value type: %T. Skipping", k, v, v)
}
logger.DebugWithCtx(cw.Ctx).Msgf("names -= %s", k)
currentAggr.Aggregators = currentAggr.Aggregators[:len(currentAggr.Aggregators)-1]
// should be empty by now. If it's not, it's an unsupported/unrecognized type of aggregation.
logger.ErrorWithCtxAndReason(cw.Ctx, logger.ReasonUnsupportedQuery(k)).
Msgf("unexpected type of subaggregation: (%v: %v), value type: %T. Skipping", k, v, v)
}

// restore current state, removing subaggregation state
Expand Down
2 changes: 1 addition & 1 deletion quesma/queryparser/range_aggregation.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ func (cw *ClickhouseQueryTranslator) processRangeAggregation(currentAggr *aggrQu
aggsCopy, err := deepcopy.Anything(aggs)
if err == nil {
currentAggr.Type = model.NewUnknownAggregationType(cw.Ctx)
cw.parseAggregation(currentAggr, aggsCopy.(QueryMap), aggregationsAccumulator)
cw.parseAggregationNames(currentAggr, aggsCopy.(QueryMap), aggregationsAccumulator)
} else {
logger.ErrorWithCtx(cw.Ctx).Msgf("deepcopy 'aggs' map error: %v. Skipping current range's interval: %v, aggs: %v", err, interval, aggs)
}
Expand Down
92 changes: 92 additions & 0 deletions quesma/quesma/search_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@ import (
"fmt"
"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/assert"
"math/rand"
"mitmproxy/quesma/clickhouse"
"mitmproxy/quesma/concurrent"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/model"
"mitmproxy/quesma/queryparser"
"mitmproxy/quesma/quesma/config"
Expand All @@ -18,6 +20,7 @@ import (
"strconv"
"strings"
"testing"
"time"
)

const defaultAsyncSearchTimeout = 1000
Expand Down Expand Up @@ -390,3 +393,92 @@ func TestNumericFacetsQueries(t *testing.T) {
}
}
}

// TestAllUnsupportedQueryTypesAreProperlyRecorded tests if all unsupported query types are properly recorded.
// It runs |testdata.UnsupportedAggregationsTests| tests, each of them sends one query of unsupported type.
// It ensures that this query type is recorded in the management console, and that all other query types are not.
func TestAllUnsupportedQueryTypesAreProperlyRecorded(t *testing.T) {
for _, tt := range testdata.UnsupportedAggregationsTests {
t.Run(tt.TestName, func(t *testing.T) {
db, _, err := sqlmock.New()
if err != nil {
t.Fatal(err)
}
defer db.Close()
assert.NoError(t, err)

lm := clickhouse.NewLogManagerWithConnection(db, table)
cfg := config.QuesmaConfiguration{}
logChan := logger.InitOnlyChannelLoggerForTests(cfg, &tracing.AsyncTraceLogger{AsyncQueryTrace: concurrent.NewMap[string, tracing.TraceCtx]()})
managementConsole := ui.NewQuesmaManagementConsole(cfg, nil, nil, logChan, telemetry.NewPhoneHomeEmptyAgent())
go managementConsole.RunOnlyChannelProcessor()

queryRunner := NewQueryRunner()
newCtx := context.WithValue(ctx, tracing.RequestIdCtxKey, tracing.GetRequestId())
_, _ = queryRunner.handleSearch(newCtx, tableName, []byte(tt.QueryRequestJson), cfg, lm, nil, managementConsole)

for _, queryType := range model.AggregationQueryTypes {
if queryType != tt.AggregationName {
assert.Len(t, managementConsole.QueriesWithUnsupportedType(queryType), 0)
}
}

// Update of the count below is done asynchronously in another goroutine
// (go managementConsole.RunOnlyChannelProcessor() above), so we might need to wait a bit
assert.Eventually(t, func() bool {
return len(managementConsole.QueriesWithUnsupportedType(tt.AggregationName)) == 1
}, 50*time.Millisecond, 1*time.Millisecond)
assert.Equal(t, 1, managementConsole.GetTotalUnsupportedQueries())
assert.Equal(t, 1, managementConsole.GetSavedUnsupportedQueries())
assert.Equal(t, 1, len(managementConsole.GetUnsupportedTypesWithCount()))
})
}
}

// TestDifferentUnsupportedQueries tests if different unsupported queries are properly recorded.
// I randomly select requestsNr queries from testdata.UnsupportedAggregationsTests, run them, and check
// if all of them are properly recorded in the management console.
func TestDifferentUnsupportedQueries(t *testing.T) {
const maxSavedQueriesPerQueryType = 10
const requestsNr = 50

// generate random |requestsNr| queries to send
testNrs := make([]int, 0, requestsNr)
testCounts := make([]int, len(testdata.UnsupportedAggregationsTests))
for range requestsNr {
randInt := rand.Intn(len(testdata.UnsupportedAggregationsTests))
testNrs = append(testNrs, randInt)
testCounts[randInt]++
}

db, _, err := sqlmock.New()
if err != nil {
t.Fatal(err)
}
defer db.Close()
assert.NoError(t, err)

lm := clickhouse.NewLogManagerWithConnection(db, table)
cfg := config.QuesmaConfiguration{}
logChan := logger.InitOnlyChannelLoggerForTests(cfg, &tracing.AsyncTraceLogger{AsyncQueryTrace: concurrent.NewMap[string, tracing.TraceCtx]()})
managementConsole := ui.NewQuesmaManagementConsole(cfg, nil, nil, logChan, telemetry.NewPhoneHomeEmptyAgent())
go managementConsole.RunOnlyChannelProcessor()

queryRunner := NewQueryRunner()
for _, testNr := range testNrs {
newCtx := context.WithValue(ctx, tracing.RequestIdCtxKey, tracing.GetRequestId())
_, _ = queryRunner.handleSearch(newCtx, tableName, []byte(testdata.UnsupportedAggregationsTests[testNr].QueryRequestJson), cfg, lm, nil, managementConsole)

}

for i, tt := range testdata.UnsupportedAggregationsTests {
// Update of the count below is done asynchronously in another goroutine
// (go managementConsole.RunOnlyChannelProcessor() above), so we might need to wait a bit
assert.Eventually(t, func() bool {
return len(managementConsole.QueriesWithUnsupportedType(tt.AggregationName)) == min(testCounts[i], maxSavedQueriesPerQueryType)
}, 500*time.Millisecond, 1*time.Millisecond,
tt.TestName+": wanted: %d, got: %d", min(testCounts[i], maxSavedQueriesPerQueryType),
len(managementConsole.QueriesWithUnsupportedType(tt.AggregationName)),
)
}
}
Loading

0 comments on commit 1f02b37

Please sign in to comment.