diff --git a/app/config/sink.go b/app/config/sink.go index ae289410c..e64d1ae1d 100644 --- a/app/config/sink.go +++ b/app/config/sink.go @@ -91,6 +91,11 @@ type StorageConfiguration struct { // without waiting for the data got inserted into the buffer. // Setting true can cause silent errors that you need to monitor separately. AsyncInsertWait bool + + // See https://clickhouse.com/docs/en/operations/settings/settings + // For example, you can set the `max_insert_threads` setting to control the number of threads + // or the `parallel_view_processing` setting to enable pushing to attached views concurrently. + QuerySettings map[string]string } func (c StorageConfiguration) Validate() error { diff --git a/cmd/sink-worker/main.go b/cmd/sink-worker/main.go index 6191446dd..a89c4a195 100644 --- a/cmd/sink-worker/main.go +++ b/cmd/sink-worker/main.go @@ -146,14 +146,19 @@ func initSink(config config.Configuration, logger *slog.Logger, metricMeter metr } } - storage := sink.NewClickhouseStorage( + // Initialize storage + storage, err := sink.NewClickhouseStorage( sink.ClickHouseStorageConfig{ ClickHouse: clickhouseClient, Database: config.Aggregation.ClickHouse.Database, AsyncInsert: config.Sink.Storage.AsyncInsert, AsyncInsertWait: config.Sink.Storage.AsyncInsertWait, + QuerySettings: config.Sink.Storage.QuerySettings, }, ) + if err != nil { + return nil, fmt.Errorf("failed to initialize storage: %w", err) + } // Initialize Kafka consumer diff --git a/openmeter/sink/storage.go b/openmeter/sink/storage.go index 28dbd056e..bd7109532 100644 --- a/openmeter/sink/storage.go +++ b/openmeter/sink/storage.go @@ -3,6 +3,7 @@ package sink import ( "context" "fmt" + "strings" "time" "github.com/ClickHouse/clickhouse-go/v2" @@ -21,12 +22,29 @@ type ClickHouseStorageConfig struct { Database string AsyncInsert bool AsyncInsertWait bool + QuerySettings map[string]string } -func NewClickhouseStorage(config ClickHouseStorageConfig) *ClickHouseStorage { +func (c ClickHouseStorageConfig) Validate() error { + if c.ClickHouse == nil { + return fmt.Errorf("clickhouse connection is required") + } + + if c.Database == "" { + return fmt.Errorf("database is required") + } + + return nil +} + +func NewClickhouseStorage(config ClickHouseStorageConfig) (*ClickHouseStorage, error) { + if err := config.Validate(); err != nil { + return nil, err + } + return &ClickHouseStorage{ config: config, - } + }, nil } type ClickHouseStorage struct { @@ -35,9 +53,10 @@ type ClickHouseStorage struct { func (c *ClickHouseStorage) BatchInsert(ctx context.Context, messages []sinkmodels.SinkMessage) error { query := InsertEventsQuery{ - Clock: realClock{}, - Database: c.config.Database, - Messages: messages, + Clock: realClock{}, + Database: c.config.Database, + Messages: messages, + QuerySettings: c.config.QuerySettings, } sql, args, err := query.ToSQL() if err != nil { @@ -63,9 +82,10 @@ func (c *ClickHouseStorage) BatchInsert(ctx context.Context, messages []sinkmode } type InsertEventsQuery struct { - Clock Clock - Database string - Messages []sinkmodels.SinkMessage + Clock Clock + Database string + Messages []sinkmodels.SinkMessage + QuerySettings map[string]string } func (q InsertEventsQuery) ToSQL() (string, []interface{}, error) { @@ -75,6 +95,16 @@ func (q InsertEventsQuery) ToSQL() (string, []interface{}, error) { query.InsertInto(tableName) query.Cols("namespace", "validation_error", "id", "type", "source", "subject", "time", "data", "ingested_at", "stored_at") + // Add settings + var settings []string + for key, value := range q.QuerySettings { + settings = append(settings, fmt.Sprintf("%s = %s", key, value)) + } + + if len(settings) > 0 { + query.SQL(fmt.Sprintf("SETTINGS %s", strings.Join(settings, ", "))) + } + for _, message := range q.Messages { var eventErr string if message.Status.Error != nil {