diff --git a/quesma/clickhouse/table_discovery.go b/quesma/clickhouse/table_discovery.go index c674c1009..577708897 100644 --- a/quesma/clickhouse/table_discovery.go +++ b/quesma/clickhouse/table_discovery.go @@ -604,3 +604,42 @@ func (td *tableDiscovery) createTableQuery(database, table string) (ddl string) } return ddl } + +type EmptyTableDiscovery struct { + TableMap *TableMap + Err error + Autodiscovery bool +} + +func NewEmptyTableDiscovery() *EmptyTableDiscovery { + return &EmptyTableDiscovery{ + TableMap: NewTableMap(), + } +} + +func (td *EmptyTableDiscovery) ReloadTableDefinitions() { +} + +func (td *EmptyTableDiscovery) TableDefinitions() *TableMap { + return td.TableMap +} + +func (td *EmptyTableDiscovery) TableDefinitionsFetchError() error { + return td.Err +} + +func (td *EmptyTableDiscovery) LastAccessTime() time.Time { + return time.Now() +} + +func (td *EmptyTableDiscovery) LastReloadTime() time.Time { + return time.Now() +} + +func (td *EmptyTableDiscovery) ForceReloadCh() <-chan chan<- struct{} { + return make(chan chan<- struct{}) +} + +func (td *EmptyTableDiscovery) AutodiscoveryEnabled() bool { + return td.Autodiscovery +} diff --git a/quesma/elasticsearch/index_resolver.go b/quesma/elasticsearch/index_resolver.go index 4bae70ce7..8ae2166b0 100644 --- a/quesma/elasticsearch/index_resolver.go +++ b/quesma/elasticsearch/index_resolver.go @@ -80,3 +80,18 @@ func (im *indexResolver) Resolve(indexPattern string) (Sources, bool, error) { return sources, true, nil } + +type EmptyIndexResolver struct { + Indexes map[string]Sources +} + +func NewEmptyIndexResolver() *EmptyIndexResolver { + return &EmptyIndexResolver{ + Indexes: make(map[string]Sources), + } +} + +func (r *EmptyIndexResolver) Resolve(indexPattern string) (Sources, bool, error) { + res, ok := r.Indexes[indexPattern] + return res, ok, nil +} diff --git a/quesma/ingest/common_table_test.go b/quesma/ingest/common_table_test.go index 69027a7f8..71cbc597a 100644 --- a/quesma/ingest/common_table_test.go +++ b/quesma/ingest/common_table_test.go @@ -14,6 +14,7 @@ import ( "quesma/quesma/config" "quesma/quesma/types" "quesma/schema" + "quesma/table_resolver" "testing" ) @@ -188,10 +189,13 @@ func TestIngestToCommonTable(t *testing.T) { tableDisco := clickhouse.NewTableDiscovery(quesmaConfig, db, virtualTableStorage) schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, quesmaConfig, clickhouse.SchemaTypeAdapter{}) + resolver := table_resolver.NewEmptyTableResolver() + ingest := newIngestProcessorWithEmptyTableMap(tables, quesmaConfig) ingest.chDb = db ingest.virtualTableStorage = virtualTableStorage ingest.schemaRegistry = schemaRegistry + ingest.tableResolver = resolver if len(tt.alreadyExistingColumns) > 0 { diff --git a/quesma/ingest/ingest_validator_test.go b/quesma/ingest/ingest_validator_test.go index ccd2650fb..bc2c1545d 100644 --- a/quesma/ingest/ingest_validator_test.go +++ b/quesma/ingest/ingest_validator_test.go @@ -12,6 +12,7 @@ import ( "quesma/concurrent" "quesma/quesma/config" "quesma/quesma/types" + "quesma/table_resolver" "quesma/util" "strings" "testing" @@ -169,7 +170,7 @@ func TestIngestValidation(t *testing.T) { ip := newIngestProcessorEmpty() ip.chDb = db ip.tableDiscovery = clickhouse.NewTableDiscoveryWith(&config.QuesmaConfiguration{}, nil, *tableMap) - + ip.tableResolver = table_resolver.NewEmptyTableResolver() defer db.Close() mock.ExpectExec(EscapeBrackets(expectedInsertJsons[i])).WithoutArgs().WillReturnResult(sqlmock.NewResult(0, 0)) diff --git a/quesma/ingest/insert_test.go b/quesma/ingest/insert_test.go index 1b6585740..539258c49 100644 --- a/quesma/ingest/insert_test.go +++ b/quesma/ingest/insert_test.go @@ -14,6 +14,7 @@ import ( "quesma/quesma/config" "quesma/quesma/types" "quesma/schema" + "quesma/table_resolver" "quesma/util" "slices" "strconv" @@ -240,6 +241,8 @@ func TestProcessInsertQuery(t *testing.T) { t.Run("case insertTest["+strconv.Itoa(index1)+"], config["+strconv.Itoa(index2)+"], ingestProcessor["+strconv.Itoa(index3)+"]", func(t *testing.T) { db, mock := util.InitSqlMockWithPrettyPrint(t, true) ip.ip.chDb = db + resolver := table_resolver.NewEmptyTableResolver() + ip.ip.tableResolver = resolver defer db.Close() // info: result values aren't important, this '.WillReturnResult[...]' just needs to be there @@ -417,12 +420,15 @@ func TestCreateTableIfSomeFieldsExistsInSchemaAlready(t *testing.T) { } schemaRegistry.Tables[schema.TableName(indexName)] = indexSchema + indexRegistry := table_resolver.NewEmptyTableResolver() schemaRegistry.FieldEncodings = make(map[schema.FieldEncodingKey]schema.EncodedFieldName) schemaRegistry.FieldEncodings[schema.FieldEncodingKey{TableName: indexName, FieldName: "schema_field"}] = "schema_field" + ingest := newIngestProcessorWithEmptyTableMap(tables, quesmaConfig) ingest.chDb = db ingest.virtualTableStorage = virtualTableStorage ingest.schemaRegistry = schemaRegistry + ingest.tableResolver = indexRegistry ctx := context.Background() formatter := clickhouse.DefaultColumnNameFormatter() diff --git a/quesma/ingest/processor.go b/quesma/ingest/processor.go index 5a54d297c..1ad74e7cc 100644 --- a/quesma/ingest/processor.go +++ b/quesma/ingest/processor.go @@ -23,6 +23,7 @@ import ( "quesma/quesma/types" "quesma/schema" "quesma/stats" + "quesma/table_resolver" "quesma/telemetry" "quesma/util" "slices" @@ -63,6 +64,7 @@ type ( ingestFieldStatistics IngestFieldStatistics ingestFieldStatisticsLock sync.Mutex virtualTableStorage persistence.JSONDatabase + tableResolver table_resolver.TableResolver } TableMap = concurrent.Map[string, *chLib.Table] SchemaMap = map[string]interface{} // TODO remove @@ -694,6 +696,9 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str jsonData []types.JSON, transformer jsonprocessor.IngestTransformer, tableFormatter TableColumNameFormatter) error { + decision := lm.tableResolver.Resolve(table_resolver.IngestPipeline, tableName) + table_resolver.TODO("processInsertQuery", decision) + indexConf, ok := lm.cfg.IndexConfig[tableName] if ok && indexConf.UseCommonTable { @@ -904,9 +909,9 @@ func (ip *IngestProcessor) Ping() error { return ip.chDb.Ping() } -func NewIngestProcessor(cfg *config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader chLib.TableDiscovery, schemaRegistry schema.Registry, virtualTableStorage persistence.JSONDatabase) *IngestProcessor { +func NewIngestProcessor(cfg *config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader chLib.TableDiscovery, schemaRegistry schema.Registry, virtualTableStorage persistence.JSONDatabase, tableResolver table_resolver.TableResolver) *IngestProcessor { ctx, cancel := context.WithCancel(context.Background()) - return &IngestProcessor{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage} + return &IngestProcessor{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage, tableResolver: tableResolver} } func NewOnlySchemaFieldsCHConfig() *chLib.ChTableConfig { diff --git a/quesma/main.go b/quesma/main.go index d25389f1b..91f80a801 100644 --- a/quesma/main.go +++ b/quesma/main.go @@ -25,6 +25,7 @@ import ( "quesma/quesma/config" "quesma/quesma/ui" "quesma/schema" + "quesma/table_resolver" "quesma/telemetry" "quesma/tracing" "syscall" @@ -92,6 +93,12 @@ func main() { connManager := connectors.NewConnectorManager(&cfg, connectionPool, phoneHomeAgent, tableDisco) lm := connManager.GetConnector() + elasticIndexResolver := elasticsearch.NewIndexResolver(cfg.Elasticsearch.Url.String()) + + // TODO index configuration for ingest and query is the same for now + tableResolver := table_resolver.NewTableResolver(cfg, tableDisco, elasticIndexResolver) + tableResolver.Start() + var ingestProcessor *ingest.IngestProcessor if cfg.EnableIngest { @@ -100,7 +107,7 @@ func main() { common_table.EnsureCommonTableExists(connectionPool) } - ingestProcessor = ingest.NewIngestProcessor(&cfg, connectionPool, phoneHomeAgent, tableDisco, schemaRegistry, virtualTableStorage) + ingestProcessor = ingest.NewIngestProcessor(&cfg, connectionPool, phoneHomeAgent, tableDisco, schemaRegistry, virtualTableStorage, tableResolver) } else { logger.Info().Msg("Ingest processor is disabled.") } @@ -109,12 +116,12 @@ func main() { logger.Info().Msgf("loaded config: %s", cfg.String()) - quesmaManagementConsole := ui.NewQuesmaManagementConsole(&cfg, lm, im, qmcLogChannel, phoneHomeAgent, schemaRegistry) //FIXME no ingest processor here just for now + quesmaManagementConsole := ui.NewQuesmaManagementConsole(&cfg, lm, im, qmcLogChannel, phoneHomeAgent, schemaRegistry, tableResolver) //FIXME no ingest processor here just for now abTestingController := sender.NewSenderCoordinator(&cfg) abTestingController.Start() - instance := constructQuesma(&cfg, tableDisco, lm, ingestProcessor, im, schemaRegistry, phoneHomeAgent, quesmaManagementConsole, qmcLogChannel, abTestingController.GetSender()) + instance := constructQuesma(&cfg, tableDisco, lm, ingestProcessor, im, schemaRegistry, phoneHomeAgent, quesmaManagementConsole, qmcLogChannel, abTestingController.GetSender(), tableResolver) instance.Start() <-doneCh @@ -127,15 +134,15 @@ func main() { phoneHomeAgent.Stop(ctx) lm.Stop() abTestingController.Stop() - + tableResolver.Stop() instance.Close(ctx) } -func constructQuesma(cfg *config.QuesmaConfiguration, sl clickhouse.TableDiscovery, lm *clickhouse.LogManager, ip *ingest.IngestProcessor, im elasticsearch.IndexManagement, schemaRegistry schema.Registry, phoneHomeAgent telemetry.PhoneHomeAgent, quesmaManagementConsole *ui.QuesmaManagementConsole, logChan <-chan logger.LogWithLevel, abResultsrepository ab_testing.Sender) *quesma.Quesma { +func constructQuesma(cfg *config.QuesmaConfiguration, sl clickhouse.TableDiscovery, lm *clickhouse.LogManager, ip *ingest.IngestProcessor, im elasticsearch.IndexManagement, schemaRegistry schema.Registry, phoneHomeAgent telemetry.PhoneHomeAgent, quesmaManagementConsole *ui.QuesmaManagementConsole, logChan <-chan logger.LogWithLevel, abResultsrepository ab_testing.Sender, indexRegistry table_resolver.TableResolver) *quesma.Quesma { if cfg.TransparentProxy { return quesma.NewQuesmaTcpProxy(phoneHomeAgent, cfg, quesmaManagementConsole, logChan, false) } else { - return quesma.NewHttpProxy(phoneHomeAgent, lm, ip, sl, im, schemaRegistry, cfg, quesmaManagementConsole, abResultsrepository) + return quesma.NewHttpProxy(phoneHomeAgent, lm, ip, sl, im, schemaRegistry, cfg, quesmaManagementConsole, abResultsrepository, indexRegistry) } } diff --git a/quesma/quesma/functionality/bulk/bulk.go b/quesma/quesma/functionality/bulk/bulk.go index 49752b47b..c86f18fcc 100644 --- a/quesma/quesma/functionality/bulk/bulk.go +++ b/quesma/quesma/functionality/bulk/bulk.go @@ -19,6 +19,7 @@ import ( "quesma/quesma/recovery" "quesma/quesma/types" "quesma/stats" + "quesma/table_resolver" "quesma/telemetry" "sort" "strings" @@ -68,7 +69,7 @@ type ( ) func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ingest.IngestProcessor, - cfg *config.QuesmaConfiguration, phoneHomeAgent telemetry.PhoneHomeAgent) (results []BulkItem, err error) { + cfg *config.QuesmaConfiguration, phoneHomeAgent telemetry.PhoneHomeAgent, tableResolver table_resolver.TableResolver) (results []BulkItem, err error) { defer recovery.LogPanic() bulkSize := len(bulk) / 2 // we divided payload by 2 so that we don't take into account the `action_and_meta_data` line, ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html @@ -76,7 +77,7 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ing // The returned results should be in the same order as the input request, however splitting the bulk might change the order. // Therefore, each BulkRequestEntry has a corresponding pointer to the result entry, allowing us to freely split and reshuffle the bulk. - results, clickhouseDocumentsToInsert, elasticRequestBody, elasticBulkEntries, err := splitBulk(ctx, defaultIndex, bulk, bulkSize, cfg) + results, clickhouseDocumentsToInsert, elasticRequestBody, elasticBulkEntries, err := splitBulk(ctx, defaultIndex, bulk, bulkSize, cfg, tableResolver) if err != nil { return []BulkItem{}, err } @@ -110,7 +111,7 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ing return results, nil } -func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bulkSize int, cfg *config.QuesmaConfiguration) ([]BulkItem, map[string][]BulkRequestEntry, []byte, []BulkRequestEntry, error) { +func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bulkSize int, cfg *config.QuesmaConfiguration, tableResolver table_resolver.TableResolver) ([]BulkItem, map[string][]BulkRequestEntry, []byte, []BulkRequestEntry, error) { results := make([]BulkItem, bulkSize) clickhouseDocumentsToInsert := make(map[string][]BulkRequestEntry, bulkSize) @@ -121,6 +122,9 @@ func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bul index := op.GetIndex() operation := op.GetOperation() + decision := tableResolver.Resolve(table_resolver.IngestPipeline, index) + table_resolver.TODO("splitBulk", decision) + entryWithResponse := BulkRequestEntry{ operation: operation, index: index, diff --git a/quesma/quesma/functionality/doc/doc.go b/quesma/quesma/functionality/doc/doc.go index f4d9900bf..84f14dfd8 100644 --- a/quesma/quesma/functionality/doc/doc.go +++ b/quesma/quesma/functionality/doc/doc.go @@ -8,16 +8,17 @@ import ( "quesma/quesma/config" "quesma/quesma/functionality/bulk" "quesma/quesma/types" + "quesma/table_resolver" "quesma/telemetry" ) -func Write(ctx context.Context, tableName *string, body types.JSON, ip *ingest.IngestProcessor, cfg *config.QuesmaConfiguration, phoneHomeAgent telemetry.PhoneHomeAgent) (bulk.BulkItem, error) { +func Write(ctx context.Context, tableName *string, body types.JSON, ip *ingest.IngestProcessor, cfg *config.QuesmaConfiguration, phoneHomeAgent telemetry.PhoneHomeAgent, registry table_resolver.TableResolver) (bulk.BulkItem, error) { // Translate single doc write to a bulk request, reusing exiting logic of bulk ingest results, err := bulk.Write(ctx, tableName, []types.JSON{ map[string]interface{}{"index": map[string]interface{}{"_index": *tableName}}, body, - }, ip, cfg, phoneHomeAgent) + }, ip, cfg, phoneHomeAgent, registry) if err != nil { return bulk.BulkItem{}, err diff --git a/quesma/quesma/functionality/terms_enum/terms_enum_test.go b/quesma/quesma/functionality/terms_enum/terms_enum_test.go index 202c08471..c707a921c 100644 --- a/quesma/quesma/functionality/terms_enum/terms_enum_test.go +++ b/quesma/quesma/functionality/terms_enum/terms_enum_test.go @@ -18,6 +18,7 @@ import ( "quesma/quesma/types" "quesma/quesma/ui" "quesma/schema" + "quesma/table_resolver" "quesma/telemetry" "quesma/tracing" "quesma/util" @@ -80,8 +81,8 @@ func testHandleTermsEnumRequest(t *testing.T, requestBody []byte) { }, Created: true, } - - managementConsole := ui.NewQuesmaManagementConsole(&config.QuesmaConfiguration{}, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil) + tableResolver := table_resolver.NewEmptyTableResolver() + managementConsole := ui.NewQuesmaManagementConsole(&config.QuesmaConfiguration{}, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil, tableResolver) db, mock := util.InitSqlMockWithPrettyPrint(t, true) defer db.Close() lm := clickhouse.NewLogManagerWithConnection(db, concurrent.NewMapWith(testTableName, table)) diff --git a/quesma/quesma/matchers.go b/quesma/quesma/matchers.go index ce63e0861..4edac622e 100644 --- a/quesma/quesma/matchers.go +++ b/quesma/quesma/matchers.go @@ -9,6 +9,7 @@ import ( "quesma/quesma/mux" "quesma/quesma/types" "quesma/schema" + "quesma/table_resolver" "quesma/tracing" "strings" ) @@ -23,7 +24,7 @@ func matchedAgainstAsyncId() mux.RequestMatcher { }) } -func matchedAgainstBulkBody(configuration *config.QuesmaConfiguration) mux.RequestMatcher { +func matchedAgainstBulkBody(configuration *config.QuesmaConfiguration, tableResolver table_resolver.TableResolver) mux.RequestMatcher { return mux.RequestMatcherFunc(func(req *mux.Request) bool { idx := 0 for _, s := range strings.Split(req.Body, "\n") { @@ -32,7 +33,12 @@ func matchedAgainstBulkBody(configuration *config.QuesmaConfiguration) mux.Reque continue } if idx%2 == 0 { - indexConfig, found := configuration.IndexConfig[extractIndexName(s)] + name := extractIndexName(s) + + decision := tableResolver.Resolve(table_resolver.IngestPipeline, name) + table_resolver.TODO("matchedAgainstBulkBody", decision) + + indexConfig, found := configuration.IndexConfig[name] if found && (indexConfig.IsClickhouseIngestEnabled() || indexConfig.IsIngestDisabled()) { return true } @@ -46,8 +52,13 @@ func matchedAgainstBulkBody(configuration *config.QuesmaConfiguration) mux.Reque } // Query path only (looks at QueryTarget) -func matchedAgainstPattern(configuration *config.QuesmaConfiguration, sr schema.Registry) mux.RequestMatcher { +func matchedAgainstPattern(configuration *config.QuesmaConfiguration, sr schema.Registry, indexRegistry table_resolver.TableResolver) mux.RequestMatcher { return mux.RequestMatcherFunc(func(req *mux.Request) bool { + indexPattern := elasticsearch.NormalizePattern(req.Params["index"]) + + decision := indexRegistry.Resolve(table_resolver.QueryPipeline, indexPattern) + table_resolver.TODO("matchedAgainstPattern", decision) + patterns := strings.Split(req.Params["index"], ",") for i, pattern := range patterns { patterns[i] = elasticsearch.NormalizePattern(pattern) @@ -85,12 +96,19 @@ func matchedAgainstPattern(configuration *config.QuesmaConfiguration, sr schema. } // check whether exact index name is enabled -func matchedExact(cfg *config.QuesmaConfiguration, queryPath bool) mux.RequestMatcher { +func matchedExact(cfg *config.QuesmaConfiguration, queryPath bool, indexRegistry table_resolver.TableResolver, pipelineName string) mux.RequestMatcher { return mux.RequestMatcherFunc(func(req *mux.Request) bool { + + indexName := req.Params["index"] + + decision := indexRegistry.Resolve(pipelineName, indexName) + table_resolver.TODO("matchedExact", decision) + if elasticsearch.IsInternalIndex(req.Params["index"]) { logger.Debug().Msgf("index %s is an internal Elasticsearch index, skipping", req.Params["index"]) return false } + indexConfig, exists := cfg.IndexConfig[req.Params["index"]] if queryPath { return exists && indexConfig.IsClickhouseQueryEnabled() @@ -100,12 +118,12 @@ func matchedExact(cfg *config.QuesmaConfiguration, queryPath bool) mux.RequestMa }) } -func matchedExactQueryPath(cfg *config.QuesmaConfiguration) mux.RequestMatcher { - return matchedExact(cfg, true) +func matchedExactQueryPath(cfg *config.QuesmaConfiguration, indexRegistry table_resolver.TableResolver) mux.RequestMatcher { + return matchedExact(cfg, true, indexRegistry, table_resolver.QueryPipeline) } -func matchedExactIngestPath(cfg *config.QuesmaConfiguration) mux.RequestMatcher { - return matchedExact(cfg, false) +func matchedExactIngestPath(cfg *config.QuesmaConfiguration, indexRegistry table_resolver.TableResolver) mux.RequestMatcher { + return matchedExact(cfg, false, indexRegistry, table_resolver.IngestPipeline) } // Returns false if the body contains a Kibana internal search. diff --git a/quesma/quesma/quesma.go b/quesma/quesma/quesma.go index 15f9b8469..8315f1bfb 100644 --- a/quesma/quesma/quesma.go +++ b/quesma/quesma/quesma.go @@ -27,6 +27,7 @@ import ( "quesma/quesma/types" "quesma/quesma/ui" "quesma/schema" + "quesma/table_resolver" "quesma/telemetry" "quesma/tracing" "quesma/util" @@ -99,8 +100,8 @@ func NewQuesmaTcpProxy(phoneHomeAgent telemetry.PhoneHomeAgent, config *config.Q func NewHttpProxy(phoneHomeAgent telemetry.PhoneHomeAgent, logManager *clickhouse.LogManager, ingestProcessor *ingest.IngestProcessor, schemaLoader clickhouse.TableDiscovery, indexManager elasticsearch.IndexManagement, schemaRegistry schema.Registry, config *config.QuesmaConfiguration, - quesmaManagementConsole *ui.QuesmaManagementConsole, abResultsRepository ab_testing.Sender) *Quesma { - queryRunner := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole, schemaRegistry, abResultsRepository) + quesmaManagementConsole *ui.QuesmaManagementConsole, abResultsRepository ab_testing.Sender, resolver table_resolver.TableResolver) *Quesma { + queryRunner := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole, schemaRegistry, abResultsRepository, resolver) // not sure how we should configure our query translator ??? // is this a config option?? @@ -110,7 +111,7 @@ func NewHttpProxy(phoneHomeAgent telemetry.PhoneHomeAgent, logManager *clickhous // tests should not be run with optimization enabled by default queryRunner.EnableQueryOptimization(config) - router := configureRouter(config, schemaRegistry, logManager, ingestProcessor, quesmaManagementConsole, phoneHomeAgent, queryRunner) + router := configureRouter(config, schemaRegistry, logManager, ingestProcessor, quesmaManagementConsole, phoneHomeAgent, queryRunner, resolver) return &Quesma{ telemetryAgent: phoneHomeAgent, processor: newDualWriteProxy(schemaLoader, logManager, indexManager, schemaRegistry, config, router, quesmaManagementConsole, phoneHomeAgent, queryRunner), diff --git a/quesma/quesma/router.go b/quesma/quesma/router.go index 307a1e637..6a6ae31ce 100644 --- a/quesma/quesma/router.go +++ b/quesma/quesma/router.go @@ -25,6 +25,7 @@ import ( "quesma/quesma/types" "quesma/quesma/ui" "quesma/schema" + "quesma/table_resolver" "quesma/telemetry" "quesma/tracing" "regexp" @@ -33,7 +34,7 @@ import ( "time" ) -func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *clickhouse.LogManager, ip *ingest.IngestProcessor, console *ui.QuesmaManagementConsole, phoneHomeAgent telemetry.PhoneHomeAgent, queryRunner *QueryRunner) *mux.PathRouter { +func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *clickhouse.LogManager, ip *ingest.IngestProcessor, console *ui.QuesmaManagementConsole, phoneHomeAgent telemetry.PhoneHomeAgent, queryRunner *QueryRunner, tableResolver table_resolver.TableResolver) *mux.PathRouter { // some syntactic sugar method := mux.IsHTTPMethod @@ -44,22 +45,22 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl return elasticsearchQueryResult(`{"cluster_name": "quesma"}`, http.StatusOK), nil }) - router.Register(routes.BulkPath, and(method("POST"), matchedAgainstBulkBody(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + router.Register(routes.BulkPath, and(method("POST"), matchedAgainstBulkBody(cfg, tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { body, err := types.ExpectNDJSON(req.ParsedBody) if err != nil { return nil, err } - results, err := bulk.Write(ctx, nil, body, ip, cfg, phoneHomeAgent) + results, err := bulk.Write(ctx, nil, body, ip, cfg, phoneHomeAgent, tableResolver) return bulkInsertResult(ctx, results, err) }) - router.Register(routes.IndexRefreshPath, and(method("POST"), matchedExactQueryPath(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + router.Register(routes.IndexRefreshPath, and(method("POST"), matchedExactQueryPath(cfg, tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { return elasticsearchInsertResult(`{"_shards":{"total":1,"successful":1,"failed":0}}`, http.StatusOK), nil }) - router.Register(routes.IndexDocPath, and(method("POST"), matchedExactIngestPath(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + router.Register(routes.IndexDocPath, and(method("POST"), matchedExactIngestPath(cfg, tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { index := req.Params["index"] body, err := types.ExpectJSON(req.ParsedBody) @@ -70,7 +71,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl }, nil } - result, err := doc.Write(ctx, &index, body, ip, cfg, phoneHomeAgent) + result, err := doc.Write(ctx, &index, body, ip, cfg, phoneHomeAgent, tableResolver) if err != nil { return &mux.Result{ Body: string(queryparser.BadRequestParseError(err)), @@ -81,7 +82,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl return indexDocResult(result) }) - router.Register(routes.IndexBulkPath, and(method("POST", "PUT"), matchedExactIngestPath(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + router.Register(routes.IndexBulkPath, and(method("POST", "PUT"), matchedExactIngestPath(cfg, tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { index := req.Params["index"] body, err := types.ExpectNDJSON(req.ParsedBody) @@ -89,7 +90,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl return nil, err } - results, err := bulk.Write(ctx, &index, body, ip, cfg, phoneHomeAgent) + results, err := bulk.Write(ctx, &index, body, ip, cfg, phoneHomeAgent, tableResolver) return bulkInsertResult(ctx, results, err) }) @@ -101,7 +102,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl return resolveIndexResult(sources) }) - router.Register(routes.IndexCountPath, and(method("GET"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + router.Register(routes.IndexCountPath, and(method("GET"), matchedAgainstPattern(cfg, sr, tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { cnt, err := queryRunner.handleCount(ctx, req.Params["index"]) if err != nil { if errors.Is(quesma_errors.ErrIndexNotExists(), err) { @@ -140,7 +141,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil }) - router.Register(routes.IndexSearchPath, and(method("GET", "POST"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + router.Register(routes.IndexSearchPath, and(method("GET", "POST"), matchedAgainstPattern(cfg, sr, tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { body, err := types.ExpectJSON(req.ParsedBody) if err != nil { @@ -162,7 +163,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl } return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil }) - router.Register(routes.IndexAsyncSearchPath, and(method("POST"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + router.Register(routes.IndexAsyncSearchPath, and(method("POST"), matchedAgainstPattern(cfg, sr, tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { waitForResultsMs := 1000 // Defaults to 1 second as in docs if v, ok := req.Params["wait_for_completion_timeout"]; ok { if w, err := time.ParseDuration(v); err == nil { @@ -199,7 +200,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil }) - router.Register(routes.IndexMappingPath, and(method("PUT"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + router.Register(routes.IndexMappingPath, and(method("PUT"), matchedAgainstPattern(cfg, sr, tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { index := req.Params["index"] body, err := types.ExpectJSON(req.ParsedBody) @@ -214,7 +215,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl return putIndexResult(index) }) - router.Register(routes.IndexMappingPath, and(method("GET"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + router.Register(routes.IndexMappingPath, and(method("GET"), matchedAgainstPattern(cfg, sr, tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { index := req.Params["index"] foundSchema, found := sr.FindSchema(schema.TableName(index)) @@ -245,7 +246,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil }) - router.Register(routes.FieldCapsPath, and(method("GET", "POST"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + router.Register(routes.FieldCapsPath, and(method("GET", "POST"), matchedAgainstPattern(cfg, sr, tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { responseBody, err := field_capabilities.HandleFieldCaps(ctx, cfg, sr, req.Params["index"], lm) if err != nil { @@ -260,7 +261,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl } return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil }) - router.Register(routes.TermsEnumPath, and(method("POST"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + router.Register(routes.TermsEnumPath, and(method("POST"), matchedAgainstPattern(cfg, sr, tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { if strings.Contains(req.Params["index"], ",") { return nil, errors.New("multi index terms enum is not yet supported") } else { @@ -281,7 +282,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl } }) - router.Register(routes.EQLSearch, and(method("GET", "POST"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + router.Register(routes.EQLSearch, and(method("GET", "POST"), matchedAgainstPattern(cfg, sr, tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { body, err := types.ExpectJSON(req.ParsedBody) if err != nil { return nil, err @@ -298,7 +299,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil }) - router.Register(routes.IndexPath, and(method("PUT"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + router.Register(routes.IndexPath, and(method("PUT"), matchedAgainstPattern(cfg, sr, tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { index := req.Params["index"] body, err := types.ExpectJSON(req.ParsedBody) @@ -318,7 +319,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl return putIndexResult(index) }) - router.Register(routes.IndexPath, and(method("GET"), matchedAgainstPattern(cfg, sr)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + router.Register(routes.IndexPath, and(method("GET"), matchedAgainstPattern(cfg, sr, tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { index := req.Params["index"] foundSchema, found := sr.FindSchema(schema.TableName(index)) diff --git a/quesma/quesma/router_test.go b/quesma/quesma/router_test.go index 8a4b14122..cee74888a 100644 --- a/quesma/quesma/router_test.go +++ b/quesma/quesma/router_test.go @@ -7,6 +7,7 @@ import ( "quesma/quesma/config" "quesma/quesma/mux" "quesma/schema" + "quesma/table_resolver" "testing" ) @@ -37,12 +38,15 @@ func Test_matchedAgainstConfig(t *testing.T) { want: false, }, } + + resolver := table_resolver.NewEmptyTableResolver() + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { req := &mux.Request{Params: map[string]string{"index": tt.index}, Body: tt.body} - assert.Equalf(t, tt.want, matchedExactQueryPath(&tt.config).Matches(req), "matchedExactQueryPath(%v), index: %s", tt.config, tt.index) + assert.Equalf(t, tt.want, matchedExactQueryPath(&tt.config, resolver).Matches(req), "matchedExactQueryPath(%v), index: %s", tt.config, tt.index) }) } } @@ -170,11 +174,14 @@ func Test_matchedAgainstPattern(t *testing.T) { want: true, }, } + + resolver := table_resolver.NewEmptyTableResolver() + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { req := &mux.Request{Params: map[string]string{"index": tt.pattern}, Body: tt.body} - assert.Equalf(t, tt.want, matchedAgainstPattern(&tt.configuration, tt.registry).Matches(req), "matchedAgainstPattern(%v)", tt.configuration) + assert.Equalf(t, tt.want, matchedAgainstPattern(&tt.configuration, tt.registry, resolver).Matches(req), "matchedAgainstPattern(%v)", tt.configuration) }) } } @@ -232,11 +239,15 @@ func Test_matchedAgainstBulkBody(t *testing.T) { want: false, }, } + + resolver := table_resolver.NewEmptyTableResolver() + for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { req := &mux.Request{Body: tt.body} - assert.Equalf(t, tt.want, matchedAgainstBulkBody(&tt.config).Matches(req), "matchedAgainstBulkBody(%+v)", tt.config) + + assert.Equalf(t, tt.want, matchedAgainstBulkBody(&tt.config, resolver).Matches(req), "matchedAgainstBulkBody(%+v)", tt.config) }) } } diff --git a/quesma/quesma/search.go b/quesma/quesma/search.go index 02affd051..0942ea2d1 100644 --- a/quesma/quesma/search.go +++ b/quesma/quesma/search.go @@ -23,6 +23,7 @@ import ( "quesma/quesma/types" "quesma/quesma/ui" "quesma/schema" + "quesma/table_resolver" "quesma/tracing" "quesma/util" "slices" @@ -56,7 +57,9 @@ type QueryRunner struct { transformationPipeline TransformationPipeline schemaRegistry schema.Registry ABResultsSender ab_testing.Sender - maxParallelQueries int // if set to 0, we run queries in sequence, it's fine for testing purposes + tableResolver table_resolver.TableResolver + + maxParallelQueries int // if set to 0, we run queries in sequence, it's fine for testing purposes } func (q *QueryRunner) EnableQueryOptimization(cfg *config.QuesmaConfiguration) { @@ -68,7 +71,8 @@ func NewQueryRunner(lm *clickhouse.LogManager, im elasticsearch.IndexManagement, qmc *ui.QuesmaManagementConsole, schemaRegistry schema.Registry, - abResultsRepository ab_testing.Sender) *QueryRunner { + abResultsRepository ab_testing.Sender, + resolver table_resolver.TableResolver) *QueryRunner { ctx, cancel := context.WithCancel(context.Background()) @@ -81,8 +85,10 @@ func NewQueryRunner(lm *clickhouse.LogManager, &SchemaCheckPass{cfg: cfg.IndexConfig}, }, }, - schemaRegistry: schemaRegistry, - ABResultsSender: abResultsRepository, + schemaRegistry: schemaRegistry, + ABResultsSender: abResultsRepository, + tableResolver: resolver, + maxParallelQueries: maxParallelQueries, } } @@ -265,6 +271,10 @@ func (q *QueryRunner) executePlan(ctx context.Context, plan *model.ExecutionPlan } func (q *QueryRunner) handleSearchCommon(ctx context.Context, indexPattern string, body types.JSON, optAsync *AsyncQuery, queryLanguage QueryLanguage) ([]byte, error) { + + decision := q.tableResolver.Resolve(table_resolver.QueryPipeline, indexPattern) + table_resolver.TODO("handleSearchCommon", decision) + sources, sourcesElastic, sourcesClickhouse := ResolveSources(indexPattern, q.cfg, q.im, q.schemaRegistry) switch sources { diff --git a/quesma/quesma/search_common_table_test.go b/quesma/quesma/search_common_table_test.go index 28ce113d7..fc918a658 100644 --- a/quesma/quesma/search_common_table_test.go +++ b/quesma/quesma/search_common_table_test.go @@ -13,6 +13,7 @@ import ( "quesma/quesma/types" "quesma/quesma/ui" "quesma/schema" + "quesma/table_resolver" "quesma/telemetry" "testing" ) @@ -258,7 +259,9 @@ func TestSearchCommonTable(t *testing.T) { indexManagement := NewFixedIndexManagement() lm := clickhouse.NewLogManagerWithConnection(db, tableMap) - managementConsole := ui.NewQuesmaManagementConsole(quesmaConfig, nil, indexManagement, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil) + resolver := table_resolver.NewEmptyTableResolver() + + managementConsole := ui.NewQuesmaManagementConsole(quesmaConfig, nil, indexManagement, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil, resolver) for i, query := range tt.WantedSql { @@ -269,7 +272,7 @@ func TestSearchCommonTable(t *testing.T) { mock.ExpectQuery(query).WillReturnRows(rows) } - queryRunner := NewQueryRunner(lm, quesmaConfig, indexManagement, managementConsole, &schemaRegistry, ab_testing.NewEmptySender()) + queryRunner := NewQueryRunner(lm, quesmaConfig, indexManagement, managementConsole, &schemaRegistry, ab_testing.NewEmptySender(), resolver) queryRunner.maxParallelQueries = 0 _, err = queryRunner.handleSearch(ctx, tt.IndexPattern, types.MustJSON(tt.QueryJson)) diff --git a/quesma/quesma/search_norace_test.go b/quesma/quesma/search_norace_test.go index bb053a8f5..05ed3c0ff 100644 --- a/quesma/quesma/search_norace_test.go +++ b/quesma/quesma/search_norace_test.go @@ -19,6 +19,7 @@ import ( "quesma/quesma/types" "quesma/quesma/ui" "quesma/schema" + "quesma/table_resolver" "quesma/telemetry" "quesma/testdata" "quesma/tracing" @@ -41,7 +42,10 @@ func TestAllUnsupportedQueryTypesAreProperlyRecorded(t *testing.T) { lm := clickhouse.NewLogManagerWithConnection(db, table) logChan := logger.InitOnlyChannelLoggerForTests() - managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, logChan, telemetry.NewPhoneHomeEmptyAgent(), nil) + + resolver := table_resolver.NewEmptyTableResolver() + + managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, logChan, telemetry.NewPhoneHomeEmptyAgent(), nil, resolver) go managementConsole.RunOnlyChannelProcessor() s := &schema.StaticRegistry{ Tables: map[schema.TableName]schema.Schema{ @@ -61,7 +65,7 @@ func TestAllUnsupportedQueryTypesAreProperlyRecorded(t *testing.T) { }, } - queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender()) + queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender(), resolver) newCtx := context.WithValue(ctx, tracing.RequestIdCtxKey, tracing.GetRequestId()) _, _ = queryRunner.handleSearch(newCtx, tableName, types.MustJSON(tt.QueryRequestJson)) @@ -108,7 +112,9 @@ func TestDifferentUnsupportedQueries(t *testing.T) { lm := clickhouse.NewLogManagerWithConnection(db, table) logChan := logger.InitOnlyChannelLoggerForTests() - managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, logChan, telemetry.NewPhoneHomeEmptyAgent(), nil) + + resolver := table_resolver.NewEmptyTableResolver() + managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, logChan, telemetry.NewPhoneHomeEmptyAgent(), nil, resolver) go managementConsole.RunOnlyChannelProcessor() s := &schema.StaticRegistry{ Tables: map[schema.TableName]schema.Schema{ @@ -129,7 +135,7 @@ func TestDifferentUnsupportedQueries(t *testing.T) { }, } - queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender()) + queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender(), resolver) for _, testNr := range testNrs { newCtx := context.WithValue(ctx, tracing.RequestIdCtxKey, tracing.GetRequestId()) _, _ = queryRunner.handleSearch(newCtx, tableName, types.MustJSON(testdata.UnsupportedQueriesTests[testNr].QueryRequestJson)) diff --git a/quesma/quesma/search_opensearch_test.go b/quesma/quesma/search_opensearch_test.go index b5930f190..876c96c1c 100644 --- a/quesma/quesma/search_opensearch_test.go +++ b/quesma/quesma/search_opensearch_test.go @@ -15,6 +15,7 @@ import ( "quesma/quesma/types" "quesma/quesma/ui" "quesma/schema" + "quesma/table_resolver" "quesma/telemetry" "quesma/testdata" "quesma/util" @@ -50,7 +51,8 @@ func TestSearchOpensearch(t *testing.T) { db, mock := util.InitSqlMockWithPrettySqlAndPrint(t, false) defer db.Close() lm := clickhouse.NewLogManagerWithConnection(db, concurrent.NewMapWith(tableName, &table)) - managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil) + resolver := table_resolver.NewEmptyTableResolver() + managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil, resolver) cw := queryparser.ClickhouseQueryTranslator{ClickhouseLM: lm, Table: &table, Ctx: context.Background(), Schema: s.Tables[tableName], Config: &DefaultConfig} body, parseErr := types.ParseJSON(tt.QueryJson) @@ -65,7 +67,7 @@ func TestSearchOpensearch(t *testing.T) { for _, wantedQuery := range tt.WantedQueries { mock.ExpectQuery(wantedQuery).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) } - queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender()) + queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender(), resolver) _, err2 := queryRunner.handleSearch(ctx, tableName, types.MustJSON(tt.QueryJson)) assert.NoError(t, err2) @@ -193,7 +195,9 @@ func TestHighlighter(t *testing.T) { db, mock := util.InitSqlMockWithPrettyPrint(t, true) defer db.Close() lm := clickhouse.NewLogManagerWithConnection(db, concurrent.NewMapWith(tableName, &table)) - managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil) + + resolver := table_resolver.NewEmptyTableResolver() + managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil, resolver) mock.ExpectQuery("").WillReturnRows(sqlmock.NewRows([]string{"message$*%:;", "host.name", "@timestamp"}). // careful, it's not always in this order, order is nondeterministic AddRow("abcd", "abcd", "abcd"). @@ -202,7 +206,7 @@ func TestHighlighter(t *testing.T) { AddRow("text-to-highlight", "text-to-highlight", "text-to-highlight"). AddRow("text", "text", "text")) - queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender()) + queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender(), resolver) response, err := queryRunner.handleSearch(ctx, tableName, types.MustJSON(query)) assert.NoError(t, err) if err != nil { diff --git a/quesma/quesma/search_test.go b/quesma/quesma/search_test.go index 567736615..71972b15c 100644 --- a/quesma/quesma/search_test.go +++ b/quesma/quesma/search_test.go @@ -20,6 +20,7 @@ import ( "quesma/quesma/types" "quesma/quesma/ui" "quesma/schema" + "quesma/table_resolver" "quesma/telemetry" "quesma/testdata" "quesma/tracing" @@ -80,12 +81,13 @@ func TestAsyncSearchHandler(t *testing.T) { db, mock := util.InitSqlMockWithPrettySqlAndPrint(t, false) defer db.Close() lm := clickhouse.NewLogManagerWithConnection(db, table) - managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil) + resolver := table_resolver.NewEmptyTableResolver() + managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil, resolver) for _, query := range tt.WantedQuery { mock.ExpectQuery(query).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) } - queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender()) + queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender(), resolver) _, err := queryRunner.handleAsyncSearch(ctx, tableName, types.MustJSON(tt.QueryJson), defaultAsyncSearchTimeout, true) assert.NoError(t, err) @@ -134,11 +136,12 @@ func TestAsyncSearchHandlerSpecialCharacters(t *testing.T) { db, mock := util.InitSqlMockWithPrettySqlAndPrint(t, false) defer db.Close() lm := clickhouse.NewLogManagerWithConnection(db, concurrent.NewMapWith(tableName, &table)) - managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil) + resolver := table_resolver.NewEmptyTableResolver() + managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil, resolver) mock.ExpectQuery(tt.ExpectedPancakeSQL).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) - queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender()) + queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender(), resolver) _, err := queryRunner.handleAsyncSearch(ctx, tableName, types.MustJSON(tt.QueryRequestJson), defaultAsyncSearchTimeout, true) assert.NoError(t, err) @@ -185,7 +188,8 @@ func TestSearchHandler(t *testing.T) { defer db.Close() lm := clickhouse.NewLogManagerWithConnection(db, table) - managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil) + resolver := table_resolver.NewEmptyTableResolver() + managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil, resolver) if len(tt.WantedRegexes) > 0 { for _, wantedRegex := range tt.WantedRegexes { @@ -204,7 +208,7 @@ func TestSearchHandler(t *testing.T) { mock.ExpectQuery(query).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) } } - queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender()) + queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender(), resolver) var err error _, err = queryRunner.handleSearch(ctx, tableName, types.MustJSON(tt.QueryJson)) assert.NoError(t, err) @@ -255,7 +259,8 @@ func TestSearchHandlerRuntimeMappings(t *testing.T) { defer db.Close() lm := clickhouse.NewLogManagerWithConnection(db, table) - managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil) + indexRegistry := table_resolver.NewEmptyTableResolver() + managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil, indexRegistry) if len(tt.WantedRegexes) > 0 { for _, wantedRegex := range tt.WantedRegexes { mock.ExpectQuery(testdata.EscapeWildcard(testdata.EscapeBrackets(wantedRegex))). @@ -266,7 +271,7 @@ func TestSearchHandlerRuntimeMappings(t *testing.T) { mock.ExpectQuery(query).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "message"})) } } - queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender()) + queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender(), indexRegistry) var err error _, err = queryRunner.handleSearch(ctx, tableName, types.MustJSON(tt.QueryJson)) assert.NoError(t, err) @@ -295,11 +300,12 @@ func TestSearchHandlerNoAttrsConfig(t *testing.T) { defer db.Close() lm := clickhouse.NewLogManagerWithConnection(db, table) - managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil) + resolver := table_resolver.NewEmptyTableResolver() + managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil, resolver) for _, wantedRegex := range tt.WantedRegexes { mock.ExpectQuery(testdata.EscapeBrackets(wantedRegex)).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) } - queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender()) + queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender(), resolver) _, _ = queryRunner.handleSearch(ctx, tableName, types.MustJSON(tt.QueryJson)) if err := mock.ExpectationsWereMet(); err != nil { @@ -331,7 +337,8 @@ func TestAsyncSearchFilter(t *testing.T) { defer db.Close() lm := clickhouse.NewLogManagerWithConnection(db, table) - managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil) + resolver := table_resolver.NewEmptyTableResolver() + managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil, resolver) if len(tt.WantedRegexes) > 0 { for _, wantedRegex := range tt.WantedRegexes { mock.ExpectQuery(testdata.EscapeBrackets(wantedRegex)).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) @@ -341,7 +348,7 @@ func TestAsyncSearchFilter(t *testing.T) { mock.ExpectQuery(query).WillReturnRows(sqlmock.NewRows([]string{"@timestamp", "host.name"})) } } - queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender()) + queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender(), resolver) _, _ = queryRunner.handleAsyncSearch(ctx, tableName, types.MustJSON(tt.QueryJson), defaultAsyncSearchTimeout, true) if err := mock.ExpectationsWereMet(); err != nil { t.Fatal("there were unfulfilled expections:", err) @@ -456,7 +463,8 @@ func TestHandlingDateTimeFields(t *testing.T) { defer db.Close() lm := clickhouse.NewLogManagerWithConnection(db, concurrent.NewMapWith(tableName, &table)) - managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil) + resolver := table_resolver.NewEmptyTableResolver() + managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil, resolver) for _, fieldName := range []string{dateTimeTimestampField, dateTime64TimestampField, dateTime64OurTimestampField} { @@ -464,7 +472,7 @@ func TestHandlingDateTimeFields(t *testing.T) { WillReturnRows(sqlmock.NewRows([]string{"key", "doc_count"})) // .AddRow(1000, uint64(10)).AddRow(1001, uint64(20))) // here rows should be added if uint64 were supported - queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender()) + queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender(), resolver) response, err := queryRunner.handleAsyncSearch(ctx, tableName, types.MustJSON(query(fieldName)), defaultAsyncSearchTimeout, true) assert.NoError(t, err) @@ -521,7 +529,8 @@ func TestNumericFacetsQueries(t *testing.T) { db, mock := util.InitSqlMockWithPrettySqlAndPrint(t, false) defer db.Close() lm := clickhouse.NewLogManagerWithConnection(db, table) - managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil) + resolver := table_resolver.NewEmptyTableResolver() + managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil, resolver) colNames := make([]string, 0, len(tt.NewResultRows[0].Cols)) for _, col := range tt.NewResultRows[0].Cols { @@ -537,7 +546,7 @@ func TestNumericFacetsQueries(t *testing.T) { } mock.ExpectQuery(tt.ExpectedSql).WillReturnRows(returnedBuckets) - queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender()) + queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender(), resolver) var response []byte var err error if handlerName == "handleSearch" { @@ -612,7 +621,8 @@ func TestSearchTrackTotalCount(t *testing.T) { db, mock := util.InitSqlMockWithPrettySqlAndPrint(t, false) defer db.Close() lm := clickhouse.NewLogManagerWithConnection(db, table) - managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil) + resolver := table_resolver.NewEmptyTableResolver() + managementConsole := ui.NewQuesmaManagementConsole(&DefaultConfig, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil, resolver) for i, sql := range testcase.ExpectedSQLs { rows := sqlmock.NewRows([]string{testcase.ExpectedSQLResults[i][0].Cols[0].ColName}) @@ -622,7 +632,7 @@ func TestSearchTrackTotalCount(t *testing.T) { mock.ExpectQuery(sql).WillReturnRows(rows) } - queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender()) + queryRunner := NewQueryRunner(lm, &DefaultConfig, nil, managementConsole, s, ab_testing.NewEmptySender(), resolver) var response []byte var err error diff --git a/quesma/quesma/ui/asset/head.html b/quesma/quesma/ui/asset/head.html index a250568d6..ca9bfae76 100644 --- a/quesma/quesma/ui/asset/head.html +++ b/quesma/quesma/ui/asset/head.html @@ -492,6 +492,26 @@ background-color: #dddddd; } + #table_resolver textarea { + font-size: larger; + background-color: #eee; + width: 80em; + color: black; + padding: 10px; + border-radius: 10px; + border: 1px solid #ccc; + margin: 1em; + box-shadow: 0px 2px 4px rgba(0, 0, 0, 0.1); + } + + #table_resolver table { + border-collapse: collapse; + table-layout: fixed; + //width: 98%; + word-wrap: break-word; + font-size: small; + } + #quesma_all_logs table { border-collapse: collapse; table-layout: fixed; diff --git a/quesma/quesma/ui/console_routes.go b/quesma/quesma/ui/console_routes.go index f5cb1a1d2..b4eafca80 100644 --- a/quesma/quesma/ui/console_routes.go +++ b/quesma/quesma/ui/console_routes.go @@ -47,6 +47,18 @@ func (qmc *QuesmaManagementConsole) createRouting() *mux.Router { _, _ = writer.Write(buf) }) + router.HandleFunc("/table_resolver", func(writer http.ResponseWriter, req *http.Request) { + buf := qmc.generateTableResolver() + _, _ = writer.Write(buf) + }) + + router.HandleFunc("/table_resolver/ask", func(writer http.ResponseWriter, req *http.Request) { + prompt := req.PostFormValue("prompt") + + buf := qmc.generateTableResolverAnswer(prompt) + _, _ = writer.Write(buf) + }) + router.HandleFunc("/tables/reload", func(writer http.ResponseWriter, req *http.Request) { qmc.logManager.ReloadTables() buf := qmc.generateTables() diff --git a/quesma/quesma/ui/html_pages_test.go b/quesma/quesma/ui/html_pages_test.go index 1119c0a68..3eead705c 100644 --- a/quesma/quesma/ui/html_pages_test.go +++ b/quesma/quesma/ui/html_pages_test.go @@ -12,6 +12,7 @@ import ( "quesma/quesma/config" "quesma/quesma/types" "quesma/stats" + "quesma/table_resolver" "quesma/telemetry" "testing" ) @@ -21,7 +22,8 @@ func TestHtmlPages(t *testing.T) { xssBytes := []byte(xss) id := "b1c4a89e-4905-5e3c-b57f-dc92627d011e" logChan := make(chan logger.LogWithLevel, 5) - qmc := NewQuesmaManagementConsole(&config.QuesmaConfiguration{}, nil, nil, logChan, telemetry.NewPhoneHomeEmptyAgent(), nil) + resolver := table_resolver.NewEmptyTableResolver() + qmc := NewQuesmaManagementConsole(&config.QuesmaConfiguration{}, nil, nil, logChan, telemetry.NewPhoneHomeEmptyAgent(), nil, resolver) qmc.PushPrimaryInfo(&QueryDebugPrimarySource{Id: id, QueryResp: xssBytes}) qmc.PushSecondaryInfo(&QueryDebugSecondarySource{Id: id, Path: xss, @@ -101,7 +103,8 @@ func TestHtmlSchemaPage(t *testing.T) { logManager := clickhouse.NewLogManager(tables, &cfg) - qmc := NewQuesmaManagementConsole(&cfg, logManager, nil, logChan, telemetry.NewPhoneHomeEmptyAgent(), nil) + resolver := table_resolver.NewEmptyTableResolver() + qmc := NewQuesmaManagementConsole(&cfg, logManager, nil, logChan, telemetry.NewPhoneHomeEmptyAgent(), nil, resolver) t.Run("schema got no XSS and no panic", func(t *testing.T) { response := string(qmc.generateTables()) diff --git a/quesma/quesma/ui/html_utils.go b/quesma/quesma/ui/html_utils.go index 729191197..a1d2d96ba 100644 --- a/quesma/quesma/ui/html_utils.go +++ b/quesma/quesma/ui/html_utils.go @@ -37,6 +37,12 @@ func generateTopNavigation(target string) []byte { buffer.Html(`>Live tail`) buffer.Html("Resolver`) + buffer.Html("\n") buffer.Html("\n\n") - if target != "tables" && target != "telemetry" { + if target != "tables" && target != "telemetry" && target != "table_resolver" { buffer.Html(`
` + "\n") buffer.Html(`
`) buffer.Html(fmt.Sprintf( diff --git a/quesma/quesma/ui/management_console.go b/quesma/quesma/ui/management_console.go index e54dec9b0..ac89e1bf8 100644 --- a/quesma/quesma/ui/management_console.go +++ b/quesma/quesma/ui/management_console.go @@ -8,6 +8,7 @@ import ( "quesma/elasticsearch" "quesma/quesma/types" "quesma/schema" + "quesma/table_resolver" "quesma/telemetry" "quesma/util" @@ -100,13 +101,14 @@ type ( phoneHomeAgent telemetry.PhoneHomeAgent schemasProvider SchemasProvider totalUnsupportedQueries int + tableResolver table_resolver.TableResolver } SchemasProvider interface { AllSchemas() map[schema.TableName]schema.Schema } ) -func NewQuesmaManagementConsole(cfg *config.QuesmaConfiguration, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, logChan <-chan logger.LogWithLevel, phoneHomeAgent telemetry.PhoneHomeAgent, schemasProvider SchemasProvider) *QuesmaManagementConsole { +func NewQuesmaManagementConsole(cfg *config.QuesmaConfiguration, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, logChan <-chan logger.LogWithLevel, phoneHomeAgent telemetry.PhoneHomeAgent, schemasProvider SchemasProvider, indexRegistry table_resolver.TableResolver) *QuesmaManagementConsole { return &QuesmaManagementConsole{ queryDebugPrimarySource: make(chan *QueryDebugPrimarySource, 10), queryDebugSecondarySource: make(chan *QueryDebugSecondarySource, 10), @@ -124,6 +126,7 @@ func NewQuesmaManagementConsole(cfg *config.QuesmaConfiguration, logManager *cli indexManagement: indexManager, phoneHomeAgent: phoneHomeAgent, schemasProvider: schemasProvider, + tableResolver: indexRegistry, } } diff --git a/quesma/quesma/ui/table_resolver.go b/quesma/quesma/ui/table_resolver.go new file mode 100644 index 000000000..b771fc1e6 --- /dev/null +++ b/quesma/quesma/ui/table_resolver.go @@ -0,0 +1,121 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package ui + +import ( + "quesma/quesma/ui/internal/builder" + "strings" +) + +func (qmc *QuesmaManagementConsole) generateTableResolver() []byte { + + buffer := newBufferWithHead() + buffer.Write(generateTopNavigation("table_resolver")) + + buffer.Html(`
`) + + buffer.Html("

Table Resolver

") + + buffer.Html("

Ask Quesma

") + + buffer.Html("Ask Quesma to resolve an pattern:") + buffer.Html(`
`) + buffer.Html(`") + + buffer.Html(`
`) + + buffer.Html(`
`) + + buffer.Html("

Recent decisions

") + + pipelines := qmc.tableResolver.Pipelines() + + buffer.Html(``) + buffer.Html(``) + buffer.Html(``) + for _, pipeline := range pipelines { + buffer.Html(``) + } + buffer.Html(``) + + decisions := qmc.tableResolver.RecentDecisions() + + for _, decision := range decisions { + buffer.Html(``) + buffer.Html(``) + + for _, pipeline := range pipelines { + buffer.Html(``) + } + + buffer.Html(``) + } + buffer.Html(`
Pattern`).Text(pipeline).Html(`
`).Text(decision.Pattern).Html(``) + if decision.Decisions[pipeline] != nil { + buffer.Text(decision.Decisions[pipeline].String()) + } else { + buffer.Text("n/a") + } + buffer.Html(`
`) + + buffer.Html(`") + + buffer.Html("\n
\n\n") + return buffer.Bytes() + +} + +func (qmc *QuesmaManagementConsole) generateTableResolverAnswer(prompt string) []byte { + var buffer builder.HtmlBuffer + + prompt = strings.TrimSpace(prompt) + + if prompt == "" { + return buffer.Bytes() + } + + patterns := strings.Split(prompt, " ") + + pipelines := qmc.tableResolver.Pipelines() + + buffer.Html("

Quesma's decision

") + + buffer.Html(``) + buffer.Html(``) + buffer.Html(``) + for _, pipeline := range pipelines { + buffer.Html(``) + } + buffer.Html(``) + + for _, pattern := range patterns { + + pattern = strings.TrimSpace(pattern) + + buffer.Html(``) + buffer.Html(``) + + for _, pipeline := range pipelines { + decision := qmc.tableResolver.Resolve(pipeline, pattern) + buffer.Html(``) + } + buffer.Html(``) + } + + buffer.Html(`
Pattern`).Text(pipeline).Html(`
`).Text(pattern).Html(``) + if decision != nil { + buffer.Text(decision.String()) + } else { + buffer.Text("n/a") + } + buffer.Html(`
`) + + return buffer.Bytes() +} diff --git a/quesma/table_resolver/empty.go b/quesma/table_resolver/empty.go new file mode 100644 index 000000000..2cca8e229 --- /dev/null +++ b/quesma/table_resolver/empty.go @@ -0,0 +1,33 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package table_resolver + +type EmptyTableResolver struct { + Decisions map[string]*Decision + RecentDecisionList []PatternDecisions + PipelinesList []string +} + +func NewEmptyTableResolver() *EmptyTableResolver { + return &EmptyTableResolver{ + Decisions: make(map[string]*Decision), + } +} + +func (r *EmptyTableResolver) Resolve(pipeline string, indexPattern string) *Decision { + return r.Decisions[indexPattern] +} + +func (r *EmptyTableResolver) RecentDecisions() []PatternDecisions { + return r.RecentDecisionList +} + +func (r *EmptyTableResolver) Pipelines() []string { + return r.PipelinesList +} + +func (r *EmptyTableResolver) Start() { +} + +func (r *EmptyTableResolver) Stop() { +} diff --git a/quesma/table_resolver/model.go b/quesma/table_resolver/model.go new file mode 100644 index 000000000..eb691906f --- /dev/null +++ b/quesma/table_resolver/model.go @@ -0,0 +1,116 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package table_resolver + +import ( + "fmt" + "quesma/logger" + "strings" +) + +type Decision struct { + // obvious fields + IsClosed bool + Err error + IsEmpty bool + + // which connector to use, and how + UseConnectors []ConnectorDecision + + // who made the decision and why + Message string + ResolverName string +} + +func (d *Decision) String() string { + + var lines []string + + if d.IsClosed { + lines = append(lines, "Returns a closed index message.") + } + + if d.IsEmpty { + lines = append(lines, "Returns an empty result.") + } + + if d.Err != nil { + lines = append(lines, fmt.Sprintf("Returns error: '%v'.", d.Err)) + } + + for _, connector := range d.UseConnectors { + lines = append(lines, connector.Message()) + } + + lines = append(lines, fmt.Sprintf("%s (%s).", d.Message, d.ResolverName)) + + return strings.Join(lines, " ") +} + +type ConnectorDecision interface { + Message() string +} + +type ConnectorDecisionElastic struct { + // TODO instance of elastic connector +} + +func (*ConnectorDecisionElastic) Message() string { + return "Pass to Elasticsearch." +} + +type ConnectorDecisionClickhouse struct { + // TODO instance of clickhouse connector + + ClickhouseTableName string + ClickhouseTables []string + IsCommonTable bool +} + +func (d *ConnectorDecisionClickhouse) Message() string { + var lines []string + + lines = append(lines, "Pass to clickhouse.") + if len(d.ClickhouseTableName) > 0 { + lines = append(lines, fmt.Sprintf("Table: '%s' .", d.ClickhouseTableName)) + } + if d.IsCommonTable { + lines = append(lines, "Common table.") + } + if len(d.ClickhouseTables) > 0 { + lines = append(lines, fmt.Sprintf("Indexes: %v.", d.ClickhouseTables)) + } + + return strings.Join(lines, " ") +} + +// PatternDecisions is a struct that holds the pattern and the decisions made for that pattern +type PatternDecisions struct { + Pattern string + Decisions map[string]*Decision +} + +type TableResolver interface { + Start() + Stop() + + Resolve(pipeline string, indexPattern string) *Decision + + Pipelines() []string + RecentDecisions() []PatternDecisions +} + +// TODO will be removed in the next PR, +// right now it is used to mark places where we must refactor the code +func TODO(place string, decision *Decision) { + var trace bool + if trace { + logger.Debug().Msgf("TODO: use table_resolver decision here %s : %v", place, decision.String()) + } +} + +// TODO hardcoded pipeline names +const ( + QueryPipeline = "Query" + IngestPipeline = "Ingest" +) diff --git a/quesma/table_resolver/rules.go b/quesma/table_resolver/rules.go new file mode 100644 index 000000000..f8a021b05 --- /dev/null +++ b/quesma/table_resolver/rules.go @@ -0,0 +1,279 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package table_resolver + +import ( + "fmt" + "quesma/common_table" + "quesma/elasticsearch" + "quesma/end_user_errors" + "quesma/quesma/config" + "quesma/util" +) + +// TODO these rules may be incorrect and incomplete +// They will be fixed int the next iteration. + +func patternIsNotAllowed(input parsedPattern) *Decision { + if !input.isPattern { + return nil + } + return &Decision{ + Message: "Pattern is not allowed.", + Err: fmt.Errorf("pattern is not allowed"), + } +} + +func makeIsDisabledInConfig(cfg map[string]config.IndexConfiguration, pipeline string) func(input parsedPattern) *Decision { + + return func(input parsedPattern) *Decision { + + if !input.isPattern { + idx, ok := cfg[input.source] + if ok { + if len(getTargets(idx, pipeline)) == 0 { + return &Decision{ + IsClosed: true, + Message: "Index is disabled in config.", + } + } + } + } + + return nil + } +} + +func resolveInternalElasticName(pattern parsedPattern) *Decision { + + if elasticsearch.IsInternalIndex(pattern.source) { + return &Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionElastic{}}, + Message: "It's kibana internals", + } + } + + return nil +} + +func makeElasticIsDefault(cfg map[string]config.IndexConfiguration) func(input parsedPattern) *Decision { + + return func(input parsedPattern) *Decision { + return &Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionElastic{}}, + Message: "Elastic is default.", + } + } +} + +func (r *tableRegistryImpl) singleIndex(indexConfig map[string]config.IndexConfiguration, pipeline string) func(input parsedPattern) *Decision { + + return func(input parsedPattern) *Decision { + + if input.isPattern { + return nil + } + + if cfg, ok := indexConfig[input.source]; ok { + if !cfg.UseCommonTable { + + targets := getTargets(cfg, pipeline) + + switch len(targets) { + + case 0: + return &Decision{ + Message: "Disabled in the config.", + IsClosed: true, + } + + case 1: + + decision := &Decision{ + Message: "Enabled in the config. ", + } + + var targetDecision ConnectorDecision + + // FIXME this + switch targets[0] { + + case "elasticsearch": + targetDecision = &ConnectorDecisionElastic{} + case "clickhouse": + targetDecision = &ConnectorDecisionClickhouse{ + ClickhouseTableName: input.source, + ClickhouseTables: []string{input.source}, + } + default: + return &Decision{ + Message: "Unsupported configuration", + Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("unsupported target: %s", targets[0])), + } + } + decision.UseConnectors = append(decision.UseConnectors, targetDecision) + + return decision + case 2: + + // check targets and decide + // TODO what about A/B testing ? + + return &Decision{ + Message: "Enabled in the config. Physical table will be used.", + + UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ + ClickhouseTableName: input.source, + ClickhouseTables: []string{input.source}, + }}, + } + + default: + return &Decision{ + Message: "Unsupported configuration", + Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("too many backend connector")), + } + } + } + } + + // TODO autodiscovery ? + + return nil + } +} + +func (r *tableRegistryImpl) makeCheckIfPatternMatchesAllConnectors() func(input parsedPattern) *Decision { + + return func(input parsedPattern) *Decision { + if input.isPattern { + + matchedElastic := []string{} + matchedClickhouse := []string{} + + for _, pattern := range input.parts { + + for indexName := range r.elasticIndexes { + if util.IndexPatternMatches(pattern, indexName) { + matchedElastic = append(matchedElastic, indexName) + } + } + + for tableName := range r.clickhouseIndexes { + if util.IndexPatternMatches(pattern, tableName) { + matchedClickhouse = append(matchedClickhouse, tableName) + } + } + } + + nElastic := len(matchedElastic) + nClickhouse := len(matchedClickhouse) + + switch { + + case nElastic > 0 && nClickhouse > 0: + return &Decision{ + Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("index pattern [%s] resolved to both elasticsearch indices: [%s] and clickhouse tables: [%s]", input.parts, matchedElastic, matchedClickhouse)), + Message: "Both Elastic and Clickhouse matched.", + } + + case nElastic > 0 && nClickhouse == 0: + + return &Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionElastic{}}, + Message: "Only Elastic matched.", + } + + case nElastic == 0 && nClickhouse > 0: + // it will be resolved by sth else later + return nil + + case nElastic == 0 && nClickhouse == 0: + + // TODO we should return emtpy result here + // or pass to another tableResolver + return &Decision{ + IsEmpty: true, + Message: "No indexes matched. Checked both connectors.", + } + } + } + + return nil + } + +} + +func (r *tableRegistryImpl) makeCommonTableResolver(cfg map[string]config.IndexConfiguration) func(input parsedPattern) *Decision { + + return func(input parsedPattern) *Decision { + + if input.isPattern { + + // TODO at this point we shouldn't have elastic indexes? + for _, pattern := range input.parts { + for indexName := range r.elasticIndexes { + if util.IndexPatternMatches(pattern, indexName) { + + // TODO what about config ? + // TODO ? + return &Decision{ + Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("index parsedPattern [%s] resolved to elasticsearch indices", input.parts)), + Message: "We're not supporting common tables for Elastic.", + } + } + } + } + + matchedIndexes := []string{} + + for _, pattern := range input.parts { + for indexName, index := range r.clickhouseIndexes { + + // TODO what about config ? + // what if index uses common table but is't + if util.IndexPatternMatches(pattern, indexName) && index.isVirtual { + matchedIndexes = append(matchedIndexes, indexName) + } + } + } + + if len(matchedIndexes) == 0 { + return &Decision{ + IsEmpty: true, + Message: "No indexes found.", + } + } + + // HERE + return &Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ + IsCommonTable: true, + ClickhouseTableName: common_table.TableName, + ClickhouseTables: matchedIndexes, + }}, + Message: "Common table will be used. Querying multiple indexes.", + } + } + + if input.source == common_table.TableName { + return &Decision{ + Err: fmt.Errorf("common table is not allowed to be queried directly"), + Message: "It's internal table. Not allowed to be queried directly.", + } + } + + if idxConfig, ok := cfg[input.source]; ok && idxConfig.UseCommonTable { + return &Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ + ClickhouseTableName: common_table.TableName, + ClickhouseTables: []string{input.source}, + IsCommonTable: true, + }}, + Message: "Common table will be used.", + } + } + + return nil + } +} diff --git a/quesma/table_resolver/table_resolver.go b/quesma/table_resolver/table_resolver.go new file mode 100644 index 000000000..c5a092e9e --- /dev/null +++ b/quesma/table_resolver/table_resolver.go @@ -0,0 +1,318 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package table_resolver + +import ( + "context" + "fmt" + "quesma/clickhouse" + "quesma/elasticsearch" + "quesma/logger" + "quesma/quesma/config" + "quesma/quesma/recovery" + "sort" + "strings" + "sync" + "time" +) + +type tableResolver interface { + resolve(indexPattern string) *Decision +} + +// parsedPattern stores the parsed index pattern +type parsedPattern struct { + // source + source string + + // parsed data + isPattern bool + parts []string +} + +type basicResolver struct { + name string + resolver func(pattern parsedPattern) *Decision +} + +type compoundResolver struct { + decisionLadder []basicResolver +} + +func (ir *compoundResolver) resolve(indexName string) *Decision { + patterns := strings.Split(indexName, ",") + + input := parsedPattern{ + source: indexName, + isPattern: len(patterns) > 1 || strings.Contains(indexName, "*"), + parts: patterns, + } + + for _, resolver := range ir.decisionLadder { + decision := resolver.resolver(input) + + if decision != nil { + decision.ResolverName = resolver.name + return decision + } + } + return &Decision{ + Message: "Could not resolve pattern. This is a bug.", + Err: fmt.Errorf("could not resolve index"), // TODO better error + } +} + +// HACK: we should have separate config for each pipeline +// now we have a single config for both, but with different fields +func getTargets(indexConf config.IndexConfiguration, pipeline string) []string { + switch pipeline { + case IngestPipeline: + return indexConf.IngestTarget + case QueryPipeline: + return indexConf.QueryTarget + default: + return []string{} + } +} + +// table represents a table or an index discovered in the connector (clickhouse or elastic or ...) +type table struct { + name string + isVirtual bool +} + +type pipelineResolver struct { + pipelineName string + + resolver tableResolver + recentDecisions map[string]*Decision +} + +type tableRegistryImpl struct { + m sync.Mutex + ctx context.Context + cancel context.CancelFunc + + tableDiscovery clickhouse.TableDiscovery + elasticIndexResolver elasticsearch.IndexResolver + + elasticIndexes map[string]table + clickhouseIndexes map[string]table + + pipelineResolvers map[string]*pipelineResolver +} + +func (r *tableRegistryImpl) Resolve(pipeline string, indexPattern string) *Decision { + r.m.Lock() + defer r.m.Unlock() + + res, exists := r.pipelineResolvers[pipeline] + if !exists { + // proper error handling + return nil + } + + if decision, ok := res.recentDecisions[indexPattern]; ok { + return decision + } + + decision := res.resolver.resolve(indexPattern) + res.recentDecisions[indexPattern] = decision + + logger.Debug().Msgf("Decision for pipeline '%s', pattern '%s': %s", pipeline, indexPattern, decision.String()) + + return decision +} + +func (r *tableRegistryImpl) updateIndexes() { + r.m.Lock() + defer r.m.Unlock() + + defer func() { + for _, res := range r.pipelineResolvers { + res.recentDecisions = make(map[string]*Decision) + } + }() + + logger.Info().Msgf("Index registry updating state.") + + // TODO how to interact with the table discovery ? + // right now we enforce the reload of the table definitions + // schema registry is doing the same + // we should inject list of tables into the resolver + r.tableDiscovery.ReloadTableDefinitions() + + tableMap := r.tableDiscovery.TableDefinitions() + clickhouseIndexes := make(map[string]table) + + tableMap.Range(func(name string, tableDef *clickhouse.Table) bool { + clickhouseIndexes[name] = table{ + name: name, + isVirtual: tableDef.VirtualTable, + } + return true + }) + + r.clickhouseIndexes = clickhouseIndexes + logger.Info().Msgf("Clickhouse tables updated: %v", clickhouseIndexes) + + elasticIndexes := make(map[string]table) + sources, ok, err := r.elasticIndexResolver.Resolve("*") + if err != nil { + logger.Error().Msgf("Could not resolve indexes from Elastic: %v", err) + return + } + if !ok { + logger.Error().Msg("Could not resolve indexes from Elastic") + return + } + + for _, index := range sources.Indices { + elasticIndexes[index.Name] = table{ + name: index.Name, + } + } + + logger.Info().Msgf("Elastic tables updated: %v", elasticIndexes) + r.elasticIndexes = elasticIndexes +} + +func (r *tableRegistryImpl) updateState() { + r.updateIndexes() +} + +func (r *tableRegistryImpl) Stop() { + r.cancel() + logger.Info().Msg("Table resolver stopped.") +} + +func (r *tableRegistryImpl) Start() { + go func() { + defer recovery.LogPanic() + logger.Info().Msg("Table resolve started.") + + for { + select { + case <-r.ctx.Done(): + return + case <-time.After(1 * time.Minute): + r.updateState() + } + } + }() +} + +func (r *tableRegistryImpl) RecentDecisions() []PatternDecisions { + r.m.Lock() + defer r.m.Unlock() + + var patternsMap = make(map[string]bool) + + for _, res := range r.pipelineResolvers { + for p := range res.recentDecisions { + patternsMap[p] = true + } + } + + var patterns []string + for p := range patternsMap { + patterns = append(patterns, p) + } + + sort.Strings(patterns) + + var res []PatternDecisions + for _, p := range patterns { + + pd := PatternDecisions{ + Pattern: p, + Decisions: make(map[string]*Decision), + } + for _, resolver := range r.pipelineResolvers { + if decision, ok := resolver.recentDecisions[p]; ok { + pd.Decisions[resolver.pipelineName] = decision + } + } + res = append(res, pd) + } + + return res +} + +func (r *tableRegistryImpl) Pipelines() []string { + + r.m.Lock() + defer r.m.Unlock() + + var res []string + + for name := range r.pipelineResolvers { + res = append(res, name) + } + sort.Strings(res) + + return res +} + +func NewTableResolver(quesmaConf config.QuesmaConfiguration, discovery clickhouse.TableDiscovery, elasticResolver elasticsearch.IndexResolver) TableResolver { + ctx, cancel := context.WithCancel(context.Background()) + + indexConf := quesmaConf.IndexConfig + + res := &tableRegistryImpl{ + ctx: ctx, + cancel: cancel, + + tableDiscovery: discovery, + elasticIndexResolver: elasticResolver, + pipelineResolvers: make(map[string]*pipelineResolver), + } + + // TODO Here we should read the config and create resolver for each pipeline defined. + // TODO We should use the pipeline name as a key in the map. + + ingestResolver := &pipelineResolver{ + pipelineName: IngestPipeline, + + resolver: &compoundResolver{ + decisionLadder: []basicResolver{ + {"patternIsNotAllowed", patternIsNotAllowed}, + {"kibanaInternal", resolveInternalElasticName}, + {"disabled", makeIsDisabledInConfig(indexConf, QueryPipeline)}, + + {"singleIndex", res.singleIndex(indexConf, IngestPipeline)}, + {"commonTable", res.makeCommonTableResolver(indexConf)}, + + {"elasticAsDefault", makeElasticIsDefault(indexConf)}, + }, + }, + recentDecisions: make(map[string]*Decision), + } + + res.pipelineResolvers[IngestPipeline] = ingestResolver + + queryResolver := &pipelineResolver{ + pipelineName: QueryPipeline, + + resolver: &compoundResolver{ + decisionLadder: []basicResolver{ + // checking if we can handle the parsedPattern + {"kibanaInternal", resolveInternalElasticName}, + {"searchAcrossConnectors", res.makeCheckIfPatternMatchesAllConnectors()}, + {"disabled", makeIsDisabledInConfig(indexConf, QueryPipeline)}, + + {"singleIndex", res.singleIndex(indexConf, QueryPipeline)}, + {"commonTable", res.makeCommonTableResolver(indexConf)}, + + // default action + {"elasticAsDefault", makeElasticIsDefault(indexConf)}, + }, + }, + recentDecisions: make(map[string]*Decision), + } + + res.pipelineResolvers[QueryPipeline] = queryResolver + // update the state ASAP + res.updateState() + return res +} diff --git a/quesma/table_resolver/table_resolver_test.go b/quesma/table_resolver/table_resolver_test.go new file mode 100644 index 000000000..66635ffd1 --- /dev/null +++ b/quesma/table_resolver/table_resolver_test.go @@ -0,0 +1,277 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package table_resolver + +import ( + "fmt" + "github.com/k0kubun/pp" + "github.com/stretchr/testify/assert" + "quesma/clickhouse" + "quesma/common_table" + "quesma/elasticsearch" + "quesma/end_user_errors" + "quesma/quesma/config" + "reflect" + "strings" + "testing" +) + +func TestTableResolver(t *testing.T) { + + indexConf := map[string]config.IndexConfiguration{ + "index1": { + QueryTarget: []string{"clickhouse"}, + IngestTarget: []string{"clickhouse"}, + }, + "index2": { + UseCommonTable: true, + QueryTarget: []string{"clickhouse"}, + IngestTarget: []string{"clickhouse"}, + }, + "index3": { + QueryTarget: []string{"elasticsearch"}, + IngestTarget: []string{"elasticsearch"}, + }, + "closed": { + QueryTarget: []string{}, + IngestTarget: []string{}, + }, + } + + cfg := config.QuesmaConfiguration{IndexConfig: indexConf} + + tests := []struct { + name string + pipeline string + pattern string + elasticIndexes []string + clickhouseIndexes []string + virtualTables []string + expected Decision + }{ + { + name: "elastic fallback", + pipeline: IngestPipeline, + pattern: "some-index", + expected: Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionElastic{}}, + }, + }, + { + name: "all", + pipeline: QueryPipeline, + pattern: "*", + expected: Decision{ + IsEmpty: true, + }, + }, + { + name: "empty *", + pipeline: QueryPipeline, + pattern: "*", + clickhouseIndexes: []string{"index1", "index2"}, + expected: Decision{ + IsEmpty: true, + }, + }, + { + name: "query all, indices in both connectors", + pipeline: QueryPipeline, + pattern: "*", + clickhouseIndexes: []string{"index1", "index2"}, + elasticIndexes: []string{"index3"}, + expected: Decision{ + Err: end_user_errors.ErrSearchCondition.New(fmt.Errorf("")), + }, + }, + { + name: "ingest with a pattern", + pipeline: IngestPipeline, + pattern: "*", + clickhouseIndexes: []string{"index1", "index2"}, + elasticIndexes: []string{"index3"}, + expected: Decision{ + Err: fmt.Errorf("pattern is not allowed"), + }, + }, + { + name: "query closed index", + pipeline: QueryPipeline, + pattern: "closed", + clickhouseIndexes: []string{"closed"}, + expected: Decision{ + IsClosed: true, + }, + }, + { + name: "ingest closed index", + pipeline: QueryPipeline, + pattern: "closed", + clickhouseIndexes: []string{"closed"}, + expected: Decision{ + IsClosed: true, + }, + }, + { + name: "ingest to index1", + pipeline: IngestPipeline, + pattern: "index1", + clickhouseIndexes: []string{"index1"}, + expected: Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ + ClickhouseTableName: "index1", + ClickhouseTables: []string{"index1"}}, + }, + }, + }, + { + name: "query from index1", + pipeline: QueryPipeline, + pattern: "index1", + clickhouseIndexes: []string{"index1"}, + expected: Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ + ClickhouseTableName: "index1", + ClickhouseTables: []string{"index1"}}, + }, + }, + }, + { + name: "ingest to index2", + pipeline: IngestPipeline, + pattern: "index2", + clickhouseIndexes: []string{"index2"}, + expected: Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ + ClickhouseTableName: common_table.TableName, + ClickhouseTables: []string{"index2"}, + IsCommonTable: true, + }}, + }, + }, + { + name: "query from index2", + pipeline: QueryPipeline, + pattern: "index2", + clickhouseIndexes: []string{"index2"}, + expected: Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ + ClickhouseTableName: common_table.TableName, + ClickhouseTables: []string{"index2"}, + IsCommonTable: true, + }}, + }, + }, + { + name: "ingest to index3", + pipeline: IngestPipeline, + pattern: "index3", + elasticIndexes: []string{"index3"}, + expected: Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionElastic{}}, + }, + }, + { + name: "query from index3", + pipeline: QueryPipeline, + pattern: "index3", + elasticIndexes: []string{"index3"}, + expected: Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionElastic{}}, + }, + }, + { + name: "query pattern", + pipeline: QueryPipeline, + pattern: "index*", + virtualTables: []string{"index2"}, + expected: Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionClickhouse{ + ClickhouseTableName: common_table.TableName, + ClickhouseTables: []string{"index2"}, + IsCommonTable: true, + }}, + }, + }, + { + name: "query kibana internals", + pipeline: QueryPipeline, + pattern: ".kibana", + expected: Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionElastic{}}, + }, + }, + { + name: "ingest kibana internals", + pipeline: IngestPipeline, + pattern: ".kibana", + expected: Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionElastic{}}, + }, + }, + { + name: "ingest not configured index", + pipeline: IngestPipeline, + pattern: "not-configured", + expected: Decision{ + UseConnectors: []ConnectorDecision{&ConnectorDecisionElastic{}}, + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + + tableDiscovery := clickhouse.NewEmptyTableDiscovery() + + for _, index := range tt.clickhouseIndexes { + tableDiscovery.TableMap.Store(index, &clickhouse.Table{ + Name: index, + }) + } + + for _, index := range tt.virtualTables { + tableDiscovery.TableMap.Store(index, &clickhouse.Table{ + Name: index, + VirtualTable: true, + }) + } + + elasticResolver := elasticsearch.NewEmptyIndexResolver() + + sources := elasticsearch.Sources{ + Indices: make([]elasticsearch.Index, 0), + } + + for _, index := range tt.elasticIndexes { + sources.Indices = append(sources.Indices, elasticsearch.Index{ + Name: index, + }) + } + elasticResolver.Indexes["*"] = sources + + resolver := NewTableResolver(cfg, tableDiscovery, elasticResolver) + + decision := resolver.Resolve(tt.pipeline, tt.pattern) + + assert.NotNil(t, decision) + if tt.expected.Err != nil { + if !strings.Contains(decision.Err.Error(), tt.expected.Err.Error()) { + t.Errorf("Error is not an instance of the expected error: got %v, expected %v", decision.Err, tt.expected.Err) + } + } else { + assert.Nil(t, decision.Err) + } + assert.Equal(t, tt.expected.IsClosed, decision.IsClosed, "expected %v, got %v", tt.expected.IsClosed, decision.IsClosed) + assert.Equal(t, tt.expected.IsEmpty, decision.IsEmpty, "expected %v, got %v", tt.expected.IsEmpty, decision.IsEmpty) + + if !reflect.DeepEqual(tt.expected.UseConnectors, decision.UseConnectors) { + pp.Println(tt.expected) + pp.Println(decision) + t.Errorf("UseConnectors didn't match, expected %v, got %v", tt.expected.UseConnectors, decision.UseConnectors) + } + + }) + } + +}