Skip to content

Commit

Permalink
Table Resolver (#840)
Browse files Browse the repository at this point in the history
This PR introduces `Table Resolver`. Table resolver answers the question
"What will Quesma do for given index (or index pattern)?"

It has three parts:
- `TableResolver` interface and implementation
- set of rules
- UI 

Limitations: 
It doesn't perform any actions based on decision. Rules can be
incomplete or simply wrong. It will be finished in the next PR.
  • Loading branch information
nablaone authored Oct 10, 2024
1 parent f5782fe commit 7611eba
Show file tree
Hide file tree
Showing 30 changed files with 1,419 additions and 84 deletions.
39 changes: 39 additions & 0 deletions quesma/clickhouse/table_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -604,3 +604,42 @@ func (td *tableDiscovery) createTableQuery(database, table string) (ddl string)
}
return ddl
}

type EmptyTableDiscovery struct {
TableMap *TableMap
Err error
Autodiscovery bool
}

func NewEmptyTableDiscovery() *EmptyTableDiscovery {
return &EmptyTableDiscovery{
TableMap: NewTableMap(),
}
}

func (td *EmptyTableDiscovery) ReloadTableDefinitions() {
}

func (td *EmptyTableDiscovery) TableDefinitions() *TableMap {
return td.TableMap
}

func (td *EmptyTableDiscovery) TableDefinitionsFetchError() error {
return td.Err
}

func (td *EmptyTableDiscovery) LastAccessTime() time.Time {
return time.Now()
}

func (td *EmptyTableDiscovery) LastReloadTime() time.Time {
return time.Now()
}

func (td *EmptyTableDiscovery) ForceReloadCh() <-chan chan<- struct{} {
return make(chan chan<- struct{})
}

func (td *EmptyTableDiscovery) AutodiscoveryEnabled() bool {
return td.Autodiscovery
}
15 changes: 15 additions & 0 deletions quesma/elasticsearch/index_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,3 +80,18 @@ func (im *indexResolver) Resolve(indexPattern string) (Sources, bool, error) {

return sources, true, nil
}

type EmptyIndexResolver struct {
Indexes map[string]Sources
}

func NewEmptyIndexResolver() *EmptyIndexResolver {
return &EmptyIndexResolver{
Indexes: make(map[string]Sources),
}
}

func (r *EmptyIndexResolver) Resolve(indexPattern string) (Sources, bool, error) {
res, ok := r.Indexes[indexPattern]
return res, ok, nil
}
4 changes: 4 additions & 0 deletions quesma/ingest/common_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"quesma/quesma/config"
"quesma/quesma/types"
"quesma/schema"
"quesma/table_resolver"
"testing"
)

Expand Down Expand Up @@ -188,10 +189,13 @@ func TestIngestToCommonTable(t *testing.T) {
tableDisco := clickhouse.NewTableDiscovery(quesmaConfig, db, virtualTableStorage)
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, quesmaConfig, clickhouse.SchemaTypeAdapter{})

resolver := table_resolver.NewEmptyTableResolver()

ingest := newIngestProcessorWithEmptyTableMap(tables, quesmaConfig)
ingest.chDb = db
ingest.virtualTableStorage = virtualTableStorage
ingest.schemaRegistry = schemaRegistry
ingest.tableResolver = resolver

