Skip to content

Commit

Permalink
Make reloading happen only if people access the Quesma (#537)
Browse files Browse the repository at this point in the history
Quesma 1 minute schema refresh prevents taking advantage of DB idling.
This PR fixed that by:
- stops periodic refresh if there is no-one uses schema (no queries)
- force refresh after long idle time

Unfortunately this logic is tricky to unit test.

CC @avelanarius
  • Loading branch information
jakozaur authored Jul 18, 2024
1 parent b2f3ea1 commit d61d294
Show file tree
Hide file tree
Showing 2 changed files with 59 additions and 8 deletions.
16 changes: 14 additions & 2 deletions quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ func (lm *LogManager) Start() {
lm.schemaLoader.ReloadTableDefinitions()

logger.Info().Msgf("schemas loaded: %s", lm.schemaLoader.TableDefinitions().Keys())
const reloadInterval = 1 * time.Minute
forceReloadCh := lm.schemaLoader.ForceReloadCh()

go func() {
recovery.LogPanic()
Expand All @@ -96,8 +98,18 @@ func (lm *LogManager) Start() {
case <-lm.ctx.Done():
logger.Debug().Msg("closing log manager")
return
case <-time.After(1 * time.Minute): // TODO make it configurable
lm.schemaLoader.ReloadTableDefinitions()
case doneCh := <-forceReloadCh:
// this prevents flood of reloads, after a long pause
if time.Since(lm.schemaLoader.LastReloadTime()) > reloadInterval {
lm.schemaLoader.ReloadTableDefinitions()
}
doneCh <- struct{}{}
case <-time.After(reloadInterval):
// only reload if we actually use Quesma, make it double time to prevent edge case
// otherwise it prevent ClickHouse Cloud from idle pausing
if time.Since(lm.schemaLoader.LastAccessTime()) < reloadInterval*2 {
lm.schemaLoader.ReloadTableDefinitions()
}
}
}
}()
Expand Down
51 changes: 45 additions & 6 deletions quesma/clickhouse/schema_loader.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,29 +13,40 @@ import (
"quesma/util"
"strings"
"sync/atomic"
"time"
)

type TableDiscovery interface {
ReloadTableDefinitions()
TableDefinitions() *TableMap
TableDefinitionsFetchError() error

LastAccessTime() time.Time
LastReloadTime() time.Time
ForceReloadCh() <-chan chan<- struct{}
}

type tableDiscovery struct {
cfg config.QuesmaConfiguration
SchemaManagement *SchemaManagement
tableDefinitions *atomic.Pointer[TableMap]
ReloadTablesError error
cfg config.QuesmaConfiguration
SchemaManagement *SchemaManagement
tableDefinitions *atomic.Pointer[TableMap]
tableDefinitionsAccessUnixSec atomic.Int64
tableDefinitionsLastReloadUnixSec atomic.Int64
forceReloadCh chan chan<- struct{}
ReloadTablesError error
}

func NewTableDiscovery(cfg config.QuesmaConfiguration, schemaManagement *SchemaManagement) TableDiscovery {
var tableDefinitions = atomic.Pointer[TableMap]{}
tableDefinitions.Store(NewTableMap())
return &tableDiscovery{
result := &tableDiscovery{
cfg: cfg,
SchemaManagement: schemaManagement,
tableDefinitions: &tableDefinitions,
forceReloadCh: make(chan chan<- struct{}),
}
result.tableDefinitionsLastReloadUnixSec.Store(time.Now().Unix())
return result
}

type TableDiscoveryTableProviderAdapter struct {
Expand All @@ -62,11 +73,14 @@ func (t TableDiscoveryTableProviderAdapter) TableDefinitions() map[string]schema
func newTableDiscoveryWith(cfg config.QuesmaConfiguration, schemaManagement *SchemaManagement, tables TableMap) TableDiscovery {
var tableDefinitions = atomic.Pointer[TableMap]{}
tableDefinitions.Store(&tables)
return &tableDiscovery{
result := &tableDiscovery{
cfg: cfg,
SchemaManagement: schemaManagement,
tableDefinitions: &tableDefinitions,
forceReloadCh: make(chan chan<- struct{}),
}
result.tableDefinitionsLastReloadUnixSec.Store(time.Now().Unix())
return result
}

func (sl *tableDiscovery) TableDefinitionsFetchError() error {
Expand All @@ -77,7 +91,22 @@ func (sl *tableDiscovery) TableAutodiscoveryEnabled() bool {
return sl.cfg.IndexConfig == nil
}

func (sl *tableDiscovery) LastAccessTime() time.Time {
timeMs := sl.tableDefinitionsAccessUnixSec.Load()
return time.Unix(timeMs, 0)
}

func (sl *tableDiscovery) LastReloadTime() time.Time {
timeMs := sl.tableDefinitionsLastReloadUnixSec.Load()
return time.Unix(timeMs, 0)
}

func (sl *tableDiscovery) ForceReloadCh() <-chan chan<- struct{} {
return sl.forceReloadCh
}

func (sl *tableDiscovery) ReloadTableDefinitions() {
sl.tableDefinitionsLastReloadUnixSec.Store(time.Now().Unix())
logger.Debug().Msg("reloading tables definitions")
var configuredTables map[string]discoveredTable
databaseName := "default"
Expand All @@ -93,6 +122,7 @@ func (sl *tableDiscovery) ReloadTableDefinitions() {
}
sl.ReloadTablesError = err
sl.tableDefinitions.Store(NewTableMap())
sl.tableDefinitionsLastReloadUnixSec.Store(time.Now().Unix())
return
} else {
if sl.TableAutodiscoveryEnabled() {
Expand Down Expand Up @@ -233,6 +263,15 @@ func (sl *tableDiscovery) populateTableDefinitions(configuredTables map[string]d
}

func (sl *tableDiscovery) TableDefinitions() *TableMap {
sl.tableDefinitionsAccessUnixSec.Store(time.Now().Unix())
lastReloadUnixSec := sl.tableDefinitionsLastReloadUnixSec.Load()
lastReload := time.Unix(lastReloadUnixSec, 0)
if time.Since(lastReload) > 15*time.Minute { // maybe configure
logger.Info().Msg("Table definitions are stale for 15 minutes, forcing reload")
doneCh := make(chan struct{}, 1)
sl.forceReloadCh <- doneCh
<-doneCh
}
return sl.tableDefinitions.Load()
}

Expand Down

0 comments on commit d61d294

Please sign in to comment.