From dac1c3e4f8b004efcfec19253c0964067fe530e8 Mon Sep 17 00:00:00 2001 From: Przemyslaw Delewski <102958445+pdelewski@users.noreply.github.com> Date: Mon, 23 Dec 2024 11:17:47 +0100 Subject: [PATCH] Utilizing quesma v2 lifecycle api (#1134) This PR: - for the first time utilize quesma v2 lifecycle APIs (start/stop) without using ad hoc http server. The same thing we have to do for TCP frontend connector. Finally, we should have one `quesmaV2` object at `constructQuesma` level. - fixes some issues with updating mapping handlers --- quesma/quesma/dual_write_proxy_v2.go | 30 ++++++++-------------------- quesma/v2/core/mux.go | 22 +++++++++++++++----- 2 files changed, 25 insertions(+), 27 deletions(-) diff --git a/quesma/quesma/dual_write_proxy_v2.go b/quesma/quesma/dual_write_proxy_v2.go index 32594025a..ffcce21a2 100644 --- a/quesma/quesma/dual_write_proxy_v2.go +++ b/quesma/quesma/dual_write_proxy_v2.go @@ -4,7 +4,6 @@ package quesma import ( "context" - "errors" "net/http" "quesma/ab_testing" "quesma/clickhouse" @@ -50,7 +49,7 @@ func (c *simultaneousClientsLimiterV2) ServeHTTP(w http.ResponseWriter, r *http. } type dualWriteHttpProxyV2 struct { - routingHttpServer *http.Server + quesmaV2 quesma_api.QuesmaBuilder indexManagement elasticsearch.IndexManagement logManager *clickhouse.LogManager publicPort util.Port @@ -91,33 +90,27 @@ func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clic queryPipeline := quesma_api.NewPipeline() queryPipeline.AddFrontendConnector(elasticHttpQueryFrontendConnector) - quesmaBuilder.AddPipeline(ingestPipeline) quesmaBuilder.AddPipeline(queryPipeline) + quesmaBuilder.AddPipeline(ingestPipeline) - _, err := quesmaBuilder.Build() + quesmaV2, err := quesmaBuilder.Build() if err != nil { logger.Fatal().Msgf("Error building Quesma: %v", err) } - var limitedHandler http.Handler if config.DisableAuth { elasticHttpIngestFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) elasticHttpQueryFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) - limitedHandler = elasticHttpIngestFrontendConnector } else { elasticHttpQueryFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) elasticHttpQueryFrontendConnector.AddMiddleware(NewAuthMiddlewareV2(config.Elasticsearch)) elasticHttpIngestFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2)) elasticHttpIngestFrontendConnector.AddMiddleware(NewAuthMiddlewareV2(config.Elasticsearch)) - limitedHandler = elasticHttpIngestFrontendConnector } return &dualWriteHttpProxyV2{ - schemaRegistry: registry, - schemaLoader: schemaLoader, - routingHttpServer: &http.Server{ - Addr: ":" + strconv.Itoa(int(config.PublicTcpPort)), - Handler: limitedHandler, - }, + schemaRegistry: registry, + schemaLoader: schemaLoader, + quesmaV2: quesmaV2, indexManagement: indexManager, logManager: logManager, publicPort: config.PublicTcpPort, @@ -139,9 +132,7 @@ func (q *dualWriteHttpProxyV2) Close(ctx context.Context) { if q.asyncQueriesEvictor != nil { q.asyncQueriesEvictor.Close() } - if err := q.routingHttpServer.Shutdown(ctx); err != nil { - logger.Fatal().Msgf("Error during server shutdown: %v", err) - } + q.quesmaV2.Stop(ctx) } func (q *dualWriteHttpProxyV2) Ingest() { @@ -149,10 +140,5 @@ func (q *dualWriteHttpProxyV2) Ingest() { q.logManager.Start() q.indexManagement.Start() go q.asyncQueriesEvictor.AsyncQueriesGC() - go func() { - if err := q.routingHttpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { - logger.Fatal().Msgf("Error starting http server: %v", err) - } - logger.Info().Msgf("Accepting HTTP at :%d", q.publicPort) - }() + q.quesmaV2.Start() } diff --git a/quesma/v2/core/mux.go b/quesma/v2/core/mux.go index 6e929a087..02b49deec 100644 --- a/quesma/v2/core/mux.go +++ b/quesma/v2/core/mux.go @@ -6,6 +6,7 @@ import ( "github.com/ucarion/urlpath" "net/http" "net/url" + "sort" "strings" ) @@ -205,14 +206,20 @@ func (p *PathRouter) GetHandlers() map[string]HandlersPipe { func (p *PathRouter) SetHandlers(handlers map[string]HandlersPipe) { newHandlers := make(map[string]HandlersPipe, 0) for path, handler := range handlers { - for index := range p.mappings { + var index int + var found bool + for index = range p.mappings { if p.mappings[index].pattern == path { - p.mappings[index].handler.Processors = handler.Processors - p.mappings[index].handler.Predicate = handler.Predicate - } else { - newHandlers[path] = handler + found = true + break } } + if found { + p.mappings[index].handler.Processors = handler.Processors + p.mappings[index].handler.Predicate = handler.Predicate + } else { + newHandlers[path] = handler + } } for path, handler := range newHandlers { p.mappings = append(p.mappings, mapping{pattern: path, @@ -222,4 +229,9 @@ func (p *PathRouter) SetHandlers(handlers map[string]HandlersPipe) { Predicate: handler.Predicate, Processors: handler.Processors}}) } + // mappings needs to be sorted as literal paths should be matched first + // for instance /_search should be matched before /:index + sort.Slice(p.mappings, func(i, j int) bool { + return p.mappings[i].pattern > p.mappings[j].pattern + }) }