Skip to content

Commit

Permalink
Dependency injection improvements (#1118)
Browse files Browse the repository at this point in the history
This PR:
- simplifies the way we inject dependencies. There is only one interface
to inject `Dependencies`
-  dependency injection is based on the component tree
- a component tree is built using the dedicated interface
`ChildComponentProvider`
  • Loading branch information
nablaone authored Dec 17, 2024
1 parent 19b16c9 commit 224d57d
Show file tree
Hide file tree
Showing 13 changed files with 234 additions and 129 deletions.
24 changes: 17 additions & 7 deletions quesma/frontend_connectors/basic_http_frontend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,26 @@ type BasicHTTPFrontendConnector struct {
registry schema.Registry
config *config.QuesmaConfiguration

diagnostic diag.Diagnostic
phoneHomeClient diag.PhoneHomeClient
debugInfoCollector diag.DebugInfoCollector
}

func (h *BasicHTTPFrontendConnector) InjectDiagnostic(diagnostic diag.Diagnostic) {
func (h *BasicHTTPFrontendConnector) GetChildComponents() []interface{} {
components := make([]interface{}, 0)

h.diagnostic = diagnostic
if h.router != nil {
components = append(components, h.router)
}

// TODO this is a hack
if h.routerInstance != nil {
h.routerInstance.InjectDiagnostic(diagnostic)
components = append(components, h.routerInstance)
}
return components
}

func (h *BasicHTTPFrontendConnector) SetDependencies(deps quesma_api.Dependencies) {
h.phoneHomeClient = deps.PhoneHomeAgent()
h.debugInfoCollector = deps.DebugInfoCollector()
}

func NewBasicHTTPFrontendConnector(endpoint string, config *config.QuesmaConfiguration) *BasicHTTPFrontendConnector {
Expand Down Expand Up @@ -70,9 +79,10 @@ func (h *BasicHTTPFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.
}

ua := req.Header.Get("User-Agent")
if h.diagnostic.PhoneHomeAgent() != nil {
h.diagnostic.PhoneHomeAgent().UserAgentCounters().Add(ua, 1)
if h.phoneHomeClient != nil {
h.phoneHomeClient.UserAgentCounters().Add(ua, 1)
}

h.routerInstance.Reroute(req.Context(), w, req, reqBody, h.router, h.logManager, h.registry)
}

Expand Down
14 changes: 8 additions & 6 deletions quesma/frontend_connectors/router_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,11 +81,13 @@ type RouterV2 struct {
HttpClient *http.Client
FailedRequests atomic.Int64

diagnostic diag.Diagnostic
debugInfoCollector diag.DebugInfoCollector
phoneHomeAgent diag.PhoneHomeClient
}

func (r *RouterV2) InjectDiagnostic(s diag.Diagnostic) {
r.diagnostic = s
func (r *RouterV2) SetDependencies(deps quesma_api.Dependencies) {
r.debugInfoCollector = deps.DebugInfoCollector()
r.phoneHomeAgent = deps.PhoneHomeAgent()
}
func NewRouterV2(config *config.QuesmaConfiguration) *RouterV2 {
tr := &http.Transport{
Expand Down Expand Up @@ -267,7 +269,7 @@ func (r *RouterV2) Reroute(ctx context.Context, w http.ResponseWriter, req *http
}
dispatcher := &quesma_api.Dispatcher{}
if handlersPipe != nil {
quesmaResponse, err := recordRequestToClickhouseV2(req.URL.Path, r.diagnostic.DebugInfoCollector(), func() (*quesma_api.Result, error) {
quesmaResponse, err := recordRequestToClickhouseV2(req.URL.Path, r.debugInfoCollector, func() (*quesma_api.Result, error) {
var result *quesma_api.Result
result, err = handlersPipe.Handler(ctx, quesmaRequest, w)

Expand Down Expand Up @@ -357,11 +359,11 @@ func (r *RouterV2) sendHttpRequestToElastic(ctx context.Context, req *http.Reque
}

go func() {
elkResponseChan <- recordRequestToElasticV2(req.URL.Path, r.diagnostic.DebugInfoCollector(), func() elasticResultV2 {
elkResponseChan <- recordRequestToElasticV2(req.URL.Path, r.debugInfoCollector, func() elasticResultV2 {

isWrite := elasticsearch.IsWriteRequest(req)

phoneHome := r.diagnostic.PhoneHomeAgent()
phoneHome := r.phoneHomeAgent

var span diag.Span
if isManagement {
Expand Down
6 changes: 3 additions & 3 deletions quesma/quesma/dual_write_proxy_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ func (q *dualWriteHttpProxyV2) Stop(ctx context.Context) {
q.Close(ctx)
}

func newDualWriteProxyV2(dependencies *quesma_api.Dependencies, schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, registry schema.Registry, config *config.QuesmaConfiguration, ingestProcessor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxyV2 {
func newDualWriteProxyV2(dependencies quesma_api.Dependencies, schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, registry schema.Registry, config *config.QuesmaConfiguration, ingestProcessor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxyV2 {

queryProcessor := NewQueryRunner(logManager, config, indexManager, dependencies.Diagnostic.DebugInfoCollector(), registry, abResultsRepository, resolver, schemaLoader)
queryProcessor := NewQueryRunner(logManager, config, indexManager, dependencies.DebugInfoCollector(), registry, abResultsRepository, resolver, schemaLoader)

// not sure how we should configure our query translator ???
// is this a config option??
Expand All @@ -82,7 +82,7 @@ func newDualWriteProxyV2(dependencies *quesma_api.Dependencies, schemaLoader cli

routerInstance := frontend_connectors.NewRouterV2(config)

dependencies.Diagnostic.PhoneHomeAgent().FailedRequestsCollector(func() int64 {
dependencies.PhoneHomeAgent().FailedRequestsCollector(func() int64 {

return routerInstance.FailedRequests.Load()
})
Expand Down
38 changes: 27 additions & 11 deletions quesma/quesma/elastic_http_frontend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,10 @@ import (

type ElasticHttpIngestFrontendConnector struct {
*frontend_connectors.BasicHTTPFrontendConnector
Config *config.QuesmaConfiguration
diagnostic diag.Diagnostic

Config *config.QuesmaConfiguration

phoneHomeClient diag.PhoneHomeClient
}

func NewElasticHttpIngestFrontendConnector(endpoint string,
Expand All @@ -39,15 +41,23 @@ func NewElasticHttpIngestFrontendConnector(endpoint string,
return fc
}

func (h *ElasticHttpIngestFrontendConnector) InjectDiagnostic(diagnostic diag.Diagnostic) {
h.diagnostic = diagnostic
// TODO this is a hack
h.BasicHTTPFrontendConnector.InjectDiagnostic(diagnostic)
func (h *ElasticHttpIngestFrontendConnector) GetChildComponents() []interface{} {
components := make([]interface{}, 0)
if h.BasicHTTPFrontendConnector != nil {
components = append(components, h.BasicHTTPFrontendConnector)
}

return components
}

func (h *ElasticHttpIngestFrontendConnector) SetDependencies(deps quesma_api.Dependencies) {
h.phoneHomeClient = deps.PhoneHomeAgent()
}

type ElasticHttpQueryFrontendConnector struct {
*frontend_connectors.BasicHTTPFrontendConnector
diagnostic diag.Diagnostic

phoneHomeClient diag.PhoneHomeClient
}

func NewElasticHttpQueryFrontendConnector(endpoint string,
Expand All @@ -67,8 +77,14 @@ func NewElasticHttpQueryFrontendConnector(endpoint string,
return fc
}

func (h *ElasticHttpQueryFrontendConnector) InjectDiagnostic(diagnostic diag.Diagnostic) {
h.diagnostic = diagnostic
// TODO this is a hack
h.BasicHTTPFrontendConnector.InjectDiagnostic(diagnostic)
func (h *ElasticHttpQueryFrontendConnector) GetChildComponents() []interface{} {
components := make([]interface{}, 0)
if h.BasicHTTPFrontendConnector != nil {
components = append(components, h.BasicHTTPFrontendConnector)
}
return components
}

func (h *ElasticHttpQueryFrontendConnector) SetDependencies(deps quesma_api.Dependencies) {
h.phoneHomeClient = deps.PhoneHomeAgent()
}
6 changes: 2 additions & 4 deletions quesma/quesma/quesma.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"quesma/telemetry"
"quesma/util"
quesma_v2 "quesma_v2/core"
"quesma_v2/core/diag"
)

type (
Expand Down Expand Up @@ -65,10 +64,9 @@ func NewHttpProxy(phoneHomeAgent telemetry.PhoneHomeAgent,
abResultsRepository ab_testing.Sender, resolver table_resolver.TableResolver,
v2 bool) *Quesma {

statistics := diag.NewStatistics(phoneHomeAgent, quesmaManagementConsole)

dependencies := quesma_v2.NewDependencies()
dependencies.Diagnostic = statistics
dependencies.SetPhoneHomeAgent(phoneHomeAgent)
dependencies.SetDebugInfoCollector(quesmaManagementConsole)

if v2 {
return &Quesma{
Expand Down
12 changes: 6 additions & 6 deletions quesma/quesma/router_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ import (
"time"
)

func ConfigureIngestRouterV2(cfg *config.QuesmaConfiguration, dependencies *quesma_api.Dependencies, ip *ingest.IngestProcessor, tableResolver table_resolver.TableResolver) quesma_api.Router {
func ConfigureIngestRouterV2(cfg *config.QuesmaConfiguration, dependencies quesma_api.Dependencies, ip *ingest.IngestProcessor, tableResolver table_resolver.TableResolver) quesma_api.Router {
// some syntactic sugar
method := quesma_api.IsHTTPMethod
and := quesma_api.And
Expand Down Expand Up @@ -90,7 +90,7 @@ func ConfigureIngestRouterV2(cfg *config.QuesmaConfiguration, dependencies *ques
return nil, err
}

results, err := bulk.Write(ctx, nil, body, ip, cfg, dependencies.Diagnostic.PhoneHomeAgent(), tableResolver)
results, err := bulk.Write(ctx, nil, body, ip, cfg, dependencies.PhoneHomeAgent(), tableResolver)
return bulkInsertResult(ctx, results, err)
})
router.Register(routes.IndexDocPath, and(method("POST"), matchedExactIngestPath(tableResolver)), func(ctx context.Context, req *quesma_api.Request, _ http.ResponseWriter) (*quesma_api.Result, error) {
Expand All @@ -105,7 +105,7 @@ func ConfigureIngestRouterV2(cfg *config.QuesmaConfiguration, dependencies *ques
}, nil
}

result, err := doc.Write(ctx, &index, body, ip, cfg, dependencies.Diagnostic.PhoneHomeAgent(), tableResolver)
result, err := doc.Write(ctx, &index, body, ip, cfg, dependencies.PhoneHomeAgent(), tableResolver)
if err != nil {
return &quesma_api.Result{
Body: string(queryparser.BadRequestParseError(err)),
Expand All @@ -125,13 +125,13 @@ func ConfigureIngestRouterV2(cfg *config.QuesmaConfiguration, dependencies *ques
return nil, err
}

results, err := bulk.Write(ctx, &index, body, ip, cfg, dependencies.Diagnostic.PhoneHomeAgent(), tableResolver)
results, err := bulk.Write(ctx, &index, body, ip, cfg, dependencies.PhoneHomeAgent(), tableResolver)
return bulkInsertResult(ctx, results, err)
})
return router
}

func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies *quesma_api.Dependencies, sr schema.Registry, lm *clickhouse.LogManager, queryRunner *QueryRunner, tableResolver table_resolver.TableResolver) quesma_api.Router {
func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies quesma_api.Dependencies, sr schema.Registry, lm *clickhouse.LogManager, queryRunner *QueryRunner, tableResolver table_resolver.TableResolver) quesma_api.Router {

// some syntactic sugar
method := quesma_api.IsHTTPMethod
Expand Down Expand Up @@ -366,7 +366,7 @@ func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, dependencies *ques
return nil, errors.New("invalid request body, expecting JSON")
}

if responseBody, err := terms_enum.HandleTermsEnum(ctx, req.Params["index"], body, lm, sr, dependencies.Diagnostic.DebugInfoCollector()); err != nil {
if responseBody, err := terms_enum.HandleTermsEnum(ctx, req.Params["index"], body, lm, sr, dependencies.DebugInfoCollector()); err != nil {
return nil, err
} else {
return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil
Expand Down
Loading

0 comments on commit 224d57d

Please sign in to comment.