Skip to content

Commit

Permalink
v2 frontend connector serveHTTP unification (#1115)
Browse files Browse the repository at this point in the history
This PR is a continuation of v2 production code ServeHTTP and POC
ServeHTTP unification. One thing that left is fallback handling

---------

Co-authored-by: Rafal Strzalinski <[email protected]>
  • Loading branch information
pdelewski and nablaone authored Dec 16, 2024
1 parent 0e2d4b8 commit 33bae75
Show file tree
Hide file tree
Showing 5 changed files with 149 additions and 77 deletions.
145 changes: 93 additions & 52 deletions quesma/frontend_connectors/basic_http_frontend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,14 @@ import (
"fmt"
"io"
"net/http"
"quesma/clickhouse"
"quesma/logger"
"quesma/quesma/config"
"quesma/quesma/recovery"
"quesma/quesma/types"
"quesma/schema"
quesma_api "quesma_v2/core"
"quesma_v2/core/diag"
"strings"
"sync"
)
Expand All @@ -22,11 +27,32 @@ type BasicHTTPFrontendConnector struct {
mutex sync.Mutex
responseMutator func(w http.ResponseWriter) http.ResponseWriter
endpoint string
routerInstance *RouterV2
logManager *clickhouse.LogManager
registry schema.Registry
config *config.QuesmaConfiguration

diagnostic diag.Diagnostic
}

func (h *BasicHTTPFrontendConnector) InjectDiagnostic(diagnostic diag.Diagnostic) {

h.diagnostic = diagnostic

// TODO this is a hack
if h.routerInstance != nil {
h.routerInstance.InjectDiagnostic(diagnostic)
}
}

func NewBasicHTTPFrontendConnector(endpoint string) *BasicHTTPFrontendConnector {
func NewBasicHTTPFrontendConnector(endpoint string, config *config.QuesmaConfiguration) *BasicHTTPFrontendConnector {

return &BasicHTTPFrontendConnector{
endpoint: endpoint,
endpoint: endpoint,
config: config,
routerInstance: NewRouterV2(config),
logManager: nil,
registry: nil,
responseMutator: func(w http.ResponseWriter) http.ResponseWriter {
return w
},
Expand All @@ -43,25 +69,40 @@ func (h *BasicHTTPFrontendConnector) GetRouter() quesma_api.Router {

func (h *BasicHTTPFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) {
defer recovery.LogPanic()

ctx := req.Context()
requestPreprocessors := quesma_api.ProcessorChain{}
requestPreprocessors = append(requestPreprocessors, quesma_api.NewTraceIdPreprocessor())

reqBody, err := PeekBodyV2(req)
if err != nil {
http.Error(w, "Error reading request body", http.StatusInternalServerError)
return
}
ctx := req.Context()
requestPreprocessors := quesma_api.ProcessorChain{}
requestPreprocessors = append(requestPreprocessors, quesma_api.NewTraceIdPreprocessor())

ua := req.Header.Get("User-Agent")
if h.diagnostic.PhoneHomeAgent() != nil {
h.diagnostic.PhoneHomeAgent().UserAgentCounters().Add(ua, 1)
}

quesmaRequest, ctx, err := preprocessRequest(ctx, &quesma_api.Request{
Method: req.Method,
Path: strings.TrimSuffix(req.URL.Path, "/"),
Params: map[string]string{},
Headers: req.Header,
QueryParams: req.URL.Query(),
Body: string(reqBody),
Method: req.Method,
Path: strings.TrimSuffix(req.URL.Path, "/"),
Params: map[string]string{},
Headers: req.Header,
QueryParams: req.URL.Query(),
Body: string(reqBody),
OriginalRequest: req,
}, requestPreprocessors)

if err != nil {
logger.ErrorWithCtx(ctx).Msgf("Error preprocessing request: %v", err)
}

quesmaRequest.ParsedBody = types.ParseRequestBody(quesmaRequest.Body)

handlersPipe, decision := h.router.Matches(quesmaRequest)

if decision != nil {
w.Header().Set(QuesmaTableResolverHeader, decision.String())
} else {
Expand All @@ -70,53 +111,53 @@ func (h *BasicHTTPFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.
dispatcher := &quesma_api.Dispatcher{}
w = h.responseMutator(w)

http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if handlersPipe != nil {
result, _ := handlersPipe.Handler(context.Background(), &quesma_api.Request{OriginalRequest: req})
var quesmaResponse *quesma_api.Result

if result != nil {
metadata, message := dispatcher.Dispatch(handlersPipe.Processors, result.Meta, result.GenericResult)
result = &quesma_api.Result{
Body: result.Body,
Meta: metadata,
StatusCode: result.StatusCode,
GenericResult: message,
}
quesmaResponse = result
if handlersPipe != nil {
quesmaResponse, err := recordRequestToClickhouseV2(req.URL.Path, h.diagnostic.DebugInfoCollector(), func() (*quesma_api.Result, error) {
var result *quesma_api.Result
result, err = handlersPipe.Handler(ctx, quesmaRequest)

if result == nil {
return result, err
}
zip := strings.Contains(req.Header.Get("Accept-Encoding"), "gzip")
_ = zip
if err == nil {
logger.Debug().Ctx(ctx).Msg("responding from quesma")
unzipped := []byte{}
if quesmaResponse != nil {
unzipped = quesmaResponse.GenericResult.([]byte)
}
if len(unzipped) == 0 {
logger.WarnWithCtx(ctx).Msgf("empty response from Clickhouse, method=%s", req.Method)
}
AddProductAndContentHeaders(req.Header, w.Header())
_, err := w.Write(unzipped)
if err != nil {
fmt.Printf("Error writing response: %s\n", err)
}

} else {
metadata, message := dispatcher.Dispatch(handlersPipe.Processors, result.Meta, result.GenericResult)

result = &quesma_api.Result{
Body: result.Body,
Meta: metadata,
StatusCode: result.StatusCode,
GenericResult: message,
}
return result, err
})

zip := strings.Contains(req.Header.Get("Accept-Encoding"), "gzip")
if err == nil {
logger.Debug().Ctx(ctx).Msg("responding from quesma")
unzipped := []byte{}
if quesmaResponse != nil {
unzipped = quesmaResponse.GenericResult.([]byte)
}
if len(unzipped) == 0 {
logger.WarnWithCtx(ctx).Msgf("empty response from Clickhouse, method=%s", req.Method)
}
AddProductAndContentHeaders(req.Header, w.Header())

responseFromQuesmaV2(ctx, unzipped, w, quesmaResponse, zip)

} else {
if h.router.GetFallbackHandler() != nil {
fmt.Printf("No handler found for path: %s\n", req.URL.Path)
handler := h.router.GetFallbackHandler()
result, _ := handler(context.Background(), &quesma_api.Request{OriginalRequest: req})
_, err := w.Write(result.GenericResult.([]byte))
if err != nil {
fmt.Printf("Error writing response: %s\n", err)
}
h.routerInstance.errorResponseV2(ctx, err, w)
}
} else {
if h.router.GetFallbackHandler() != nil {
fmt.Printf("No handler found for path: %s\n", req.URL.Path)
handler := h.router.GetFallbackHandler()
result, _ := handler(context.Background(), &quesma_api.Request{OriginalRequest: req})
_, err := w.Write(result.GenericResult.([]byte))
if err != nil {
fmt.Printf("Error writing response: %s\n", err)
}
}
}).ServeHTTP(w, req)
}
}

func (h *BasicHTTPFrontendConnector) Listen() error {
Expand Down
29 changes: 18 additions & 11 deletions quesma/frontend_connectors/router_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,9 @@ func responseFromQuesmaV2(ctx context.Context, unzipped []byte, w http.ResponseW
logger.Debug().Str(logger.RID, id).Msg("responding from Quesma")

for key, value := range quesmaResponse.Meta {
w.Header().Set(key, value.(string))
if headerStringValue, ok := value.(string); ok {
w.Header().Set(key, headerStringValue)
}
}
if zip {
w.Header().Set("Content-Encoding", "gzip")
Expand Down Expand Up @@ -164,7 +166,7 @@ func (*RouterV2) closedIndexResponse(ctx context.Context, w http.ResponseWriter,

}

func (r *RouterV2) elasticFallback(decision *quesma_api.Decision,
func (r *RouterV2) ElasticFallback(decision *quesma_api.Decision,
ctx context.Context, w http.ResponseWriter,
req *http.Request, reqBody []byte, logManager *clickhouse.LogManager, schemaRegistry schema.Registry) {

Expand Down Expand Up @@ -238,12 +240,13 @@ func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http
})

quesmaRequest, ctx, err := preprocessRequest(ctx, &quesma_api.Request{
Method: req.Method,
Path: strings.TrimSuffix(req.URL.Path, "/"),
Params: map[string]string{},
Headers: req.Header,
QueryParams: req.URL.Query(),
Body: string(reqBody),
Method: req.Method,
Path: strings.TrimSuffix(req.URL.Path, "/"),
Params: map[string]string{},
Headers: req.Header,
QueryParams: req.URL.Query(),
Body: string(reqBody),
OriginalRequest: req,
}, r.RequestPreprocessors)

if err != nil {
Expand Down Expand Up @@ -298,7 +301,7 @@ func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http
r.errorResponseV2(ctx, err, w)
}
} else {
r.elasticFallback(decision, ctx, w, req, reqBody, logManager, schemaRegistry)
r.ElasticFallback(decision, ctx, w, req, reqBody, logManager, schemaRegistry)
}
}

Expand Down Expand Up @@ -385,7 +388,9 @@ func recordRequestToClickhouseV2(path string, qmc diag.DebugInfoCollector, reque
}
now := time.Now()
response, err := requestFunc()
qmc.RecordRequest(statName, time.Since(now), err != nil)
if qmc != nil {
qmc.RecordRequest(statName, time.Since(now), err != nil)
}
return response, err
}

Expand All @@ -396,7 +401,9 @@ func recordRequestToElasticV2(path string, qmc diag.DebugInfoCollector, requestF
}
now := time.Now()
response := requestFunc()
qmc.RecordRequest(statName, time.Since(now), !isResponseOkV2(response.response))
if qmc != nil {
qmc.RecordRequest(statName, time.Since(now), !isResponseOkV2(response.response))
}
return response
}

Expand Down
24 changes: 21 additions & 3 deletions quesma/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"quesma/backend_connectors"
"quesma/frontend_connectors"
"quesma/processors"
"quesma/quesma/config"
quesma_api "quesma_v2/core"
"sync/atomic"
"syscall"
Expand Down Expand Up @@ -65,7 +66,16 @@ func ab_testing_scenario() quesma_api.QuesmaBuilder {
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma()
quesmaBuilder.SetDependencies(quesma_api.EmptyDependencies())

ingestFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888")
cfg := &config.QuesmaConfiguration{
DisableAuth: true,
Elasticsearch: config.ElasticsearchConfiguration{
Url: &config.Url{Host: "localhost:9200", Scheme: "http"},
User: "",
Password: "",
},
}

ingestFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)
ingestHTTPRouter := quesma_api.NewPathRouter()
ingestHTTPRouter.AddRoute("/_bulk", bulk)
ingestHTTPRouter.AddRoute("/_doc", doc)
Expand All @@ -83,7 +93,7 @@ func ab_testing_scenario() quesma_api.QuesmaBuilder {
ingestPipeline.AddProcessor(ingestProcessor)
ingestPipeline.AddProcessor(abIngestTestProcessor)

queryFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888")
queryFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)
queryHTTPRouter := quesma_api.NewPathRouter()
queryHTTPRouter.AddRoute("/_search", search)
queryFrontendConnector.AddRouter(queryHTTPRouter)
Expand All @@ -108,8 +118,16 @@ func ab_testing_scenario() quesma_api.QuesmaBuilder {
func fallbackScenario() quesma_api.QuesmaBuilder {
var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma()
quesmaBuilder.SetDependencies(quesma_api.EmptyDependencies())
cfg := &config.QuesmaConfiguration{
DisableAuth: true,
Elasticsearch: config.ElasticsearchConfiguration{
Url: &config.Url{Host: "localhost:9200", Scheme: "http"},
User: "",
Password: "",
},
}
ingestFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888", cfg)

ingestFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888")
ingestHTTPRouter := quesma_api.NewPathRouter()
var fallback quesma_api.HTTPFrontendHandler = fallback
ingestHTTPRouter.AddFallbackHandler(fallback)
Expand Down
6 changes: 3 additions & 3 deletions quesma/quesma/dual_write_proxy_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,12 +91,12 @@ func newDualWriteProxyV2(dependencies *quesma_api.Dependencies, schemaLoader cli
searchRouter := ConfigureSearchRouterV2(config, dependencies, registry, logManager, queryProcessor, resolver)

elasticHttpIngestFrontendConnector := NewElasticHttpIngestFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)),

routerInstance, logManager, registry)
logManager, registry, config)
elasticHttpIngestFrontendConnector.AddRouter(ingestRouter)

elasticHttpQueryFrontendConnector := NewElasticHttpQueryFrontendConnector(":"+strconv.Itoa(int(config.PublicTcpPort)),
routerInstance, logManager, registry)
logManager, registry, config)

elasticHttpQueryFrontendConnector.AddRouter(searchRouter)

quesmaBuilder := quesma_api.NewQuesma()
Expand Down
22 changes: 14 additions & 8 deletions quesma/quesma/elastic_http_frontend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"net/http"
"quesma/clickhouse"
"quesma/frontend_connectors"
"quesma/quesma/config"
"quesma/quesma/recovery"
"quesma/schema"
quesma_api "quesma_v2/core"
Expand All @@ -18,24 +19,29 @@ type ElasticHttpIngestFrontendConnector struct {
routerInstance *frontend_connectors.RouterV2
logManager *clickhouse.LogManager
registry schema.Registry
Config *config.QuesmaConfiguration
diagnostic diag.Diagnostic
}

func NewElasticHttpIngestFrontendConnector(endpoint string,
routerInstance *frontend_connectors.RouterV2,
logManager *clickhouse.LogManager,
registry schema.Registry) *ElasticHttpIngestFrontendConnector {
registry schema.Registry,
config *config.QuesmaConfiguration) *ElasticHttpIngestFrontendConnector {

return &ElasticHttpIngestFrontendConnector{
BasicHTTPFrontendConnector: frontend_connectors.NewBasicHTTPFrontendConnector(endpoint),
routerInstance: routerInstance,
BasicHTTPFrontendConnector: frontend_connectors.NewBasicHTTPFrontendConnector(endpoint, config),
routerInstance: frontend_connectors.NewRouterV2(config),
logManager: logManager,
registry: registry,
}
}

func (h *ElasticHttpIngestFrontendConnector) InjectDiagnostic(diagnostic diag.Diagnostic) {
h.diagnostic = diagnostic

// TODO this is a hack
h.BasicHTTPFrontendConnector.InjectDiagnostic(diagnostic)
h.routerInstance.InjectDiagnostic(diagnostic)
}

func serveHTTPHelper(w http.ResponseWriter, req *http.Request,
Expand Down Expand Up @@ -70,13 +76,13 @@ type ElasticHttpQueryFrontendConnector struct {
}

func NewElasticHttpQueryFrontendConnector(endpoint string,
routerInstance *frontend_connectors.RouterV2,
logManager *clickhouse.LogManager,
registry schema.Registry) *ElasticHttpIngestFrontendConnector {
registry schema.Registry,
config *config.QuesmaConfiguration) *ElasticHttpIngestFrontendConnector {

return &ElasticHttpIngestFrontendConnector{
BasicHTTPFrontendConnector: frontend_connectors.NewBasicHTTPFrontendConnector(endpoint),
routerInstance: routerInstance,
BasicHTTPFrontendConnector: frontend_connectors.NewBasicHTTPFrontendConnector(endpoint, config),
routerInstance: frontend_connectors.NewRouterV2(config),
logManager: logManager,
registry: registry,
}
Expand Down

0 comments on commit 33bae75

Please sign in to comment.