Skip to content

Commit

Permalink
A/B testing - E vs C and C vs E (#919)
Browse files Browse the repository at this point in the history
This PR contains:
- generalized code of A/B testing query results for both scenarios:
Elastic vs Clickhouse and Clickhouse vs Elastic
- kibana dashboard id is stored in a separate column
- we don't store requests and A/B responses  if the responses match

Some code is copy pasted. I'm not brave enough to generalize it now. 

Tests will be added as part of IT suite. 

<img width="789" alt="Screenshot 2024-10-28 at 12 37 16"
src="https://github.com/user-attachments/assets/3be97a39-c245-4477-934c-40bf1a918947">

<img width="1995" alt="Screenshot 2024-10-28 at 12 36 49"
src="https://github.com/user-attachments/assets/77c487f1-c232-4216-9f9d-779925cf8c25">
  • Loading branch information
nablaone authored Oct 30, 2024
1 parent 8ccf821 commit 72cab08
Show file tree
Hide file tree
Showing 12 changed files with 478 additions and 259 deletions.
3 changes: 2 additions & 1 deletion quesma/ab_testing/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,12 +83,13 @@ func NewCollector(ctx context.Context, ingester ingest.Ingester, healthQueue cha
&diffTransformer{},
//&ppPrintFanout{},
//&mismatchedOnlyFilter{},
&redactOkResults{},
//&elasticSearchFanout{
// url: "http://localhost:8080",
// indexName: "ab_testing_logs",
//},
&internalIngestFanout{
indexName: "ab_testing_logs",
indexName: ab_testing.ABTestingTableName,
ingestProcessor: ingester,
},
},
Expand Down
23 changes: 16 additions & 7 deletions quesma/ab_testing/collector/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,22 +66,31 @@ func (t *diffTransformer) process(in EnrichedResults) (out EnrichedResults, drop
}

if len(mismatches) > 0 {
b, err := json.MarshalIndent(mismatches, "", " ")

if err != nil {
return in, false, fmt.Errorf("failed to marshal mismatches: %w", err)
}

in.Mismatch.Mismatches = string(b)
in.Mismatch.IsOK = false
in.Mismatch.Count = len(mismatches)
in.Mismatch.Message = mismatches.String()

topMismatchType, _ := t.mostCommonMismatchType(mismatches)
if topMismatchType != "" {
in.Mismatch.TopMismatchType = topMismatchType
}

// if there are too many mismatches, we only show the first 20
// this is to avoid overwhelming the user with too much information
const mismatchesSize = 20

if len(mismatches) > mismatchesSize {
mismatches = mismatches[:mismatchesSize]
}

b, err := json.MarshalIndent(mismatches, "", " ")

if err != nil {
return in, false, fmt.Errorf("failed to marshal mismatches: %w", err)
}
in.Mismatch.Mismatches = string(b)
in.Mismatch.Message = mismatches.String()

} else {
in.Mismatch.Mismatches = "[]"
in.Mismatch.IsOK = true
Expand Down
24 changes: 24 additions & 0 deletions quesma/ab_testing/collector/filters.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,27 @@ func (t *mismatchedOnlyFilter) process(in EnrichedResults) (out EnrichedResults,

// avoid unused struct error
var _ = &mismatchedOnlyFilter{}

type redactOkResults struct {
}

func (t *redactOkResults) name() string {
return "redactOkResults"
}

func (t *redactOkResults) process(in EnrichedResults) (out EnrichedResults, drop bool, err error) {

// we're not interested in the details of the request and responses if the mismatch is OK

redactMsg := "***REDACTED***"
if in.Mismatch.IsOK {
in.Request.Body = redactMsg
in.A.Body = redactMsg
in.B.Body = redactMsg
in.Mismatch.Message = "OK"
}

return in, false, nil
}

var _ = &redactOkResults{}
2 changes: 2 additions & 0 deletions quesma/ab_testing/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
// SPDX-License-Identifier: Elastic-2.0
package ab_testing

const ABTestingTableName = "ab_testing_logs"

type Request struct {
Path string `json:"path"`
IndexName string `json:"index_name"`
Expand Down
2 changes: 1 addition & 1 deletion quesma/jsondiff/elastic_response_diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import "fmt"

// NewElasticResponseJSONDiff creates a JSONDiff instance that is tailored to compare Elasticsearch response JSONs.
func NewElasticResponseJSONDiff() (*JSONDiff, error) {
d, err := NewJSONDiff("^id$", ".*Quesma_key_.*", "^took$")
d, err := NewJSONDiff("^id$", ".*Quesma_key_.*", "^took$", ".*__quesma_total_count", ".*\\._id", "^_shards.*", ".*\\._score", ".*\\._source", ".*\\.__quesma_originalKey")

if err != nil {
return nil, fmt.Errorf("could not create JSONDiff: %v", err)
Expand Down
41 changes: 37 additions & 4 deletions quesma/jsondiff/jsondiff.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ func newType(code, message string) mismatchType {
var (
invalidType = newType("invalid_type", "Types are not equal")
invalidValue = newType("invalid_value", "Values are not equal")
invalidNumberValue = newType("invalid_number_value", "Numbers are not equal")
invalidDateValue = newType("invalid_date_value", "Dates are not equal")
invalidArrayLength = newType("invalid_array_length", "Array lengths are not equal")
invalidArrayLengthOffByOne = newType("invalid_array_length_off_by_one", "Array lengths are off by one.")
objectDifference = newType("object_difference", "Objects are different")
Expand Down Expand Up @@ -355,6 +357,8 @@ func (d *JSONDiff) asType(a any) string {
return fmt.Sprintf("%T", a)
}

var dateRx = regexp.MustCompile(`\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}`)

func (d *JSONDiff) compare(expected any, actual any) {

if d.isIgnoredPath() {
Expand Down Expand Up @@ -399,9 +403,9 @@ func (d *JSONDiff) compare(expected any, actual any) {
case float64:

// float operations are noisy, we need to compare them with desired precision

epsilon := 1e-9
relativeTolerance := 1e-9
// this is lousy, but it works for now
epsilon := 1e-3
relativeTolerance := 1e-3
aFloat := expected.(float64)
bFloat := actual.(float64)

Expand All @@ -411,8 +415,37 @@ func (d *JSONDiff) compare(expected any, actual any) {
relativeDiff := absDiff / math.Max(math.Abs(aFloat), math.Abs(bFloat))

if relativeDiff > relativeTolerance {
d.addMismatch(invalidValue, d.asValue(expected), d.asValue(actual))
d.addMismatch(invalidNumberValue, d.asValue(expected), d.asValue(actual))
}
}

default:
d.addMismatch(invalidType, d.asType(expected), d.asType(actual))
}

case string:

switch actualString := actual.(type) {
case string:

if dateRx.MatchString(aVal) && dateRx.MatchString(actualString) {

// TODO add better date comparison here
// parse both date and compare them with desired precision

// elastics returns date in formats
// "2024-10-24T00:00:00.000+02:00"
// "2024-10-24T00:00:00.000Z"

// quesma returns
// 2024-10-23T22:00:00.000
compareOnly := "2000-01-"

if aVal[:len(compareOnly)] != actualString[:len(compareOnly)] {
d.addMismatch(invalidDateValue, d.asValue(expected), d.asValue(actual))
}

return
}

default:
Expand Down
12 changes: 6 additions & 6 deletions quesma/jsondiff/jsondiff_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestJSONDiff(t *testing.T) {
name: "Test 2",
expected: `{"a": 1, "b": 2, "c": 3}`,
actual: `{"a": 1, "b": 3, "c": 3}`,
problems: []JSONMismatch{mismatch("b", invalidValue)},
problems: []JSONMismatch{mismatch("b", invalidNumberValue)},
},

{
Expand Down Expand Up @@ -67,7 +67,7 @@ func TestJSONDiff(t *testing.T) {
name: "array element difference",
expected: `{"a": [1, 2, 3], "b": 2, "c": 3}`,
actual: `{"a": [1, 2, 4], "b": 2, "c": 3}`,
problems: []JSONMismatch{mismatch("a.[2]", invalidValue)},
problems: []JSONMismatch{mismatch("a.[2]", invalidNumberValue)},
},

{
Expand All @@ -81,28 +81,28 @@ func TestJSONDiff(t *testing.T) {
name: "object difference",
expected: `{"a": {"b": 1}, "c": 3}`,
actual: `{"a": {"b": 2}, "c": 3}`,
problems: []JSONMismatch{mismatch("a.b", invalidValue)},
problems: []JSONMismatch{mismatch("a.b", invalidNumberValue)},
},

{
name: "deep path difference",
expected: `{"a": {"d": {"b": 1}}, "c": 3}`,
actual: `{"a": {"d": {"b": 2}}, "c": 3}`,
problems: []JSONMismatch{mismatch("a.d.b", invalidValue)},
problems: []JSONMismatch{mismatch("a.d.b", invalidNumberValue)},
},

{
name: "deep path difference",
expected: `{"a": {"d": {"b": 1}}, "c": 3, "_ignore": 1}`,
actual: `{"a": {"d": {"b": 2}}, "c": 3}`,
problems: []JSONMismatch{mismatch("a.d.b", invalidValue)},
problems: []JSONMismatch{mismatch("a.d.b", invalidNumberValue)},
},

{
name: "array sort difference ",
expected: `{"a": [1, 2, 3], "b": 2, "c": 3}`,
actual: `{"a": [1, 3, 2], "b": 2, "c": 3}`,
problems: []JSONMismatch{mismatch("a.[1]", invalidValue), mismatch("a.[2]", invalidValue)},
problems: []JSONMismatch{mismatch("a.[1]", invalidNumberValue), mismatch("a.[2]", invalidNumberValue)},
},

{
Expand Down
11 changes: 7 additions & 4 deletions quesma/quesma/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,8 +578,9 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
}
}

if len(processedConfig.QueryTarget) == 2 && !(processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) {
errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration - when you specify two targets, ClickHouse has to be the primary one and Elastic has to be the secondary one", indexName))
if len(processedConfig.QueryTarget) == 2 && !((processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) ||
(processedConfig.QueryTarget[0] == ElasticsearchTarget && processedConfig.QueryTarget[1] == ClickhouseTarget)) {
errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration", indexName))
continue
}

Expand Down Expand Up @@ -676,10 +677,12 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
}
}

if len(processedConfig.QueryTarget) == 2 && !(processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) {
errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration - when you specify two targets, ClickHouse has to be the primary one and Elastic has to be the secondary one", indexName))
if len(processedConfig.QueryTarget) == 2 && !((processedConfig.QueryTarget[0] == ClickhouseTarget && processedConfig.QueryTarget[1] == ElasticsearchTarget) ||
(processedConfig.QueryTarget[0] == ElasticsearchTarget && processedConfig.QueryTarget[1] == ClickhouseTarget)) {
errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration", indexName))
continue
}

if len(processedConfig.QueryTarget) == 2 {
// Turn on A/B testing
processedConfig.Optimizers = make(map[string]OptimizerConfiguration)
Expand Down
33 changes: 12 additions & 21 deletions quesma/quesma/search.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (q *QueryRunner) runExecutePlanAsync(ctx context.Context, plan *model.Execu
}()
}

func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, table *clickhouse.Table, body types.JSON, optAsync *AsyncQuery, optComparePlansCh chan<- executionPlanResult) (responseBody []byte, err error) {
func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, table *clickhouse.Table, body types.JSON, optAsync *AsyncQuery, optComparePlansCh chan<- executionPlanResult, abTestingMainPlan bool) (responseBody []byte, err error) {
contextValues := tracing.ExtractValues(ctx)
id := contextValues.RequestId
path := contextValues.RequestPath
Expand All @@ -214,7 +214,7 @@ func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan
sendMainPlanResult := func(responseBody []byte, err error) {
if optComparePlansCh != nil {
optComparePlansCh <- executionPlanResult{
isMain: true,
isMain: abTestingMainPlan,
plan: plan,
err: err,
responseBody: responseBody,
Expand Down Expand Up @@ -300,31 +300,27 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
return nil, end_user_errors.ErrSearchCondition.New(fmt.Errorf("no connectors to use"))
}

var clickhouseDecision *table_resolver.ConnectorDecisionClickhouse
var elasticDecision *table_resolver.ConnectorDecisionElastic
var clickhouseConnector *table_resolver.ConnectorDecisionClickhouse

for _, connector := range decision.UseConnectors {
switch c := connector.(type) {

case *table_resolver.ConnectorDecisionClickhouse:
clickhouseDecision = c
clickhouseConnector = c

case *table_resolver.ConnectorDecisionElastic:
elasticDecision = c
// NOP

default:
return nil, fmt.Errorf("unknown connector type: %T", c)
}
}

// it's impossible here to don't have a clickhouse decision
if clickhouseDecision == nil {
if clickhouseConnector == nil {
return nil, fmt.Errorf("no clickhouse connector")
}

if elasticDecision != nil {
fmt.Println("elastic", elasticDecision)
}

var responseBody []byte

startTime := time.Now()
Expand All @@ -343,7 +339,7 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin

var table *clickhouse.Table // TODO we should use schema here only
var currentSchema schema.Schema
resolvedIndexes := clickhouseDecision.ClickhouseTables
resolvedIndexes := clickhouseConnector.ClickhouseTables

if len(resolvedIndexes) == 1 {
indexName := resolvedIndexes[0] // we got exactly one table here because of the check above
Expand Down Expand Up @@ -446,17 +442,12 @@ func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern strin
plan.StartTime = startTime
plan.Name = model.MainExecutionPlan

// Some flags may trigger alternative execution plans, this is primary for dev

alternativePlan, alternativePlanExecutor := q.maybeCreateAlternativeExecutionPlan(ctx, resolvedIndexes, plan, queryTranslator, body, table, optAsync != nil)

var optComparePlansCh chan<- executionPlanResult

if alternativePlan != nil {
optComparePlansCh = q.runAlternativePlanAndComparison(ctx, alternativePlan, alternativePlanExecutor, body)
if decision.EnableABTesting {
return q.executeABTesting(ctx, plan, queryTranslator, table, body, optAsync, decision, indexPattern)
}

return q.executePlan(ctx, plan, queryTranslator, table, body, optAsync, optComparePlansCh)
return q.executePlan(ctx, plan, queryTranslator, table, body, optAsync, nil, true)

}

func (q *QueryRunner) storeAsyncSearch(qmc *ui.QuesmaManagementConsole, id, asyncId string,
Expand Down
Loading

0 comments on commit 72cab08

Please sign in to comment.