From f58d12287a7b8f094c15fae3cf1135f0b19eec35 Mon Sep 17 00:00:00 2001 From: Przemyslaw Delewski <102958445+pdelewski@users.noreply.github.com> Date: Thu, 28 Nov 2024 11:52:10 +0100 Subject: [PATCH] Adding quesma v2 core apis (#1046) This PR adds APIs created in `quesma_api` POC to official codebase. According to our discussion I created separate v2 directory with new module --- v2/backend_connectors.go | 47 ++++++++++++++++++ v2/dispatch.go | 43 ++++++++++++++++ v2/metadata_api.go | 25 ++++++++++ v2/quesma_apis.go | 90 +++++++++++++++++++++++++++++++++ v2/quesma_builder.go | 104 +++++++++++++++++++++++++++++++++++++++ v2/quesma_pipeline.go | 53 ++++++++++++++++++++ v2/quesma_utils.go | 24 +++++++++ 7 files changed, 386 insertions(+) create mode 100644 v2/backend_connectors.go create mode 100644 v2/dispatch.go create mode 100644 v2/metadata_api.go create mode 100644 v2/quesma_apis.go create mode 100644 v2/quesma_builder.go create mode 100644 v2/quesma_pipeline.go create mode 100644 v2/quesma_utils.go diff --git a/v2/backend_connectors.go b/v2/backend_connectors.go new file mode 100644 index 000000000..c8061b79c --- /dev/null +++ b/v2/backend_connectors.go @@ -0,0 +1,47 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package quesma_api + +import "context" + +type BackendConnectorType int + +const ( + NoopBackend = iota + MySQLBackend + PgSQLBackend +) + +func GetBackendConnectorNameFromType(connectorType BackendConnectorType) string { + switch connectorType { + case MySQLBackend: + return "mysql" + case PgSQLBackend: + return "pgsql" + default: + return "noop" + } +} + +type NoopBackendConnector struct { +} + +func (p *NoopBackendConnector) GetId() BackendConnectorType { + return NoopBackend +} + +func (p *NoopBackendConnector) Open() error { + return nil +} + +func (p *NoopBackendConnector) Close() error { + return nil +} + +func (p *NoopBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (Rows, error) { + return nil, nil +} + +func (p *NoopBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error { + return nil +} diff --git a/v2/dispatch.go b/v2/dispatch.go new file mode 100644 index 000000000..24aec544d --- /dev/null +++ b/v2/dispatch.go @@ -0,0 +1,43 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package quesma_api + +import ( + "net/http" +) + +type HTTPFrontendHandler func(request *http.Request) (map[string]interface{}, any, error) + +type HandlersPipe struct { + Handler HTTPFrontendHandler + Processors []Processor +} + +type Dispatcher struct { +} + +func (d *Dispatcher) Dispatch(processors []Processor, metadata map[string]interface{}, message any) (map[string]interface{}, any) { + return d.dispatch(processors, metadata, message) +} + +func (d *Dispatcher) dispatch(processors []Processor, metadata map[string]interface{}, message any) (map[string]any, any) { + // Process the response using the processor + var inputMessages []any + inputMessages = append(inputMessages, message) + if processors == nil { + return metadata, inputMessages[0] + } + var outMessage any + for _, processor := range processors { + metadata, outerMessage, _ := processor.Handle(metadata, inputMessages...) + outMessage = outerMessage + inputMessages = make([]any, 0) + innerProcessors := processor.GetProcessors() + for _, innerProc := range innerProcessors { + // TODO inner processor can have its own processors + metadata, outMessage, _ = innerProc.Handle(metadata, outerMessage) + inputMessages = append(inputMessages, outMessage) + } + } + return metadata, outMessage +} diff --git a/v2/metadata_api.go b/v2/metadata_api.go new file mode 100644 index 000000000..fca080a41 --- /dev/null +++ b/v2/metadata_api.go @@ -0,0 +1,25 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package quesma_api + +import "strconv" + +func MakeNewMetadata() map[string]any { + return make(map[string]any) +} + +func SetCorrelationId(metadata map[string]any, correlationId int64) { + metadata["correlationId"] = strconv.FormatInt(correlationId, 10) +} + +func GetCorrelationId(metadata map[string]any) string { + if correlationId, ok := metadata["correlationId"]; !ok { + panic("CorrelationId not found in metadata") + } else { + checkedCorrelationId, err := CheckedCast[string](correlationId) + if err != nil { + panic("CorrelationId is not string") + } + return checkedCorrelationId + } +} diff --git a/v2/quesma_apis.go b/v2/quesma_apis.go new file mode 100644 index 000000000..1d45089cb --- /dev/null +++ b/v2/quesma_apis.go @@ -0,0 +1,90 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package quesma_api + +import ( + "context" + "net" + "net/http" +) + +type Router interface { + Cloner + AddRoute(path string, handler HTTPFrontendHandler) + GetHandlers() map[string]HandlersPipe + SetHandlers(handlers map[string]HandlersPipe) + Multiplexer() *http.ServeMux +} + +type FrontendConnector interface { + Listen() error // Start listening on the endpoint + GetEndpoint() string + Stop(ctx context.Context) error // Stop listening +} + +type HTTPFrontendConnector interface { + FrontendConnector + AddRouter(router Router) + GetRouter() Router +} + +type TCPFrontendConnector interface { + FrontendConnector + AddConnectionHandler(handler TCPConnectionHandler) + GetConnectionHandler() TCPConnectionHandler +} + +type TCPConnectionHandler interface { + HandleConnection(conn net.Conn) error + SetHandlers(processor []Processor) +} + +type CompoundProcessor interface { + AddProcessor(proc Processor) + GetProcessors() []Processor +} + +type PipelineBuilder interface { + AddFrontendConnector(conn FrontendConnector) + GetFrontendConnectors() []FrontendConnector + AddBackendConnector(conn BackendConnector) + GetBackendConnectors() map[BackendConnectorType]BackendConnector + CompoundProcessor + Build() PipelineBuilder + Start() +} + +type QuesmaBuilder interface { + AddPipeline(pipeline PipelineBuilder) + GetPipelines() []PipelineBuilder + Build() (QuesmaBuilder, error) + Start() + Stop(ctx context.Context) +} + +type Processor interface { + CompoundProcessor + GetId() string + Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) + SetBackendConnectors(conns map[BackendConnectorType]BackendConnector) + GetBackendConnector(connectorType BackendConnectorType) BackendConnector + GetSupportedBackendConnectors() []BackendConnectorType +} + +type Rows interface { + Next() bool + Scan(dest ...interface{}) error + Close() + Err() error +} + +type BackendConnector interface { + GetId() BackendConnectorType + Open() error + // Query executes a query that returns rows, typically a SELECT. + Query(ctx context.Context, query string, args ...interface{}) (Rows, error) + + // Exec executes a command that doesn't return rows, typically an INSERT, UPDATE, or DELETE. + Exec(ctx context.Context, query string, args ...interface{}) error + Close() error +} diff --git a/v2/quesma_builder.go b/v2/quesma_builder.go new file mode 100644 index 000000000..ef99a9dd0 --- /dev/null +++ b/v2/quesma_builder.go @@ -0,0 +1,104 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package quesma_api + +import ( + "context" + "fmt" +) + +type Quesma struct { + pipelines []PipelineBuilder +} + +func NewQuesma() *Quesma { + return &Quesma{ + pipelines: make([]PipelineBuilder, 0), + } +} + +func (quesma *Quesma) AddPipeline(pipeline PipelineBuilder) { + quesma.pipelines = append(quesma.pipelines, pipeline) +} + +func (quesma *Quesma) GetPipelines() []PipelineBuilder { + return quesma.pipelines +} + +func (quesma *Quesma) Start() { + for _, pipeline := range quesma.pipelines { + pipeline.Start() + } +} + +func (quesma *Quesma) Stop(ctx context.Context) { + for _, pipeline := range quesma.pipelines { + for _, conn := range pipeline.GetFrontendConnectors() { + conn.Stop(ctx) + } + } + for _, pipeline := range quesma.pipelines { + for _, conn := range pipeline.GetBackendConnectors() { + conn.Close() + } + } +} + +func (quesma *Quesma) Build() (QuesmaBuilder, error) { + endpoints := make(map[string]struct{}) + handlers := make(map[string]HandlersPipe) + + for _, pipeline := range quesma.pipelines { + for _, conn := range pipeline.GetFrontendConnectors() { + if httpConn, ok := conn.(HTTPFrontendConnector); ok { + endpoints[conn.GetEndpoint()] = struct{}{} + router := httpConn.GetRouter() + for path, handlerWrapper := range router.GetHandlers() { + handlerWrapper.Processors = append(handlerWrapper.Processors, pipeline.GetProcessors()...) + handlers[path] = handlerWrapper + } + } + } + } + if len(endpoints) == 1 { + for _, pipeline := range quesma.pipelines { + for _, conn := range pipeline.GetFrontendConnectors() { + if httpConn, ok := conn.(HTTPFrontendConnector); ok { + router := httpConn.GetRouter().Clone().(Router) + if len(endpoints) == 1 { + router.SetHandlers(handlers) + } + httpConn.AddRouter(router) + + } + } + } + } + + for _, pipeline := range quesma.pipelines { + backendConnectorTypesPerPipeline := make(map[BackendConnectorType]struct{}) + for _, conn := range pipeline.GetFrontendConnectors() { + if tcpConn, ok := conn.(TCPFrontendConnector); ok { + if len(pipeline.GetProcessors()) > 0 { + tcpConn.GetConnectionHandler().SetHandlers(pipeline.GetProcessors()) + } + } + } + backendConnectors := pipeline.GetBackendConnectors() + for _, backendConnector := range backendConnectors { + backendConnectorTypesPerPipeline[backendConnector.GetId()] = struct{}{} + } + for _, proc := range pipeline.GetProcessors() { + supportedBackendConnectorsByProc := proc.GetSupportedBackendConnectors() + for _, backendConnectorType := range supportedBackendConnectorsByProc { + if _, ok := backendConnectorTypesPerPipeline[backendConnectorType]; !ok { + return nil, fmt.Errorf("processor %v requires backend connector %v which is not provided", proc.GetId(), GetBackendConnectorNameFromType(backendConnectorType)) + } + } + proc.SetBackendConnectors(backendConnectors) + } + + } + + return quesma, nil +} diff --git a/v2/quesma_pipeline.go b/v2/quesma_pipeline.go new file mode 100644 index 000000000..ce6adaddc --- /dev/null +++ b/v2/quesma_pipeline.go @@ -0,0 +1,53 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package quesma_api + +type Pipeline struct { + FrontendConnectors []FrontendConnector + Processors []Processor + BackendConnectors map[BackendConnectorType]BackendConnector +} + +func NewPipeline() *Pipeline { + backendConnectors := make(map[BackendConnectorType]BackendConnector) + backendConnectors[NoopBackend] = &NoopBackendConnector{} + return &Pipeline{ + FrontendConnectors: make([]FrontendConnector, 0), + Processors: make([]Processor, 0), + BackendConnectors: backendConnectors, + } +} + +func (p *Pipeline) AddFrontendConnector(conn FrontendConnector) { + p.FrontendConnectors = append(p.FrontendConnectors, conn) +} + +func (p *Pipeline) AddProcessor(proc Processor) { + p.Processors = append(p.Processors, proc) +} + +func (p *Pipeline) AddBackendConnector(conn BackendConnector) { + p.BackendConnectors[conn.GetId()] = conn +} + +func (p *Pipeline) Build() PipelineBuilder { + return p +} + +func (p *Pipeline) Start() { + for _, conn := range p.FrontendConnectors { + go conn.Listen() + } +} + +func (p *Pipeline) GetFrontendConnectors() []FrontendConnector { + return p.FrontendConnectors +} + +func (p *Pipeline) GetProcessors() []Processor { + return p.Processors +} + +func (p *Pipeline) GetBackendConnectors() map[BackendConnectorType]BackendConnector { + return p.BackendConnectors +} diff --git a/v2/quesma_utils.go b/v2/quesma_utils.go new file mode 100644 index 000000000..3cbf3922b --- /dev/null +++ b/v2/quesma_utils.go @@ -0,0 +1,24 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package quesma_api + +import ( + "fmt" +) + +type Cloner interface { + Clone() Cloner +} + +func CheckedCast[T any](value interface{}) (T, error) { + v, ok := value.(T) + if !ok { + var zero T + return zero, fmt.Errorf("cannot cast %v to %T", value, zero) + } + return v, nil +} + +func SetInputType[T any]() any { + return nil +}