Skip to content

Commit

Permalink
Cleanup ElasticsearchIngestFrontendConnector (#1079)
Browse files Browse the repository at this point in the history
  • Loading branch information
mieciu authored Dec 9, 2024
1 parent e62def5 commit bd2002d
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 88 deletions.
24 changes: 21 additions & 3 deletions quesma/frontend_connectors/basic_http_frontend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"context"
"fmt"
"github.com/ucarion/urlpath"
"io"
"net/http"
quesma_api "quesma_v2/core"
Expand Down Expand Up @@ -92,12 +93,16 @@ type BasicHTTPFrontendConnector struct {
listener *http.Server
router quesma_api.Router

endpoint string
responseMutator func(w http.ResponseWriter) http.ResponseWriter
endpoint string
}

func NewBasicHTTPFrontendConnector(endpoint string) *BasicHTTPFrontendConnector {
return &BasicHTTPFrontendConnector{
endpoint: endpoint,
responseMutator: func(w http.ResponseWriter) http.ResponseWriter {
return w
},
}
}

Expand All @@ -110,9 +115,11 @@ func (h *BasicHTTPFrontendConnector) GetRouter() quesma_api.Router {
}

func (h *BasicHTTPFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) {
handlerWrapper, exists := h.router.GetHandlers()[req.URL.Path]
handlers := h.router.GetHandlers()
handlerWrapper := getMatchingHandler(req.URL.Path, handlers)
dispatcher := &quesma_api.Dispatcher{}
if !exists {
w = h.responseMutator(w)
if handlerWrapper == nil {
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if h.router.GetFallbackHandler() != nil {
fmt.Printf("No handler found for path: %s\n", req.URL.Path)
Expand All @@ -137,6 +144,17 @@ func (h *BasicHTTPFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.
}).ServeHTTP(w, req)
}

func getMatchingHandler(requestPath string, handlers map[string]quesma_api.HandlersPipe) *quesma_api.HandlersPipe {
for path, handler := range handlers {
urlPath := urlpath.New(path)
_, matches := urlPath.Match(requestPath)
if matches {
return &handler
}
}
return nil
}

func (h *BasicHTTPFrontendConnector) Listen() error {
h.listener = &http.Server{}
h.listener.Addr = h.endpoint
Expand Down
90 changes: 7 additions & 83 deletions quesma/frontend_connectors/elasticsearch_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,13 @@
package frontend_connectors

import (
"context"
"fmt"
"github.com/ucarion/urlpath"
"net/http"
quesma_api "quesma_v2/core"
)

type ElasticsearchIngestFrontendConnector struct {
// BasicHTTPFrontendConnector // TODO: embedding resulted in ServeHTTP being called from BasicHTTPFrontendConnector instead of ElasticsearchIngestFrontendConnector
listener *http.Server
router quesma_api.Router

endpoint string
BasicHTTPFrontendConnector
}

const (
Expand All @@ -33,7 +27,10 @@ const (

func NewElasticsearchIngestFrontendConnector(endpoint string) *ElasticsearchIngestFrontendConnector {
fc := &ElasticsearchIngestFrontendConnector{
endpoint: endpoint,
BasicHTTPFrontendConnector: BasicHTTPFrontendConnector{
endpoint: endpoint,
responseMutator: setContentType,
},
}
router := NewHTTPRouter()
router.AddRoute(IndexBulkPath, bulk)
Expand All @@ -42,55 +39,19 @@ func NewElasticsearchIngestFrontendConnector(endpoint string) *ElasticsearchInge
return fc
}

func getMatchingHandler(requestPath string, handlers map[string]quesma_api.HandlersPipe) *quesma_api.HandlersPipe {
for path, handler := range handlers {
urlPath := urlpath.New(path)
_, matches := urlPath.Match(requestPath)
if matches {
return &handler
}
}
return nil
}

func (h *ElasticsearchIngestFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) {
handlers := h.router.GetHandlers()
handlerWrapper := getMatchingHandler(req.URL.Path, handlers)
if handlerWrapper == nil {
h.router.Multiplexer().ServeHTTP(w, req)
return
}
dispatcher := &quesma_api.Dispatcher{}

// for the response out we are Elasticsearch-7 compliant
func setContentType(w http.ResponseWriter) http.ResponseWriter {
w.Header().Set("Content-Type", "application/json")
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
metadata, message, _ := handlerWrapper.Handler(req)
req.Header.Set("x-przemek", "blah")
_, message = dispatcher.Dispatch(handlerWrapper.Processors, metadata, message)
_, err := w.Write(message.([]byte))
if err != nil {
fmt.Printf("Error writing response: %s\n", err)
}
}).ServeHTTP(w, req)
return w
}

func bulk(request *http.Request) (map[string]interface{}, any, error) {
//body, err := ReadRequestBody(request)
//if err != nil {
// return nil, nil, err
//}
metadata := quesma_api.MakeNewMetadata()
metadata[IngestAction] = BulkIndexAction
metadata[IngestTargetKey] = getIndexFromRequest(request)
return metadata, request, nil
}

func doc(request *http.Request) (map[string]interface{}, any, error) {
//body, err := ReadRequestBody(request)
//if err != nil {
// return nil, nil, err
//}
metadata := quesma_api.MakeNewMetadata()
metadata[IngestAction] = DocIndexAction
metadata[IngestTargetKey] = getIndexFromRequest(request)
Expand All @@ -102,40 +63,3 @@ func getIndexFromRequest(request *http.Request) string {
match, _ := expectedUrl.Match(request.URL.Path) // safe to call at this level
return match.Params["index"]
}

// Temporarily ported from BasicHTTPFrontendConnector until we figure out embedding issue

func (h *ElasticsearchIngestFrontendConnector) AddRouter(router quesma_api.Router) {
h.router = router
}

func (h *ElasticsearchIngestFrontendConnector) GetRouter() quesma_api.Router {
return h.router
}

func (h *ElasticsearchIngestFrontendConnector) Listen() error {
h.listener = &http.Server{}
h.listener.Addr = h.endpoint
h.listener.Handler = h
go func() {
err := h.listener.ListenAndServe()
_ = err
}()

return nil
}

func (h *ElasticsearchIngestFrontendConnector) Stop(ctx context.Context) error {
if h.listener == nil {
return nil
}
err := h.listener.Shutdown(ctx)
if err != nil {
return err
}
return h.listener.Close()
}

func (h *ElasticsearchIngestFrontendConnector) GetEndpoint() string {
return h.endpoint
}
Original file line number Diff line number Diff line change
Expand Up @@ -110,8 +110,8 @@ func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]in
panic("ElasticsearchToClickHouseIngestProcessor: invalid message type")
}

if _, present := p.config.IndexConfig[indexNameFromIncomingReq]; !present {
// route to Elasticsearch
if _, present := p.config.IndexConfig[indexNameFromIncomingReq]; !present && metadata[IngestAction] == DocIndexAction {
// route to Elasticsearch, `bulk` request might be sent to ClickHouse depending on the request payload
resp := es.Send(messageAsHttpReq)
respBody, err := ReadResponseBody(resp)
if err != nil {
Expand Down

0 comments on commit bd2002d

Please sign in to comment.