From d61d294ef5c29dc09fe81e47eca5f9f0c0cfa9bf Mon Sep 17 00:00:00 2001 From: Jacek Migdal Date: Thu, 18 Jul 2024 13:50:08 +0200 Subject: [PATCH] Make reloading happen only if people access the Quesma (#537) Quesma 1 minute schema refresh prevents taking advantage of DB idling. This PR fixed that by: - stops periodic refresh if there is no-one uses schema (no queries) - force refresh after long idle time Unfortunately this logic is tricky to unit test. CC @avelanarius --- quesma/clickhouse/clickhouse.go | 16 ++++++++-- quesma/clickhouse/schema_loader.go | 51 ++++++++++++++++++++++++++---- 2 files changed, 59 insertions(+), 8 deletions(-) diff --git a/quesma/clickhouse/clickhouse.go b/quesma/clickhouse/clickhouse.go index 00bc933ca..7b4e928f6 100644 --- a/quesma/clickhouse/clickhouse.go +++ b/quesma/clickhouse/clickhouse.go @@ -88,6 +88,8 @@ func (lm *LogManager) Start() { lm.schemaLoader.ReloadTableDefinitions() logger.Info().Msgf("schemas loaded: %s", lm.schemaLoader.TableDefinitions().Keys()) + const reloadInterval = 1 * time.Minute + forceReloadCh := lm.schemaLoader.ForceReloadCh() go func() { recovery.LogPanic() @@ -96,8 +98,18 @@ func (lm *LogManager) Start() { case <-lm.ctx.Done(): logger.Debug().Msg("closing log manager") return - case <-time.After(1 * time.Minute): // TODO make it configurable - lm.schemaLoader.ReloadTableDefinitions() + case doneCh := <-forceReloadCh: + // this prevents flood of reloads, after a long pause + if time.Since(lm.schemaLoader.LastReloadTime()) > reloadInterval { + lm.schemaLoader.ReloadTableDefinitions() + } + doneCh <- struct{}{} + case <-time.After(reloadInterval): + // only reload if we actually use Quesma, make it double time to prevent edge case + // otherwise it prevent ClickHouse Cloud from idle pausing + if time.Since(lm.schemaLoader.LastAccessTime()) < reloadInterval*2 { + lm.schemaLoader.ReloadTableDefinitions() + } } } }() diff --git a/quesma/clickhouse/schema_loader.go b/quesma/clickhouse/schema_loader.go index 29daeabb7..d72b473ed 100644 --- a/quesma/clickhouse/schema_loader.go +++ b/quesma/clickhouse/schema_loader.go @@ -13,29 +13,40 @@ import ( "quesma/util" "strings" "sync/atomic" + "time" ) type TableDiscovery interface { ReloadTableDefinitions() TableDefinitions() *TableMap TableDefinitionsFetchError() error + + LastAccessTime() time.Time + LastReloadTime() time.Time + ForceReloadCh() <-chan chan<- struct{} } type tableDiscovery struct { - cfg config.QuesmaConfiguration - SchemaManagement *SchemaManagement - tableDefinitions *atomic.Pointer[TableMap] - ReloadTablesError error + cfg config.QuesmaConfiguration + SchemaManagement *SchemaManagement + tableDefinitions *atomic.Pointer[TableMap] + tableDefinitionsAccessUnixSec atomic.Int64 + tableDefinitionsLastReloadUnixSec atomic.Int64 + forceReloadCh chan chan<- struct{} + ReloadTablesError error } func NewTableDiscovery(cfg config.QuesmaConfiguration, schemaManagement *SchemaManagement) TableDiscovery { var tableDefinitions = atomic.Pointer[TableMap]{} tableDefinitions.Store(NewTableMap()) - return &tableDiscovery{ + result := &tableDiscovery{ cfg: cfg, SchemaManagement: schemaManagement, tableDefinitions: &tableDefinitions, + forceReloadCh: make(chan chan<- struct{}), } + result.tableDefinitionsLastReloadUnixSec.Store(time.Now().Unix()) + return result } type TableDiscoveryTableProviderAdapter struct { @@ -62,11 +73,14 @@ func (t TableDiscoveryTableProviderAdapter) TableDefinitions() map[string]schema func newTableDiscoveryWith(cfg config.QuesmaConfiguration, schemaManagement *SchemaManagement, tables TableMap) TableDiscovery { var tableDefinitions = atomic.Pointer[TableMap]{} tableDefinitions.Store(&tables) - return &tableDiscovery{ + result := &tableDiscovery{ cfg: cfg, SchemaManagement: schemaManagement, tableDefinitions: &tableDefinitions, + forceReloadCh: make(chan chan<- struct{}), } + result.tableDefinitionsLastReloadUnixSec.Store(time.Now().Unix()) + return result } func (sl *tableDiscovery) TableDefinitionsFetchError() error { @@ -77,7 +91,22 @@ func (sl *tableDiscovery) TableAutodiscoveryEnabled() bool { return sl.cfg.IndexConfig == nil } +func (sl *tableDiscovery) LastAccessTime() time.Time { + timeMs := sl.tableDefinitionsAccessUnixSec.Load() + return time.Unix(timeMs, 0) +} + +func (sl *tableDiscovery) LastReloadTime() time.Time { + timeMs := sl.tableDefinitionsLastReloadUnixSec.Load() + return time.Unix(timeMs, 0) +} + +func (sl *tableDiscovery) ForceReloadCh() <-chan chan<- struct{} { + return sl.forceReloadCh +} + func (sl *tableDiscovery) ReloadTableDefinitions() { + sl.tableDefinitionsLastReloadUnixSec.Store(time.Now().Unix()) logger.Debug().Msg("reloading tables definitions") var configuredTables map[string]discoveredTable databaseName := "default" @@ -93,6 +122,7 @@ func (sl *tableDiscovery) ReloadTableDefinitions() { } sl.ReloadTablesError = err sl.tableDefinitions.Store(NewTableMap()) + sl.tableDefinitionsLastReloadUnixSec.Store(time.Now().Unix()) return } else { if sl.TableAutodiscoveryEnabled() { @@ -233,6 +263,15 @@ func (sl *tableDiscovery) populateTableDefinitions(configuredTables map[string]d } func (sl *tableDiscovery) TableDefinitions() *TableMap { + sl.tableDefinitionsAccessUnixSec.Store(time.Now().Unix()) + lastReloadUnixSec := sl.tableDefinitionsLastReloadUnixSec.Load() + lastReload := time.Unix(lastReloadUnixSec, 0) + if time.Since(lastReload) > 15*time.Minute { // maybe configure + logger.Info().Msg("Table definitions are stale for 15 minutes, forcing reload") + doneCh := make(chan struct{}, 1) + sl.forceReloadCh <- doneCh + <-doneCh + } return sl.tableDefinitions.Load() }