From fefacc53e45a1e2b698838d727a3903a774b242f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Hejman?= Date: Mon, 9 Dec 2024 16:09:05 +0100 Subject: [PATCH] Ingest processor v2 - cleanup passing objects (#1084) --- .../elasticsearch_backend_connector.go | 4 ++ quesma/ingest/processor2.go | 15 ++++- ...icsearch_to_clickhouse_ingest_processor.go | 49 +++++++---------- quesma/processors/es_to_ch_ingest/handlers.go | 55 ++++++------------- 4 files changed, 53 insertions(+), 70 deletions(-) diff --git a/quesma/backend_connectors/elasticsearch_backend_connector.go b/quesma/backend_connectors/elasticsearch_backend_connector.go index 1b491df6b..c2f8c5cda 100644 --- a/quesma/backend_connectors/elasticsearch_backend_connector.go +++ b/quesma/backend_connectors/elasticsearch_backend_connector.go @@ -41,6 +41,10 @@ func NewElasticsearchBackendConnector(cfg config.ElasticsearchConfiguration) *El return conn } +func (e *ElasticsearchBackendConnector) GetConfig() config.ElasticsearchConfiguration { + return e.config +} + func (e *ElasticsearchBackendConnector) RequestWithHeaders(ctx context.Context, method, endpoint string, body []byte, headers http.Header) (*http.Response, error) { return e.doRequest(ctx, method, endpoint, body, headers) } diff --git a/quesma/ingest/processor2.go b/quesma/ingest/processor2.go index fdf1d0e8b..ffa589893 100644 --- a/quesma/ingest/processor2.go +++ b/quesma/ingest/processor2.go @@ -8,6 +8,8 @@ import ( "fmt" "github.com/ClickHouse/clickhouse-go/v2" "github.com/goccy/go-json" + "net/http" + "quesma/backend_connectors" chLib "quesma/clickhouse" "quesma/comment_metadata" "quesma/common_table" @@ -39,6 +41,7 @@ type ( ctx context.Context cancel context.CancelFunc chDb quesma_api.BackendConnector + es backend_connectors.ElasticsearchBackendConnector tableDiscovery chLib.TableDiscovery cfg *config.QuesmaConfiguration phoneHomeAgent telemetry.PhoneHomeAgent @@ -104,6 +107,14 @@ func (ip *IngestProcessor2) Close() { // return count, nil //} +func (ip *IngestProcessor2) SendToElasticsearch(req *http.Request) *http.Response { + return ip.es.Send(req) +} + +func (ip *IngestProcessor2) RequestToElasticsearch(ctx context.Context, method, endpoint string, body []byte, headers http.Header) (*http.Response, error) { + return ip.es.RequestWithHeaders(ctx, method, endpoint, body, headers) +} + func (ip *IngestProcessor2) createTableObjectAndAttributes(ctx context.Context, query string, config *chLib.ChTableConfig, name string, tableDefinitionChangeOnly bool) (string, error) { table, err := chLib.NewTable(query, config) if err != nil { @@ -656,9 +667,9 @@ func (ip *IngestProcessor2) Ping() error { return ip.chDb.Open() } -func NewIngestProcessor2(cfg *config.QuesmaConfiguration, chDb quesma_api.BackendConnector, phoneHomeAgent telemetry.PhoneHomeAgent, loader chLib.TableDiscovery, schemaRegistry schema.Registry, virtualTableStorage persistence.JSONDatabase, tableResolver table_resolver.TableResolver) *IngestProcessor2 { +func NewIngestProcessor2(cfg *config.QuesmaConfiguration, chDb quesma_api.BackendConnector, phoneHomeAgent telemetry.PhoneHomeAgent, loader chLib.TableDiscovery, schemaRegistry schema.Registry, virtualTableStorage persistence.JSONDatabase, tableResolver table_resolver.TableResolver, esBackendConn backend_connectors.ElasticsearchBackendConnector) *IngestProcessor2 { ctx, cancel := context.WithCancel(context.Background()) - return &IngestProcessor2{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage, tableResolver: tableResolver} + return &IngestProcessor2{ctx: ctx, cancel: cancel, chDb: chDb, tableDiscovery: loader, cfg: cfg, phoneHomeAgent: phoneHomeAgent, schemaRegistry: schemaRegistry, virtualTableStorage: virtualTableStorage, tableResolver: tableResolver, es: esBackendConn} } // validateIngest validates the document against the table schema diff --git a/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go b/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go index 9651cce1d..2be7c7889 100644 --- a/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go +++ b/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go @@ -10,7 +10,6 @@ import ( "github.com/rs/zerolog/log" "io" "net/http" - "net/url" "quesma/backend_connectors" "quesma/clickhouse" "quesma/common_table" @@ -47,9 +46,18 @@ func NewElasticsearchToClickHouseIngestProcessor(conf config.QuesmaProcessorConf func (p *ElasticsearchToClickHouseIngestProcessor) Init() error { chBackendConnector := p.GetBackendConnector(quesma_api.ClickHouseSQLBackend) if chBackendConnector == nil { - return fmt.Errorf("ClickHouse backend connector not found") + return fmt.Errorf("backend connector for ClickHouse not found") + } + esBackendConnector := p.GetBackendConnector(quesma_api.ElasticsearchBackend) + if esBackendConnector == nil { + return fmt.Errorf("backend connector for Elasticsearch not found") + } + esBackendConnectorCasted, ok := esBackendConnector.(*backend_connectors.ElasticsearchBackendConnector) // OKAY JUST FOR NOW + if !ok { + return fmt.Errorf("failed to cast Elasticsearch backend connector") } - p.legacyIngestProcessor = p.prepareTemporaryIngestProcessor(chBackendConnector) + + p.legacyIngestProcessor = p.prepareTemporaryIngestProcessor(chBackendConnector, *esBackendConnectorCasted) return nil } @@ -60,50 +68,31 @@ func (p *ElasticsearchToClickHouseIngestProcessor) GetId() string { // prepareTemporaryIngestProcessor creates a temporary ingest processor which is a new version of the ingest processor, // which uses `quesma_api.BackendConnector` instead of `*sql.DB` for the database connection. -func (p *ElasticsearchToClickHouseIngestProcessor) prepareTemporaryIngestProcessor(connector quesma_api.BackendConnector) *ingest.IngestProcessor2 { - u, _ := url.Parse("http://localhost:9200") +func (p *ElasticsearchToClickHouseIngestProcessor) prepareTemporaryIngestProcessor(chBackendConn quesma_api.BackendConnector, esBackendConn backend_connectors.ElasticsearchBackendConnector) *ingest.IngestProcessor2 { - elasticsearchConfig := config.ElasticsearchConfiguration{ - Url: (*config.Url)(u), - } oldQuesmaConfig := &config.QuesmaConfiguration{ IndexConfig: p.config.IndexConfig, } - virtualTableStorage := persistence.NewElasticJSONDatabase(elasticsearchConfig, common_table.VirtualTableElasticIndexName) - tableDisco := clickhouse.NewTableDiscovery2(oldQuesmaConfig, connector, virtualTableStorage) + virtualTableStorage := persistence.NewElasticJSONDatabase(esBackendConn.GetConfig(), common_table.VirtualTableElasticIndexName) + tableDisco := clickhouse.NewTableDiscovery2(oldQuesmaConfig, chBackendConn, virtualTableStorage) schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, oldQuesmaConfig, clickhouse.SchemaTypeAdapter{}) v2TableResolver := NewNextGenTableResolver() - ip := ingest.NewIngestProcessor2(oldQuesmaConfig, connector, nil, tableDisco, schemaRegistry, virtualTableStorage, v2TableResolver) + ip := ingest.NewIngestProcessor2(oldQuesmaConfig, chBackendConn, nil, tableDisco, schemaRegistry, virtualTableStorage, v2TableResolver, esBackendConn) + ip.Start() return ip } func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { var data []byte - var chBackend, esBackend quesma_api.BackendConnector indexNameFromIncomingReq := metadata[IngestTargetKey].(string) if indexNameFromIncomingReq == "" { panic("NO INDEX NAME?!?!?") } - if chBackend = p.GetBackendConnector(quesma_api.ClickHouseSQLBackend); chBackend == nil { - fmt.Println("Backend connector not found") - return metadata, data, nil - } - - esBackend = p.GetBackendConnector(quesma_api.ElasticsearchBackend) - if esBackend == nil { - fmt.Println("Backend connector not found") - return metadata, data, nil - } - es, ok := esBackend.(*backend_connectors.ElasticsearchBackendConnector) // OKAY JUST FOR NOW - if !ok { - panic(" !!! ") - } - for _, m := range message { messageAsHttpReq, err := quesma_api.CheckedCast[*http.Request](m) if err != nil { @@ -112,7 +101,7 @@ func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]in if _, present := p.config.IndexConfig[indexNameFromIncomingReq]; !present && metadata[IngestAction] == DocIndexAction { // route to Elasticsearch, `bulk` request might be sent to ClickHouse depending on the request payload - resp := es.Send(messageAsHttpReq) + resp := p.legacyIngestProcessor.SendToElasticsearch(messageAsHttpReq) respBody, err := ReadResponseBody(resp) if err != nil { println(err) @@ -131,7 +120,7 @@ func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]in if err != nil { println(err) } - result, err := handleDocIndex(payloadJson, indexNameFromIncomingReq, p.legacyIngestProcessor, p.config) + result, err := p.handleDocIndex(payloadJson, indexNameFromIncomingReq) if err != nil { println(err) } @@ -143,7 +132,7 @@ func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]in if err != nil { println(err) } - results, err := handleBulkIndex(payloadNDJson, indexNameFromIncomingReq, p.legacyIngestProcessor, es, p.config) + results, err := p.handleBulkIndex(payloadNDJson, indexNameFromIncomingReq) if err != nil { println(err) } diff --git a/quesma/processors/es_to_ch_ingest/handlers.go b/quesma/processors/es_to_ch_ingest/handlers.go index b9038221f..6100e9952 100644 --- a/quesma/processors/es_to_ch_ingest/handlers.go +++ b/quesma/processors/es_to_ch_ingest/handlers.go @@ -9,8 +9,6 @@ import ( "fmt" "io" "net/http" - "quesma/backend_connectors" - "quesma/ingest" "quesma/logger" "quesma/queryparser" "quesma/quesma/config" @@ -20,21 +18,21 @@ import ( ) // handleDocIndex assembles the payload into bulk format to reusing existing logic of bulk ingest -func handleDocIndex(payload types.JSON, targetTableName string, temporaryIngestProcessor *ingest.IngestProcessor2, indexConfig config.QuesmaProcessorConfig) (bulkmodel.BulkItem, error) { +func (p *ElasticsearchToClickHouseIngestProcessor) handleDocIndex(payload types.JSON, targetTableName string) (bulkmodel.BulkItem, error) { newPayload := []types.JSON{ map[string]interface{}{"index": map[string]interface{}{"_index": targetTableName}}, payload, } - if results, err := Write(context.Background(), &targetTableName, newPayload, temporaryIngestProcessor, nil, indexConfig); err != nil { + if results, err := p.Write(context.Background(), &targetTableName, newPayload); err != nil { return bulkmodel.BulkItem{}, err } else { return results[0], nil } } -func handleBulkIndex(payload types.NDJSON, targetTableName string, temporaryIngestProcessor *ingest.IngestProcessor2, es *backend_connectors.ElasticsearchBackendConnector, cfg config.QuesmaProcessorConfig) ([]bulkmodel.BulkItem, error) { - results, err := Write(context.Background(), &targetTableName, payload, temporaryIngestProcessor, es, cfg) +func (p *ElasticsearchToClickHouseIngestProcessor) handleBulkIndex(payload types.NDJSON, targetTableName string) ([]bulkmodel.BulkItem, error) { + results, err := p.Write(context.Background(), &targetTableName, payload) if err != nil { fmt.Printf("failed writing: %v", err) return []bulkmodel.BulkItem{}, err @@ -42,55 +40,35 @@ func handleBulkIndex(payload types.NDJSON, targetTableName string, temporaryInge return results, nil } -func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ingest.IngestProcessor2, es *backend_connectors.ElasticsearchBackendConnector, conf config.QuesmaProcessorConfig) (results []bulkmodel.BulkItem, err error) { +func (p *ElasticsearchToClickHouseIngestProcessor) Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON) (results []bulkmodel.BulkItem, err error) { defer recovery.LogPanic() bulkSize := len(bulk) / 2 // we divided payload by 2 so that we don't take into account the `action_and_meta_data` line, ref: https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html // The returned results should be in the same order as the input request, however splitting the bulk might change the order. // Therefore, each BulkRequestEntry has a corresponding pointer to the result entry, allowing us to freely split and reshuffle the bulk. - results, clickhouseDocumentsToInsert, elasticRequestBody, elasticBulkEntries, err := splitBulk(ctx, defaultIndex, bulk, bulkSize, conf) + results, clickhouseDocumentsToInsert, elasticRequestBody, elasticBulkEntries, err := splitBulk(ctx, defaultIndex, bulk, bulkSize, p.config) if err != nil { return []bulkmodel.BulkItem{}, err } - // we fail if there are some documents to insert into Clickhouse but ingest processor is not available - //if len(clickhouseDocumentsToInsert) > 0 && ip == nil { - // - // indexes := make(map[string]struct{}) - // for index := range clickhouseDocumentsToInsert { - // indexes[index] = struct{}{} - // } - // - // indexesAsList := make([]string, 0, len(indexes)) - // for index := range indexes { - // indexesAsList = append(indexesAsList, index) - // } - // sort.Strings(indexesAsList) - // - // return []BulkItem{}, end_user_errors.ErrNoIngest.New(fmt.Errorf("ingest processor is not available, but documents are targeted to Clickhouse indexes: %s", strings.Join(indexesAsList, ","))) - //} - - // No place for that here - err = sendToElastic(elasticRequestBody, elasticBulkEntries, es) + err = p.sendToElastic(elasticRequestBody, elasticBulkEntries) if err != nil { return []bulkmodel.BulkItem{}, err } - //if ip != nil { - fmt.Printf("woudl send to clickhouse: [%v]", clickhouseDocumentsToInsert) - sendToClickhouse(ctx, clickhouseDocumentsToInsert, ip) - //} + fmt.Printf("would send to clickhouse: [%v]\n", clickhouseDocumentsToInsert) + p.sendToClickhouse(ctx, clickhouseDocumentsToInsert) return results, nil } -func sendToElastic(elasticRequestBody []byte, elasticBulkEntries []BulkRequestEntry, es *backend_connectors.ElasticsearchBackendConnector) error { +func (p *ElasticsearchToClickHouseIngestProcessor) sendToElastic(elasticRequestBody []byte, elasticBulkEntries []BulkRequestEntry) error { if len(elasticRequestBody) == 0 { return nil } - response, err := es.RequestWithHeaders(context.Background(), "POST", "/_bulk", elasticRequestBody, http.Header{"Content-Type": {"application/x-ndjson"}}) + response, err := p.legacyIngestProcessor.RequestToElasticsearch(context.Background(), "POST", "/_bulk", elasticRequestBody, http.Header{"Content-Type": {"application/x-ndjson"}}) if err != nil { return err } @@ -118,7 +96,7 @@ func sendToElastic(elasticRequestBody []byte, elasticBulkEntries []BulkRequestEn return nil } -func sendToClickhouse(ctx context.Context, clickhouseDocumentsToInsert map[string][]BulkRequestEntry, ip *ingest.IngestProcessor2) { +func (p *ElasticsearchToClickHouseIngestProcessor) sendToClickhouse(ctx context.Context, clickhouseDocumentsToInsert map[string][]BulkRequestEntry) { for indexName, documents := range clickhouseDocumentsToInsert { //phoneHomeAgent.IngestCounters().Add(indexName, int64(len(documents))) @@ -127,16 +105,17 @@ func sendToClickhouse(ctx context.Context, clickhouseDocumentsToInsert map[strin //} // if the index is mapped to specified database table in the configuration, use that table // TODO: Index name override ignored for now - //if len(cfg.IndexConfig[indexName].Override) > 0 { - // indexName = cfg.IndexConfig[indexName].Override - //} + + if len(p.config.IndexConfig[indexName].Override) > 0 { + indexName = p.config.IndexConfig[indexName].Override + } inserts := make([]types.JSON, len(documents)) for i, document := range documents { inserts[i] = document.document } - err := ip.Ingest(ctx, indexName, inserts) + err := p.legacyIngestProcessor.Ingest(ctx, indexName, inserts) for _, document := range documents { bulkSingleResponse := bulkmodel.BulkSingleResponse{