Skip to content

Commit

Permalink
Utilizing quesma v2 lifecycle api (#1134)
Browse files Browse the repository at this point in the history
This PR:
- for the first time utilize quesma v2 lifecycle APIs (start/stop)
without using ad hoc http server.
The same thing we have to do for TCP frontend connector. Finally, we
should have one `quesmaV2` object at
   `constructQuesma` level.
- fixes some issues with updating mapping handlers
  • Loading branch information
pdelewski authored Dec 23, 2024
1 parent b05ce0f commit dac1c3e
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 27 deletions.
30 changes: 8 additions & 22 deletions quesma/quesma/dual_write_proxy_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package quesma

import (
"context"
"errors"
"net/http"
"quesma/ab_testing"
"quesma/clickhouse"
Expand Down Expand Up @@ -50,7 +49,7 @@ func (c *simultaneousClientsLimiterV2) ServeHTTP(w http.ResponseWriter, r *http.
}

type dualWriteHttpProxyV2 struct {
routingHttpServer *http.Server
quesmaV2 quesma_api.QuesmaBuilder
indexManagement elasticsearch.IndexManagement
logManager *clickhouse.LogManager
publicPort util.Port
Expand Down Expand Up @@ -91,33 +90,27 @@ func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clic

queryPipeline := quesma_api.NewPipeline()
queryPipeline.AddFrontendConnector(elasticHttpQueryFrontendConnector)
quesmaBuilder.AddPipeline(ingestPipeline)
quesmaBuilder.AddPipeline(queryPipeline)
quesmaBuilder.AddPipeline(ingestPipeline)

_, err := quesmaBuilder.Build()
quesmaV2, err := quesmaBuilder.Build()
if err != nil {
logger.Fatal().Msgf("Error building Quesma: %v", err)
}
var limitedHandler http.Handler
if config.DisableAuth {
elasticHttpIngestFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2))
elasticHttpQueryFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2))
limitedHandler = elasticHttpIngestFrontendConnector
} else {
elasticHttpQueryFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2))
elasticHttpQueryFrontendConnector.AddMiddleware(NewAuthMiddlewareV2(config.Elasticsearch))
elasticHttpIngestFrontendConnector.AddMiddleware(newSimultaneousClientsLimiterV2(concurrentClientsLimitV2))
elasticHttpIngestFrontendConnector.AddMiddleware(NewAuthMiddlewareV2(config.Elasticsearch))
limitedHandler = elasticHttpIngestFrontendConnector
}

return &dualWriteHttpProxyV2{
schemaRegistry: registry,
schemaLoader: schemaLoader,
routingHttpServer: &http.Server{
Addr: ":" + strconv.Itoa(int(config.PublicTcpPort)),
Handler: limitedHandler,
},
schemaRegistry: registry,
schemaLoader: schemaLoader,
quesmaV2: quesmaV2,
indexManagement: indexManager,
logManager: logManager,
publicPort: config.PublicTcpPort,
Expand All @@ -139,20 +132,13 @@ func (q *dualWriteHttpProxyV2) Close(ctx context.Context) {
if q.asyncQueriesEvictor != nil {
q.asyncQueriesEvictor.Close()
}
if err := q.routingHttpServer.Shutdown(ctx); err != nil {
logger.Fatal().Msgf("Error during server shutdown: %v", err)
}
q.quesmaV2.Stop(ctx)
}

func (q *dualWriteHttpProxyV2) Ingest() {
q.schemaLoader.ReloadTableDefinitions()
q.logManager.Start()
q.indexManagement.Start()
go q.asyncQueriesEvictor.AsyncQueriesGC()
go func() {
if err := q.routingHttpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) {
logger.Fatal().Msgf("Error starting http server: %v", err)
}
logger.Info().Msgf("Accepting HTTP at :%d", q.publicPort)
}()
q.quesmaV2.Start()
}
22 changes: 17 additions & 5 deletions quesma/v2/core/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"github.com/ucarion/urlpath"
"net/http"
"net/url"
"sort"
"strings"
)

Expand Down Expand Up @@ -205,14 +206,20 @@ func (p *PathRouter) GetHandlers() map[string]HandlersPipe {
func (p *PathRouter) SetHandlers(handlers map[string]HandlersPipe) {
newHandlers := make(map[string]HandlersPipe, 0)
for path, handler := range handlers {
for index := range p.mappings {
var index int
var found bool
for index = range p.mappings {
if p.mappings[index].pattern == path {
p.mappings[index].handler.Processors = handler.Processors
p.mappings[index].handler.Predicate = handler.Predicate
} else {
newHandlers[path] = handler
found = true
break
}
}
if found {
p.mappings[index].handler.Processors = handler.Processors
p.mappings[index].handler.Predicate = handler.Predicate
} else {
newHandlers[path] = handler
}
}
for path, handler := range newHandlers {
p.mappings = append(p.mappings, mapping{pattern: path,
Expand All @@ -222,4 +229,9 @@ func (p *PathRouter) SetHandlers(handlers map[string]HandlersPipe) {
Predicate: handler.Predicate,
Processors: handler.Processors}})
}
// mappings needs to be sorted as literal paths should be matched first
// for instance /_search should be matched before /:index
sort.Slice(p.mappings, func(i, j int) bool {
return p.mappings[i].pattern > p.mappings[j].pattern
})
}

0 comments on commit dac1c3e

Please sign in to comment.