Skip to content

Commit

Permalink
Schema registry refactoring (#1187)
Browse files Browse the repository at this point in the history
This PR adds/changes:
- notifications triggered by table discovery or table change
- schema registry caching 
- simplified locking strategy
- cache invalidation on table discovery
  • Loading branch information
nablaone authored Jan 16, 2025
1 parent 4ffb5c9 commit e44f0af
Show file tree
Hide file tree
Showing 13 changed files with 211 additions and 42 deletions.
46 changes: 46 additions & 0 deletions quesma/clickhouse/table_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"github.com/QuesmaOrg/quesma/quesma/logger"
"github.com/QuesmaOrg/quesma/quesma/persistence"
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
"github.com/QuesmaOrg/quesma/quesma/quesma/types"
"github.com/QuesmaOrg/quesma/quesma/schema"
"github.com/QuesmaOrg/quesma/quesma/util"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"github.com/goccy/go-json"
"strings"
"sync"
"sync/atomic"
"time"
)
Expand All @@ -34,12 +36,15 @@ func (d DbKind) String() string {
type TableDiscovery interface {
ReloadTableDefinitions()
TableDefinitions() *TableMap
AddTable(tableName string, table *Table)
TableDefinitionsFetchError() error

LastAccessTime() time.Time
LastReloadTime() time.Time
ForceReloadCh() <-chan chan<- struct{}
AutodiscoveryEnabled() bool

RegisterTablesReloadListener(ch chan<- types.ReloadMessage)
}

type tableDiscovery struct {
Expand All @@ -51,6 +56,9 @@ type tableDiscovery struct {
forceReloadCh chan chan<- struct{}
ReloadTablesError error
virtualTableStorage persistence.JSONDatabase

reloadObserversMutex sync.Mutex
reloadObservers []chan<- types.ReloadMessage
}

type columnMetadata struct {
Expand Down Expand Up @@ -79,6 +87,10 @@ type TableDiscoveryTableProviderAdapter struct {
TableDiscovery
}

func (t TableDiscoveryTableProviderAdapter) RegisterTablesReloadListener(ch chan<- types.ReloadMessage) {
t.TableDiscovery.RegisterTablesReloadListener(ch)
}

func (t TableDiscoveryTableProviderAdapter) TableDefinitions() map[string]schema.Table {

// here we filter out our internal columns
Expand Down Expand Up @@ -125,6 +137,31 @@ func NewTableDiscoveryWith(cfg *config.QuesmaConfiguration, dbConnPool quesma_ap
return result
}

func (td *tableDiscovery) AddTable(tableName string, table *Table) {
td.tableDefinitions.Load().Store(tableName, table)
td.notifyObservers()
}

func (td *tableDiscovery) RegisterTablesReloadListener(ch chan<- types.ReloadMessage) {
td.reloadObserversMutex.Lock()
defer td.reloadObserversMutex.Unlock()
td.reloadObservers = append(td.reloadObservers, ch)
}

func (td *tableDiscovery) notifyObservers() {

td.reloadObserversMutex.Lock()
defer td.reloadObserversMutex.Unlock()

msg := types.ReloadMessage{Timestamp: time.Now()}
for _, observer := range td.reloadObservers {
fmt.Println("Sending message to observer", observer)
go func() {
observer <- msg
}()
}
}

func (td *tableDiscovery) TableDefinitionsFetchError() error {
return td.ReloadTablesError
}
Expand Down Expand Up @@ -178,6 +215,8 @@ func (td *tableDiscovery) ReloadTableDefinitions() {

td.ReloadTablesError = nil
td.populateTableDefinitions(configuredTables, databaseName, td.cfg)

td.notifyObservers()
}

func (td *tableDiscovery) readVirtualTables(configuredTables map[string]discoveredTable) map[string]discoveredTable {
Expand Down Expand Up @@ -634,6 +673,9 @@ func NewEmptyTableDiscovery() *EmptyTableDiscovery {
}
}

func (td *EmptyTableDiscovery) RegisterTablesReloadListener(ch chan<- types.ReloadMessage) {
}

func (td *EmptyTableDiscovery) ReloadTableDefinitions() {
}

Expand All @@ -660,3 +702,7 @@ func (td *EmptyTableDiscovery) ForceReloadCh() <-chan chan<- struct{} {
func (td *EmptyTableDiscovery) AutodiscoveryEnabled() bool {
return td.Autodiscovery
}

func (td *EmptyTableDiscovery) AddTable(tableName string, table *Table) {
td.TableMap.Store(tableName, table)
}
2 changes: 2 additions & 0 deletions quesma/ingest/common_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,8 @@ func TestIngestToCommonTable(t *testing.T) {

tableDisco := clickhouse.NewTableDiscovery(quesmaConfig, db, virtualTableStorage)
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, quesmaConfig, clickhouse.SchemaTypeAdapter{})
schemaRegistry.Start()
defer schemaRegistry.Stop()

resolver := table_resolver.NewEmptyTableResolver()

Expand Down
2 changes: 1 addition & 1 deletion quesma/ingest/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -962,7 +962,7 @@ func (ip *IngestProcessor) AddTableIfDoesntExist(table *chLib.Table) bool {
logger.Error().Msgf("error storing virtual table: %v", err)
}
}
ip.tableDiscovery.TableDefinitions().Store(table.Name, table)
ip.tableDiscovery.AddTable(table.Name, table)
return true
}
wasntCreated := !t.Created
Expand Down
2 changes: 2 additions & 0 deletions quesma/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func main() {
virtualTableStorage := persistence.NewElasticJSONDatabase(cfg.Elasticsearch, common_table.VirtualTableElasticIndexName)
tableDisco := clickhouse.NewTableDiscovery(&cfg, connectionPool, virtualTableStorage)
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.SchemaTypeAdapter{})
schemaRegistry.Start()

im := elasticsearch.NewIndexManagement(cfg.Elasticsearch)

Expand Down Expand Up @@ -146,6 +147,7 @@ func main() {

ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
schemaRegistry.Stop()
feature.NotSupportedLogger.Stop()
phoneHomeAgent.Stop(ctx)
lm.Stop()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ func (p *ElasticsearchToClickHouseIngestProcessor) prepareTemporaryIngestProcess
virtualTableStorage := persistence.NewElasticJSONDatabase(esBackendConn.GetConfig(), common_table.VirtualTableElasticIndexName)
tableDisco := clickhouse.NewTableDiscovery(oldQuesmaConfig, connectionPool, virtualTableStorage)
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, oldQuesmaConfig, clickhouse.SchemaTypeAdapter{})
schemaRegistry.Start()

dummyTableResolver := table_resolver.NewDummyTableResolver()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func (p *ElasticsearchToClickHouseQueryProcessor) prepareTemporaryQueryProcessor
virtualTableStorage := persistence.NewElasticJSONDatabase(esBackendConn.GetConfig(), common_table.VirtualTableElasticIndexName)
tableDisco := clickhouse.NewTableDiscovery(oldQuesmaConfig, connectionPool, virtualTableStorage)
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, oldQuesmaConfig, clickhouse.SchemaTypeAdapter{})
schemaRegistry.Start()

logManager := clickhouse.NewEmptyLogManager(oldQuesmaConfig, connectionPool, phoneHomeAgent, tableDisco)
logManager.Start()
Expand Down
7 changes: 6 additions & 1 deletion quesma/quesma/router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -280,7 +280,12 @@ func TestConfigureRouter(t *testing.T) {
},
}
tr := TestTableResolver{}
testRouter := ConfigureRouter(cfg, schema.NewSchemaRegistry(fixedTableProvider{}, cfg, clickhouse.SchemaTypeAdapter{}), &clickhouse.LogManager{}, &ingest.IngestProcessor{}, &ui.QuesmaManagementConsole{}, telemetry.NewPhoneHomeAgent(cfg, nil, ""), &QueryRunner{}, tr, nil)

schemaRegistry := schema.NewSchemaRegistry(fixedTableProvider{}, cfg, clickhouse.SchemaTypeAdapter{})
schemaRegistry.Start()
defer schemaRegistry.Stop()

testRouter := ConfigureRouter(cfg, schemaRegistry, &clickhouse.LogManager{}, &ingest.IngestProcessor{}, &ui.QuesmaManagementConsole{}, telemetry.NewPhoneHomeAgent(cfg, nil, ""), &QueryRunner{}, tr, nil)

tests := []struct {
path string
Expand Down
16 changes: 14 additions & 2 deletions quesma/quesma/schema_transformer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/QuesmaOrg/quesma/quesma/clickhouse"
"github.com/QuesmaOrg/quesma/quesma/model"
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
"github.com/QuesmaOrg/quesma/quesma/quesma/types"
"github.com/QuesmaOrg/quesma/quesma/schema"
"github.com/stretchr/testify/assert"
"strconv"
Expand All @@ -19,8 +20,8 @@ type fixedTableProvider struct {
func (f fixedTableProvider) TableDefinitions() map[string]schema.Table {
return f.tables
}

func (f fixedTableProvider) AutodiscoveryEnabled() bool { return false }
func (f fixedTableProvider) AutodiscoveryEnabled() bool { return false }
func (f fixedTableProvider) RegisterTablesReloadListener(chan<- types.ReloadMessage) {}

func Test_ipRangeTransform(t *testing.T) {
const isIPAddressInRangePrimitive = "isIPAddressInRange"
Expand Down Expand Up @@ -86,6 +87,8 @@ func Test_ipRangeTransform(t *testing.T) {
TableName: "kibana_sample_data_logs_nested", FieldName: "nested.clientip"}: "nested_clientip",
}
s := schema.NewSchemaRegistry(tableProvider, &cfg, clickhouse.SchemaTypeAdapter{})
s.Start()
defer s.Stop()
transform := NewSchemaCheckPass(&cfg, tableDiscovery, defaultSearchAfterStrategy)
s.UpdateFieldEncodings(fieldEncodings)

Expand Down Expand Up @@ -704,6 +707,8 @@ func TestApplyPhysicalFromExpression(t *testing.T) {
td.Store(tableDefinition.Name, &tableDefinition)

s := schema.NewSchemaRegistry(tableDiscovery, &cfg, clickhouse.SchemaTypeAdapter{})
s.Start()
defer s.Stop()
transform := NewSchemaCheckPass(&config.QuesmaConfiguration{IndexConfig: indexConfig}, nil, defaultSearchAfterStrategy)

tests := []struct {
Expand Down Expand Up @@ -964,6 +969,8 @@ func TestFullTextFields(t *testing.T) {
}

s := schema.NewSchemaRegistry(tableDiscovery, &cfg, clickhouse.SchemaTypeAdapter{})
s.Start()
defer s.Stop()
transform := NewSchemaCheckPass(&config.QuesmaConfiguration{IndexConfig: indexConfig}, nil, defaultSearchAfterStrategy)

indexSchema, ok := s.FindSchema("test")
Expand Down Expand Up @@ -1071,6 +1078,9 @@ func Test_applyMatchOperator(t *testing.T) {
}

s := schema.NewSchemaRegistry(tableDiscovery, &cfg, clickhouse.SchemaTypeAdapter{})
s.Start()
defer s.Stop()

transform := NewSchemaCheckPass(&cfg, nil, defaultSearchAfterStrategy)

indexSchema, ok := s.FindSchema("test")
Expand Down Expand Up @@ -1171,6 +1181,8 @@ func Test_checkAggOverUnsupportedType(t *testing.T) {
}

s := schema.NewSchemaRegistry(tableDiscovery, &cfg, clickhouse.SchemaTypeAdapter{})
s.Start()
defer s.Stop()
transform := NewSchemaCheckPass(&cfg, nil, defaultSearchAfterStrategy)

indexSchema, ok := s.FindSchema("test")
Expand Down
10 changes: 10 additions & 0 deletions quesma/quesma/types/messages.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package types

import "time"

type ReloadMessage struct {
Timestamp time.Time
}
Loading

0 comments on commit e44f0af

Please sign in to comment.