diff --git a/quesma/backend_connectors/clickhouse_backend_connector.go b/quesma/backend_connectors/clickhouse_backend_connector.go new file mode 100644 index 000000000..dbedb8316 --- /dev/null +++ b/quesma/backend_connectors/clickhouse_backend_connector.go @@ -0,0 +1,98 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package backend_connectors + +import ( + "context" + "database/sql" + "github.com/ClickHouse/clickhouse-go/v2" + + quesma_api "quesma_v2/core" +) + +type ClickHouseBackendConnector struct { + Endpoint string + connection *sql.DB +} + +type ClickHouseRows struct { + rows *sql.Rows +} + +func (p *ClickHouseRows) Next() bool { + return p.rows.Next() +} + +func (p *ClickHouseRows) Scan(dest ...interface{}) error { + return p.rows.Scan(dest...) +} + +func (p *ClickHouseRows) Close() { + err := p.rows.Close() + if err != nil { + panic(err) + } +} + +func (p *ClickHouseRows) Err() error { + return p.rows.Err() +} + +func (p *ClickHouseBackendConnector) GetId() quesma_api.BackendConnectorType { + return quesma_api.ClickHouseSQLBackend +} + +func (p *ClickHouseBackendConnector) Open() error { + conn, err := initDBConnection() + if err != nil { + return err + } + p.connection = conn + return nil +} + +func (p *ClickHouseBackendConnector) Close() error { + if p.connection == nil { + return nil + } + return p.connection.Close() +} + +func (p *ClickHouseBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) { + rows, err := p.connection.QueryContext(ctx, query, args...) + if err != nil { + return nil, err + } + return &ClickHouseRows{rows: rows}, nil +} + +func (p *ClickHouseBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error { + if len(args) == 0 { + _, err := p.connection.ExecContext(ctx, query) + return err + } + _, err := p.connection.ExecContext(ctx, query, args...) + return err +} + +// func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql.DB { +func initDBConnection() (*sql.DB, error) { + options := clickhouse.Options{Addr: []string{"localhost:9000"}} + info := struct { + Name string + Version string + }{ + Name: "quesma", + Version: "NEW ODD VERSION", //buildinfo.Version, + } + options.ClientInfo.Products = append(options.ClientInfo.Products, info) + return clickhouse.OpenDB(&options), nil + +} + +func NewClickHouseBackendConnector(endpoint string) *ClickHouseBackendConnector { + return &ClickHouseBackendConnector{ + Endpoint: endpoint, + } +} diff --git a/quesma/backend_connectors/elasticsearch_backend_connector.go b/quesma/backend_connectors/elasticsearch_backend_connector.go new file mode 100644 index 000000000..1b491df6b --- /dev/null +++ b/quesma/backend_connectors/elasticsearch_backend_connector.go @@ -0,0 +1,102 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package backend_connectors + +import ( + "bytes" + "context" + "crypto/tls" + "fmt" + "net/http" + "quesma/elasticsearch" + "quesma/quesma/config" + quesma_api "quesma_v2/core" + "time" +) + +const esRequestTimeout = 5 * time.Second + +type Rows struct { + Hits []map[string]interface{} +} + +// ElasticsearchBackendConnector is just a test impl - +// TODO: THIS IS A TRUE QUESTION MARK WHETHER IT IS GOING TO STAY LIKE THIS +type ElasticsearchBackendConnector struct { + client *http.Client + config config.ElasticsearchConfiguration +} + +func NewElasticsearchBackendConnector(cfg config.ElasticsearchConfiguration) *ElasticsearchBackendConnector { + conn := &ElasticsearchBackendConnector{ + config: cfg, + client: &http.Client{ + Transport: &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + }, + Timeout: esRequestTimeout, + }, + } + return conn +} + +func (e *ElasticsearchBackendConnector) RequestWithHeaders(ctx context.Context, method, endpoint string, body []byte, headers http.Header) (*http.Response, error) { + return e.doRequest(ctx, method, endpoint, body, headers) +} + +func (e *ElasticsearchBackendConnector) doRequest(ctx context.Context, method, endpoint string, body []byte, headers http.Header) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, method, fmt.Sprintf("%s/%s", e.config.Url, endpoint), bytes.NewBuffer(body)) + if err != nil { + return nil, err + } + if req.Header.Get("Content-Type") == "" { + req.Header.Set("Content-Type", "application/json") + } + req = elasticsearch.AddBasicAuthIfNeeded(req, e.config.User, e.config.Password) + for key, values := range headers { + for _, value := range values { + req.Header.Set(key, value) + } + } + return e.client.Do(req) +} + +// HttpBackendConnector is a base interface for sending http requests, for now +type HttpBackendConnector interface { + Send(r *http.Request) *http.Response +} + +func (e *ElasticsearchBackendConnector) Send(r *http.Request) *http.Response { + r.Host = e.config.Url.Host + r.URL.Host = e.config.Url.Host + r.URL.Scheme = e.config.Url.Scheme + r.RequestURI = "" // this is important for the request to be sent correctly to a different host + maybeAuthdReq := elasticsearch.AddBasicAuthIfNeeded(r, e.config.User, e.config.Password) + if resp, err := e.client.Do(maybeAuthdReq); err != nil { + fmt.Printf("Error: %v\n", err) + panic(err) + } else { + return resp + } +} + +func (e *ElasticsearchBackendConnector) GetId() quesma_api.BackendConnectorType { + return quesma_api.ElasticsearchBackend +} + +func (e *ElasticsearchBackendConnector) Open() error { + return nil +} + +func (e *ElasticsearchBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) { + panic("not implemented") +} + +func (e *ElasticsearchBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error { + panic("not implemented") +} + +func (e *ElasticsearchBackendConnector) Close() error { + return nil +} diff --git a/quesma/clickhouse/table_discovery2.go b/quesma/clickhouse/table_discovery2.go new file mode 100644 index 000000000..b20ad2f36 --- /dev/null +++ b/quesma/clickhouse/table_discovery2.go @@ -0,0 +1,496 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package clickhouse + +import ( + "context" + "errors" + "fmt" + "github.com/goccy/go-json" + "quesma/common_table" + "quesma/end_user_errors" + "quesma/logger" + "quesma/persistence" + "quesma/quesma/config" + "quesma/util" + quesma_api "quesma_v2/core" + "strings" + "sync/atomic" + "time" +) + +//type TableDiscovery interface { +// ReloadTableDefinitions() +// TableDefinitions() *TableMap +// TableDefinitionsFetchError() error +// +// LastAccessTime() time.Time +// LastReloadTime() time.Time +// ForceReloadCh() <-chan chan<- struct{} +// AutodiscoveryEnabled() bool +//} + +type tableDiscovery2 struct { + cfg *config.QuesmaConfiguration + dbConnPool quesma_api.BackendConnector + tableDefinitions *atomic.Pointer[TableMap] + tableDefinitionsAccessUnixSec atomic.Int64 + tableDefinitionsLastReloadUnixSec atomic.Int64 + forceReloadCh chan chan<- struct{} + ReloadTablesError error + virtualTableStorage persistence.JSONDatabase +} + +func NewTableDiscovery2(cfg *config.QuesmaConfiguration, dbConn quesma_api.BackendConnector, virtualTablesDB persistence.JSONDatabase) TableDiscovery { + var tableDefinitions = atomic.Pointer[TableMap]{} + tableDefinitions.Store(NewTableMap()) + result := &tableDiscovery2{ + cfg: cfg, + dbConnPool: dbConn, + tableDefinitions: &tableDefinitions, + forceReloadCh: make(chan chan<- struct{}), + virtualTableStorage: virtualTablesDB, + } + result.tableDefinitionsLastReloadUnixSec.Store(time.Now().Unix()) + return result +} + +//type TableDiscoveryTableProviderAdapter struct { +// TableDiscovery +//} + +//func (t TableDiscoveryTableProviderAdapter) TableDefinitions() map[string]schema.Table { +// tableMap := t.TableDiscovery.TableDefinitions() +// tables := make(map[string]schema.Table) +// tableMap.Range(func(tableName string, value *Table) bool { +// table := schema.Table{Columns: make(map[string]schema.Column)} +// for _, column := range value.Cols { +// table.Columns[column.Name] = schema.Column{ +// Name: column.Name, +// Type: column.Type.String(), +// Comment: column.Comment, +// } +// } +// table.DatabaseName = value.DatabaseName +// tables[tableName] = table +// return true +// }) +// return tables +//} + +func NewTableDiscovery2With(cfg *config.QuesmaConfiguration, dbConnPool quesma_api.BackendConnector, tables TableMap) TableDiscovery { + var tableDefinitions = atomic.Pointer[TableMap]{} + tableDefinitions.Store(&tables) + result := &tableDiscovery2{ + cfg: cfg, + dbConnPool: dbConnPool, + tableDefinitions: &tableDefinitions, + forceReloadCh: make(chan chan<- struct{}), + } + result.tableDefinitionsLastReloadUnixSec.Store(time.Now().Unix()) + return result +} + +func (td *tableDiscovery2) TableDefinitionsFetchError() error { + return td.ReloadTablesError +} + +func (td *tableDiscovery2) AutodiscoveryEnabled() bool { + return td.cfg.IndexAutodiscoveryEnabled() +} + +func (td *tableDiscovery2) LastAccessTime() time.Time { + timeMs := td.tableDefinitionsAccessUnixSec.Load() + return time.Unix(timeMs, 0) +} + +func (td *tableDiscovery2) LastReloadTime() time.Time { + timeMs := td.tableDefinitionsLastReloadUnixSec.Load() + return time.Unix(timeMs, 0) +} + +func (td *tableDiscovery2) ForceReloadCh() <-chan chan<- struct{} { + return td.forceReloadCh +} + +func (td *tableDiscovery2) ReloadTableDefinitions() { + td.tableDefinitionsLastReloadUnixSec.Store(time.Now().Unix()) + logger.Debug().Msg("reloading tables definitions") + var configuredTables map[string]discoveredTable + databaseName := "default" + if td.cfg.ClickHouse.Database != "" { + databaseName = td.cfg.ClickHouse.Database + } + // TODO here we should read table definition from the elastic as well. + if tables, err := td.readTables(databaseName); err != nil { + var endUserError *end_user_errors.EndUserError + if errors.As(err, &endUserError) { + logger.ErrorWithCtxAndReason(context.Background(), endUserError.Reason()).Msgf("could not describe tables: %v", err) + } else { + logger.Error().Msgf("could not describe tables: %v", err) + } + td.ReloadTablesError = err + td.tableDefinitions.Store(NewTableMap()) + td.tableDefinitionsLastReloadUnixSec.Store(time.Now().Unix()) + return + } else { + if td.AutodiscoveryEnabled() { + configuredTables = td.autoConfigureTables(tables, databaseName) + } else { + configuredTables = td.configureTables(tables, databaseName) + } + } + configuredTables = td.readVirtualTables(configuredTables) + + td.ReloadTablesError = nil + td.populateTableDefinitions(configuredTables, databaseName, td.cfg) +} + +func (td *tableDiscovery2) readVirtualTables(configuredTables map[string]discoveredTable) map[string]discoveredTable { + quesmaCommonTable, ok := configuredTables[common_table.TableName] + if !ok { + logger.Warn().Msg("common table not found") + return configuredTables + } + + virtualTables, err := td.virtualTableStorage.List() + if err != nil { + logger.Error().Msgf("could not list virtual tables: %v", err) + return configuredTables + } + + for _, virtualTable := range virtualTables { + data, ok, err := td.virtualTableStorage.Get(virtualTable) + if err != nil { + logger.Error().Msgf("could not read virtual table %s: %v", virtualTable, err) + continue + } + if !ok { + logger.Warn().Msgf("virtual table %s not found", virtualTable) + continue + } + + var readVirtualTable common_table.VirtualTable + err = json.Unmarshal([]byte(data), &readVirtualTable) + if err != nil { + logger.Error().Msgf("could not unmarshal virtual table %s: %v", virtualTable, err) + } + + if readVirtualTable.Version != common_table.VirtualTableStructVersion { + // migration is not supported yet + // we simply skip the table + logger.Warn().Msgf("skipping virtual table %s, version mismatch, actual '%s', expecting '%s'", virtualTable, readVirtualTable.Version, common_table.VirtualTableStructVersion) + continue + } + + discoTable := discoveredTable{ + name: virtualTable, + columnTypes: make(map[string]columnMetadata), + } + + for _, col := range readVirtualTable.Columns { + + // here we construct virtual table columns based on common table columns + commonTableColumn, ok := quesmaCommonTable.columnTypes[col.Name] + + if ok { + discoTable.columnTypes[col.Name] = columnMetadata{colType: commonTableColumn.colType, comment: commonTableColumn.comment} + } else { + logger.Warn().Msgf("column %s not found in common table but exists in virtual table %s", col.Name, virtualTable) + } + } + + discoTable.comment = "Virtual table. Version: " + readVirtualTable.StoredAt + discoTable.createTableQuery = "n/a" + discoTable.config = config.IndexConfiguration{} + discoTable.virtualTable = true + + configuredTables[virtualTable] = discoTable + } + return configuredTables +} + +// configureTables confronts the tables discovered in the database with the configuration provided by the user, returning final list of tables managed by Quesma +func (td *tableDiscovery2) configureTables(tables map[string]map[string]columnMetadata, databaseName string) (configuredTables map[string]discoveredTable) { + configuredTables = make(map[string]discoveredTable) + var explicitlyDisabledTables, notConfiguredTables []string + for table, columns := range tables { + + // single logs table is our internal table, user shouldn't configure it at all + // and we should always include it in the list of tables managed by Quesma + isCommonTable := table == common_table.TableName + + if indexConfig, found := td.cfg.IndexConfig[table]; found || isCommonTable { + + if isCommonTable { + indexConfig = config.IndexConfiguration{} + } + + if !isCommonTable && !indexConfig.IsClickhouseQueryEnabled() && !indexConfig.IsClickhouseIngestEnabled() { + explicitlyDisabledTables = append(explicitlyDisabledTables, table) + } else { + comment := td.tableComment(databaseName, table) + createTableQuery := td.createTableQuery(databaseName, table) + // we assume here that @timestamp field is always present in the table, or it's explicitly configured + configuredTables[table] = discoveredTable{table, databaseName, columns, indexConfig, comment, createTableQuery, "", false} + } + } else { + notConfiguredTables = append(notConfiguredTables, table) + } + } + logger.Info().Msgf( + "Table discovery results: configured=[%s], found but not configured=[%s], explicitly disabled=[%s]", + strings.Join(util.MapKeys(configuredTables), ","), + strings.Join(notConfiguredTables, ","), + strings.Join(explicitlyDisabledTables, ","), + ) + return +} + +// autoConfigureTables takes the list of discovered tables and automatically configures them, returning the final list of tables managed by Quesma +func (td *tableDiscovery2) autoConfigureTables(tables map[string]map[string]columnMetadata, databaseName string) (configuredTables map[string]discoveredTable) { + configuredTables = make(map[string]discoveredTable) + var autoDiscoResults strings.Builder + logger.Info().Msg("Index configuration empty, running table auto-discovery") + for table, columns := range tables { + comment := td.tableComment(databaseName, table) + createTableQuery := td.createTableQuery(databaseName, table) + var maybeTimestampField string + if td.cfg.Hydrolix.IsNonEmpty() { + maybeTimestampField = td.tableTimestampField(databaseName, table, Hydrolix) + } else { + maybeTimestampField = td.tableTimestampField(databaseName, table, ClickHouse) + } + const isVirtualTable = false + configuredTables[table] = discoveredTable{table, databaseName, columns, config.IndexConfiguration{}, comment, createTableQuery, maybeTimestampField, isVirtualTable} + + } + for tableName, table := range configuredTables { + autoDiscoResults.WriteString(fmt.Sprintf("{table: %s, timestampField: %s}, ", tableName, table.timestampFieldName)) + } + logger.Info().Msgf("Table auto-discovery results -> %d tables found: [%s]", len(configuredTables), strings.TrimSuffix(autoDiscoResults.String(), ", ")) + return +} + +func (td *tableDiscovery2) populateTableDefinitions(configuredTables map[string]discoveredTable, databaseName string, cfg *config.QuesmaConfiguration) { + tableMap := NewTableMap() + for tableName, resTable := range configuredTables { + var columnsMap = make(map[string]*Column) + partiallyResolved := false + for col, columnMeta := range resTable.columnTypes { + if resTable.config.SchemaOverrides != nil { + if schemaOverride, found := resTable.config.SchemaOverrides.Fields[config.FieldName(col)]; found && schemaOverride.Ignored { + logger.Warn().Msgf("table %s, column %s is ignored", tableName, col) + continue + } + } + if col != AttributesValuesColumn && col != AttributesMetadataColumn { + column := resolveColumn(col, columnMeta.colType) + if column != nil { + column.Comment = columnMeta.comment + columnsMap[col] = column + } else { + logger.Warn().Msgf("column '%s.%s' type: '%s' not resolved. table will be skipped", tableName, col, columnMeta.colType) + partiallyResolved = true + } + } + } + + var timestampFieldName *string + if resTable.timestampFieldName != "" { + timestampFieldName = &resTable.timestampFieldName + } + + if !partiallyResolved { + table := Table{ + Created: true, + Name: tableName, + Comment: resTable.comment, + DatabaseName: databaseName, + Cols: columnsMap, + Config: &ChTableConfig{ + Attributes: []Attribute{}, + CastUnsupportedAttrValueTypesToString: true, + PreferCastingToOthers: true, + }, + CreateTableQuery: resTable.createTableQuery, + DiscoveredTimestampFieldName: timestampFieldName, + VirtualTable: resTable.virtualTable, + } + if containsAttributes(resTable.columnTypes) { + table.Config.Attributes = []Attribute{NewDefaultStringAttribute()} + } + + table.ApplyIndexConfig(cfg) + tableMap.Store(tableName, &table) + + logger.Debug().Msgf("schema for table [%s] loaded", tableName) + } else { + logger.Warn().Msgf("table %s not fully resolved, skipping", tableName) + } + } + + existing := td.tableDefinitions.Load() + existing.Range(func(key string, table *Table) bool { + if table.VirtualTable { + return true + } + if !tableMap.Has(key) { + + logger.Info().Msgf("table %s is no longer found in the database, ignoring", key) + } + return true + }) + discoveredTables := make([]string, 0) + tableMap.Range(func(key string, _ *Table) bool { + if !existing.Has(key) { + discoveredTables = append(discoveredTables, key) + } + return true + }) + if len(discoveredTables) > 0 { + logger.Info().Msgf("discovered new tables: %s", discoveredTables) + } + td.tableDefinitions.Store(tableMap) +} + +func (td *tableDiscovery2) TableDefinitions() *TableMap { + td.tableDefinitionsAccessUnixSec.Store(time.Now().Unix()) + lastReloadUnixSec := td.tableDefinitionsLastReloadUnixSec.Load() + lastReload := time.Unix(lastReloadUnixSec, 0) + if time.Since(lastReload) > 15*time.Minute { // maybe configure + logger.Info().Msg("Table definitions are stale for 15 minutes, forcing reload") + doneCh := make(chan struct{}, 1) + td.forceReloadCh <- doneCh + <-doneCh + } + return td.tableDefinitions.Load() +} + +func (td *tableDiscovery2) readTables(database string) (map[string]map[string]columnMetadata, error) { + + logger.Debug().Msgf("describing tables: %s", database) + + if td.dbConnPool == nil { + return map[string]map[string]columnMetadata{}, fmt.Errorf("database connection pool is nil, cannot describe tables") + } + + rows, err := td.dbConnPool.Query(context.Background(), "SELECT table, name, type, comment FROM system.columns WHERE database = ?", database) + if err != nil { + err = end_user_errors.GuessClickhouseErrorType(err).InternalDetails("reading list of columns from system.columns") + return map[string]map[string]columnMetadata{}, err + } + defer rows.Close() + columnsPerTable := make(map[string]map[string]columnMetadata) + for rows.Next() { + var table, colName, colType, comment string + if err := rows.Scan(&table, &colName, &colType, &comment); err != nil { + return map[string]map[string]columnMetadata{}, err + } + if _, ok := columnsPerTable[table]; !ok { + columnsPerTable[table] = make(map[string]columnMetadata) + } + columnsPerTable[table][colName] = columnMetadata{colType: colType, comment: comment} + } + + return columnsPerTable, nil +} + +func (td *tableDiscovery2) tableTimestampField(database, table string, dbKind DbKind) (primaryKey string) { + switch dbKind { + case Hydrolix: + return td.getTimestampFieldForHydrolix(database, table) + case ClickHouse: + return td.getTimestampFieldForClickHouse(database, table) + } + return +} + +func (td *tableDiscovery2) getTimestampFieldForHydrolix(database, table string) (timestampField string) { + // In Hydrolix, there's always only one column in a table set as a primary timestamp + // Ref: https://docs.hydrolix.io/docs/transforms-and-write-schema#primary-timestamp + + rows, err := td.dbConnPool.Query(context.Background(), "SELECT primary_key FROM system.tables WHERE database = ? and table = ?", database, table) + if err != nil { + logger.Debug().Msgf("failed fetching primary key for table %s: %v", table, err) + } + if err2 := rows.Scan(×tampField); err2 != nil { + logger.Debug().Msgf("failed fetching primary key for table %s: %v", table, err2) + } + return timestampField +} + +func (td *tableDiscovery2) getTimestampFieldForClickHouse(database, table string) (timestampField string) { + // In ClickHouse, there's no concept of a primary timestamp field, primary keys are often composite, + // hence we have to use following heuristic to determine the timestamp field (also just picking the first column if there are multiple) + rows, err := td.dbConnPool.Query(context.Background(), "SELECT name FROM system.columns WHERE database = ? AND table = ? AND is_in_primary_key = 1 AND type iLIKE 'DateTime%'", database, table) + if err != nil { + logger.Debug().Msgf("failed fetching primary key for table %s: %v", table, err) + } + if err2 := rows.Scan(×tampField); err2 != nil { + logger.Debug().Msgf("failed fetching primary key for table %s: %v", table, err2) + } + return timestampField +} + +func (td *tableDiscovery2) tableComment(database, table string) (comment string) { + + rows, err := td.dbConnPool.Query(context.Background(), "SELECT comment FROM system.tables WHERE database = ? and table = ?", database, table) + if err != nil { + logger.Error().Msgf("could not get table comment: %v", err) + } + if err2 := rows.Scan(&comment); err2 != nil { + logger.Error().Msgf("could not get table comment: %v", err2) + } + return comment +} + +func (td *tableDiscovery2) createTableQuery(database, table string) (ddl string) { + rows, err := td.dbConnPool.Query(context.Background(), "SELECT create_table_query FROM system.tables WHERE database = ? and table = ? ", database, table) + if err != nil { + logger.Error().Msgf("could not get table comment: %v", err) + } + if err2 := rows.Scan(&ddl); err2 != nil { + logger.Error().Msgf("could not get table comment: %v", err2) + } + return ddl +} + +//type EmptyTableDiscovery struct { +// TableMap *TableMap +// Err error +// Autodiscovery bool +//} +// +//func NewEmptyTableDiscovery() *EmptyTableDiscovery { +// return &EmptyTableDiscovery{ +// TableMap: NewTableMap(), +// } +//} +// +//func (td *EmptyTableDiscovery) ReloadTableDefinitions() { +//} +// +//func (td *EmptyTableDiscovery) TableDefinitions() *TableMap { +// return td.TableMap +//} +// +//func (td *EmptyTableDiscovery) TableDefinitionsFetchError() error { +// return td.Err +//} +// +//func (td *EmptyTableDiscovery) LastAccessTime() time.Time { +// return time.Now() +//} +// +//func (td *EmptyTableDiscovery) LastReloadTime() time.Time { +// return time.Now() +//} +// +//func (td *EmptyTableDiscovery) ForceReloadCh() <-chan chan<- struct{} { +// return make(chan chan<- struct{}) +//} +// +//func (td *EmptyTableDiscovery) AutodiscoveryEnabled() bool { +// return td.Autodiscovery +//} diff --git a/quesma/frontend_connectors/elasticsearch_ingest.go b/quesma/frontend_connectors/elasticsearch_ingest.go new file mode 100644 index 000000000..66bcd6d50 --- /dev/null +++ b/quesma/frontend_connectors/elasticsearch_ingest.go @@ -0,0 +1,101 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package frontend_connectors + +import ( + "fmt" + "github.com/ucarion/urlpath" + "net/http" + quesma_api "quesma_v2/core" +) + +type ElasticsearchIngestFrontendConnector struct { + BasicHTTPFrontendConnector +} + +const ( + IndexDocPath = "/:index/_doc" + IndexBulkPath = "/:index/_bulk" + + // IngestAction and below are metadata items passed to processor. + IngestAction = "ingest_action" + DocIndexAction = "_doc" + BulkIndexAction = "_bulk" + IngestTargetKey = "ingest_target" + // TODO: this actually should not be a dependency on processor +) + +func NewElasticsearchIngestFrontendConnector(endpoint string) *ElasticsearchIngestFrontendConnector { + fc := &ElasticsearchIngestFrontendConnector{ + BasicHTTPFrontendConnector: BasicHTTPFrontendConnector{ + endpoint: endpoint, + }, + } + router := NewHTTPRouter() + router.AddRoute(IndexBulkPath, bulk) + router.AddRoute(IndexDocPath, doc) + fc.AddRouter(router) + return fc +} + +func getMatchingHandler(requestPath string, handlers map[string]quesma_api.HandlersPipe) *quesma_api.HandlersPipe { + for path, handler := range handlers { + urlPath := urlpath.New(path) + _, matches := urlPath.Match(requestPath) + if matches { + return &handler + } + } + return nil +} + +func (h *ElasticsearchIngestFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) { + handlers := h.router.GetHandlers() + handlerWrapper := getMatchingHandler(req.URL.Path, handlers) + if handlerWrapper == nil { + h.router.Multiplexer().ServeHTTP(w, req) + return + } + dispatcher := &quesma_api.Dispatcher{} + + // for the response out we are Elasticsearch-7 compliant + w.Header().Set("Content-Type", "application/json") + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + metadata, message, _ := handlerWrapper.Handler(req) + req.Header.Set("x-przemek", "blah") + _, message = dispatcher.Dispatch(handlerWrapper.Processors, metadata, message) + _, err := w.Write(message.([]byte)) + if err != nil { + fmt.Printf("Error writing response: %s\n", err) + } + }).ServeHTTP(w, req) +} + +func bulk(request *http.Request) (map[string]interface{}, any, error) { + //body, err := ReadRequestBody(request) + //if err != nil { + // return nil, nil, err + //} + metadata := quesma_api.MakeNewMetadata() + metadata[IngestAction] = BulkIndexAction + metadata[IngestTargetKey] = getIndexFromRequest(request) + return metadata, request, nil +} + +func doc(request *http.Request) (map[string]interface{}, any, error) { + //body, err := ReadRequestBody(request) + //if err != nil { + // return nil, nil, err + //} + metadata := quesma_api.MakeNewMetadata() + metadata[IngestAction] = DocIndexAction + metadata[IngestTargetKey] = getIndexFromRequest(request) + return metadata, request, nil +} + +func getIndexFromRequest(request *http.Request) string { + expectedUrl := urlpath.New("/:index/*") + match, _ := expectedUrl.Match(request.URL.Path) // safe to call at this level + return match.Params["index"] +} diff --git a/quesma/go.mod b/quesma/go.mod index 706e6a0ce..b7fc51a5b 100644 --- a/quesma/go.mod +++ b/quesma/go.mod @@ -10,16 +10,12 @@ require ( github.com/DataDog/go-sqllexer v0.0.17 github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df github.com/coreos/go-semver v0.3.1 - github.com/go-sql-driver/mysql v1.8.1 github.com/goccy/go-json v0.10.3 - github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 github.com/gorilla/securecookie v1.1.2 github.com/gorilla/sessions v1.4.0 github.com/hashicorp/go-multierror v1.1.1 - github.com/jackc/pgx/v4 v4.18.3 - github.com/jackc/pgx/v5 v5.7.1 github.com/k0kubun/pp v3.0.1+incompatible github.com/knadh/koanf/parsers/json v0.1.0 github.com/knadh/koanf/parsers/yaml v0.1.0 @@ -38,7 +34,9 @@ require ( require ( filippo.io/edwards25519 v1.1.0 // indirect + github.com/go-sql-driver/mysql v1.8.1 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect github.com/jackc/chunkreader/v2 v2.0.1 // indirect github.com/jackc/pgconn v1.14.3 // indirect @@ -47,6 +45,8 @@ require ( github.com/jackc/pgproto3/v2 v2.3.3 // indirect github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect github.com/jackc/pgtype v1.14.0 // indirect + github.com/jackc/pgx/v4 v4.18.3 // indirect + github.com/jackc/pgx/v5 v5.7.1 // indirect github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/kr/text v0.2.0 // indirect diff --git a/quesma/ingest/processor2.go b/quesma/ingest/processor2.go new file mode 100644 index 000000000..fdf1d0e8b --- /dev/null +++ b/quesma/ingest/processor2.go @@ -0,0 +1,689 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package ingest + +import ( + "context" + "errors" + "fmt" + "github.com/ClickHouse/clickhouse-go/v2" + "github.com/goccy/go-json" + chLib "quesma/clickhouse" + "quesma/comment_metadata" + "quesma/common_table" + "quesma/end_user_errors" + "quesma/jsonprocessor" + "quesma/logger" + "quesma/model" + "quesma/persistence" + "quesma/quesma/config" + "quesma/quesma/recovery" + "quesma/quesma/types" + "quesma/schema" + "quesma/stats" + "quesma/table_resolver" + "quesma/telemetry" + "quesma/util" + quesma_api "quesma_v2/core" + "slices" + "sort" + "strings" + "sync" + "sync/atomic" + "time" +) + +type ( + //IngestProcessor2 is essentially an Ingest Processor we know and like but `chDb` is `quesma_api.BackendConnector` not `*sql.DB` + IngestProcessor2 struct { + ctx context.Context + cancel context.CancelFunc + chDb quesma_api.BackendConnector + tableDiscovery chLib.TableDiscovery + cfg *config.QuesmaConfiguration + phoneHomeAgent telemetry.PhoneHomeAgent + schemaRegistry schema.Registry + ingestCounter int64 + ingestFieldStatistics IngestFieldStatistics + ingestFieldStatisticsLock sync.Mutex + virtualTableStorage persistence.JSONDatabase + tableResolver table_resolver.TableResolver + } +) + +func (ip *IngestProcessor2) Start() { + if err := ip.Ping(); err != nil { + endUserError := end_user_errors.GuessClickhouseErrorType(err) + logger.ErrorWithCtxAndReason(ip.ctx, endUserError.Reason()).Msgf("could not connect to clickhouse. error: %v", endUserError) + } + + ip.tableDiscovery.ReloadTableDefinitions() + + logger.Info().Msgf("schemas loaded: %s", ip.tableDiscovery.TableDefinitions().Keys()) + const reloadInterval = 1 * time.Minute + forceReloadCh := ip.tableDiscovery.ForceReloadCh() + + go func() { + recovery.LogPanic() + for { + select { + case <-ip.ctx.Done(): + logger.Debug().Msg("closing log manager") + return + case doneCh := <-forceReloadCh: + // this prevents flood of reloads, after a long pause + if time.Since(ip.tableDiscovery.LastReloadTime()) > reloadInterval { + ip.tableDiscovery.ReloadTableDefinitions() + } + doneCh <- struct{}{} + case <-time.After(reloadInterval): + // only reload if we actually use Quesma, make it double time to prevent edge case + // otherwise it prevent ClickHouse Cloud from idle pausing + if time.Since(ip.tableDiscovery.LastAccessTime()) < reloadInterval*2 { + ip.tableDiscovery.ReloadTableDefinitions() + } + } + } + }() +} + +func (ip *IngestProcessor2) Stop() { + ip.cancel() +} + +func (ip *IngestProcessor2) Close() { + _ = ip.chDb.Close() +} + +//func (ip *IngestProcessor2) Count(ctx context.Context, table string) (int64, error) { +// var count int64 +// err := ip.chDb.QueryRowContext(ctx, "SELECT count(*) FROM ?", table).Scan(&count) +// if err != nil { +// return 0, fmt.Errorf("clickhouse: query row failed: %v", err) +// } +// return count, nil +//} + +func (ip *IngestProcessor2) createTableObjectAndAttributes(ctx context.Context, query string, config *chLib.ChTableConfig, name string, tableDefinitionChangeOnly bool) (string, error) { + table, err := chLib.NewTable(query, config) + if err != nil { + return "", err + } + + // This is a HACK. + // CreateQueryParser assumes that the table name is in the form of "database.table" + // in this case we don't have a database name, so we need to add it + if tableDefinitionChangeOnly { + table.Name = name + table.DatabaseName = "" + table.Comment = "Definition only. This is not a real table." + table.VirtualTable = true + } + + // if exists only then createTable + noSuchTable := ip.AddTableIfDoesntExist(table) + if !noSuchTable { + return "", fmt.Errorf("table %s already exists", table.Name) + } + + return addOurFieldsToCreateTableQuery(query, config, table), nil +} + +// This function generates ALTER TABLE commands for adding new columns +// to the table based on the attributesMap and the table name +// AttributesMap contains the attributes that are not part of the schema +// Function has side effects, it modifies the table.Cols map +// and removes the attributes that were promoted to columns +func (ip *IngestProcessor2) generateNewColumns( + attrsMap map[string][]interface{}, + table *chLib.Table, + alteredAttributesIndexes []int, + encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) []string { + var alterCmd []string + attrKeys := getAttributesByArrayName(chLib.DeprecatedAttributesKeyColumn, attrsMap) + attrTypes := getAttributesByArrayName(chLib.DeprecatedAttributesValueType, attrsMap) + var deleteIndexes []int + + reverseMap := reverseFieldEncoding(encodings, table.Name) + + // HACK Alert: + // We must avoid altering the table.Cols map and reading at the same time. + // This should be protected by a lock or a copy of the table should be used. + // + newColumns := make(map[string]*chLib.Column) + for k, v := range table.Cols { + newColumns[k] = v + } + + for i := range alteredAttributesIndexes { + + columnType := "" + modifiers := "" + // Array and Map are not Nullable + if strings.Contains(attrTypes[i], "Array") || strings.Contains(attrTypes[i], "Map") { + columnType = attrTypes[i] + } else { + modifiers = "Nullable" + columnType = fmt.Sprintf("Nullable(%s)", attrTypes[i]) + } + + propertyName := attrKeys[i] + field, ok := reverseMap[schema.EncodedFieldName(attrKeys[i])] + if ok { + propertyName = field.FieldName + } + + metadata := comment_metadata.NewCommentMetadata() + metadata.Values[comment_metadata.ElasticFieldName] = propertyName + comment := metadata.Marshall() + + alterTable := fmt.Sprintf("ALTER TABLE \"%s\" ADD COLUMN IF NOT EXISTS \"%s\" %s", table.Name, attrKeys[i], columnType) + newColumns[attrKeys[i]] = &chLib.Column{Name: attrKeys[i], Type: chLib.NewBaseType(attrTypes[i]), Modifiers: modifiers, Comment: comment} + alterCmd = append(alterCmd, alterTable) + + alterColumn := fmt.Sprintf("ALTER TABLE \"%s\" COMMENT COLUMN \"%s\" '%s'", table.Name, attrKeys[i], comment) + alterCmd = append(alterCmd, alterColumn) + + deleteIndexes = append(deleteIndexes, i) + } + + table.Cols = newColumns + + if table.VirtualTable { + err := ip.storeVirtualTable(table) + if err != nil { + logger.Error().Msgf("error storing virtual table: %v", err) + } + } + + for i := len(deleteIndexes) - 1; i >= 0; i-- { + attrsMap[chLib.DeprecatedAttributesKeyColumn] = append(attrsMap[chLib.DeprecatedAttributesKeyColumn][:deleteIndexes[i]], attrsMap[chLib.DeprecatedAttributesKeyColumn][deleteIndexes[i]+1:]...) + attrsMap[chLib.DeprecatedAttributesValueType] = append(attrsMap[chLib.DeprecatedAttributesValueType][:deleteIndexes[i]], attrsMap[chLib.DeprecatedAttributesValueType][deleteIndexes[i]+1:]...) + attrsMap[chLib.DeprecatedAttributesValueColumn] = append(attrsMap[chLib.DeprecatedAttributesValueColumn][:deleteIndexes[i]], attrsMap[chLib.DeprecatedAttributesValueColumn][deleteIndexes[i]+1:]...) + } + return alterCmd +} + +// This function implements heuristic for deciding if we should add new columns +func (ip *IngestProcessor2) shouldAlterColumns(table *chLib.Table, attrsMap map[string][]interface{}) (bool, []int) { + attrKeys := getAttributesByArrayName(chLib.DeprecatedAttributesKeyColumn, attrsMap) + alterColumnIndexes := make([]int, 0) + + // this is special case for common table storage + // we do always add columns for common table storage + if table.Name == common_table.TableName { + if len(table.Cols) > alterColumnUpperLimit { + logger.Warn().Msgf("Common table has more than %d columns (alwaysAddColumnLimit)", alterColumnUpperLimit) + } + } + + if len(table.Cols) < alwaysAddColumnLimit || table.Name == common_table.TableName { + // We promote all non-schema fields to columns + // therefore we need to add all attrKeys indexes to alterColumnIndexes + for i := 0; i < len(attrKeys); i++ { + alterColumnIndexes = append(alterColumnIndexes, i) + } + return true, alterColumnIndexes + } + + if len(table.Cols) > alterColumnUpperLimit { + return false, nil + } + ip.ingestFieldStatisticsLock.Lock() + if ip.ingestFieldStatistics == nil { + ip.ingestFieldStatistics = make(IngestFieldStatistics) + } + ip.ingestFieldStatisticsLock.Unlock() + for i := 0; i < len(attrKeys); i++ { + ip.ingestFieldStatisticsLock.Lock() + ip.ingestFieldStatistics[IngestFieldBucketKey{indexName: table.Name, field: attrKeys[i]}]++ + counter := atomic.LoadInt64(&ip.ingestCounter) + fieldCounter := ip.ingestFieldStatistics[IngestFieldBucketKey{indexName: table.Name, field: attrKeys[i]}] + // reset statistics every alwaysAddColumnLimit + // for now alwaysAddColumnLimit is used in two contexts + // for defining column limit and for resetting statistics + if counter >= alwaysAddColumnLimit { + atomic.StoreInt64(&ip.ingestCounter, 0) + ip.ingestFieldStatistics = make(IngestFieldStatistics) + } + ip.ingestFieldStatisticsLock.Unlock() + // if field is present more or equal fieldFrequency + // during each alwaysAddColumnLimit iteration + // promote it to column + if fieldCounter >= fieldFrequency { + alterColumnIndexes = append(alterColumnIndexes, i) + } + } + if len(alterColumnIndexes) > 0 { + return true, alterColumnIndexes + } + return false, nil +} + +func (ip *IngestProcessor2) GenerateIngestContent(table *chLib.Table, + data types.JSON, + inValidJson types.JSON, + config *chLib.ChTableConfig, + encodings map[schema.FieldEncodingKey]schema.EncodedFieldName) ([]string, types.JSON, []NonSchemaField, error) { + + if len(config.Attributes) == 0 { + return nil, data, nil, nil + } + + mDiff := DifferenceMap(data, table) // TODO change to DifferenceMap(m, t) + + if len(mDiff) == 0 && len(inValidJson) == 0 { // no need to modify, just insert 'js' + return nil, data, nil, nil + } + + // check attributes precondition + if len(config.Attributes) <= 0 { + return nil, nil, nil, fmt.Errorf("no attributes config, but received non-schema fields: %s", mDiff) + } + attrsMap, _ := BuildAttrsMap(mDiff, config) + + // generateNewColumns is called on original attributes map + // before adding invalid fields to it + // otherwise it would contain invalid fields e.g. with wrong types + // we only want to add fields that are not part of the schema e.g we don't + // have columns for them + var alterCmd []string + atomic.AddInt64(&ip.ingestCounter, 1) + if ok, alteredAttributesIndexes := ip.shouldAlterColumns(table, attrsMap); ok { + alterCmd = ip.generateNewColumns(attrsMap, table, alteredAttributesIndexes, encodings) + } + // If there are some invalid fields, we need to add them to the attributes map + // to not lose them and be able to store them later by + // generating correct update query + // addInvalidJsonFieldsToAttributes returns a new map with invalid fields added + // this map is then used to generate non-schema fields string + attrsMapWithInvalidFields := addInvalidJsonFieldsToAttributes(attrsMap, inValidJson) + nonSchemaFields, err := generateNonSchemaFields(attrsMapWithInvalidFields) + + if err != nil { + return nil, nil, nil, err + } + + onlySchemaFields := RemoveNonSchemaFields(data, table) + + return alterCmd, onlySchemaFields, nonSchemaFields, nil +} + +func (ip *IngestProcessor2) processInsertQuery(ctx context.Context, + tableName string, + jsonData []types.JSON, transformer jsonprocessor.IngestTransformer, + tableFormatter TableColumNameFormatter, tableDefinitionChangeOnly bool) ([]string, error) { + // this is pre ingest transformer + // here we transform the data before it's structure evaluation and insertion + // + preIngestTransformer := &jsonprocessor.RewriteArrayOfObject{} + var processed []types.JSON + for _, jsonValue := range jsonData { + result, err := preIngestTransformer.Transform(jsonValue) + if err != nil { + return nil, fmt.Errorf("error while rewriting json: %v", err) + } + processed = append(processed, result) + } + jsonData = processed + + // we are doing two passes, e.g. calling transformFieldName twice + // first time we populate encodings map + // second time we do field encoding + // This is because we need to know all field encodings before we start + // transforming fields, otherwise we would mutate json and populating encodings + // which would introduce side effects + // This can be done in one pass, but it would be more complex + // and requires some rewrite of json flattening + encodings := populateFieldEncodings(jsonData, tableName) + + if ip.schemaRegistry != nil { + ip.schemaRegistry.UpdateFieldEncodings(encodings) + } + // Do field encoding here, once for all jsons + // This is in-place operation + for _, jsonValue := range jsonData { + transformFieldName(jsonValue, func(field string) string { + return util.FieldToColumnEncoder(field) + }) + } + + var transformedJsons []types.JSON + for _, jsonValue := range jsonData { + transformedJson, err := transformer.Transform(jsonValue) + if err != nil { + return nil, fmt.Errorf("error while transforming json: %v", err) + } + transformedJsons = append(transformedJsons, transformedJson) + } + + table := ip.FindTable(tableName) + var tableConfig *chLib.ChTableConfig + var createTableCmd string + if table == nil { + tableConfig = NewOnlySchemaFieldsCHConfig() + columnsFromJson := JsonToColumns(transformedJsons[0], tableConfig) + + fieldOrigins := make(map[schema.FieldName]schema.FieldSource) + + for _, column := range columnsFromJson { + fieldOrigins[schema.FieldName(column.ClickHouseColumnName)] = schema.FieldSourceIngest + } + + ip.schemaRegistry.UpdateFieldsOrigins(schema.TableName(tableName), fieldOrigins) + + // This comes externally from (configuration) + // So we need to convert that separately + columnsFromSchema := SchemaToColumns(findSchemaPointer(ip.schemaRegistry, tableName), tableFormatter, tableName, ip.schemaRegistry.GetFieldEncodings()) + columnsAsString := columnsWithIndexes(columnsToString(columnsFromJson, columnsFromSchema, ip.schemaRegistry.GetFieldEncodings(), tableName), Indexes(transformedJsons[0])) + createTableCmd = createTableQuery(tableName, columnsAsString, tableConfig) + var err error + createTableCmd, err = ip.createTableObjectAndAttributes(ctx, createTableCmd, tableConfig, tableName, tableDefinitionChangeOnly) + if err != nil { + logger.ErrorWithCtx(ctx).Msgf("error createTableObjectAndAttributes, can't create table: %v", err) + return nil, err + } + // Set pointer to table after creating it + table = ip.FindTable(tableName) + } else if !table.Created { + createTableCmd = table.CreateTableString() + } + if table == nil { + return nil, fmt.Errorf("table %s not found", tableName) + } + tableConfig = table.Config + var jsonsReadyForInsertion []string + var alterCmd []string + var validatedJsons []types.JSON + var invalidJsons []types.JSON + validatedJsons, invalidJsons, err := ip.preprocessJsons(ctx, table.Name, transformedJsons) + if err != nil { + return nil, fmt.Errorf("error preprocessJsons: %v", err) + } + for i, preprocessedJson := range validatedJsons { + alter, onlySchemaFields, nonSchemaFields, err := ip.GenerateIngestContent(table, preprocessedJson, + invalidJsons[i], tableConfig, encodings) + + if err != nil { + return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' : %v", table.Name, err) + } + insertJson, err := generateInsertJson(nonSchemaFields, onlySchemaFields) + if err != nil { + return nil, fmt.Errorf("error generatateInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err) + } + alterCmd = append(alterCmd, alter...) + if err != nil { + return nil, fmt.Errorf("error BuildInsertJson, tablename: '%s' json: '%s': %v", table.Name, PrettyJson(insertJson), err) + } + jsonsReadyForInsertion = append(jsonsReadyForInsertion, insertJson) + } + + insertValues := strings.Join(jsonsReadyForInsertion, ", ") + insert := fmt.Sprintf("INSERT INTO \"%s\" FORMAT JSONEachRow %s", table.Name, insertValues) + + return generateSqlStatements(createTableCmd, alterCmd, insert), nil +} + +func (lm *IngestProcessor2) Ingest(ctx context.Context, tableName string, jsonData []types.JSON) error { + + nameFormatter := DefaultColumnNameFormatter() + transformer := jsonprocessor.IngestTransformerFor(tableName, lm.cfg) // here? + return lm.ProcessInsertQuery(ctx, tableName, jsonData, transformer, nameFormatter) +} + +func (lm *IngestProcessor2) ProcessInsertQuery(ctx context.Context, tableName string, + jsonData []types.JSON, transformer jsonprocessor.IngestTransformer, + tableFormatter TableColumNameFormatter) error { + + decision := lm.tableResolver.Resolve(quesma_api.IngestPipeline, tableName) + + if decision.Err != nil { + return decision.Err + } + + if decision.IsEmpty { // TODO + return fmt.Errorf("table %s not found", tableName) + } + + if decision.IsClosed { // TODO + return fmt.Errorf("table %s is closed", tableName) + } + + for _, connectorDecision := range decision.UseConnectors { + + var clickhouseDecision *quesma_api.ConnectorDecisionClickhouse + + var ok bool + if clickhouseDecision, ok = connectorDecision.(*quesma_api.ConnectorDecisionClickhouse); !ok { + continue + } + + if clickhouseDecision.IsCommonTable { // TODO: TABLE_RESOLVER DECIDES WHETHER WE'RE DEALING WITH COMMON TABLE + // we have clone the data, because we want to process it twice + var clonedJsonData []types.JSON + for _, jsonValue := range jsonData { + clonedJsonData = append(clonedJsonData, jsonValue.Clone()) + } + + err := lm.processInsertQueryInternal(ctx, tableName, clonedJsonData, transformer, tableFormatter, true) + if err != nil { + // we ignore an error here, because we want to process the data and don't lose it + logger.ErrorWithCtx(ctx).Msgf("error processing insert query - virtual table schema update: %v", err) + } + + pipeline := jsonprocessor.IngestTransformerPipeline{} + pipeline = append(pipeline, &common_table.IngestAddIndexNameTransformer{IndexName: tableName}) + pipeline = append(pipeline, transformer) + + err = lm.processInsertQueryInternal(ctx, common_table.TableName, jsonData, pipeline, tableFormatter, false) + if err != nil { + return fmt.Errorf("error processing insert query to a common table: %w", err) + } + + } else { + err := lm.processInsertQueryInternal(ctx, clickhouseDecision.ClickhouseTableName, jsonData, transformer, tableFormatter, false) + if err != nil { + return fmt.Errorf("error processing insert query: %w", err) + } + } + + } + return nil +} + +func (ip *IngestProcessor2) processInsertQueryInternal(ctx context.Context, tableName string, + jsonData []types.JSON, transformer jsonprocessor.IngestTransformer, + tableFormatter TableColumNameFormatter, isVirtualTable bool) error { + statements, err := ip.processInsertQuery(ctx, tableName, jsonData, transformer, tableFormatter, isVirtualTable) + if err != nil { + return err + } + + var logVirtualTableDDL bool // maybe this should be a part of the config or sth + + if isVirtualTable && logVirtualTableDDL { + for _, statement := range statements { + if strings.HasPrefix(statement, "ALTER") || strings.HasPrefix(statement, "CREATE") { + logger.InfoWithCtx(ctx).Msgf("VIRTUAL DDL EXECUTION: %s", statement) + } + } + } + + if isVirtualTable { + return nil + } + + // We expect to have date format set to `best_effort` + ctx = clickhouse.Context(ctx, clickhouse.WithSettings(clickhouse.Settings{ + "date_time_input_format": "best_effort", + })) + + return ip.executeStatements(ctx, statements) +} + +// This function executes query with context +// and creates span for it +func (ip *IngestProcessor2) execute(ctx context.Context, query string) error { + //span := ip.phoneHomeAgent.ClickHouseInsertDuration().Begin() + + // We log every DDL query + if ip.cfg.Logging.EnableSQLTracing { + if strings.HasPrefix(query, "ALTER") || strings.HasPrefix(query, "CREATE") { + logger.InfoWithCtx(ctx).Msgf("DDL query execution: %s", query) + } + } + + err := ip.chDb.Exec(ctx, query) + //span.End(err) + return err +} + +func (ip *IngestProcessor2) executeStatements(ctx context.Context, queries []string) error { + for _, q := range queries { + + err := ip.execute(ctx, q) + if err != nil { + logger.ErrorWithCtx(ctx).Msgf("error executing query: %v", err) + return err + } + } + return nil +} + +func (ip *IngestProcessor2) preprocessJsons(ctx context.Context, + tableName string, jsons []types.JSON) ([]types.JSON, []types.JSON, error) { + var validatedJsons []types.JSON + var invalidJsons []types.JSON + for _, jsonValue := range jsons { + // Validate the input JSON + // against the schema + inValidJson, err := ip.validateIngest(tableName, jsonValue) + if err != nil { + return nil, nil, fmt.Errorf("error validation: %v", err) + } + invalidJsons = append(invalidJsons, inValidJson) + stats.GlobalStatistics.UpdateNonSchemaValues(ip.cfg, tableName, + inValidJson, NestedSeparator) + // Remove invalid fields from the input JSON + jsonValue = subtractInputJson(jsonValue, inValidJson) + validatedJsons = append(validatedJsons, jsonValue) + } + return validatedJsons, invalidJsons, nil +} + +func (ip *IngestProcessor2) FindTable(tableName string) (result *chLib.Table) { + tableNamePattern := util.TableNamePatternRegexp(tableName) + ip.tableDiscovery.TableDefinitions(). + Range(func(name string, table *chLib.Table) bool { + if tableNamePattern.MatchString(name) { + result = table + return false + } + return true + }) + + return result +} + +func (ip *IngestProcessor2) storeVirtualTable(table *chLib.Table) error { + + now := time.Now() + + table.Comment = "Virtual table. Version: " + now.Format(time.RFC3339) + + var columnsToStore []string + for _, col := range table.Cols { + // We don't want to store attributes columns in the virtual table + if col.Name == chLib.AttributesValuesColumn || col.Name == chLib.AttributesMetadataColumn { + continue + } + columnsToStore = append(columnsToStore, col.Name) + } + + // We always want to store timestamp in the virtual table + // if it's not already there + if !slices.Contains(columnsToStore, model.TimestampFieldName) { + columnsToStore = append(columnsToStore, model.TimestampFieldName) + } + + sort.Strings(columnsToStore) + + var columns []common_table.VirtualTableColumn + + for _, col := range columnsToStore { + columns = append(columns, common_table.VirtualTableColumn{ + Name: col, + }) + } + + vTable := &common_table.VirtualTable{ + Version: common_table.VirtualTableStructVersion, + StoredAt: now.Format(time.RFC3339), + Columns: columns, + } + + data, err := json.MarshalIndent(vTable, "", " ") + if err != nil { + return err + } + + return ip.virtualTableStorage.Put(table.Name, string(data)) +} + +// Returns if schema wasn't created (so it needs to be, and will be in a moment) +func (ip *IngestProcessor2) AddTableIfDoesntExist(table *chLib.Table) bool { + t := ip.FindTable(table.Name) + if t == nil { + table.Created = true + + table.ApplyIndexConfig(ip.cfg) + + if table.VirtualTable { + err := ip.storeVirtualTable(table) + if err != nil { + logger.Error().Msgf("error storing virtual table: %v", err) + } + } + ip.tableDiscovery.TableDefinitions().Store(table.Name, table) + return true + } + wasntCreated := !t.Created + t.Created = true + return wasntCreated +} + +func (ip *IngestProcessor2) Ping() error { + return ip.chDb.Open() +} + +func NewIngestProcessor2(cfg *config.QuesmaConfiguration, chDb quesma_api.BackendConnector, phoneHomeAgent telemetry.PhoneHomeAgent, loader chLib.TableDiscovery, schemaRegistry schema.Registry, virtualTableStorage persistence.JSONDatabase, tableResolver table_resolver.TableResolver) *IngestProcessor2 { + ctx, cancel := context.WithCancel(context.Background()) + return &IngestProcessor2{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage, tableResolver: tableResolver} +} + +// validateIngest validates the document against the table schema +// and returns the fields that are not valid e.g. have wrong types +// according to the schema +func (ip *IngestProcessor2) validateIngest(tableName string, document types.JSON) (types.JSON, error) { + clickhouseTable := ip.FindTable(tableName) + + if clickhouseTable == nil { + logger.Error().Msgf("Table %s not found", tableName) + return nil, errors.New("table not found:" + tableName) + } + deletedFields := make(types.JSON) + for columnName, column := range clickhouseTable.Cols { + if column == nil { + continue + } + if value, ok := document[columnName]; ok { + if value == nil { + continue + } + for k, v := range validateValueAgainstType(columnName, value, column) { + deletedFields[k] = v + } + } + } + return deletedFields, nil +} diff --git a/quesma/main.go b/quesma/main.go index 5fc7d871f..7115eb890 100644 --- a/quesma/main.go +++ b/quesma/main.go @@ -10,6 +10,7 @@ import ( "os/signal" "quesma/ab_testing" "quesma/ab_testing/sender" + "quesma/backend_connectors" "quesma/buildinfo" "quesma/clickhouse" "quesma/common_table" @@ -21,6 +22,7 @@ import ( "quesma/licensing" "quesma/logger" "quesma/persistence" + "quesma/processors/es_to_ch_ingest" "quesma/quesma" "quesma/quesma/async_search_storage" "quesma/quesma/config" @@ -49,12 +51,57 @@ const EnableConcurrencyProfiling = false // buildIngestOnlyQuesma is for now a helper function to help establishing the way of v2 module api import func buildIngestOnlyQuesma() quesma_api.QuesmaBuilder { var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma() - _ = frontend_connectors.NewBasicHTTPFrontendConnector(":8080") - _ = frontend_connectors.NewHTTPRouter() - quesmaInstance, _ := quesmaBuilder.Build() + ingestFrontendConnector := frontend_connectors.NewElasticsearchIngestFrontendConnector(":8080") + + var ingestPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline() + ingestPipeline.AddFrontendConnector(ingestFrontendConnector) + + ingestProcessor := es_to_ch_ingest.NewElasticsearchToClickHouseIngestProcessor( + config.QuesmaProcessorConfig{ + UseCommonTable: false, + IndexConfig: map[string]config.IndexConfiguration{ + "test_index": { + Name: "test_index", + }, + "test_index_2": { + Name: "test_index_2", + }, + "*": { + IngestTarget: []string{config.ElasticsearchTarget}, + }, + }, + }, + ) + ingestPipeline.AddProcessor(ingestProcessor) + quesmaBuilder.AddPipeline(ingestPipeline) + + clickHouseBackendConnector := backend_connectors.NewClickHouseBackendConnector("clickhouse://localhost:9000") + elasticsearchBackendConnector := backend_connectors.NewElasticsearchBackendConnector( + config.ElasticsearchConfiguration{ + Url: &config.Url{Host: "localhost:9200", Scheme: "https"}, + User: "elastic", + Password: "quesmaquesma", + }) + ingestPipeline.AddBackendConnector(clickHouseBackendConnector) + ingestPipeline.AddBackendConnector(elasticsearchBackendConnector) + + quesmaInstance, err := quesmaBuilder.Build() + if err != nil { + log.Fatalf("error building quesma instance: %v", err) + } return quesmaInstance } +/* Example of how to use the v2 module api in main function +func main2() { + q1 := buildIngestOnlyQuesma() + q1.Start() + stop := make(chan os.Signal, 1) + <-stop + q1.Stop(context.Background()) +} +*/ + func main() { if EnableConcurrencyProfiling { runtime.SetBlockProfileRate(1) diff --git a/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go b/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go new file mode 100644 index 000000000..480c21e24 --- /dev/null +++ b/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go @@ -0,0 +1,164 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package es_to_ch_ingest + +import ( + "bytes" + "fmt" + "github.com/goccy/go-json" + "github.com/rs/zerolog/log" + "io" + "net/http" + "net/url" + "quesma/backend_connectors" + "quesma/clickhouse" + "quesma/common_table" + "quesma/frontend_connectors" + "quesma/ingest" + "quesma/persistence" + "quesma/processors" + "quesma/quesma/config" + "quesma/quesma/types" + "quesma/schema" + "quesma_v2/core" +) + +const ( + IngestAction = "ingest_action" + DocIndexAction = "_doc" + BulkIndexAction = "_bulk" + IngestTargetKey = "ingest_target" +) + +type ElasticsearchToClickHouseIngestProcessor struct { + processors.BaseProcessor + config config.QuesmaProcessorConfig +} + +func NewElasticsearchToClickHouseIngestProcessor(conf config.QuesmaProcessorConfig) *ElasticsearchToClickHouseIngestProcessor { + return &ElasticsearchToClickHouseIngestProcessor{ + BaseProcessor: processors.NewBaseProcessor(), + config: conf, + } +} + +func (p *ElasticsearchToClickHouseIngestProcessor) GetId() string { + return "elasticsearch_to_clickhouse_ingest" +} + +// prepareTemporaryIngestProcessor creates a temporary ingest processor which is a new version of the ingest processor, +// which uses `quesma_api.BackendConnector` instead of `*sql.DB` for the database connection. +func (p *ElasticsearchToClickHouseIngestProcessor) prepareTemporaryIngestProcessor(connector quesma_api.BackendConnector) *ingest.IngestProcessor2 { + u, _ := url.Parse("http://localhost:9200") + + elasticsearchConfig := config.ElasticsearchConfiguration{ + Url: (*config.Url)(u), + } + oldQuesmaConfig := &config.QuesmaConfiguration{ + IndexConfig: p.config.IndexConfig, + } + + virtualTableStorage := persistence.NewElasticJSONDatabase(elasticsearchConfig, common_table.VirtualTableElasticIndexName) + tableDisco := clickhouse.NewTableDiscovery2(oldQuesmaConfig, connector, virtualTableStorage) + schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, oldQuesmaConfig, clickhouse.SchemaTypeAdapter{}) + + v2TableResolver := NewNextGenTableResolver() + + ip := ingest.NewIngestProcessor2(oldQuesmaConfig, connector, nil, tableDisco, schemaRegistry, virtualTableStorage, v2TableResolver) + ip.Start() + return ip +} + +func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { + var data []byte + var chBackend, esBackend quesma_api.BackendConnector + indexNameFromIncomingReq := metadata[IngestTargetKey].(string) + if indexNameFromIncomingReq == "" { + panic("NO INDEX NAME?!?!?") + } + + if chBackend = p.GetBackendConnector(quesma_api.ClickHouseSQLBackend); chBackend == nil { + fmt.Println("Backend connector not found") + return metadata, data, nil + } + + tempIngestProcessor := p.prepareTemporaryIngestProcessor(chBackend) + + esBackend = p.GetBackendConnector(quesma_api.ElasticsearchBackend) + if esBackend == nil { + fmt.Println("Backend connector not found") + return metadata, data, nil + } + es, ok := esBackend.(*backend_connectors.ElasticsearchBackendConnector) // OKAY JUST FOR NOW + if !ok { + panic(" !!! ") + } + + for _, m := range message { + messageAsHttpReq, err := quesma_api.CheckedCast[*http.Request](m) + if err != nil { + panic("ElasticsearchToClickHouseIngestProcessor: invalid message type") + } + + if _, present := p.config.IndexConfig[indexNameFromIncomingReq]; !present { + // route to Elasticsearch + resp := es.Send(messageAsHttpReq) + respBody, err := ReadResponseBody(resp) + if err != nil { + println(err) + } + return metadata, respBody, nil + } + + bodyAsBytes, err := frontend_connectors.ReadRequestBody(messageAsHttpReq) + if err != nil { + panic("ElasticsearchToClickHouseIngestProcessor: invalid message type") + } + + switch metadata[IngestAction] { + case DocIndexAction: + payloadJson, err := types.ExpectJSON(types.ParseRequestBody(string(bodyAsBytes))) + if err != nil { + println(err) + } + result, err := handleDocIndex(payloadJson, indexNameFromIncomingReq, tempIngestProcessor, p.config) + if err != nil { + println(err) + } + if respBody, err := json.Marshal(result.Index); err == nil { + return metadata, respBody, nil + } + case BulkIndexAction: + payloadNDJson, err := types.ExpectNDJSON(types.ParseRequestBody(string(bodyAsBytes))) + if err != nil { + println(err) + } + results, err := handleBulkIndex(payloadNDJson, indexNameFromIncomingReq, tempIngestProcessor, es, p.config) + if err != nil { + println(err) + } + if respBody, err := json.Marshal(results); err == nil { + return metadata, respBody, nil + } + println("BulkIndexAction") + default: + log.Info().Msg("Rethink you whole life and start over again") + } + + } + return metadata, data, nil +} + +func (p *ElasticsearchToClickHouseIngestProcessor) GetSupportedBackendConnectors() []quesma_api.BackendConnectorType { + return []quesma_api.BackendConnectorType{quesma_api.ClickHouseSQLBackend, quesma_api.ElasticsearchBackend} +} + +func ReadResponseBody(resp *http.Response) ([]byte, error) { + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + resp.Body = io.NopCloser(bytes.NewBuffer(respBody)) + return respBody, nil +} diff --git a/quesma/processors/es_to_ch_ingest/handlers.go b/quesma/processors/es_to_ch_ingest/handlers.go new file mode 100644 index 000000000..b9038221f --- /dev/null +++ b/quesma/processors/es_to_ch_ingest/handlers.go @@ -0,0 +1,307 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package es_to_ch_ingest + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "quesma/backend_connectors" + "quesma/ingest" + "quesma/logger" + "quesma/queryparser" + "quesma/quesma/config" + bulkmodel "quesma/quesma/functionality/bulk" + "quesma/quesma/recovery" + "quesma/quesma/types" +) + +// handleDocIndex assembles the payload into bulk format to reusing existing logic of bulk ingest +func handleDocIndex(payload types.JSON, targetTableName string, temporaryIngestProcessor *ingest.IngestProcessor2, indexConfig config.QuesmaProcessorConfig) (bulkmodel.BulkItem, error) { + newPayload := []types.JSON{ + map[string]interface{}{"index": map[string]interface{}{"_index": targetTableName}}, + payload, + } + + if results, err := Write(context.Background(), &targetTableName, newPayload, temporaryIngestProcessor, nil, indexConfig); err != nil { + return bulkmodel.BulkItem{}, err + } else { + return results[0], nil + } +} + +func handleBulkIndex(payload types.NDJSON, targetTableName string, temporaryIngestProcessor *ingest.IngestProcessor2, es *backend_connectors.ElasticsearchBackendConnector, cfg config.QuesmaProcessorConfig) ([]bulkmodel.BulkItem, error) { + results, err := Write(context.Background(), &targetTableName, payload, temporaryIngestProcessor, es, cfg) + if err != nil { + fmt.Printf("failed writing: %v", err) + return []bulkmodel.BulkItem{}, err + } + return results, nil +} + +func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ingest.IngestProcessor2, es *backend_connectors.ElasticsearchBackendConnector, conf config.QuesmaProcessorConfig) (results []bulkmodel.BulkItem, err error) { + defer recovery.LogPanic() + + bulkSize := len(bulk) / 2 // we divided payload by 2 so that we don't take into account the `action_and_meta_data` line, ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html + + // The returned results should be in the same order as the input request, however splitting the bulk might change the order. + // Therefore, each BulkRequestEntry has a corresponding pointer to the result entry, allowing us to freely split and reshuffle the bulk. + results, clickhouseDocumentsToInsert, elasticRequestBody, elasticBulkEntries, err := splitBulk(ctx, defaultIndex, bulk, bulkSize, conf) + if err != nil { + return []bulkmodel.BulkItem{}, err + } + + // we fail if there are some documents to insert into Clickhouse but ingest processor is not available + //if len(clickhouseDocumentsToInsert) > 0 && ip == nil { + // + // indexes := make(map[string]struct{}) + // for index := range clickhouseDocumentsToInsert { + // indexes[index] = struct{}{} + // } + // + // indexesAsList := make([]string, 0, len(indexes)) + // for index := range indexes { + // indexesAsList = append(indexesAsList, index) + // } + // sort.Strings(indexesAsList) + // + // return []BulkItem{}, end_user_errors.ErrNoIngest.New(fmt.Errorf("ingest processor is not available, but documents are targeted to Clickhouse indexes: %s", strings.Join(indexesAsList, ","))) + //} + + // No place for that here + err = sendToElastic(elasticRequestBody, elasticBulkEntries, es) + if err != nil { + return []bulkmodel.BulkItem{}, err + } + + //if ip != nil { + fmt.Printf("woudl send to clickhouse: [%v]", clickhouseDocumentsToInsert) + sendToClickhouse(ctx, clickhouseDocumentsToInsert, ip) + //} + + return results, nil +} + +func sendToElastic(elasticRequestBody []byte, elasticBulkEntries []BulkRequestEntry, es *backend_connectors.ElasticsearchBackendConnector) error { + if len(elasticRequestBody) == 0 { + return nil + } + + response, err := es.RequestWithHeaders(context.Background(), "POST", "/_bulk", elasticRequestBody, http.Header{"Content-Type": {"application/x-ndjson"}}) + if err != nil { + return err + } + + if response.StatusCode != http.StatusOK { + responseBody, _ := io.ReadAll(response.Body) + return fmt.Errorf("error sending bulk request (%v): %s", response.StatusCode, responseBody) + } + + responseBody, err := io.ReadAll(response.Body) + if err != nil { + return err + } + + var elasticBulkResponse bulkmodel.BulkResponse + err = json.Unmarshal(responseBody, &elasticBulkResponse) + if err != nil { + return err + } + + // Copy Elastic's response entries to our response (pointers to results array) + for i, entry := range elasticBulkResponse.Items { + *elasticBulkEntries[i].response = entry + } + return nil +} + +func sendToClickhouse(ctx context.Context, clickhouseDocumentsToInsert map[string][]BulkRequestEntry, ip *ingest.IngestProcessor2) { + for indexName, documents := range clickhouseDocumentsToInsert { + //phoneHomeAgent.IngestCounters().Add(indexName, int64(len(documents))) + + //for _, document := range documents { + // stats.GlobalStatistics.Process(cfg, indexName, document.document, clickhouse.NestedSeparator) + //} + // if the index is mapped to specified database table in the configuration, use that table + // TODO: Index name override ignored for now + //if len(cfg.IndexConfig[indexName].Override) > 0 { + // indexName = cfg.IndexConfig[indexName].Override + //} + + inserts := make([]types.JSON, len(documents)) + for i, document := range documents { + inserts[i] = document.document + } + + err := ip.Ingest(ctx, indexName, inserts) + + for _, document := range documents { + bulkSingleResponse := bulkmodel.BulkSingleResponse{ + ID: "fakeId", + Index: document.index, + PrimaryTerm: 1, + SeqNo: 0, + Shards: bulkmodel.BulkShardsResponse{ + Failed: 0, + Successful: 1, + Total: 1, + }, + Version: 0, + Result: "created", + Status: 201, + Type: "_doc", + } + + if err != nil { + bulkSingleResponse.Result = "" + bulkSingleResponse.Status = 400 + bulkSingleResponse.Shards = bulkmodel.BulkShardsResponse{ + Failed: 1, + Successful: 0, + Total: 1, + } + bulkSingleResponse.Error = queryparser.Error{ + RootCause: []queryparser.RootCause{ + { + Type: "quesma_error", + Reason: err.Error(), + }, + }, + Type: "quesma_error", + Reason: err.Error(), + } + } + + // Fill out the response pointer (a pointer to the results array we will return for a bulk) + switch document.operation { + case "create": + document.response.Create = bulkSingleResponse + + case "index": + document.response.Index = bulkSingleResponse + + default: + logger.Error().Msgf("unsupported bulk operation type: %s. Document: %v", document.operation, document.document) + } + } + } +} + +func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bulkSize int, processorConfig config.QuesmaProcessorConfig) ([]bulkmodel.BulkItem, map[string][]BulkRequestEntry, []byte, []BulkRequestEntry, error) { + results := make([]bulkmodel.BulkItem, bulkSize) + + clickhouseDocumentsToInsert := make(map[string][]BulkRequestEntry, bulkSize) + var elasticRequestBody []byte + var elasticBulkEntries []BulkRequestEntry + + err := bulk.BulkForEach(func(entryNumber int, op types.BulkOperation, rawOp types.JSON, document types.JSON) error { + index := op.GetIndex() + operation := op.GetOperation() + + entryWithResponse := BulkRequestEntry{ + operation: operation, + index: index, + document: document, + response: &results[entryNumber], + } + + if index == "" { + if defaultIndex != nil { + index = *defaultIndex + } else { + // Elastic also fails the entire bulk in such case + logger.ErrorWithCtxAndReason(ctx, "no index name in _bulk").Msgf("no index name in _bulk") + return fmt.Errorf("no index name in _bulk. Operation: %v, Document: %v", rawOp, document) + } + } + + // OLD WAY WAS TO ASK TableResolver + //decision := tableResolver.Resolve(table_resolver.IngestPipeline, index) + ingestTarget := findIngestTarget(index, processorConfig) + + // TODO: think if we need that + //if decision.IsClosed || len(decision.UseConnectors) == 0 { + // bulkSingleResponse := BulkSingleResponse{ + // Shards: BulkShardsResponse{ + // Failed: 1, + // Successful: 0, + // Total: 1, + // }, + // Status: 403, + // Type: "_doc", + // Error: queryparser.Error{ + // RootCause: []queryparser.RootCause{ + // { + // Type: "index_closed_exception", + // Reason: fmt.Sprintf("index %s is not routed to any connector", index), + // }, + // }, + // Type: "index_closed_exception", + // Reason: fmt.Sprintf("index %s is not routed to any connector", index), + // }, + // } + // switch operation { + // case "create": + // entryWithResponse.response.Create = bulkSingleResponse + // + // case "index": + // entryWithResponse.response.Index = bulkSingleResponse + // + // default: + // return fmt.Errorf("unsupported bulk operation type: %s. Document: %v", operation, document) + // } + //} + + switch ingestTarget { + + case config.ElasticsearchTarget: + // Bulk entry for Elastic - forward the request as-is + opBytes, err := rawOp.Bytes() + if err != nil { + return err + } + elasticRequestBody = append(elasticRequestBody, opBytes...) + elasticRequestBody = append(elasticRequestBody, '\n') + + documentBytes, err := document.Bytes() + if err != nil { + return err + } + elasticRequestBody = append(elasticRequestBody, documentBytes...) + elasticRequestBody = append(elasticRequestBody, '\n') + + elasticBulkEntries = append(elasticBulkEntries, entryWithResponse) + + case config.ClickhouseTarget: + // Bulk entry for Clickhouse + if operation != "create" && operation != "index" { + // Elastic also fails the entire bulk in such case + logger.ErrorWithCtxAndReason(ctx, "unsupported bulk operation type").Msgf("unsupported bulk operation type: %s", operation) + return fmt.Errorf("unsupported bulk operation type: %s. Operation: %v, Document: %v", operation, rawOp, document) + } + + clickhouseDocumentsToInsert[index] = append(clickhouseDocumentsToInsert[index], entryWithResponse) + + default: + return fmt.Errorf("why are we even here without a target") + } + + return nil + }) + + return results, clickhouseDocumentsToInsert, elasticRequestBody, elasticBulkEntries, err +} + +func findIngestTarget(index string, processorConfig config.QuesmaProcessorConfig) string { + //unsafe, but config validation should have caught this + defaultTarget := processorConfig.IndexConfig["*"].IngestTarget[0] + _, found := processorConfig.IndexConfig[index] + if !found { + return defaultTarget + } else { // per legacy syntax, if present means it's a clickhouse target + return config.ClickhouseTarget + } +} diff --git a/quesma/processors/es_to_ch_ingest/model.go b/quesma/processors/es_to_ch_ingest/model.go new file mode 100644 index 000000000..a3cf3d2a4 --- /dev/null +++ b/quesma/processors/es_to_ch_ingest/model.go @@ -0,0 +1,20 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package es_to_ch_ingest + +import ( + "quesma/quesma/functionality/bulk" + "quesma/quesma/types" +) + +type ( + // BulkRequestEntry is redeclared here as its using private fields + // and the whole point of this experiment is not to mess too much with the v1 code + BulkRequestEntry struct { + operation string + index string + document types.JSON + response *bulk.BulkItem + } +) diff --git a/quesma/processors/es_to_ch_ingest/table_resolver_nextgen.go b/quesma/processors/es_to_ch_ingest/table_resolver_nextgen.go new file mode 100644 index 000000000..ccfe52eba --- /dev/null +++ b/quesma/processors/es_to_ch_ingest/table_resolver_nextgen.go @@ -0,0 +1,31 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 + +package es_to_ch_ingest + +import ( + "quesma/table_resolver" + quesma_api "quesma_v2/core" +) + +func NewNextGenTableResolver() table_resolver.TableResolver { + return &NextGenTableResolver{} +} + +type NextGenTableResolver struct{} + +func (n *NextGenTableResolver) Start() {} +func (n *NextGenTableResolver) Stop() {} +func (n *NextGenTableResolver) Resolve(_ string, tableName string) *quesma_api.Decision { + decision := &quesma_api.Decision{ + UseConnectors: []quesma_api.ConnectorDecision{&quesma_api.ConnectorDecisionClickhouse{ + ClickhouseTableName: tableName, + }}} + return decision +} +func (n *NextGenTableResolver) Pipelines() []string { + return []string{} +} +func (n *NextGenTableResolver) RecentDecisions() []quesma_api.PatternDecisions { + return []quesma_api.PatternDecisions{} +} diff --git a/quesma/v2/core/backend_connectors.go b/quesma/v2/core/backend_connectors.go index c8061b79c..fe6ac24e6 100644 --- a/quesma/v2/core/backend_connectors.go +++ b/quesma/v2/core/backend_connectors.go @@ -10,6 +10,8 @@ const ( NoopBackend = iota MySQLBackend PgSQLBackend + ClickHouseSQLBackend + ElasticsearchBackend ) func GetBackendConnectorNameFromType(connectorType BackendConnectorType) string { @@ -18,6 +20,10 @@ func GetBackendConnectorNameFromType(connectorType BackendConnectorType) string return "mysql" case PgSQLBackend: return "pgsql" + case ClickHouseSQLBackend: + return "clickhouse" + case ElasticsearchBackend: + return "elasticsearch" default: return "noop" }