Skip to content

Commit

Permalink
Adding quesma v2 core apis (#1046)
Browse files Browse the repository at this point in the history
This PR adds APIs created in `quesma_api` POC to official codebase.
According to our discussion I created separate v2 directory with new
module
  • Loading branch information
pdelewski authored Nov 28, 2024
1 parent 11cd50b commit f58d122
Show file tree
Hide file tree
Showing 7 changed files with 386 additions and 0 deletions.
47 changes: 47 additions & 0 deletions v2/backend_connectors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package quesma_api

import "context"

type BackendConnectorType int

const (
NoopBackend = iota
MySQLBackend
PgSQLBackend
)

func GetBackendConnectorNameFromType(connectorType BackendConnectorType) string {
switch connectorType {
case MySQLBackend:
return "mysql"
case PgSQLBackend:
return "pgsql"
default:
return "noop"
}
}

type NoopBackendConnector struct {
}

func (p *NoopBackendConnector) GetId() BackendConnectorType {
return NoopBackend
}

func (p *NoopBackendConnector) Open() error {
return nil
}

func (p *NoopBackendConnector) Close() error {
return nil
}

func (p *NoopBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (Rows, error) {
return nil, nil
}

func (p *NoopBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
return nil
}
43 changes: 43 additions & 0 deletions v2/dispatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package quesma_api

import (
"net/http"
)

type HTTPFrontendHandler func(request *http.Request) (map[string]interface{}, any, error)

type HandlersPipe struct {
Handler HTTPFrontendHandler
Processors []Processor
}

type Dispatcher struct {
}

func (d *Dispatcher) Dispatch(processors []Processor, metadata map[string]interface{}, message any) (map[string]interface{}, any) {
return d.dispatch(processors, metadata, message)
}

func (d *Dispatcher) dispatch(processors []Processor, metadata map[string]interface{}, message any) (map[string]any, any) {
// Process the response using the processor
var inputMessages []any
inputMessages = append(inputMessages, message)
if processors == nil {
return metadata, inputMessages[0]
}
var outMessage any
for _, processor := range processors {
metadata, outerMessage, _ := processor.Handle(metadata, inputMessages...)
outMessage = outerMessage
inputMessages = make([]any, 0)
innerProcessors := processor.GetProcessors()
for _, innerProc := range innerProcessors {
// TODO inner processor can have its own processors
metadata, outMessage, _ = innerProc.Handle(metadata, outerMessage)
inputMessages = append(inputMessages, outMessage)
}
}
return metadata, outMessage
}
25 changes: 25 additions & 0 deletions v2/metadata_api.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package quesma_api

import "strconv"

func MakeNewMetadata() map[string]any {
return make(map[string]any)
}

func SetCorrelationId(metadata map[string]any, correlationId int64) {
metadata["correlationId"] = strconv.FormatInt(correlationId, 10)
}

func GetCorrelationId(metadata map[string]any) string {
if correlationId, ok := metadata["correlationId"]; !ok {
panic("CorrelationId not found in metadata")
} else {
checkedCorrelationId, err := CheckedCast[string](correlationId)
if err != nil {
panic("CorrelationId is not string")
}
return checkedCorrelationId
}
}
90 changes: 90 additions & 0 deletions v2/quesma_apis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package quesma_api

import (
"context"
"net"
"net/http"
)

type Router interface {
Cloner
AddRoute(path string, handler HTTPFrontendHandler)
GetHandlers() map[string]HandlersPipe
SetHandlers(handlers map[string]HandlersPipe)
Multiplexer() *http.ServeMux
}

type FrontendConnector interface {
Listen() error // Start listening on the endpoint
GetEndpoint() string
Stop(ctx context.Context) error // Stop listening
}

type HTTPFrontendConnector interface {
FrontendConnector
AddRouter(router Router)
GetRouter() Router
}

type TCPFrontendConnector interface {
FrontendConnector
AddConnectionHandler(handler TCPConnectionHandler)
GetConnectionHandler() TCPConnectionHandler
}

type TCPConnectionHandler interface {
HandleConnection(conn net.Conn) error
SetHandlers(processor []Processor)
}

type CompoundProcessor interface {
AddProcessor(proc Processor)
GetProcessors() []Processor
}

type PipelineBuilder interface {
AddFrontendConnector(conn FrontendConnector)
GetFrontendConnectors() []FrontendConnector
AddBackendConnector(conn BackendConnector)
GetBackendConnectors() map[BackendConnectorType]BackendConnector
CompoundProcessor
Build() PipelineBuilder
Start()
}

type QuesmaBuilder interface {
AddPipeline(pipeline PipelineBuilder)
GetPipelines() []PipelineBuilder
Build() (QuesmaBuilder, error)
Start()
Stop(ctx context.Context)
}

type Processor interface {
CompoundProcessor
GetId() string
Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error)
SetBackendConnectors(conns map[BackendConnectorType]BackendConnector)
GetBackendConnector(connectorType BackendConnectorType) BackendConnector
GetSupportedBackendConnectors() []BackendConnectorType
}

type Rows interface {
Next() bool
Scan(dest ...interface{}) error
Close()
Err() error
}

type BackendConnector interface {
GetId() BackendConnectorType
Open() error
// Query executes a query that returns rows, typically a SELECT.
Query(ctx context.Context, query string, args ...interface{}) (Rows, error)

// Exec executes a command that doesn't return rows, typically an INSERT, UPDATE, or DELETE.
Exec(ctx context.Context, query string, args ...interface{}) error
Close() error
}
104 changes: 104 additions & 0 deletions v2/quesma_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package quesma_api

import (
"context"
"fmt"
)

type Quesma struct {
pipelines []PipelineBuilder
}

func NewQuesma() *Quesma {
return &Quesma{
pipelines: make([]PipelineBuilder, 0),
}
}

func (quesma *Quesma) AddPipeline(pipeline PipelineBuilder) {
quesma.pipelines = append(quesma.pipelines, pipeline)
}

func (quesma *Quesma) GetPipelines() []PipelineBuilder {
return quesma.pipelines
}

func (quesma *Quesma) Start() {
for _, pipeline := range quesma.pipelines {
pipeline.Start()
}
}

func (quesma *Quesma) Stop(ctx context.Context) {
for _, pipeline := range quesma.pipelines {
for _, conn := range pipeline.GetFrontendConnectors() {
conn.Stop(ctx)
}
}
for _, pipeline := range quesma.pipelines {
for _, conn := range pipeline.GetBackendConnectors() {
conn.Close()
}
}
}

func (quesma *Quesma) Build() (QuesmaBuilder, error) {
endpoints := make(map[string]struct{})
handlers := make(map[string]HandlersPipe)

for _, pipeline := range quesma.pipelines {
for _, conn := range pipeline.GetFrontendConnectors() {
if httpConn, ok := conn.(HTTPFrontendConnector); ok {
endpoints[conn.GetEndpoint()] = struct{}{}
router := httpConn.GetRouter()
for path, handlerWrapper := range router.GetHandlers() {
handlerWrapper.Processors = append(handlerWrapper.Processors, pipeline.GetProcessors()...)
handlers[path] = handlerWrapper
}
}
}
}
if len(endpoints) == 1 {
for _, pipeline := range quesma.pipelines {
for _, conn := range pipeline.GetFrontendConnectors() {
if httpConn, ok := conn.(HTTPFrontendConnector); ok {
router := httpConn.GetRouter().Clone().(Router)
if len(endpoints) == 1 {
router.SetHandlers(handlers)
}
httpConn.AddRouter(router)

}
}
}
}

for _, pipeline := range quesma.pipelines {
backendConnectorTypesPerPipeline := make(map[BackendConnectorType]struct{})
for _, conn := range pipeline.GetFrontendConnectors() {
if tcpConn, ok := conn.(TCPFrontendConnector); ok {
if len(pipeline.GetProcessors()) > 0 {
tcpConn.GetConnectionHandler().SetHandlers(pipeline.GetProcessors())
}
}
}
backendConnectors := pipeline.GetBackendConnectors()
for _, backendConnector := range backendConnectors {
backendConnectorTypesPerPipeline[backendConnector.GetId()] = struct{}{}
}
for _, proc := range pipeline.GetProcessors() {
supportedBackendConnectorsByProc := proc.GetSupportedBackendConnectors()
for _, backendConnectorType := range supportedBackendConnectorsByProc {
if _, ok := backendConnectorTypesPerPipeline[backendConnectorType]; !ok {
return nil, fmt.Errorf("processor %v requires backend connector %v which is not provided", proc.GetId(), GetBackendConnectorNameFromType(backendConnectorType))
}
}
proc.SetBackendConnectors(backendConnectors)
}

}

return quesma, nil
}
53 changes: 53 additions & 0 deletions v2/quesma_pipeline.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package quesma_api

type Pipeline struct {
FrontendConnectors []FrontendConnector
Processors []Processor
BackendConnectors map[BackendConnectorType]BackendConnector
}

func NewPipeline() *Pipeline {
backendConnectors := make(map[BackendConnectorType]BackendConnector)
backendConnectors[NoopBackend] = &NoopBackendConnector{}
return &Pipeline{
FrontendConnectors: make([]FrontendConnector, 0),
Processors: make([]Processor, 0),
BackendConnectors: backendConnectors,
}
}

func (p *Pipeline) AddFrontendConnector(conn FrontendConnector) {
p.FrontendConnectors = append(p.FrontendConnectors, conn)
}

func (p *Pipeline) AddProcessor(proc Processor) {
p.Processors = append(p.Processors, proc)
}

func (p *Pipeline) AddBackendConnector(conn BackendConnector) {
p.BackendConnectors[conn.GetId()] = conn
}

func (p *Pipeline) Build() PipelineBuilder {
return p
}

func (p *Pipeline) Start() {
for _, conn := range p.FrontendConnectors {
go conn.Listen()
}
}

func (p *Pipeline) GetFrontendConnectors() []FrontendConnector {
return p.FrontendConnectors
}

func (p *Pipeline) GetProcessors() []Processor {
return p.Processors
}

func (p *Pipeline) GetBackendConnectors() map[BackendConnectorType]BackendConnector {
return p.BackendConnectors
}
24 changes: 24 additions & 0 deletions v2/quesma_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0
package quesma_api

import (
"fmt"
)

type Cloner interface {
Clone() Cloner
}

func CheckedCast[T any](value interface{}) (T, error) {
v, ok := value.(T)
if !ok {
var zero T
return zero, fmt.Errorf("cannot cast %v to %T", value, zero)
}
return v, nil
}

func SetInputType[T any]() any {
return nil
}

0 comments on commit f58d122

Please sign in to comment.