Skip to content

Commit

Permalink
Initialize processors (#1074)
Browse files Browse the repository at this point in the history
Two things:
* Temporarily remove embedding of `BasicHTTPFrontendConnector` which
caused issues
* Adds `Init()` function to processor where one can perform actions
**after** processor has been fully set up - has all the connectors, etc.
  • Loading branch information
mieciu authored Dec 6, 2024
1 parent bbf9f3b commit 95583aa
Show file tree
Hide file tree
Showing 6 changed files with 74 additions and 18 deletions.
48 changes: 44 additions & 4 deletions quesma/frontend_connectors/elasticsearch_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@
package frontend_connectors

import (
"context"
"fmt"
"github.com/ucarion/urlpath"
"net/http"
quesma_api "quesma_v2/core"
)

type ElasticsearchIngestFrontendConnector struct {
BasicHTTPFrontendConnector
// BasicHTTPFrontendConnector // TODO: embedding resulted in ServeHTTP being called from BasicHTTPFrontendConnector instead of ElasticsearchIngestFrontendConnector
listener *http.Server
router quesma_api.Router

endpoint string
}

const (
Expand All @@ -28,9 +33,7 @@ const (

func NewElasticsearchIngestFrontendConnector(endpoint string) *ElasticsearchIngestFrontendConnector {
fc := &ElasticsearchIngestFrontendConnector{
BasicHTTPFrontendConnector: BasicHTTPFrontendConnector{
endpoint: endpoint,
},
endpoint: endpoint,
}
router := NewHTTPRouter()
router.AddRoute(IndexBulkPath, bulk)
Expand Down Expand Up @@ -99,3 +102,40 @@ func getIndexFromRequest(request *http.Request) string {
match, _ := expectedUrl.Match(request.URL.Path) // safe to call at this level
return match.Params["index"]
}

// Temporarily ported from BasicHTTPFrontendConnector until we figure out embedding issue

func (h *ElasticsearchIngestFrontendConnector) AddRouter(router quesma_api.Router) {
h.router = router
}

func (h *ElasticsearchIngestFrontendConnector) GetRouter() quesma_api.Router {
return h.router
}

func (h *ElasticsearchIngestFrontendConnector) Listen() error {
h.listener = &http.Server{}
h.listener.Addr = h.endpoint
h.listener.Handler = h
go func() {
err := h.listener.ListenAndServe()
_ = err
}()

return nil
}

func (h *ElasticsearchIngestFrontendConnector) Stop(ctx context.Context) error {
if h.listener == nil {
return nil
}
err := h.listener.Shutdown(ctx)
if err != nil {
return err
}
return h.listener.Close()
}

func (h *ElasticsearchIngestFrontendConnector) GetEndpoint() string {
return h.endpoint
}
17 changes: 8 additions & 9 deletions quesma/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,15 +92,14 @@ func buildIngestOnlyQuesma() quesma_api.QuesmaBuilder {
return quesmaInstance
}

/* Example of how to use the v2 module api in main function
func main2() {
q1 := buildIngestOnlyQuesma()
q1.Start()
stop := make(chan os.Signal, 1)
<-stop
q1.Stop(context.Background())
}
*/
// Example of how to use the v2 module api in main function
//func main() {
// q1 := buildIngestOnlyQuesma()
// q1.Start()
// stop := make(chan os.Signal, 1)
// <-stop
// q1.Stop(context.Background())
//}

func main() {
if EnableConcurrencyProfiling {
Expand Down
4 changes: 4 additions & 0 deletions quesma/processors/base_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,10 @@ func (p *BaseProcessor) AddProcessor(proc quesma_api.Processor) {
p.InnerProcessors = append(p.InnerProcessors, proc)
}

func (p *BaseProcessor) Init() error {
return nil
}

func (p *BaseProcessor) GetProcessors() []quesma_api.Processor {
return p.InnerProcessors
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ const (

type ElasticsearchToClickHouseIngestProcessor struct {
processors.BaseProcessor
config config.QuesmaProcessorConfig
config config.QuesmaProcessorConfig
legacyIngestProcessor *ingest.IngestProcessor2
}

func NewElasticsearchToClickHouseIngestProcessor(conf config.QuesmaProcessorConfig) *ElasticsearchToClickHouseIngestProcessor {
Expand All @@ -43,6 +44,16 @@ func NewElasticsearchToClickHouseIngestProcessor(conf config.QuesmaProcessorConf
}
}

func (p *ElasticsearchToClickHouseIngestProcessor) Init() error {
chBackendConnector := p.GetBackendConnector(quesma_api.ClickHouseSQLBackend)
if chBackendConnector == nil {
return fmt.Errorf("ClickHouse backend connector not found")
}
p.legacyIngestProcessor = p.prepareTemporaryIngestProcessor(chBackendConnector)

return nil
}

func (p *ElasticsearchToClickHouseIngestProcessor) GetId() string {
return "elasticsearch_to_clickhouse_ingest"
}
Expand Down Expand Up @@ -83,8 +94,6 @@ func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]in
return metadata, data, nil
}

tempIngestProcessor := p.prepareTemporaryIngestProcessor(chBackend)

esBackend = p.GetBackendConnector(quesma_api.ElasticsearchBackend)
if esBackend == nil {
fmt.Println("Backend connector not found")
Expand Down Expand Up @@ -122,7 +131,7 @@ func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]in
if err != nil {
println(err)
}
result, err := handleDocIndex(payloadJson, indexNameFromIncomingReq, tempIngestProcessor, p.config)
result, err := handleDocIndex(payloadJson, indexNameFromIncomingReq, p.legacyIngestProcessor, p.config)
if err != nil {
println(err)
}
Expand All @@ -134,7 +143,7 @@ func (p *ElasticsearchToClickHouseIngestProcessor) Handle(metadata map[string]in
if err != nil {
println(err)
}
results, err := handleBulkIndex(payloadNDJson, indexNameFromIncomingReq, tempIngestProcessor, es, p.config)
results, err := handleBulkIndex(payloadNDJson, indexNameFromIncomingReq, p.legacyIngestProcessor, es, p.config)
if err != nil {
println(err)
}
Expand Down
1 change: 1 addition & 0 deletions quesma/v2/core/quesma_apis.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ type Processor interface {
SetBackendConnectors(conns map[BackendConnectorType]BackendConnector)
GetBackendConnector(connectorType BackendConnectorType) BackendConnector
GetSupportedBackendConnectors() []BackendConnectorType
Init() error
}

type Rows interface {
Expand Down
3 changes: 3 additions & 0 deletions quesma/v2/core/quesma_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ func (quesma *Quesma) Build() (QuesmaBuilder, error) {
}
}
proc.SetBackendConnectors(backendConnectors)
if err := proc.Init(); err != nil {
return nil, fmt.Errorf("processor %v failed to initialize: %v", proc.GetId(), err)
}
}

}
Expand Down

0 comments on commit 95583aa

Please sign in to comment.