From bd2002d04952923ac0636ddd081a1fdaa61c1dcc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Przemys=C5=82aw=20Hejman?= Date: Mon, 9 Dec 2024 12:16:57 +0100 Subject: [PATCH] Cleanup `ElasticsearchIngestFrontendConnector` (#1079) The biggest difference here is probably [this](https://github.com/QuesmaOrg/quesma/pull/1079/files#diff-1aecc230932429578180e7b3119100c164df77de5d98e7f2ab3f30518d71729dR118-R119), so the `getMatchingHandler` func. --- .../basic_http_frontend_connector.go | 24 ++++- .../elasticsearch_ingest.go | 90 ++----------------- ...icsearch_to_clickhouse_ingest_processor.go | 4 +- 3 files changed, 30 insertions(+), 88 deletions(-) diff --git a/quesma/frontend_connectors/basic_http_frontend_connector.go b/quesma/frontend_connectors/basic_http_frontend_connector.go index 16df18f72..0cbc75c48 100644 --- a/quesma/frontend_connectors/basic_http_frontend_connector.go +++ b/quesma/frontend_connectors/basic_http_frontend_connector.go @@ -7,6 +7,7 @@ import ( "bytes" "context" "fmt" + "github.com/ucarion/urlpath" "io" "net/http" quesma_api "quesma_v2/core" @@ -92,12 +93,16 @@ type BasicHTTPFrontendConnector struct { listener *http.Server router quesma_api.Router - endpoint string + responseMutator func(w http.ResponseWriter) http.ResponseWriter + endpoint string } func NewBasicHTTPFrontendConnector(endpoint string) *BasicHTTPFrontendConnector { return &BasicHTTPFrontendConnector{ endpoint: endpoint, + responseMutator: func(w http.ResponseWriter) http.ResponseWriter { + return w + }, } } @@ -110,9 +115,11 @@ func (h *BasicHTTPFrontendConnector) GetRouter() quesma_api.Router { } func (h *BasicHTTPFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) { - handlerWrapper, exists := h.router.GetHandlers()[req.URL.Path] + handlers := h.router.GetHandlers() + handlerWrapper := getMatchingHandler(req.URL.Path, handlers) dispatcher := &quesma_api.Dispatcher{} - if !exists { + w = h.responseMutator(w) + if handlerWrapper == nil { http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { if h.router.GetFallbackHandler() != nil { fmt.Printf("No handler found for path: %s\n", req.URL.Path) @@ -137,6 +144,17 @@ func (h *BasicHTTPFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http. }).ServeHTTP(w, req) } +func getMatchingHandler(requestPath string, handlers map[string]quesma_api.HandlersPipe) *quesma_api.HandlersPipe { + for path, handler := range handlers { + urlPath := urlpath.New(path) + _, matches := urlPath.Match(requestPath) + if matches { + return &handler + } + } + return nil +} + func (h *BasicHTTPFrontendConnector) Listen() error { h.listener = &http.Server{} h.listener.Addr = h.endpoint diff --git a/quesma/frontend_connectors/elasticsearch_ingest.go b/quesma/frontend_connectors/elasticsearch_ingest.go index 581e1bd02..9a503379b 100644 --- a/quesma/frontend_connectors/elasticsearch_ingest.go +++ b/quesma/frontend_connectors/elasticsearch_ingest.go @@ -4,19 +4,13 @@ package frontend_connectors import ( - "context" - "fmt" "github.com/ucarion/urlpath" "net/http" quesma_api "quesma_v2/core" ) type ElasticsearchIngestFrontendConnector struct { - // BasicHTTPFrontendConnector // TODO: embedding resulted in ServeHTTP being called from BasicHTTPFrontendConnector instead of ElasticsearchIngestFrontendConnector - listener *http.Server - router quesma_api.Router - - endpoint string + BasicHTTPFrontendConnector } const ( @@ -33,7 +27,10 @@ const ( func NewElasticsearchIngestFrontendConnector(endpoint string) *ElasticsearchIngestFrontendConnector { fc := &ElasticsearchIngestFrontendConnector{ - endpoint: endpoint, + BasicHTTPFrontendConnector: BasicHTTPFrontendConnector{ + endpoint: endpoint, + responseMutator: setContentType, + }, } router := NewHTTPRouter() router.AddRoute(IndexBulkPath, bulk) @@ -42,44 +39,12 @@ func NewElasticsearchIngestFrontendConnector(endpoint string) *ElasticsearchInge return fc } -func getMatchingHandler(requestPath string, handlers map[string]quesma_api.HandlersPipe) *quesma_api.HandlersPipe { - for path, handler := range handlers { - urlPath := urlpath.New(path) - _, matches := urlPath.Match(requestPath) - if matches { - return &handler - } - } - return nil -} - -func (h *ElasticsearchIngestFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) { - handlers := h.router.GetHandlers() - handlerWrapper := getMatchingHandler(req.URL.Path, handlers) - if handlerWrapper == nil { - h.router.Multiplexer().ServeHTTP(w, req) - return - } - dispatcher := &quesma_api.Dispatcher{} - - // for the response out we are Elasticsearch-7 compliant +func setContentType(w http.ResponseWriter) http.ResponseWriter { w.Header().Set("Content-Type", "application/json") - http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - metadata, message, _ := handlerWrapper.Handler(req) - req.Header.Set("x-przemek", "blah") - _, message = dispatcher.Dispatch(handlerWrapper.Processors, metadata, message) - _, err := w.Write(message.([]byte)) - if err != nil { - fmt.Printf("Error writing response: %s\n", err) - } - }).ServeHTTP(w, req) + return w } func bulk(request *http.Request) (map[string]interface{}, any, error) { - //body, err := ReadRequestBody(request) - //if err != nil { - // return nil, nil, err - //} metadata := quesma_api.MakeNewMetadata() metadata[IngestAction] = BulkIndexAction metadata[IngestTargetKey] = getIndexFromRequest(request) @@ -87,10 +52,6 @@ func bulk(request *http.Request) (map[string]interface{}, any, error) { } func doc(request *http.Request) (map[string]interface{}, any, error) { - //body, err := ReadRequestBody(request) - //if err != nil { - // return nil, nil, err - //} metadata := quesma_api.MakeNewMetadata() metadata[IngestAction] = DocIndexAction metadata[IngestTargetKey] = getIndexFromRequest(request) @@ -102,40 +63,3 @@ func getIndexFromRequest(request *http.Request) string { match, _ := expectedUrl.Match(request.URL.Path) // safe to call at this level return match.Params["index"] } - -// Temporarily ported from BasicHTTPFrontendConnector until we figure out embedding issue - -func (h *ElasticsearchIngestFrontendConnector) AddRouter(router quesma_api.Router) { - h.router = router -} - -func (h *ElasticsearchIngestFrontendConnector) GetRouter() quesma_api.Router { - return h.router -} - -func (h *ElasticsearchIngestFrontendConnector) Listen() error { - h.listener = &http.Server{} - h.listener.Addr = h.endpoint - h.listener.Handler = h - go func() { - err := h.listener.ListenAndServe() - _ = err - }() - - return nil -} - -func (h *ElasticsearchIngestFrontendConnector) Stop(ctx context.Context) error { - if h.listener == nil { - return nil - } - err := h.listener.Shutdown(ctx) - if err != nil { - return err - } - return h.listener.Close() -} - -func (h *ElasticsearchIngestFrontendConnector) GetEndpoint() string { - return h.endpoint -} diff --git a/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go b/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go index eb7b6f981..9651cce1d 100644 --- a/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go +++ b/quesma/processors/es_to_ch_ingest/elasticsearch_to_clickhouse_ingest_processor.go @@ -110,8 +110,8 @@ func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]in panic("ElasticsearchToClickHouseIngestProcessor: invalid message type") } - if _, present := p.config.IndexConfig[indexNameFromIncomingReq]; !present { - // route to Elasticsearch + if _, present := p.config.IndexConfig[indexNameFromIncomingReq]; !present && metadata[IngestAction] == DocIndexAction { + // route to Elasticsearch, `bulk` request might be sent to ClickHouse depending on the request payload resp := es.Send(messageAsHttpReq) respBody, err := ReadResponseBody(resp) if err != nil {