Skip to content

Commit

Permalink
Remove quesma/logger from utils package (#1021)
Browse files Browse the repository at this point in the history
A step towards untangling dependency hell with our modules.

Mainly it's about the having functions from `utils` return an `error`
and logging them (or sometimes not - whenever I've though it's not that
significant) in the places they're called.

Related to #1017
  • Loading branch information
mieciu authored Nov 21, 2024
1 parent c083bdd commit cf7ba3e
Show file tree
Hide file tree
Showing 14 changed files with 133 additions and 83 deletions.
7 changes: 5 additions & 2 deletions quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@ import (
"quesma/persistence"
"quesma/quesma/config"
"quesma/quesma/recovery"

"quesma/telemetry"
"quesma/util"
"slices"
Expand Down Expand Up @@ -165,7 +164,11 @@ func (lm *LogManager) ResolveIndexPattern(ctx context.Context, pattern string) (
} else {
lm.tableDiscovery.TableDefinitions().
Range(func(tableName string, v *Table) bool {
if util.IndexPatternMatches(pattern, tableName) {
matches, err := util.IndexPatternMatches(pattern, tableName)
if err != nil {
logger.Error().Msgf("error matching index pattern: %v", err)
}
if matches {
results = append(results, tableName)
}
return true
Expand Down
4 changes: 3 additions & 1 deletion quesma/elasticsearch/index_management.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@ func (s stubIndexManagement) GetSourceNames() map[string]bool {
func (s stubIndexManagement) GetSourceNamesMatching(indexPattern string) map[string]bool {
var result = make(map[string]bool)
for _, index := range s.indexes {
if util.IndexPatternMatches(indexPattern, index) {
if matches, err := util.IndexPatternMatches(indexPattern, index); err == nil && matches {
result[index] = true
} else {
logger.Error().Err(err)
}
}
return result
Expand Down
6 changes: 5 additions & 1 deletion quesma/model/bucket_aggregations/date_histogram.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,11 @@ func (query *DateHistogram) TranslateSqlResponseToJson(rows []model.QueryResultR
var response []model.JsonMap
for _, row := range rows {
docCount := row.LastColValue()
if util.ExtractInt64(docCount) < int64(query.minDocCount) {
docCountAsInt, err := util.ExtractInt64(docCount)
if err != nil {
logger.ErrorWithCtx(query.ctx).Msgf("error parsing doc_count: %v", docCount)
}
if docCountAsInt < int64(query.minDocCount) {
continue
}
originalKey := query.getKey(row)
Expand Down
9 changes: 6 additions & 3 deletions quesma/model/bucket_aggregations/geotile_grid.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ func (query GeoTileGrid) TranslateSqlResponseToJson(rows []model.QueryResultRow)
}
var response []model.JsonMap
for _, row := range rows {
zoom := int64(util.ExtractFloat64(row.Cols[0].Value))
x := int64(util.ExtractFloat64(row.Cols[1].Value))
y := int64(util.ExtractFloat64(row.Cols[2].Value))
zoomAsFloat, _ := util.ExtractFloat64(row.Cols[0].Value)
zoom := int64(zoomAsFloat)
xAsFloat, _ := util.ExtractFloat64(row.Cols[1].Value)
x := int64(xAsFloat)
yAsFloat, _ := util.ExtractFloat64(row.Cols[2].Value)
y := int64(yAsFloat)
key := strconv.FormatInt(zoom, 10) + "/" + strconv.FormatInt(x, 10) + "/" + strconv.FormatInt(y, 10)
response = append(response, model.JsonMap{
"key": key,
Expand Down
6 changes: 5 additions & 1 deletion quesma/model/bucket_aggregations/multi_terms.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ func (query MultiTerms) TranslateSqlResponseToJson(rows []model.QueryResultRow)
}
sumOtherDocCount := 0
if len(rows) > 0 {
sumOtherDocCount = int(util.ExtractInt64(query.parentCount(rows[0]))) - query.sumDocCounts(rows)
parentCount, err := util.ExtractInt64(query.parentCount(rows[0]))
if err != nil {
logger.Error().Err(err)
}
sumOtherDocCount = int(parentCount) - query.sumDocCounts(rows)
}
return model.JsonMap{
"sum_other_doc_count": sumOtherDocCount,
Expand Down
5 changes: 3 additions & 2 deletions quesma/model/bucket_aggregations/terms.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,15 @@ func (query Terms) TranslateSqlResponseToJson(rows []model.QueryResultRow) model
}

if !query.significant {
sumOtherDocCount := int(util.ExtractInt64(query.parentCount(rows[0]))) - query.sumDocCounts(rows)
parentCountAsInt, _ := util.ExtractInt64(query.parentCount(rows[0]))
sumOtherDocCount := int(parentCountAsInt) - query.sumDocCounts(rows)
return model.JsonMap{
"sum_other_doc_count": sumOtherDocCount,
"doc_count_error_upper_bound": 0,
"buckets": response,
}
} else {
parentDocCount := util.ExtractInt64(query.parentCount(rows[0]))
parentDocCount, _ := util.ExtractInt64(query.parentCount(rows[0]))
return model.JsonMap{
"buckets": response,
"doc_count": parentDocCount,
Expand Down
20 changes: 16 additions & 4 deletions quesma/queryparser/pancake_json_rendering.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,11 @@ func (p *pancakeJSONRenderer) combinatorBucketToJSON(remainingLayers []*pancakeM
if err != nil {
return nil, err
}
return util.MergeMaps(p.ctx, aggJson, subAggr), nil
mergeResult, mergeErr := util.MergeMaps(aggJson, subAggr)
if mergeErr != nil {
logger.ErrorWithCtx(p.ctx).Msgf("error merging maps: %v", mergeErr)
}
return mergeResult, nil
case bucket_aggregations.CombinatorAggregationInterface:
var bucketArray []model.JsonMap
for _, subGroup := range queryType.CombinatorGroups() {
Expand All @@ -193,7 +197,11 @@ func (p *pancakeJSONRenderer) combinatorBucketToJSON(remainingLayers []*pancakeM
selectedRows := p.selectMetricRows(layer.nextBucketAggregation.InternalNameForCount(), selectedRowsWithoutPrefix)
aggJson := queryType.CombinatorTranslateSqlResponseToJson(subGroup, selectedRows)

bucketArray = append(bucketArray, util.MergeMaps(p.ctx, aggJson, subAggr))
mergeResult, mergeErr := util.MergeMaps(aggJson, subAggr)
if mergeErr != nil {
logger.ErrorWithCtx(p.ctx).Msgf("error merging maps: %v", mergeErr)
}
bucketArray = append(bucketArray, mergeResult)
bucketArray[len(bucketArray)-1]["key"] = subGroup.Key
}
var bucketsJson any
Expand Down Expand Up @@ -319,7 +327,9 @@ func (p *pancakeJSONRenderer) layerToJSON(remainingLayers []*pancakeModelLayer,
if err != nil {
return nil, err
}
bucketArr[i] = util.MergeMaps(p.ctx, bucket, subAggr)
if bucketArr[i], err = util.MergeMaps(bucket, subAggr); err != nil {
logger.ErrorWithCtx(p.ctx).Msgf("error merging maps: %v", err)
}
}
} else {
// A bit harder case. Observation: len(bucketArr) > len(subAggrRows) and set(subAggrRows' keys) is a subset of set(bucketArr's keys)
Expand Down Expand Up @@ -360,7 +370,9 @@ func (p *pancakeJSONRenderer) layerToJSON(remainingLayers []*pancakeModelLayer,
if err != nil {
return nil, err
}
bucketArr[i] = util.MergeMaps(p.ctx, bucket, subAggr)
if bucketArr[i], err = util.MergeMaps(bucket, subAggr); err != nil {
logger.ErrorWithCtx(p.ctx).Msgf("error merging maps: %v", err)
}
}
}

Expand Down
12 changes: 10 additions & 2 deletions quesma/queryparser/pancake_pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,11 @@ func (p pancakePipelinesProcessor) currentPipelineMetricAggregations(layer *panc
thisPipelineResults := p.calcSingleMetricPipeline(layer, pipeline, rows)

errorMsg := fmt.Sprintf("calculateThisLayerMetricPipelines, pipeline: %s", pipeline.internalName)
resultPerPipeline = util.Merge(p.ctx, resultPerPipeline, thisPipelineResults, errorMsg)
var err error
resultPerPipeline, err = util.Merge(resultPerPipeline, thisPipelineResults, errorMsg)
if err != nil {
logger.ErrorWithCtx(p.ctx).Msgf("error merging results: %v", err)
}
}

return
Expand All @@ -72,7 +76,11 @@ func (p pancakePipelinesProcessor) calcSingleMetricPipeline(layer *pancakeModelL
childResults := p.calcSingleMetricPipeline(layer, pipelineChild, resultRows)

errorMsg := fmt.Sprintf("processSingleMetricPipeline, pipeline: %s, pipelineChild: %s", pipeline.internalName, pipelineChild.internalName)
resultPerPipeline = util.Merge(p.ctx, resultPerPipeline, childResults, errorMsg)
var err error
resultPerPipeline, err = util.Merge(resultPerPipeline, childResults, errorMsg)
if err != nil {
logger.ErrorWithCtx(p.ctx).Msgf("error merging results: %v", err)
}
}

return
Expand Down
4 changes: 3 additions & 1 deletion quesma/queryparser/query_translator.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ func (cw *ClickhouseQueryTranslator) MakeAggregationPartOfResponse(queries []*mo
return nil, err
}

aggregations = util.MergeMaps(cw.Ctx, aggregations, aggregation)
if aggregations, err = util.MergeMaps(aggregations, aggregation); err != nil {
logger.ErrorWithCtx(cw.Ctx).Msgf("failed to merge aggregations: %v", err)
}
}
}
return aggregations, nil
Expand Down
6 changes: 3 additions & 3 deletions quesma/table_resolver/rules.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,20 +32,20 @@ func (r *tableRegistryImpl) wildcardPatternSplitter(pattern string) (parsedPatte
}

for indexName := range r.conf.IndexConfig {
if util.IndexPatternMatches(pattern, indexName) {
if matches, _ := util.IndexPatternMatches(pattern, indexName); matches {
matchingSingleNames = append(matchingSingleNames, indexName)
}
}

// but maybe we should also check against the actual indexes ??
for indexName := range r.elasticIndexes {
if util.IndexPatternMatches(pattern, indexName) {
if matches, _ := util.IndexPatternMatches(pattern, indexName); matches {
matchingSingleNames = append(matchingSingleNames, indexName)
}
}
if r.conf.AutodiscoveryEnabled {
for tableName := range r.clickhouseIndexes {
if util.IndexPatternMatches(pattern, tableName) {
if matches, _ := util.IndexPatternMatches(pattern, tableName); matches {
matchingSingleNames = append(matchingSingleNames, tableName)
}
}
Expand Down
10 changes: 5 additions & 5 deletions quesma/util/elasticsearch.go → quesma/util/index_pattern.go
Original file line number Diff line number Diff line change
@@ -1,18 +1,18 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package util

import (
"quesma/logger"
"fmt"
"regexp"
"strings"
)

func IndexPatternMatches(indexNamePattern, indexName string) bool {
func IndexPatternMatches(indexNamePattern, indexName string) (bool, error) {
r, err := regexp.Compile("^" + strings.Replace(indexNamePattern, "*", ".*", -1) + "$")
if err != nil {
logger.Error().Msgf("invalid index name pattern [%s]: %s", indexNamePattern, err)
return false
return false, fmt.Errorf("invalid index name pattern [%s]: %s", indexNamePattern, err)
}
return r.MatchString(indexName)
return r.MatchString(indexName), nil
}
13 changes: 7 additions & 6 deletions quesma/util/map_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ package util

import (
"cmp"
"context"
"quesma/logger"
"fmt"
"github.com/hashicorp/go-multierror"
"sort"
)

Expand Down Expand Up @@ -41,13 +41,14 @@ func MapKeysSortedByValue[K comparable, V cmp.Ordered](m map[K]V) []K {
return keys
}

// Caution: this function mutates the first map
func Merge[V any](ctx context.Context, m1, m2 map[string]V, errorContext string) map[string]V {
// Merge function mutates the first map - use with caution!
func Merge[V any](m1, m2 map[string]V, errorContext string) (map[string]V, error) {
var err *multierror.Error
for k, v := range m2 {
if _, exists := m1[k]; exists {
logger.ErrorWithCtx(ctx).Msgf("key %s already exists. overriding (%s)", k, errorContext)
err = multierror.Append(err, fmt.Errorf("key %s already exists. overriding (%s)", k, errorContext))
}
m1[k] = v
}
return m1
return m1, err.ErrorOrNil()
}
Loading

0 comments on commit cf7ba3e

Please sign in to comment.