diff --git a/quesma/ab_testing/sender/coordinator.go b/quesma/ab_testing/sender/coordinator.go index 8c6ca7225..31423179a 100644 --- a/quesma/ab_testing/sender/coordinator.go +++ b/quesma/ab_testing/sender/coordinator.go @@ -9,6 +9,7 @@ import ( "quesma/logger" "quesma/quesma/config" "quesma/quesma/recovery" + "strings" "time" ) @@ -26,11 +27,23 @@ func NewSenderCoordinator(cfg *config.QuesmaConfiguration) *SenderCoordinator { ctx, cancel := context.WithCancel(context.Background()) + var enabledForIndex []string + for indexName, indexConfig := range cfg.IndexConfig { + _, disabledAb := indexConfig.GetOptimizerConfiguration(config.ElasticABOptimizerName) + if !disabledAb { + enabledForIndex = append(enabledForIndex, indexName) + } + } + + if len(enabledForIndex) > 0 { + logger.Info().Msgf("A/B Testing is enabled for indexes: %s", strings.Join(enabledForIndex, ",")) + } + return &SenderCoordinator{ sender: newSender(ctx), ctx: ctx, cancelFunc: cancel, - enabled: true, // TODO this should be read from config + enabled: len(enabledForIndex) > 0, // add quesma health monitor service here } } diff --git a/quesma/clickhouse/connection.go b/quesma/clickhouse/connection.go index b205d6ce6..e489fc727 100644 --- a/quesma/clickhouse/connection.go +++ b/quesma/clickhouse/connection.go @@ -52,6 +52,9 @@ func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql } func InitDBConnectionPool(c *config.QuesmaConfiguration) *sql.DB { + if c.ClickHouse.Url == nil { + return nil + } db := initDBConnection(c, &tls.Config{}) diff --git a/quesma/connectors/connector.go b/quesma/connectors/connector.go index 06ef97b24..0f80aec1f 100644 --- a/quesma/connectors/connector.go +++ b/quesma/connectors/connector.go @@ -64,5 +64,13 @@ func registerConnectors(cfg *config.QuesmaConfiguration, chDb *sql.DB, phoneHome logger.Error().Msgf("Unknown connector type [%s]", conn.ConnectorType) } } + + // Mock connector for transparent proxy, perhaps improve at some point + if len(cfg.Connectors) == 0 && cfg.TransparentProxy { + conns = append(conns, &ClickHouseOSConnector{ + Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader), + }) + } + return conns } diff --git a/quesma/main.go b/quesma/main.go index 90164bee2..3130cac00 100644 --- a/quesma/main.go +++ b/quesma/main.go @@ -131,7 +131,7 @@ func main() { func constructQuesma(cfg *config.QuesmaConfiguration, sl clickhouse.TableDiscovery, lm *clickhouse.LogManager, ip *ingest.IngestProcessor, im elasticsearch.IndexManagement, schemaRegistry schema.Registry, phoneHomeAgent telemetry.PhoneHomeAgent, quesmaManagementConsole *ui.QuesmaManagementConsole, logChan <-chan logger.LogWithLevel, abResultsrepository ab_testing.Sender) *quesma.Quesma { if cfg.TransparentProxy { - return quesma.NewQuesmaTcpProxy(phoneHomeAgent, cfg, quesmaManagementConsole, logChan, true) + return quesma.NewQuesmaTcpProxy(phoneHomeAgent, cfg, quesmaManagementConsole, logChan, false) } else { return quesma.NewHttpProxy(phoneHomeAgent, lm, ip, sl, im, schemaRegistry, cfg, quesmaManagementConsole, logChan, abResultsrepository) } diff --git a/quesma/quesma/config/config.go b/quesma/quesma/config/config.go index a326af829..efaa0ebfe 100644 --- a/quesma/quesma/config/config.go +++ b/quesma/quesma/config/config.go @@ -150,9 +150,11 @@ func (c *QuesmaConfiguration) Validate() error { } connectorCount := len(c.Connectors) if connectorCount != 1 { - result = multierror.Append(result, fmt.Errorf("%d connectors configured - at this moment Quesma requires **exactly** one connector specified", connectorCount)) + if !(connectorCount == 0 && c.TransparentProxy) { // no connectors for transparent proxy is fine + result = multierror.Append(result, fmt.Errorf("%d connectors configured - at this moment Quesma requires **exactly** one connector specified", connectorCount)) + } } - if c.ClickHouse.Url == nil && c.Hydrolix.Url == nil { + if c.ClickHouse.Url == nil && c.Hydrolix.Url == nil && !c.TransparentProxy { result = multierror.Append(result, fmt.Errorf("clickHouse or hydrolix URL is required")) } if c.ClickHouse.IsNonEmpty() && c.Hydrolix.IsNonEmpty() { diff --git a/quesma/quesma/config/config_v2.go b/quesma/quesma/config/config_v2.go index 8ec032d1a..630a1eb5f 100644 --- a/quesma/quesma/config/config_v2.go +++ b/quesma/quesma/config/config_v2.go @@ -25,6 +25,8 @@ const ( ClickHouseOSBackendConnectorName = "clickhouse-os" ClickHouseBackendConnectorName = "clickhouse" HydrolixBackendConnectorName = "hydrolix" + + ElasticABOptimizerName = "elastic_ab_testing" ) type ProcessorType string @@ -522,7 +524,7 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration { if len(indexConfig.QueryTarget) == 2 { // Turn on A/B testing processedConfig.Optimizers = make(map[string]OptimizerConfiguration) - processedConfig.Optimizers["elastic_ab_testing"] = OptimizerConfiguration{ + processedConfig.Optimizers[ElasticABOptimizerName] = OptimizerConfiguration{ Disabled: false, Properties: map[string]string{}, } @@ -584,7 +586,7 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration { if len(indexConfig.QueryTarget) == 2 { // Turn on A/B testing processedConfig.Optimizers = make(map[string]OptimizerConfiguration) - processedConfig.Optimizers["elastic_ab_testing"] = OptimizerConfiguration{ + processedConfig.Optimizers[ElasticABOptimizerName] = OptimizerConfiguration{ Disabled: false, Properties: map[string]string{}, } @@ -630,25 +632,18 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration { END: - if relationalDBErr != nil && !conf.TransparentProxy { - errAcc = multierror.Append(errAcc, relationalDBErr) - } else if relationalDBErr != nil && conf.TransparentProxy { - relDBConn := RelationalDbConfiguration{ - ConnectorType: ClickHouseOSBackendConnectorName, - Url: &Url{ - Host: "localhost", - }, - } - conf.Connectors["mock-for-transparent-proxy"] = relDBConn - conf.ClickHouse = relDBConn - } else { - relDBConn.ConnectorType = connType - if connType == HydrolixBackendConnectorName { - conf.Connectors["injected-hydrolix-connector"] = *relDBConn - conf.Hydrolix = *relDBConn + if !conf.TransparentProxy { + if relationalDBErr != nil { + errAcc = multierror.Append(errAcc, relationalDBErr) } else { - conf.Connectors["injected-clickhouse-connector"] = *relDBConn - conf.ClickHouse = *relDBConn + relDBConn.ConnectorType = connType + if connType == HydrolixBackendConnectorName { + conf.Connectors["injected-hydrolix-connector"] = *relDBConn + conf.Hydrolix = *relDBConn + } else { + conf.Connectors["injected-clickhouse-connector"] = *relDBConn + conf.ClickHouse = *relDBConn + } } } diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index c86b001ed..ab9f382e0 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -875,17 +875,3 @@ func pushSecondaryInfo(qmc *ui.QuesmaManagementConsole, Id, AsyncId, Path string QueryTranslatedResults: QueryTranslatedResults, SecondaryTook: time.Since(startTime)}) } - -func pushAlternativeInfo(qmc *ui.QuesmaManagementConsole, Id, AsyncId, OpaqueId, Path string, IncomingQueryBody []byte, QueryBodyTranslated []types.TranslatedSQLQuery, QueryTranslatedResults []byte, startTime time.Time) { - qmc.PushSecondaryInfo(&ui.QueryDebugSecondarySource{ - Id: Id, - AsyncId: AsyncId, - OpaqueId: OpaqueId, - Path: Path, - IncomingQueryBody: IncomingQueryBody, - QueryBodyTranslated: QueryBodyTranslated, - QueryTranslatedResults: QueryTranslatedResults, - SecondaryTook: time.Since(startTime), - IsAlternativePlan: true}) - -} diff --git a/quesma/quesma/search_alternative.go b/quesma/quesma/search_alternative.go index 23dfb223a..f1a7354a8 100644 --- a/quesma/quesma/search_alternative.go +++ b/quesma/quesma/search_alternative.go @@ -5,7 +5,6 @@ package quesma import ( "bytes" "context" - "encoding/json" "fmt" "io" "net/http" @@ -13,7 +12,7 @@ import ( "quesma/clickhouse" "quesma/logger" "quesma/model" - "quesma/queryparser" + "quesma/quesma/config" "quesma/quesma/recovery" "quesma/quesma/types" "quesma/tracing" @@ -124,13 +123,8 @@ func (q *QueryRunner) maybeCreateAlternativeExecutionPlan(ctx context.Context, i resolvedTableName := indexes[0] - props, disabled := q.cfg.IndexConfig[resolvedTableName].GetOptimizerConfiguration(queryparser.PancakeOptimizerName) - if !disabled && props["mode"] == "alternative" { - return q.maybeCreatePancakeExecutionPlan(ctx, resolvedTableName, plan, queryTranslator, body, table, isAsync) - } - // TODO is should be enabled in a different way. it's not an optimizer - cfg, disabled := q.cfg.IndexConfig[resolvedTableName].GetOptimizerConfiguration("elastic_ab_testing") + cfg, disabled := q.cfg.IndexConfig[resolvedTableName].GetOptimizerConfiguration(config.ElasticABOptimizerName) if !disabled { return q.askElasticAsAnAlternative(ctx, resolvedTableName, plan, queryTranslator, body, table, isAsync, cfg) } @@ -220,96 +214,3 @@ func (q *QueryRunner) askElasticAsAnAlternative(ctx context.Context, resolvedTab return responseBody, nil } } - -func (q *QueryRunner) maybeCreatePancakeExecutionPlan(ctx context.Context, resolvedTableName string, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, body types.JSON, table *clickhouse.Table, isAsync bool) (*model.ExecutionPlan, executionPlanExecutor) { - - hasAggQuery := false - queriesWithoutAggr := make([]*model.Query, 0) - for _, query := range plan.Queries { - switch query.Type.AggregationType() { - case model.MetricsAggregation, model.BucketAggregation, model.PipelineMetricsAggregation, model.PipelineBucketAggregation: - hasAggQuery = true - default: - queriesWithoutAggr = append(queriesWithoutAggr, query) - } - } - - if !hasAggQuery { - return nil, nil - } - - if chQueryTranslator, ok := queryTranslator.(*queryparser.ClickhouseQueryTranslator); ok { - - // TODO FIXME check if the original plan has count query - addCount := false - - if pancakeQueries, err := chQueryTranslator.PancakeParseAggregationJson(body, addCount); err == nil { - logger.InfoWithCtx(ctx).Msgf("Running alternative pancake queries") - queries := append(queriesWithoutAggr, pancakeQueries...) - alternativePlan := &model.ExecutionPlan{ - IndexPattern: plan.IndexPattern, - QueryRowsTransformers: make([]model.QueryRowsTransformer, len(queries)), - Queries: queries, - StartTime: plan.StartTime, - Name: "pancake", - } - - return alternativePlan, func(ctx context.Context) ([]byte, error) { - - return q.executeAlternativePlan(ctx, plan, queryTranslator, table, body, false) - } - - } else { - // TODO: change to info - logger.ErrorWithCtx(ctx).Msgf("Error parsing pancake queries: %v", err) - } - } else { - logger.ErrorWithCtx(ctx).Msgf("Alternative plan is not supported for non-clickhouse query translators") - } - return nil, nil -} - -func (q *QueryRunner) executeAlternativePlan(ctx context.Context, plan *model.ExecutionPlan, queryTranslator IQueryTranslator, table *clickhouse.Table, body types.JSON, isAsync bool) (responseBody []byte, err error) { - - doneCh := make(chan AsyncSearchWithError, 1) - - err = q.transformQueries(ctx, plan) - if err != nil { - return responseBody, err - } - - if resp, err := q.checkProperties(ctx, plan, table, queryTranslator); err != nil { - return resp, err - } - - q.runExecutePlanAsync(ctx, plan, queryTranslator, table, doneCh, nil) - - response := <-doneCh - - if response.err == nil { - if isAsync { - asyncResponse := queryparser.SearchToAsyncSearchResponse(response.response, "__quesma_alternative_plan", false, 200) - responseBody, err = asyncResponse.Marshal() - if err != nil { - return nil, err - } - } else { - responseBody, err = response.response.Marshal() - if err != nil { - return nil, err - } - } - } else { - // TODO better error handling - m := make(map[string]interface{}) - m["error"] = fmt.Sprintf("%v", response.err.Error()) - responseBody, _ = json.MarshalIndent(m, "", " ") - } - - bodyAsBytes, _ := body.Bytes() - contextValues := tracing.ExtractValues(ctx) - pushAlternativeInfo(q.quesmaManagementConsole, contextValues.RequestId, "", contextValues.OpaqueId, contextValues.RequestPath, bodyAsBytes, response.translatedQueryBody, responseBody, plan.StartTime) - - return responseBody, response.err - -}