diff --git a/quesma/frontend_connectors/router_v2.go b/quesma/frontend_connectors/router_v2.go index aa8500428..1f08a33d7 100644 --- a/quesma/frontend_connectors/router_v2.go +++ b/quesma/frontend_connectors/router_v2.go @@ -203,7 +203,7 @@ func (r *RouterV2) elasticFallback(decision *quesma_api.Decision, } } -func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, searchRouter *quesma_api.PathRouter, ingestRouter *quesma_api.PathRouter, logManager *clickhouse.LogManager) { +func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, router quesma_api.Router, logManager *clickhouse.LogManager) { defer recovery.LogAndHandlePanic(ctx, func(err error) { w.WriteHeader(500) w.Write(queryparser.InternalQuesmaError("Unknown Quesma error")) @@ -223,31 +223,18 @@ func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http } quesmaRequest.ParsedBody = types.ParseRequestBody(quesmaRequest.Body) - var handler quesma_api.HTTPFrontendHandler - var decision *quesma_api.Decision - searchHandlerPipe, searchDecision := searchRouter.Matches(quesmaRequest) - if searchDecision != nil { - decision = searchDecision - } - if searchHandlerPipe != nil { - handler = searchHandlerPipe.Handler - } - ingestHandlerPipe, ingestDecision := ingestRouter.Matches(quesmaRequest) - if searchDecision == nil { - decision = ingestDecision - } - if searchHandlerPipe == nil && ingestHandlerPipe != nil { - handler = ingestHandlerPipe.Handler - } + + handlersPipe, decision := router.Matches(quesmaRequest) + if decision != nil { w.Header().Set(QuesmaTableResolverHeader, decision.String()) } else { w.Header().Set(QuesmaTableResolverHeader, "n/a") } - if handler != nil { + if handlersPipe != nil { quesmaResponse, err := recordRequestToClickhouseV2(req.URL.Path, r.QuesmaManagementConsole, func() (*quesma_api.Result, error) { - return handler(ctx, quesmaRequest) + return handlersPipe.Handler(ctx, quesmaRequest) }) zip := strings.Contains(req.Header.Get("Accept-Encoding"), "gzip") diff --git a/quesma/quesma/dual_write_proxy_v2.go b/quesma/quesma/dual_write_proxy_v2.go index 7fe5b6c5c..e786f86b9 100644 --- a/quesma/quesma/dual_write_proxy_v2.go +++ b/quesma/quesma/dual_write_proxy_v2.go @@ -72,18 +72,15 @@ func (q *dualWriteHttpProxyV2) Stop(ctx context.Context) { q.Close(ctx) } -func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, registry schema.Registry, config *config.QuesmaConfiguration, quesmaManagementConsole *ui.QuesmaManagementConsole, agent telemetry.PhoneHomeAgent, processor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxyV2 { - queryRunner := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole, registry, abResultsRepository, resolver) +func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, registry schema.Registry, config *config.QuesmaConfiguration, quesmaManagementConsole *ui.QuesmaManagementConsole, agent telemetry.PhoneHomeAgent, ingestProcessor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxyV2 { + queryProcessor := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole, registry, abResultsRepository, resolver) // not sure how we should configure our query translator ??? // is this a config option?? - queryRunner.DateMathRenderer = queryparser.DateMathExpressionFormatLiteral + queryProcessor.DateMathRenderer = queryparser.DateMathExpressionFormatLiteral // tests should not be run with optimization enabled by default - queryRunner.EnableQueryOptimization(config) - - ingestRouter := ConfigureIngestRouterV2(config, processor, agent, resolver) - searchRouter := ConfigureSearchRouterV2(config, registry, logManager, quesmaManagementConsole, queryRunner, resolver) + queryProcessor.EnableQueryOptimization(config) tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, @@ -92,20 +89,45 @@ func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *cli Transport: tr, Timeout: time.Minute, // should be more configurable, 30s is Kibana default timeout } - routerInstance := frontend_connectors.RouterV2{PhoneHomeAgent: agent, Config: config, QuesmaManagementConsole: quesmaManagementConsole, HttpClient: client, RequestPreprocessors: quesma_api.ProcessorChain{}} + + routerInstance := frontend_connectors.RouterV2{PhoneHomeAgent: agent, + Config: config, QuesmaManagementConsole: quesmaManagementConsole, + HttpClient: client, RequestPreprocessors: quesma_api.ProcessorChain{}} routerInstance. RegisterPreprocessor(quesma_api.NewTraceIdPreprocessor()) agent.FailedRequestsCollector(func() int64 { return routerInstance.FailedRequests.Load() }) - elasticHttpFrontentConnector := NewElasticHttpFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)), - &routerInstance, searchRouter.(*quesma_api.PathRouter), ingestRouter.(*quesma_api.PathRouter), logManager, agent) + ingestRouter := ConfigureIngestRouterV2(config, ingestProcessor, agent, resolver) + searchRouter := ConfigureSearchRouterV2(config, registry, logManager, quesmaManagementConsole, queryProcessor, resolver) + + elasticHttpIngestFrontendConnector := NewElasticHttpIngestFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)), + &routerInstance, logManager, agent) + elasticHttpIngestFrontendConnector.AddRouter(ingestRouter) + + elasticHttpQueryFrontendConnector := NewElasticHttpQueryFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)), + &routerInstance, logManager, agent) + elasticHttpQueryFrontendConnector.AddRouter(searchRouter) + + quesmaBuilder := quesma_api.NewQuesma() + ingestPipeline := quesma_api.NewPipeline() + ingestPipeline.AddFrontendConnector(elasticHttpIngestFrontendConnector) + + queryPipeline := quesma_api.NewPipeline() + queryPipeline.AddFrontendConnector(elasticHttpQueryFrontendConnector) + quesmaBuilder.AddPipeline(ingestPipeline) + quesmaBuilder.AddPipeline(queryPipeline) + _, err := quesmaBuilder.Build() + if err != nil { + logger.Fatal().Msgf("Error building Quesma: %v", err) + } + var limitedHandler http.Handler if config.DisableAuth { - limitedHandler = newSimultaneousClientsLimiterV2(elasticHttpFrontentConnector, concurrentClientsLimitV2) + limitedHandler = newSimultaneousClientsLimiterV2(elasticHttpIngestFrontendConnector, concurrentClientsLimitV2) } else { - limitedHandler = newSimultaneousClientsLimiterV2(NewAuthMiddleware(elasticHttpFrontentConnector, config.Elasticsearch), concurrentClientsLimitV2) + limitedHandler = newSimultaneousClientsLimiterV2(NewAuthMiddleware(elasticHttpIngestFrontendConnector, config.Elasticsearch), concurrentClientsLimitV2) } return &dualWriteHttpProxyV2{ @@ -119,10 +141,10 @@ func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *cli logManager: logManager, publicPort: config.PublicTcpPort, asyncQueriesEvictor: async_search_storage.NewAsyncQueriesEvictor( - queryRunner.AsyncRequestStorage.(async_search_storage.AsyncSearchStorageInMemory), - queryRunner.AsyncQueriesContexts.(async_search_storage.AsyncQueryContextStorageInMemory), + queryProcessor.AsyncRequestStorage.(async_search_storage.AsyncSearchStorageInMemory), + queryProcessor.AsyncQueriesContexts.(async_search_storage.AsyncQueryContextStorageInMemory), ), - queryRunner: queryRunner, + queryRunner: queryProcessor, } } diff --git a/quesma/quesma/elastic_http_frontend_connector.go b/quesma/quesma/elastic_http_frontend_connector.go index c7184afc3..25ed80ec0 100644 --- a/quesma/quesma/elastic_http_frontend_connector.go +++ b/quesma/quesma/elastic_http_frontend_connector.go @@ -9,35 +9,34 @@ import ( "quesma/frontend_connectors" "quesma/quesma/recovery" "quesma/telemetry" - "quesma_v2/core" + quesma_api "quesma_v2/core" ) -type ElasticHttpFrontendConnector struct { +type ElasticHttpIngestFrontendConnector struct { *frontend_connectors.BasicHTTPFrontendConnector routerInstance *frontend_connectors.RouterV2 - searchRouter *quesma_api.PathRouter - ingestRouter *quesma_api.PathRouter logManager *clickhouse.LogManager agent telemetry.PhoneHomeAgent } -func NewElasticHttpFrontendConnector(endpoint string, +func NewElasticHttpIngestFrontendConnector(endpoint string, routerInstance *frontend_connectors.RouterV2, - searchRouter *quesma_api.PathRouter, - ingestRouter *quesma_api.PathRouter, logManager *clickhouse.LogManager, - agent telemetry.PhoneHomeAgent) *ElasticHttpFrontendConnector { - return &ElasticHttpFrontendConnector{ + agent telemetry.PhoneHomeAgent) *ElasticHttpIngestFrontendConnector { + + return &ElasticHttpIngestFrontendConnector{ BasicHTTPFrontendConnector: frontend_connectors.NewBasicHTTPFrontendConnector(endpoint), routerInstance: routerInstance, - searchRouter: searchRouter, - ingestRouter: ingestRouter, logManager: logManager, agent: agent, } } -func (h *ElasticHttpFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) { +func serveHTTPHelper(w http.ResponseWriter, req *http.Request, + routerInstance *frontend_connectors.RouterV2, + pathRouter quesma_api.Router, + agent telemetry.PhoneHomeAgent, + logManager *clickhouse.LogManager) { defer recovery.LogPanic() reqBody, err := frontend_connectors.PeekBodyV2(req) if err != nil { @@ -46,7 +45,35 @@ func (h *ElasticHttpFrontendConnector) ServeHTTP(w http.ResponseWriter, req *htt } ua := req.Header.Get("User-Agent") - h.agent.UserAgentCounters().Add(ua, 1) + agent.UserAgentCounters().Add(ua, 1) + + routerInstance.Reroute(req.Context(), w, req, reqBody, pathRouter, logManager) +} + +func (h *ElasticHttpIngestFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) { + serveHTTPHelper(w, req, h.routerInstance, h.GetRouter(), h.agent, h.logManager) +} + +type ElasticHttpQueryFrontendConnector struct { + *frontend_connectors.BasicHTTPFrontendConnector + routerInstance *frontend_connectors.RouterV2 + logManager *clickhouse.LogManager + agent telemetry.PhoneHomeAgent +} + +func NewElasticHttpQueryFrontendConnector(endpoint string, + routerInstance *frontend_connectors.RouterV2, + logManager *clickhouse.LogManager, + agent telemetry.PhoneHomeAgent) *ElasticHttpIngestFrontendConnector { + + return &ElasticHttpIngestFrontendConnector{ + BasicHTTPFrontendConnector: frontend_connectors.NewBasicHTTPFrontendConnector(endpoint), + routerInstance: routerInstance, + logManager: logManager, + agent: agent, + } +} - h.routerInstance.Reroute(req.Context(), w, req, reqBody, h.searchRouter, h.ingestRouter, h.logManager) +func (h *ElasticHttpQueryFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) { + serveHTTPHelper(w, req, h.routerInstance, h.GetRouter(), h.agent, h.logManager) } diff --git a/quesma/v2/core/dispatch.go b/quesma/v2/core/dispatch.go index 6f02699f5..584f56862 100644 --- a/quesma/v2/core/dispatch.go +++ b/quesma/v2/core/dispatch.go @@ -9,6 +9,7 @@ import ( type HTTPFrontendHandler func(ctx context.Context, req *Request) (*Result, error) type HandlersPipe struct { + Predicate RequestMatcher Handler HTTPFrontendHandler Processors []Processor } diff --git a/quesma/v2/core/mux.go b/quesma/v2/core/mux.go index 30f3f4c90..685a3253d 100644 --- a/quesma/v2/core/mux.go +++ b/quesma/v2/core/mux.go @@ -93,7 +93,7 @@ func (p *PathRouter) Clone() Cloner { } func (p *PathRouter) Register(pattern string, predicate RequestMatcher, handler HTTPFrontendHandler) { - mapping := mapping{pattern, urlpath.New(pattern), predicate, &HandlersPipe{Handler: handler}} + mapping := mapping{pattern, urlpath.New(pattern), predicate, &HandlersPipe{Handler: handler, Predicate: predicate}} p.mappings = append(p.mappings, mapping) } @@ -203,6 +203,14 @@ func (p *PathRouter) GetHandlers() map[string]HandlersPipe { } func (p *PathRouter) SetHandlers(handlers map[string]HandlersPipe) { for path, handler := range handlers { - p.mappings = append(p.mappings, mapping{pattern: path, compiledPath: urlpath.New(path), handler: &handler}) + if _, ok := handler.Predicate.(*predicateAlways); ok { // in order to pass processors we have to make this alignment (predicates aren't present in the old API + p.mappings = append(p.mappings, mapping{pattern: path, + compiledPath: urlpath.New(path), + handler: &HandlersPipe{Handler: handler.Handler, + Predicate: handler.Predicate, + Processors: handler.Processors}}) + } else { + p.Register(path, handler.Predicate, handler.Handler) + } } }