Skip to content

Commit

Permalink
feat(sink): query settings (#1708)
Browse files Browse the repository at this point in the history
  • Loading branch information
hekike authored Oct 19, 2024
1 parent 4c578ce commit 8ca1548
Show file tree
Hide file tree
Showing 3 changed files with 49 additions and 9 deletions.
5 changes: 5 additions & 0 deletions app/config/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,11 @@ type StorageConfiguration struct {
// without waiting for the data got inserted into the buffer.
// Setting true can cause silent errors that you need to monitor separately.
AsyncInsertWait bool

// See https://clickhouse.com/docs/en/operations/settings/settings
// For example, you can set the `max_insert_threads` setting to control the number of threads
// or the `parallel_view_processing` setting to enable pushing to attached views concurrently.
QuerySettings map[string]string
}

func (c StorageConfiguration) Validate() error {
Expand Down
7 changes: 6 additions & 1 deletion cmd/sink-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,19 @@ func initSink(config config.Configuration, logger *slog.Logger, metricMeter metr
}
}

storage := sink.NewClickhouseStorage(
// Initialize storage
storage, err := sink.NewClickhouseStorage(
sink.ClickHouseStorageConfig{
ClickHouse: clickhouseClient,
Database: config.Aggregation.ClickHouse.Database,
AsyncInsert: config.Sink.Storage.AsyncInsert,
AsyncInsertWait: config.Sink.Storage.AsyncInsertWait,
QuerySettings: config.Sink.Storage.QuerySettings,
},
)
if err != nil {
return nil, fmt.Errorf("failed to initialize storage: %w", err)
}

// Initialize Kafka consumer

Expand Down
46 changes: 38 additions & 8 deletions openmeter/sink/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package sink
import (
"context"
"fmt"
"strings"
"time"

"github.com/ClickHouse/clickhouse-go/v2"
Expand All @@ -21,12 +22,29 @@ type ClickHouseStorageConfig struct {
Database string
AsyncInsert bool
AsyncInsertWait bool
QuerySettings map[string]string
}

func NewClickhouseStorage(config ClickHouseStorageConfig) *ClickHouseStorage {
func (c ClickHouseStorageConfig) Validate() error {
if c.ClickHouse == nil {
return fmt.Errorf("clickhouse connection is required")
}

if c.Database == "" {
return fmt.Errorf("database is required")
}

return nil
}

func NewClickhouseStorage(config ClickHouseStorageConfig) (*ClickHouseStorage, error) {
if err := config.Validate(); err != nil {
return nil, err
}

return &ClickHouseStorage{
config: config,
}
}, nil
}

type ClickHouseStorage struct {
Expand All @@ -35,9 +53,10 @@ type ClickHouseStorage struct {

func (c *ClickHouseStorage) BatchInsert(ctx context.Context, messages []sinkmodels.SinkMessage) error {
query := InsertEventsQuery{
Clock: realClock{},
Database: c.config.Database,
Messages: messages,
Clock: realClock{},
Database: c.config.Database,
Messages: messages,
QuerySettings: c.config.QuerySettings,
}
sql, args, err := query.ToSQL()
if err != nil {
Expand All @@ -63,9 +82,10 @@ func (c *ClickHouseStorage) BatchInsert(ctx context.Context, messages []sinkmode
}

type InsertEventsQuery struct {
Clock Clock
Database string
Messages []sinkmodels.SinkMessage
Clock Clock
Database string
Messages []sinkmodels.SinkMessage
QuerySettings map[string]string
}

func (q InsertEventsQuery) ToSQL() (string, []interface{}, error) {
Expand All @@ -75,6 +95,16 @@ func (q InsertEventsQuery) ToSQL() (string, []interface{}, error) {
query.InsertInto(tableName)
query.Cols("namespace", "validation_error", "id", "type", "source", "subject", "time", "data", "ingested_at", "stored_at")

// Add settings
var settings []string
for key, value := range q.QuerySettings {
settings = append(settings, fmt.Sprintf("%s = %s", key, value))
}

if len(settings) > 0 {
query.SQL(fmt.Sprintf("SETTINGS %s", strings.Join(settings, ", ")))
}

for _, message := range q.Messages {
var eventErr string
if message.Status.Error != nil {
Expand Down

0 comments on commit 8ca1548

Please sign in to comment.