if len(tt.alreadyExistingColumns) > 0 {

Expand Down
3 changes: 2 additions & 1 deletion quesma/ingest/ingest_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"quesma/concurrent"
"quesma/quesma/config"
"quesma/quesma/types"
"quesma/table_resolver"
"quesma/util"
"strings"
"testing"
Expand Down Expand Up @@ -169,7 +170,7 @@ func TestIngestValidation(t *testing.T) {
ip := newIngestProcessorEmpty()
ip.chDb = db
ip.tableDiscovery = clickhouse.NewTableDiscoveryWith(&config.QuesmaConfiguration{}, nil, *tableMap)

ip.tableResolver = table_resolver.NewEmptyTableResolver()
defer db.Close()

mock.ExpectExec(EscapeBrackets(expectedInsertJsons[i])).WithoutArgs().WillReturnResult(sqlmock.NewResult(0, 0))
Expand Down
6 changes: 6 additions & 0 deletions quesma/ingest/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"quesma/quesma/config"
"quesma/quesma/types"
"quesma/schema"
"quesma/table_resolver"
"quesma/util"
"slices"
"strconv"
Expand Down Expand Up @@ -240,6 +241,8 @@ func TestProcessInsertQuery(t *testing.T) {
t.Run("case insertTest["+strconv.Itoa(index1)+"], config["+strconv.Itoa(index2)+"], ingestProcessor["+strconv.Itoa(index3)+"]", func(t *testing.T) {
db, mock := util.InitSqlMockWithPrettyPrint(t, true)
ip.ip.chDb = db
resolver := table_resolver.NewEmptyTableResolver()
ip.ip.tableResolver = resolver
defer db.Close()

// info: result values aren't important, this '.WillReturnResult[...]' just needs to be there
Expand Down Expand Up @@ -417,12 +420,15 @@ func TestCreateTableIfSomeFieldsExistsInSchemaAlready(t *testing.T) {
}
schemaRegistry.Tables[schema.TableName(indexName)] = indexSchema

indexRegistry := table_resolver.NewEmptyTableResolver()
schemaRegistry.FieldEncodings = make(map[schema.FieldEncodingKey]schema.EncodedFieldName)
schemaRegistry.FieldEncodings[schema.FieldEncodingKey{TableName: indexName, FieldName: "schema_field"}] = "schema_field"

ingest := newIngestProcessorWithEmptyTableMap(tables, quesmaConfig)
ingest.chDb = db
ingest.virtualTableStorage = virtualTableStorage
ingest.schemaRegistry = schemaRegistry
ingest.tableResolver = indexRegistry

ctx := context.Background()
formatter := clickhouse.DefaultColumnNameFormatter()
Expand Down
9 changes: 7 additions & 2 deletions quesma/ingest/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"quesma/quesma/types"
"quesma/schema"
"quesma/stats"
"quesma/table_resolver"
"quesma/telemetry"
"quesma/util"
"slices"
Expand Down Expand Up @@ -63,6 +64,7 @@ type (
ingestFieldStatistics IngestFieldStatistics
ingestFieldStatisticsLock sync.Mutex
virtualTableStorage persistence.JSONDatabase
tableResolver table_resolver.TableResolver
}
TableMap = concurrent.Map[string, *chLib.Table]
SchemaMap = map[string]interface{} // TODO remove
Expand Down Expand Up @@ -694,6 +696,9 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
tableFormatter TableColumNameFormatter) error {

decision := lm.tableResolver.Resolve(table_resolver.IngestPipeline, tableName)
table_resolver.TODO("processInsertQuery", decision)

indexConf, ok := lm.cfg.IndexConfig[tableName]
if ok && indexConf.UseCommonTable {

Expand Down Expand Up @@ -904,9 +909,9 @@ func (ip *IngestProcessor) Ping() error {
return ip.chDb.Ping()
}

func NewIngestProcessor(cfg *config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader chLib.TableDiscovery, schemaRegistry schema.Registry, virtualTableStorage persistence.JSONDatabase) *IngestProcessor {
func NewIngestProcessor(cfg *config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader chLib.TableDiscovery, schemaRegistry schema.Registry, virtualTableStorage persistence.JSONDatabase, tableResolver table_resolver.TableResolver) *IngestProcessor {
ctx, cancel := context.WithCancel(context.Background())
return &IngestProcessor{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage}
return &IngestProcessor{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage, tableResolver: tableResolver}
}

func NewOnlySchemaFieldsCHConfig() *chLib.ChTableConfig {
Expand Down
19 changes: 13 additions & 6 deletions quesma/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"quesma/quesma/config"
"quesma/quesma/ui"
"quesma/schema"
"quesma/table_resolver"
"quesma/telemetry"
"quesma/tracing"
"syscall"
Expand Down Expand Up @@ -92,6 +93,12 @@ func main() {
connManager := connectors.NewConnectorManager(&cfg, connectionPool, phoneHomeAgent, tableDisco)
lm := connManager.GetConnector()

elasticIndexResolver := elasticsearch.NewIndexResolver(cfg.Elasticsearch.Url.String())

// TODO index configuration for ingest and query is the same for now
tableResolver := table_resolver.NewTableResolver(cfg, tableDisco, elasticIndexResolver)
tableResolver.Start()

var ingestProcessor *ingest.IngestProcessor

if cfg.EnableIngest {
Expand All @@ -100,7 +107,7 @@ func main() {
common_table.EnsureCommonTableExists(connectionPool)
}

ingestProcessor = ingest.NewIngestProcessor(&cfg, connectionPool, phoneHomeAgent, tableDisco, schemaRegistry, virtualTableStorage)
ingestProcessor = ingest.NewIngestProcessor(&cfg, connectionPool, phoneHomeAgent, tableDisco, schemaRegistry, virtualTableStorage, tableResolver)
} else {
logger.Info().Msg("Ingest processor is disabled.")
}
Expand All @@ -109,12 +116,12 @@ func main() {

logger.Info().Msgf("loaded config: %s", cfg.String())

quesmaManagementConsole := ui.NewQuesmaManagementConsole(&cfg, lm, im, qmcLogChannel, phoneHomeAgent, schemaRegistry) //FIXME no ingest processor here just for now
quesmaManagementConsole := ui.NewQuesmaManagementConsole(&cfg, lm, im, qmcLogChannel, phoneHomeAgent, schemaRegistry, tableResolver) //FIXME no ingest processor here just for now

abTestingController := sender.NewSenderCoordinator(&cfg)
abTestingController.Start()

instance := constructQuesma(&cfg, tableDisco, lm, ingestProcessor, im, schemaRegistry, phoneHomeAgent, quesmaManagementConsole, qmcLogChannel, abTestingController.GetSender())
instance := constructQuesma(&cfg, tableDisco, lm, ingestProcessor, im, schemaRegistry, phoneHomeAgent, quesmaManagementConsole, qmcLogChannel, abTestingController.GetSender(), tableResolver)
instance.Start()

<-doneCh
Expand All @@ -127,15 +134,15 @@ func main() {
phoneHomeAgent.Stop(ctx)
lm.Stop()
abTestingController.Stop()

tableResolver.Stop()
instance.Close(ctx)

}

func constructQuesma(cfg *config.QuesmaConfiguration, sl clickhouse.TableDiscovery, lm *clickhouse.LogManager, ip *ingest.IngestProcessor, im elasticsearch.IndexManagement, schemaRegistry schema.Registry, phoneHomeAgent telemetry.PhoneHomeAgent, quesmaManagementConsole *ui.QuesmaManagementConsole, logChan <-chan logger.LogWithLevel, abResultsrepository ab_testing.Sender) *quesma.Quesma {
func constructQuesma(cfg *config.QuesmaConfiguration, sl clickhouse.TableDiscovery, lm *clickhouse.LogManager, ip *ingest.IngestProcessor, im elasticsearch.IndexManagement, schemaRegistry schema.Registry, phoneHomeAgent telemetry.PhoneHomeAgent, quesmaManagementConsole *ui.QuesmaManagementConsole, logChan <-chan logger.LogWithLevel, abResultsrepository ab_testing.Sender, indexRegistry table_resolver.TableResolver) *quesma.Quesma {
if cfg.TransparentProxy {
return quesma.NewQuesmaTcpProxy(phoneHomeAgent, cfg, quesmaManagementConsole, logChan, false)
} else {
return quesma.NewHttpProxy(phoneHomeAgent, lm, ip, sl, im, schemaRegistry, cfg, quesmaManagementConsole, abResultsrepository)
return quesma.NewHttpProxy(phoneHomeAgent, lm, ip, sl, im, schemaRegistry, cfg, quesmaManagementConsole, abResultsrepository, indexRegistry)
}
}
10 changes: 7 additions & 3 deletions quesma/quesma/functionality/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"quesma/quesma/recovery"
"quesma/quesma/types"
"quesma/stats"
"quesma/table_resolver"
"quesma/telemetry"
"sort"
"strings"
Expand Down Expand Up @@ -68,15 +69,15 @@ type (
)

func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ingest.IngestProcessor,
cfg *config.QuesmaConfiguration, phoneHomeAgent telemetry.PhoneHomeAgent) (results []BulkItem, err error) {
cfg *config.QuesmaConfiguration, phoneHomeAgent telemetry.PhoneHomeAgent, tableResolver table_resolver.TableResolver) (results []BulkItem, err error) {
defer recovery.LogPanic()

bulkSize := len(bulk) / 2 // we divided payload by 2 so that we don't take into account the `action_and_meta_data` line, ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html
maybeLogBatchSize(bulkSize)

// The returned results should be in the same order as the input request, however splitting the bulk might change the order.
// Therefore, each BulkRequestEntry has a corresponding pointer to the result entry, allowing us to freely split and reshuffle the bulk.
results, clickhouseDocumentsToInsert, elasticRequestBody, elasticBulkEntries, err := splitBulk(ctx, defaultIndex, bulk, bulkSize, cfg)
results, clickhouseDocumentsToInsert, elasticRequestBody, elasticBulkEntries, err := splitBulk(ctx, defaultIndex, bulk, bulkSize, cfg, tableResolver)
if err != nil {
return []BulkItem{}, err
}
Expand Down Expand Up @@ -110,7 +111,7 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ing
return results, nil
}

func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bulkSize int, cfg *config.QuesmaConfiguration) ([]BulkItem, map[string][]BulkRequestEntry, []byte, []BulkRequestEntry, error) {
func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bulkSize int, cfg *config.QuesmaConfiguration, tableResolver table_resolver.TableResolver) ([]BulkItem, map[string][]BulkRequestEntry, []byte, []BulkRequestEntry, error) {
results := make([]BulkItem, bulkSize)

clickhouseDocumentsToInsert := make(map[string][]BulkRequestEntry, bulkSize)
Expand All @@ -121,6 +122,9 @@ func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bul
index := op.GetIndex()
operation := op.GetOperation()

decision := tableResolver.Resolve(table_resolver.IngestPipeline, index)
table_resolver.TODO("splitBulk", decision)

entryWithResponse := BulkRequestEntry{
operation: operation,
index: index,
Expand Down
5 changes: 3 additions & 2 deletions quesma/quesma/functionality/doc/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,16 +8,17 @@ import (
"quesma/quesma/config"
"quesma/quesma/functionality/bulk"
"quesma/quesma/types"
"quesma/table_resolver"
"quesma/telemetry"
)

func Write(ctx context.Context, tableName *string, body types.JSON, ip *ingest.IngestProcessor, cfg *config.QuesmaConfiguration, phoneHomeAgent telemetry.PhoneHomeAgent) (bulk.BulkItem, error) {
func Write(ctx context.Context, tableName *string, body types.JSON, ip *ingest.IngestProcessor, cfg *config.QuesmaConfiguration, phoneHomeAgent telemetry.PhoneHomeAgent, registry table_resolver.TableResolver) (bulk.BulkItem, error) {
// Translate single doc write to a bulk request, reusing exiting logic of bulk ingest

results, err := bulk.Write(ctx, tableName, []types.JSON{
map[string]interface{}{"index": map[string]interface{}{"_index": *tableName}},
body,
}, ip, cfg, phoneHomeAgent)
}, ip, cfg, phoneHomeAgent, registry)

if err != nil {
return bulk.BulkItem{}, err
Expand Down
5 changes: 3 additions & 2 deletions quesma/quesma/functionality/terms_enum/terms_enum_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"quesma/quesma/types"
"quesma/quesma/ui"
"quesma/schema"
"quesma/table_resolver"
"quesma/telemetry"
"quesma/tracing"
"quesma/util"
Expand Down Expand Up @@ -80,8 +81,8 @@ func testHandleTermsEnumRequest(t *testing.T, requestBody []byte) {
},
Created: true,
}

managementConsole := ui.NewQuesmaManagementConsole(&config.QuesmaConfiguration{}, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil)
tableResolver := table_resolver.NewEmptyTableResolver()
managementConsole := ui.NewQuesmaManagementConsole(&config.QuesmaConfiguration{}, nil, nil, make(<-chan logger.LogWithLevel, 50000), telemetry.NewPhoneHomeEmptyAgent(), nil, tableResolver)
db, mock := util.InitSqlMockWithPrettyPrint(t, true)
defer db.Close()
lm := clickhouse.NewLogManagerWithConnection(db, concurrent.NewMapWith(testTableName, table))
Expand Down
Loading

0 comments on commit 7611eba

Please sign in to comment.