Skip to content

Commit

Permalink
Store state directly in quesma obj
Browse files Browse the repository at this point in the history
  • Loading branch information
pdelewski committed Dec 27, 2024
1 parent 2ef3e94 commit 63b83f8
Show file tree
Hide file tree
Showing 6 changed files with 35 additions and 41 deletions.
8 changes: 0 additions & 8 deletions quesma/frontend_connectors/basic_http_frontend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,11 +183,3 @@ func (h *BasicHTTPFrontendConnector) GetRouterInstance() *RouterV2 {
func (h *BasicHTTPFrontendConnector) AddMiddleware(middleware http.Handler) {
h.middlewares = append(h.middlewares, middleware)
}

func (h *BasicHTTPFrontendConnector) SetConnector(connector quesma_api.FrontendConnector) {
h.connector = connector
}

func (h *BasicHTTPFrontendConnector) Connector() quesma_api.FrontendConnector {
return h.connector
}
16 changes: 10 additions & 6 deletions quesma/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,22 +142,24 @@ func fallbackScenario() quesma_api.QuesmaBuilder {
func Test_fallbackScenario(t *testing.T) {
qBuilder := fallbackScenario()
q1, _ := qBuilder.Build()
q1.Start()
ctx := context.Background()
q1.Start(ctx)
stop := make(chan os.Signal, 1)
emitRequests(stop)
<-stop
q1.Stop(context.Background())
q1.Stop(ctx)
atomic.LoadInt32(&fallbackCalled)
assert.Equal(t, int32(4), fallbackCalled)
}

func Test_scenario1(t *testing.T) {
q1 := ab_testing_scenario()
q1.Start()
ctx := context.Background()
q1.Start(ctx)
stop := make(chan os.Signal, 1)
emitRequests(stop)
<-stop
q1.Stop(context.Background())
q1.Stop(ctx)
}

var middlewareCallCount int32 = 0
Expand Down Expand Up @@ -213,7 +215,8 @@ func Test_middleware(t *testing.T) {
{
quesmaBuilder := createMiddleWareScenario(true, cfg)
quesmaBuilder.Build()
quesmaBuilder.Start()
ctx := context.Background()
quesmaBuilder.Start(ctx)
stop := make(chan os.Signal, 1)
emitRequests(stop)
<-stop
Expand All @@ -225,7 +228,8 @@ func Test_middleware(t *testing.T) {
{
quesmaBuilder := createMiddleWareScenario(false, cfg)
quesmaBuilder.Build()
quesmaBuilder.Start()
ctx := context.Background()
quesmaBuilder.Start(ctx)
stop := make(chan os.Signal, 1)
emitRequests(stop)
<-stop
Expand Down
2 changes: 1 addition & 1 deletion quesma/quesma/dual_write_proxy_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,5 +141,5 @@ func (q *dualWriteHttpProxyV2) Ingest() {
q.logManager.Start()
q.indexManagement.Start()
go q.asyncQueriesEvictor.AsyncQueriesGC()
q.quesmaV2.Start()
q.quesmaV2.Start(context.Background())
}
8 changes: 2 additions & 6 deletions quesma/v2/core/quesma_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ type Router interface {
// They can be shared between multiple pipelines
type FrontendConnector interface {
InstanceNamer
// SetConnector sets the connector
SetConnector(listener FrontendConnector)
// Connector returns the connector
Connector() FrontendConnector
// Listen starts listening on the endpoint
Listen() error // Start listening on the endpoint
GetEndpoint() string
Expand Down Expand Up @@ -71,14 +67,14 @@ type PipelineBuilder interface {
GetBackendConnectors() map[BackendConnectorType]BackendConnector
CompoundProcessor
Build() PipelineBuilder
Start()
Start(ctx context.Context)
}

type QuesmaBuilder interface {
AddPipeline(pipeline PipelineBuilder)
GetPipelines() []PipelineBuilder
Build() (QuesmaBuilder, error)
Start()
Start(ctx context.Context)
Stop(ctx context.Context)
}

Expand Down
23 changes: 14 additions & 9 deletions quesma/v2/core/quesma_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@ import (
)

type Quesma struct {
pipelines []PipelineBuilder
dependencies Dependencies
pipelines []PipelineBuilder
dependencies Dependencies
activeFrontendConnectors []FrontendConnector
}

func NewQuesma(deps Dependencies) *Quesma {
Expand Down Expand Up @@ -39,10 +40,16 @@ func (quesma *Quesma) GetPipelines() []PipelineBuilder {
return quesma.pipelines
}

func (quesma *Quesma) Start() {
func (quesma *Quesma) Start(ctx context.Context) {
activeFrontendConnectors := make([]FrontendConnector, 0)
for _, fc := range quesma.activeFrontendConnectors {
activeFrontendConnectors = append(activeFrontendConnectors, fc)
}
for _, pipeline := range quesma.pipelines {
newCtx := context.WithValue(context.Background(), "activeFrontendConnectors", activeFrontendConnectors)
quesma.dependencies.Logger().Info().Msgf("Starting pipeline %v", pipeline)
pipeline.Start()
pipeline.Start(newCtx)
activeFrontendConnectors = make([]FrontendConnector, 0)
}
}

Expand Down Expand Up @@ -85,11 +92,9 @@ func (quesma *Quesma) buildInternal() (QuesmaBuilder, error) {
}
}
}
for pipelineIndex, _ := range quesma.pipelines {
for frontendConnectorIndex := range quesma.pipelines[pipelineIndex].GetFrontendConnectors() {
quesma.pipelines[pipelineIndex].GetFrontendConnectors()[frontendConnectorIndex].SetConnector(quesma.pipelines[0].GetFrontendConnectors()[0])
}
}
quesma.activeFrontendConnectors = make([]FrontendConnector, 0)
quesma.activeFrontendConnectors = append(quesma.activeFrontendConnectors, quesma.pipelines[0].GetFrontendConnectors()[0])

}

for _, pipeline := range quesma.pipelines {
Expand Down
19 changes: 8 additions & 11 deletions quesma/v2/core/quesma_pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@
// SPDX-License-Identifier: Elastic-2.0
package quesma_api

import "fmt"
import (
"context"
"fmt"
)

type Pipeline struct {
FrontendConnectors []FrontendConnector
Expand Down Expand Up @@ -61,18 +64,12 @@ func (p *Pipeline) Build() PipelineBuilder {
return p
}

func (p *Pipeline) Start() {
// TODO connectors for the same endpoint should be sharing the same listener
// This is a temporary solution to start all connectors
// some of them will fail to start
// because the port is already in use
// This works well from application perspective
// because we are copying routing table from all connectors
// however, bind error remains
for _, conn := range p.FrontendConnectors {
func (p *Pipeline) Start(ctx context.Context) {
activeFrontendConnectors := ctx.Value("activeFrontendConnectors").([]FrontendConnector)
for _, conn := range activeFrontendConnectors {
p.logger.Info().Msgf("Starting frontend connector %s", conn)
go func() {
err := conn.Connector().Listen()
err := conn.Listen()
if err != nil {
p.logger.Error().Err(err).Msgf("Failed to start frontend connector %s", conn)
}
Expand Down

0 comments on commit 63b83f8

Please sign in to comment.