Skip to content

Commit

Permalink
PathRouter has to be moved up to v2 (#1065)
Browse files Browse the repository at this point in the history
In order to use it in v2 `quesma_api` I had to move it up to v2
  • Loading branch information
pdelewski authored Dec 5, 2024
1 parent 0923f81 commit 0ba6514
Show file tree
Hide file tree
Showing 33 changed files with 301 additions and 491 deletions.
2 changes: 1 addition & 1 deletion quesma/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ require (
github.com/stretchr/testify v1.10.0
github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a
github.com/tidwall/sjson v1.2.5
github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb
golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f
golang.org/x/oauth2 v0.24.0
quesma_v2 v0.0.0-00010101000000-000000000000
Expand All @@ -57,6 +56,7 @@ require (
github.com/tidwall/gjson v1.18.0 // indirect
github.com/tidwall/match v1.1.1 // indirect
github.com/tidwall/pretty v1.2.1 // indirect
github.com/ucarion/urlpath v0.0.0-20200424170820-7ccc79b76bbb // indirect
golang.org/x/crypto v0.28.0 // indirect
golang.org/x/text v0.19.0 // indirect
)
Expand Down
8 changes: 4 additions & 4 deletions quesma/ingest/common_table_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@ import (
"github.com/stretchr/testify/assert"
"quesma/clickhouse"
"quesma/common_table"
"quesma/frontend_connectors"
"quesma/jsonprocessor"
"quesma/persistence"
"quesma/quesma/config"
"quesma/quesma/types"
"quesma/schema"
"quesma/table_resolver"
"quesma_v2/core/mux"
"testing"
)

Expand Down Expand Up @@ -192,9 +192,9 @@ func TestIngestToCommonTable(t *testing.T) {

resolver := table_resolver.NewEmptyTableResolver()

decision := &frontend_connectors.Decision{
UseConnectors: []frontend_connectors.ConnectorDecision{
&frontend_connectors.ConnectorDecisionClickhouse{
decision := &mux.Decision{
UseConnectors: []mux.ConnectorDecision{
&mux.ConnectorDecisionClickhouse{
ClickhouseTableName: common_table.TableName,
ClickhouseTables: []string{indexName},
IsCommonTable: true,
Expand Down
6 changes: 3 additions & 3 deletions quesma/ingest/ingest_validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,11 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/assert"
"quesma/clickhouse"
"quesma/frontend_connectors"
"quesma/quesma/config"
"quesma/quesma/types"
"quesma/table_resolver"
"quesma/util"
"quesma_v2/core/mux"
"strings"
"testing"
)
Expand Down Expand Up @@ -172,8 +172,8 @@ func TestIngestValidation(t *testing.T) {
ip.tableDiscovery = clickhouse.NewTableDiscoveryWith(&config.QuesmaConfiguration{}, nil, *tableMap)

resolver := table_resolver.NewEmptyTableResolver()
decision := &frontend_connectors.Decision{
UseConnectors: []frontend_connectors.ConnectorDecision{&frontend_connectors.ConnectorDecisionClickhouse{
decision := &mux.Decision{
UseConnectors: []mux.ConnectorDecision{&mux.ConnectorDecisionClickhouse{
ClickhouseTableName: "test_table",
}}}
resolver.Decisions["test_table"] = decision
Expand Down
10 changes: 5 additions & 5 deletions quesma/ingest/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
"github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/assert"
"quesma/clickhouse"
"quesma/frontend_connectors"
"quesma/jsonprocessor"
"quesma/persistence"
"quesma/quesma/config"
"quesma/quesma/types"
"quesma/schema"
"quesma/table_resolver"
"quesma/util"
"quesma_v2/core/mux"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -240,8 +240,8 @@ func TestProcessInsertQuery(t *testing.T) {
db, mock := util.InitSqlMockWithPrettyPrint(t, true)
ip.ip.chDb = db
resolver := table_resolver.NewEmptyTableResolver()
decision := &frontend_connectors.Decision{
UseConnectors: []frontend_connectors.ConnectorDecision{&frontend_connectors.ConnectorDecisionClickhouse{
decision := &mux.Decision{
UseConnectors: []mux.ConnectorDecision{&mux.ConnectorDecisionClickhouse{
ClickhouseTableName: "test_table",
}}}
resolver.Decisions["test_table"] = decision
Expand Down Expand Up @@ -425,8 +425,8 @@ func TestCreateTableIfSomeFieldsExistsInSchemaAlready(t *testing.T) {
schemaRegistry.Tables[schema.TableName(indexName)] = indexSchema

resolver := table_resolver.NewEmptyTableResolver()
decision := &frontend_connectors.Decision{
UseConnectors: []frontend_connectors.ConnectorDecision{&frontend_connectors.ConnectorDecisionClickhouse{
decision := &mux.Decision{
UseConnectors: []mux.ConnectorDecision{&mux.ConnectorDecisionClickhouse{
ClickhouseTableName: "test_index",
}}}
resolver.Decisions["test_index"] = decision
Expand Down
8 changes: 4 additions & 4 deletions quesma/ingest/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"quesma/comment_metadata"
"quesma/common_table"
"quesma/end_user_errors"
"quesma/frontend_connectors"
"quesma/jsonprocessor"
"quesma/logger"
"quesma/model"
Expand All @@ -25,6 +24,7 @@ import (
"quesma/table_resolver"
"quesma/telemetry"
"quesma/util"
"quesma_v2/core/mux"
"slices"
"sort"
"strings"
Expand Down Expand Up @@ -696,7 +696,7 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str
jsonData []types.JSON, transformer jsonprocessor.IngestTransformer,
tableFormatter TableColumNameFormatter) error {

decision := lm.tableResolver.Resolve(frontend_connectors.IngestPipeline, tableName)
decision := lm.tableResolver.Resolve(mux.IngestPipeline, tableName)

if decision.Err != nil {
return decision.Err
Expand All @@ -712,10 +712,10 @@ func (lm *IngestProcessor) ProcessInsertQuery(ctx context.Context, tableName str

for _, connectorDecision := range decision.UseConnectors {

var clickhouseDecision *frontend_connectors.ConnectorDecisionClickhouse
var clickhouseDecision *mux.ConnectorDecisionClickhouse

var ok bool
if clickhouseDecision, ok = connectorDecision.(*frontend_connectors.ConnectorDecisionClickhouse); !ok {
if clickhouseDecision, ok = connectorDecision.(*mux.ConnectorDecisionClickhouse); !ok {
continue
}

Expand Down
8 changes: 3 additions & 5 deletions quesma/quesma/dual_write_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import (
"quesma/elasticsearch"
"quesma/end_user_errors"
"quesma/feature"
"quesma/frontend_connectors"
"quesma/frontend_connectors/mux"
"quesma/frontend_connectors/routes"
"quesma/ingest"
"quesma/logger"
"quesma/queryparser"
Expand All @@ -32,6 +29,8 @@ import (
"quesma/telemetry"
"quesma/tracing"
"quesma/util"
"quesma_v2/core/mux"
"quesma_v2/core/routes"
"strconv"
"strings"
"sync/atomic"
Expand All @@ -54,7 +53,6 @@ func newSimultaneousClientsLimiter(handler http.Handler, limit int64) *simultane
}

func (c *simultaneousClientsLimiter) ServeHTTP(w http.ResponseWriter, r *http.Request) {

current := c.counter.Load()
// this is hard limit, we should not allow to go over it
if current >= c.limit {
Expand Down Expand Up @@ -369,7 +367,7 @@ func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.R
}

for _, connector := range decision.UseConnectors {
if _, ok := connector.(*frontend_connectors.ConnectorDecisionElastic); ok {
if _, ok := connector.(*mux.ConnectorDecisionElastic); ok {
// this is desired elastic call
sendToElastic = true
break
Expand Down
11 changes: 5 additions & 6 deletions quesma/quesma/dual_write_proxy_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,6 @@ import (
"quesma/elasticsearch"
"quesma/end_user_errors"
"quesma/feature"
"quesma/frontend_connectors"
"quesma/frontend_connectors/mux"
"quesma/frontend_connectors/routes"
"quesma/ingest"
"quesma/logger"
"quesma/queryparser"
Expand All @@ -32,6 +29,8 @@ import (
"quesma/telemetry"
"quesma/tracing"
"quesma/util"
"quesma_v2/core/mux"
"quesma_v2/core/routes"
"strconv"
"strings"
"sync/atomic"
Expand Down Expand Up @@ -276,7 +275,7 @@ func (*routerV2) closedIndexResponse(ctx context.Context, w http.ResponseWriter,

}

func (r *routerV2) elasticFallback(decision *frontend_connectors.Decision,
func (r *routerV2) elasticFallback(decision *mux.Decision,
ctx context.Context, w http.ResponseWriter,
req *http.Request, reqBody []byte, logManager *clickhouse.LogManager) {

Expand Down Expand Up @@ -307,7 +306,7 @@ func (r *routerV2) elasticFallback(decision *frontend_connectors.Decision,
}

for _, connector := range decision.UseConnectors {
if _, ok := connector.(*frontend_connectors.ConnectorDecisionElastic); ok {
if _, ok := connector.(*mux.ConnectorDecisionElastic); ok {
// this is desired elastic call
sendToElastic = true
break
Expand Down Expand Up @@ -360,7 +359,7 @@ func (r *routerV2) reroute(ctx context.Context, w http.ResponseWriter, req *http

quesmaRequest.ParsedBody = types.ParseRequestBody(quesmaRequest.Body)
var handler mux.Handler
var decision *frontend_connectors.Decision
var decision *mux.Decision
searchHandler, searchDecision := searchRouter.Matches(quesmaRequest)
if searchDecision != nil {
decision = searchDecision
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/elastic_http_frontend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@ import (
"net/http"
"quesma/clickhouse"
"quesma/frontend_connectors"
"quesma/frontend_connectors/mux"
"quesma/quesma/recovery"
"quesma/telemetry"
"quesma_v2/core/mux"
)

type ElasticHttpFrontendConnector struct {
Expand Down
8 changes: 4 additions & 4 deletions quesma/quesma/functionality/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"quesma/clickhouse"
"quesma/elasticsearch"
"quesma/end_user_errors"
"quesma/frontend_connectors"
"quesma/ingest"
"quesma/logger"
"quesma/queryparser"
Expand All @@ -21,6 +20,7 @@ import (
"quesma/stats"
"quesma/table_resolver"
"quesma/telemetry"
"quesma_v2/core/mux"
"sort"
"strings"
"sync"
Expand Down Expand Up @@ -139,7 +139,7 @@ func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bul
}
}

decision := tableResolver.Resolve(frontend_connectors.IngestPipeline, index)
decision := tableResolver.Resolve(mux.IngestPipeline, index)

if decision.Err != nil {
return decision.Err
Expand Down Expand Up @@ -181,7 +181,7 @@ func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bul

switch connector.(type) {

case *frontend_connectors.ConnectorDecisionElastic:
case *mux.ConnectorDecisionElastic:
// Bulk entry for Elastic - forward the request as-is
opBytes, err := rawOp.Bytes()
if err != nil {
Expand All @@ -199,7 +199,7 @@ func splitBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, bul

elasticBulkEntries = append(elasticBulkEntries, entryWithResponse)

case *frontend_connectors.ConnectorDecisionClickhouse:
case *mux.ConnectorDecisionClickhouse:

// Bulk entry for Clickhouse
if operation != "create" && operation != "index" {
Expand Down
15 changes: 7 additions & 8 deletions quesma/quesma/matchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,12 @@
package quesma

import (
"quesma/frontend_connectors"
"quesma/frontend_connectors/mux"
"quesma/logger"
"quesma/quesma/config"
"quesma/quesma/types"
"quesma/table_resolver"
"quesma/tracing"
"quesma_v2/core/mux"
"strings"
)

Expand All @@ -34,15 +33,15 @@ func matchedAgainstBulkBody(configuration *config.QuesmaConfiguration, tableReso
if idx%2 == 0 {
name := extractIndexName(s)

decision := tableResolver.Resolve(frontend_connectors.IngestPipeline, name)
decision := tableResolver.Resolve(mux.IngestPipeline, name)

if decision.IsClosed {
return mux.MatchResult{Matched: true, Decision: decision}
}

// if have any enabled Clickhouse connector, then return true
for _, connector := range decision.UseConnectors {
if _, ok := connector.(*frontend_connectors.ConnectorDecisionClickhouse); ok {
if _, ok := connector.(*mux.ConnectorDecisionClickhouse); ok {
return mux.MatchResult{Matched: true, Decision: decision}
}
}
Expand All @@ -57,7 +56,7 @@ func matchedAgainstBulkBody(configuration *config.QuesmaConfiguration, tableReso

// Query path only (looks at QueryTarget)
func matchedAgainstPattern(indexRegistry table_resolver.TableResolver) mux.RequestMatcher {
return matchAgainstTableResolver(indexRegistry, frontend_connectors.QueryPipeline)
return matchAgainstTableResolver(indexRegistry, mux.QueryPipeline)
}

// check whether exact index name is enabled
Expand All @@ -71,7 +70,7 @@ func matchAgainstTableResolver(indexRegistry table_resolver.TableResolver, pipel
return mux.MatchResult{Matched: false, Decision: decision}
}
for _, connector := range decision.UseConnectors {
if _, ok := connector.(*frontend_connectors.ConnectorDecisionClickhouse); ok {
if _, ok := connector.(*mux.ConnectorDecisionClickhouse); ok {
return mux.MatchResult{Matched: true, Decision: decision}
}
}
Expand All @@ -80,11 +79,11 @@ func matchAgainstTableResolver(indexRegistry table_resolver.TableResolver, pipel
}

func matchedExactQueryPath(indexRegistry table_resolver.TableResolver) mux.RequestMatcher {
return matchAgainstTableResolver(indexRegistry, frontend_connectors.QueryPipeline)
return matchAgainstTableResolver(indexRegistry, mux.QueryPipeline)
}

func matchedExactIngestPath(indexRegistry table_resolver.TableResolver) mux.RequestMatcher {
return matchAgainstTableResolver(indexRegistry, frontend_connectors.IngestPipeline)
return matchAgainstTableResolver(indexRegistry, mux.IngestPipeline)
}

// Returns false if the body contains a Kibana internal search.
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/matchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ package quesma

import (
"github.com/stretchr/testify/assert"
"quesma/frontend_connectors/mux"
"quesma/quesma/types"
"quesma_v2/core/mux"
"testing"
)

Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/processors.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ package quesma

import (
"context"
"quesma/frontend_connectors/mux"
"quesma/tracing"
"quesma_v2/core/mux"
)

type (
Expand Down
Loading

0 comments on commit 0ba6514

Please sign in to comment.