Skip to content

Commit

Permalink
Moving dependencies needed by routerV2 to v2 (#1066)
Browse files Browse the repository at this point in the history
  • Loading branch information
pdelewski authored Dec 5, 2024
1 parent 0ba6514 commit 7e4022a
Show file tree
Hide file tree
Showing 42 changed files with 356 additions and 345 deletions.
2 changes: 1 addition & 1 deletion quesma/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import (
"quesma/end_user_errors"
"quesma/logger"
"quesma/model"
"quesma/tracing"
tracing "quesma_v2/core/tracing"
"strconv"
"strings"
"sync/atomic"
Expand Down
2 changes: 1 addition & 1 deletion quesma/ingest/common_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"quesma/quesma/types"
"quesma/schema"
"quesma/table_resolver"
"quesma_v2/core/mux"
mux "quesma_v2/core"
"testing"
)

Expand Down
2 changes: 1 addition & 1 deletion quesma/ingest/ingest_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"quesma/quesma/types"
"quesma/table_resolver"
"quesma/util"
"quesma_v2/core/mux"
mux "quesma_v2/core"
"strings"
"testing"
)
Expand Down
2 changes: 1 addition & 1 deletion quesma/ingest/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ import (
"quesma/schema"
"quesma/table_resolver"
"quesma/util"
"quesma_v2/core/mux"
mux "quesma_v2/core"
"slices"
"strconv"
"strings"
Expand Down
8 changes: 4 additions & 4 deletions quesma/ingest/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import (
"quesma/table_resolver"
"quesma/telemetry"
"quesma/util"
"quesma_v2/core/mux"
"quesma_v2/core"
"slices"
"sort"
"strings"
Expand Down Expand Up @@ -696,7 +696,7 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
tableFormatter TableColumNameFormatter) error {

decision := lm.tableResolver.Resolve(mux.IngestPipeline, tableName)
decision := lm.tableResolver.Resolve(quesma_api.IngestPipeline, tableName)

if decision.Err != nil {
return decision.Err
Expand All @@ -712,10 +712,10 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str

for _, connectorDecision := range decision.UseConnectors {

var clickhouseDecision *mux.ConnectorDecisionClickhouse
var clickhouseDecision *quesma_api.ConnectorDecisionClickhouse

var ok bool
if clickhouseDecision, ok = connectorDecision.(*mux.ConnectorDecisionClickhouse); !ok {
if clickhouseDecision, ok = connectorDecision.(*quesma_api.ConnectorDecisionClickhouse); !ok {
continue
}

Expand Down
5 changes: 3 additions & 2 deletions quesma/logger/logger.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ import (
"net/http"
"os"
"quesma/stats/errorstats"
"quesma/tracing"
asyncQueryTracing "quesma/tracing"
tracing "quesma_v2/core/tracing"
"time"
)

Expand Down Expand Up @@ -41,7 +42,7 @@ const (
var logger zerolog.Logger

// InitLogger returns channel where log messages will be sent
func InitLogger(cfg Configuration, sig chan os.Signal, doneCh chan struct{}, asyncQueryTraceLogger *tracing.AsyncTraceLogger) <-chan LogWithLevel {
func InitLogger(cfg Configuration, sig chan os.Signal, doneCh chan struct{}, asyncQueryTraceLogger *asyncQueryTracing.AsyncTraceLogger) <-chan LogWithLevel {
zerolog.TimeFieldFormat = time.RFC3339Nano // without this we don't have milliseconds timestamp precision
var output io.Writer = zerolog.ConsoleWriter{Out: os.Stderr, TimeFormat: time.StampMilli}
if os.Getenv("GO_ENV") == "production" { // ConsoleWriter is slow, disable it in production
Expand Down
32 changes: 16 additions & 16 deletions quesma/quesma/dual_write_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,10 @@ import (
"quesma/schema"
"quesma/table_resolver"
"quesma/telemetry"
"quesma/tracing"
"quesma/util"
"quesma_v2/core/mux"
quesma_api "quesma_v2/core"
"quesma_v2/core/routes"
tracing "quesma_v2/core/tracing"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -68,7 +68,7 @@ func (c *simultaneousClientsLimiter) ServeHTTP(w http.ResponseWriter, r *http.Re

type dualWriteHttpProxy struct {
routingHttpServer *http.Server
elasticRouter *mux.PathRouter
elasticRouter *quesma_api.PathRouter
indexManagement elasticsearch.IndexManagement
logManager *clickhouse.LogManager
publicPort util.Port
Expand Down Expand Up @@ -101,9 +101,9 @@ func newDualWriteProxy(schemaLoader clickhouse.TableDiscovery, logManager *click
Transport: tr,
Timeout: time.Minute, // should be more configurable, 30s is Kibana default timeout
}
routerInstance := router{phoneHomeAgent: agent, config: config, quesmaManagementConsole: quesmaManagementConsole, httpClient: client, requestPreprocessors: processorChain{}}
routerInstance := router{phoneHomeAgent: agent, config: config, quesmaManagementConsole: quesmaManagementConsole, httpClient: client, requestPreprocessors: quesma_api.ProcessorChain{}}
routerInstance.
registerPreprocessor(NewTraceIdPreprocessor())
registerPreprocessor(quesma_api.NewTraceIdPreprocessor())

agent.FailedRequestsCollector(func() int64 {
return routerInstance.failedRequests.Load()
Expand Down Expand Up @@ -192,7 +192,7 @@ func responseFromElastic(ctx context.Context, elkResponse *http.Response, w http
elkResponse.Body.Close()
}

func responseFromQuesma(ctx context.Context, unzipped []byte, w http.ResponseWriter, quesmaResponse *mux.Result, zip bool) {
func responseFromQuesma(ctx context.Context, unzipped []byte, w http.ResponseWriter, quesmaResponse *quesma_api.Result, zip bool) {
id := ctx.Value(tracing.RequestIdCtxKey).(string)
logger.Debug().Str(logger.RID, id).Msg("responding from Quesma")

Expand All @@ -217,14 +217,14 @@ func responseFromQuesma(ctx context.Context, unzipped []byte, w http.ResponseWri

type router struct {
config *config.QuesmaConfiguration
requestPreprocessors processorChain
requestPreprocessors quesma_api.ProcessorChain
quesmaManagementConsole *ui.QuesmaManagementConsole
phoneHomeAgent telemetry.PhoneHomeAgent
httpClient *http.Client
failedRequests atomic.Int64
}

func (r *router) registerPreprocessor(preprocessor RequestPreprocessor) {
func (r *router) registerPreprocessor(preprocessor quesma_api.RequestPreprocessor) {
r.requestPreprocessors = append(r.requestPreprocessors, preprocessor)
}

Expand All @@ -233,7 +233,7 @@ func (r *router) errorResponse(ctx context.Context, err error, w http.ResponseWr

msg := "Internal Quesma Error.\nPlease contact support if the problem persists."
reason := "Failed request."
result := mux.ServerErrorResult()
result := quesma_api.ServerErrorResult()

// if error is an error with user-friendly message, we should use it
var endUserError *end_user_errors.EndUserError
Expand All @@ -243,7 +243,7 @@ func (r *router) errorResponse(ctx context.Context, err error, w http.ResponseWr

// we treat all `Q1xxx` errors as bad requests here
if endUserError.ErrorType().Number < 2000 {
result = mux.BadReqeustResult()
result = quesma_api.BadReqeustResult()
}
}

Expand Down Expand Up @@ -286,13 +286,13 @@ func (*router) closedIndexResponse(ctx context.Context, w http.ResponseWriter, p

}

func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, router *mux.PathRouter, logManager *clickhouse.LogManager) {
func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, router *quesma_api.PathRouter, logManager *clickhouse.LogManager) {
defer recovery.LogAndHandlePanic(ctx, func(err error) {
w.WriteHeader(500)
w.Write(queryparser.InternalQuesmaError("Unknown Quesma error"))
})

quesmaRequest, ctx, err := r.preprocessRequest(ctx, &mux.Request{
quesmaRequest, ctx, err := r.preprocessRequest(ctx, &quesma_api.Request{
Method: req.Method,
Path: strings.TrimSuffix(req.URL.Path, "/"),
Params: map[string]string{},
Expand All @@ -316,7 +316,7 @@ func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.R
}

if handler != nil {
quesmaResponse, err := recordRequestToClickhouse(req.URL.Path, r.quesmaManagementConsole, func() (*mux.Result, error) {
quesmaResponse, err := recordRequestToClickhouse(req.URL.Path, r.quesmaManagementConsole, func() (*quesma_api.Result, error) {
return handler(ctx, quesmaRequest)
})

Expand Down Expand Up @@ -367,7 +367,7 @@ func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.R
}

for _, connector := range decision.UseConnectors {
if _, ok := connector.(*mux.ConnectorDecisionElastic); ok {
if _, ok := connector.(*quesma_api.ConnectorDecisionElastic); ok {
// this is desired elastic call
sendToElastic = true
break
Expand Down Expand Up @@ -400,7 +400,7 @@ func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.R
}
}

func (r *router) preprocessRequest(ctx context.Context, quesmaRequest *mux.Request) (*mux.Request, context.Context, error) {
func (r *router) preprocessRequest(ctx context.Context, quesmaRequest *quesma_api.Request) (*quesma_api.Request, context.Context, error) {
var err error
var processedRequest = quesmaRequest
for _, preprocessor := range r.requestPreprocessors {
Expand Down Expand Up @@ -474,7 +474,7 @@ func isIngest(path string) bool {
return strings.HasSuffix(path, routes.BulkPath) // We may add more methods in future such as `_put` or `_create`
}

func recordRequestToClickhouse(path string, qmc *ui.QuesmaManagementConsole, requestFunc func() (*mux.Result, error)) (*mux.Result, error) {
func recordRequestToClickhouse(path string, qmc *ui.QuesmaManagementConsole, requestFunc func() (*quesma_api.Result, error)) (*quesma_api.Result, error) {
statName := ui.RequestStatisticKibana2Clickhouse
if isIngest(path) {
statName = ui.RequestStatisticIngest2Clickhouse
Expand Down
37 changes: 19 additions & 18 deletions quesma/quesma/dual_write_proxy_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,10 @@ import (
"quesma/schema"
"quesma/table_resolver"
"quesma/telemetry"
"quesma/tracing"
"quesma/util"
"quesma_v2/core/mux"
quesma_api "quesma_v2/core"
tracing "quesma_v2/core/tracing"

"quesma_v2/core/routes"
"strconv"
"strings"
Expand Down Expand Up @@ -102,9 +103,9 @@ func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *cli
Transport: tr,
Timeout: time.Minute, // should be more configurable, 30s is Kibana default timeout
}
routerInstance := routerV2{phoneHomeAgent: agent, config: config, quesmaManagementConsole: quesmaManagementConsole, httpClient: client, requestPreprocessors: processorChain{}}
routerInstance := routerV2{phoneHomeAgent: agent, config: config, quesmaManagementConsole: quesmaManagementConsole, httpClient: client, requestPreprocessors: quesma_api.ProcessorChain{}}
routerInstance.
registerPreprocessor(NewTraceIdPreprocessor())
registerPreprocessor(quesma_api.NewTraceIdPreprocessor())
agent.FailedRequestsCollector(func() int64 {
return routerInstance.failedRequests.Load()
})
Expand Down Expand Up @@ -181,7 +182,7 @@ func responseFromElasticV2(ctx context.Context, elkResponse *http.Response, w ht
elkResponse.Body.Close()
}

func responseFromQuesmaV2(ctx context.Context, unzipped []byte, w http.ResponseWriter, quesmaResponse *mux.Result, zip bool) {
func responseFromQuesmaV2(ctx context.Context, unzipped []byte, w http.ResponseWriter, quesmaResponse *quesma_api.Result, zip bool) {
id := ctx.Value(tracing.RequestIdCtxKey).(string)
logger.Debug().Str(logger.RID, id).Msg("responding from Quesma")

Expand All @@ -206,14 +207,14 @@ func responseFromQuesmaV2(ctx context.Context, unzipped []byte, w http.ResponseW

type routerV2 struct {
config *config.QuesmaConfiguration
requestPreprocessors processorChain
requestPreprocessors quesma_api.ProcessorChain
quesmaManagementConsole *ui.QuesmaManagementConsole
phoneHomeAgent telemetry.PhoneHomeAgent
httpClient *http.Client
failedRequests atomic.Int64
}

func (r *routerV2) registerPreprocessor(preprocessor RequestPreprocessor) {
func (r *routerV2) registerPreprocessor(preprocessor quesma_api.RequestPreprocessor) {
r.requestPreprocessors = append(r.requestPreprocessors, preprocessor)
}

Expand All @@ -222,7 +223,7 @@ func (r *routerV2) errorResponseV2(ctx context.Context, err error, w http.Respon

msg := "Internal Quesma Error.\nPlease contact support if the problem persists."
reason := "Failed request."
result := mux.ServerErrorResult()
result := quesma_api.ServerErrorResult()

// if error is an error with user-friendly message, we should use it
var endUserError *end_user_errors.EndUserError
Expand All @@ -232,7 +233,7 @@ func (r *routerV2) errorResponseV2(ctx context.Context, err error, w http.Respon

// we treat all `Q1xxx` errors as bad requests here
if endUserError.ErrorType().Number < 2000 {
result = mux.BadReqeustResult()
result = quesma_api.BadReqeustResult()
}
}

Expand Down Expand Up @@ -275,7 +276,7 @@ func (*routerV2) closedIndexResponse(ctx context.Context, w http.ResponseWriter,

}

func (r *routerV2) elasticFallback(decision *mux.Decision,
func (r *routerV2) elasticFallback(decision *quesma_api.Decision,
ctx context.Context, w http.ResponseWriter,
req *http.Request, reqBody []byte, logManager *clickhouse.LogManager) {

Expand Down Expand Up @@ -306,7 +307,7 @@ func (r *routerV2) elasticFallback(decision *mux.Decision,
}

for _, connector := range decision.UseConnectors {
if _, ok := connector.(*mux.ConnectorDecisionElastic); ok {
if _, ok := connector.(*quesma_api.ConnectorDecisionElastic); ok {
// this is desired elastic call
sendToElastic = true
break
Expand Down Expand Up @@ -338,13 +339,13 @@ func (r *routerV2) elasticFallback(decision *mux.Decision,
}
}

func (r *routerV2) reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, searchRouter *mux.PathRouter, ingestRouter *mux.PathRouter, logManager *clickhouse.LogManager) {
func (r *routerV2) reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, searchRouter *quesma_api.PathRouter, ingestRouter *quesma_api.PathRouter, logManager *clickhouse.LogManager) {
defer recovery.LogAndHandlePanic(ctx, func(err error) {
w.WriteHeader(500)
w.Write(queryparser.InternalQuesmaError("Unknown Quesma error"))
})

quesmaRequest, ctx, err := r.preprocessRequest(ctx, &mux.Request{
quesmaRequest, ctx, err := r.preprocessRequest(ctx, &quesma_api.Request{
Method: req.Method,
Path: strings.TrimSuffix(req.URL.Path, "/"),
Params: map[string]string{},
Expand All @@ -358,8 +359,8 @@ func (r *routerV2) reroute(ctx context.Context, w http.ResponseWriter, req *http
}

quesmaRequest.ParsedBody = types.ParseRequestBody(quesmaRequest.Body)
var handler mux.Handler
var decision *mux.Decision
var handler quesma_api.Handler
var decision *quesma_api.Decision
searchHandler, searchDecision := searchRouter.Matches(quesmaRequest)
if searchDecision != nil {
decision = searchDecision
Expand All @@ -381,7 +382,7 @@ func (r *routerV2) reroute(ctx context.Context, w http.ResponseWriter, req *http
}

if handler != nil {
quesmaResponse, err := recordRequestToClickhouseV2(req.URL.Path, r.quesmaManagementConsole, func() (*mux.Result, error) {
quesmaResponse, err := recordRequestToClickhouseV2(req.URL.Path, r.quesmaManagementConsole, func() (*quesma_api.Result, error) {
return handler(ctx, quesmaRequest)
})

Expand All @@ -408,7 +409,7 @@ func (r *routerV2) reroute(ctx context.Context, w http.ResponseWriter, req *http
}
}

func (r *routerV2) preprocessRequest(ctx context.Context, quesmaRequest *mux.Request) (*mux.Request, context.Context, error) {
func (r *routerV2) preprocessRequest(ctx context.Context, quesmaRequest *quesma_api.Request) (*quesma_api.Request, context.Context, error) {
var err error
var processedRequest = quesmaRequest
for _, preprocessor := range r.requestPreprocessors {
Expand Down Expand Up @@ -482,7 +483,7 @@ func isIngestV2(path string) bool {
return strings.HasSuffix(path, routes.BulkPath) // We may add more methods in future such as `_put` or `_create`
}

func recordRequestToClickhouseV2(path string, qmc *ui.QuesmaManagementConsole, requestFunc func() (*mux.Result, error)) (*mux.Result, error) {
func recordRequestToClickhouseV2(path string, qmc *ui.QuesmaManagementConsole, requestFunc func() (*quesma_api.Result, error)) (*quesma_api.Result, error) {
statName := ui.RequestStatisticKibana2Clickhouse
if isIngestV2(path) {
statName = ui.RequestStatisticIngest2Clickhouse
Expand Down
10 changes: 5 additions & 5 deletions quesma/quesma/elastic_http_frontend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,22 +9,22 @@ import (
"quesma/frontend_connectors"
"quesma/quesma/recovery"
"quesma/telemetry"
"quesma_v2/core/mux"
"quesma_v2/core"
)

type ElasticHttpFrontendConnector struct {
*frontend_connectors.BasicHTTPFrontendConnector
routerInstance *routerV2
searchRouter *mux.PathRouter
ingestRouter *mux.PathRouter
searchRouter *quesma_api.PathRouter
ingestRouter *quesma_api.PathRouter
logManager *clickhouse.LogManager
agent telemetry.PhoneHomeAgent
}

func NewElasticHttpFrontendConnector(endpoint string,
routerInstance *routerV2,
searchRouter *mux.PathRouter,
ingestRouter *mux.PathRouter,
searchRouter *quesma_api.PathRouter,
ingestRouter *quesma_api.PathRouter,
logManager *clickhouse.LogManager,
agent telemetry.PhoneHomeAgent) *ElasticHttpFrontendConnector {
return &ElasticHttpFrontendConnector{
Expand Down
Loading

0 comments on commit 7e4022a

Please sign in to comment.