Skip to content

Commit

Permalink
Adding frontend, backend connectors, processors and some basic tests (#…
Browse files Browse the repository at this point in the history
…1049)

This PR:
- adds `go.mod`
- moves core api from `v2` to `v2/core`
- adds remaining components (frontend, backend connectors, processors)
- adds basic tests
  • Loading branch information
pdelewski authored Nov 28, 2024
1 parent f58d122 commit 8144a02
Show file tree
Hide file tree
Showing 27 changed files with 1,777 additions and 0 deletions.
77 changes: 77 additions & 0 deletions v2/backend_connectors/mysql_backend_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package backend_connectors

import (
"context"
"database/sql"
_ "github.com/go-sql-driver/mysql"
quesma_api "quesma_v2/core"
)

type MySqlRows struct {
rows *sql.Rows
}

func (p *MySqlRows) Next() bool {
return p.rows.Next()
}

func (p *MySqlRows) Scan(dest ...interface{}) error {
return p.rows.Scan(dest...)
}

func (p *MySqlRows) Close() {
err := p.rows.Close()
if err != nil {
panic(err)
}
}

func (p *MySqlRows) Err() error {
return p.rows.Err()
}

type MySqlBackendConnector struct {
Endpoint string
connection *sql.DB
}

func (p *MySqlBackendConnector) GetId() quesma_api.BackendConnectorType {
return quesma_api.MySQLBackend
}

func (p *MySqlBackendConnector) Open() error {
conn, err := sql.Open("mysql", p.Endpoint)
if err != nil {
return err
}
err = conn.Ping()
if err != nil {
return err
}
p.connection = conn
return nil
}

func (p *MySqlBackendConnector) Close() error {
if p.connection == nil {
return nil
}
return p.connection.Close()
}

func (p *MySqlBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) {
rows, err := p.connection.QueryContext(context.Background(), query, args...)
if err != nil {
return nil, err
}
return &MySqlRows{rows: rows}, nil
}

func (p *MySqlBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
if len(args) == 0 {
_, err := p.connection.ExecContext(context.Background(), query)
return err
}
_, err := p.connection.ExecContext(context.Background(), query, args...)
return err
}
45 changes: 45 additions & 0 deletions v2/backend_connectors/postgres_backend_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package backend_connectors

import (
"context"
"github.com/jackc/pgx/v4"
quesma_api "quesma_v2/core"
)

type PostgresBackendConnector struct {
Endpoint string
connection *pgx.Conn
}

func (p *PostgresBackendConnector) GetId() quesma_api.BackendConnectorType {
return quesma_api.PgSQLBackend
}

func (p *PostgresBackendConnector) Open() error {
conn, err := pgx.Connect(context.Background(), p.Endpoint)
if err != nil {
return err
}
p.connection = conn
return nil
}

func (p *PostgresBackendConnector) Close() error {
if p.connection == nil {
return nil
}
return p.connection.Close(context.Background())
}

func (p *PostgresBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) {
return p.connection.Query(context.Background(), query, args...)
}

func (p *PostgresBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
if len(args) == 0 {
_, err := p.connection.Exec(context.Background(), query)
return err
}
_, err := p.connection.Exec(context.Background(), query, args...)
return err
}
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
File renamed without changes.
147 changes: 147 additions & 0 deletions v2/frontend_connectors/basic_http_frontend_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
package frontend_connectors

import (
"bytes"
"context"
"fmt"
"io"
"net/http"
quesma_api "quesma_v2/core"
"sync"
)

type HTTPRouter struct {
mux *http.ServeMux // Default HTTP multiplexer
handlers map[string]quesma_api.HandlersPipe // Map to store custom route handlers
mutex sync.RWMutex // Mutex for concurrent access to handlers
}

func NewHTTPRouter() *HTTPRouter {
return &HTTPRouter{
mux: http.NewServeMux(),
handlers: make(map[string]quesma_api.HandlersPipe),
}
}

// AddRoute adds a new route to the router
func (router *HTTPRouter) AddRoute(path string, handler quesma_api.HTTPFrontendHandler) {
router.mutex.Lock()
defer router.mutex.Unlock()
router.handlers[path] = quesma_api.HandlersPipe{Handler: handler}
fmt.Printf("Added route: %s\n", path)
}

func (router *HTTPRouter) Clone() quesma_api.Cloner {
newRouter := NewHTTPRouter()
router.mutex.Lock()
defer router.mutex.Unlock()
for path, handler := range router.handlers {
newRouter.handlers[path] = handler
}
return newRouter
}

func (router *HTTPRouter) GetHandlers() map[string]quesma_api.HandlersPipe {
router.mutex.RLock()
defer router.mutex.RUnlock()
callInfos := make(map[string]quesma_api.HandlersPipe)
for k, v := range router.handlers {
callInfos[k] = v
}
return callInfos
}

func (router *HTTPRouter) SetHandlers(handlers map[string]quesma_api.HandlersPipe) {
router.mutex.Lock()
defer router.mutex.Unlock()
for path, handler := range handlers {
router.handlers[path] = handler
}
}

func (router *HTTPRouter) Lock() {
router.mutex.Lock()
}

func (router *HTTPRouter) Unlock() {
router.mutex.Unlock()
}

func (router *HTTPRouter) Multiplexer() *http.ServeMux {
return router.mux
}

type BasicHTTPFrontendConnector struct {
listener *http.Server
router quesma_api.Router

endpoint string
}

func NewBasicHTTPFrontendConnector(endpoint string) *BasicHTTPFrontendConnector {
return &BasicHTTPFrontendConnector{
endpoint: endpoint,
}
}

func (h *BasicHTTPFrontendConnector) AddRouter(router quesma_api.Router) {
h.router = router
}

func (h *BasicHTTPFrontendConnector) GetRouter() quesma_api.Router {
return h.router
}

func (h *BasicHTTPFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) {
handlerWrapper, exists := h.router.GetHandlers()[req.URL.Path]
if !exists {
h.router.Multiplexer().ServeHTTP(w, req)
return
}
dispatcher := &quesma_api.Dispatcher{}
http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
metadata, message, _ := handlerWrapper.Handler(req)

metadata, message = dispatcher.Dispatch(handlerWrapper.Processors, metadata, message)
_, err := w.Write(message.([]byte))
if err != nil {
fmt.Printf("Error writing response: %s\n", err)
}
}).ServeHTTP(w, req)
}

func (h *BasicHTTPFrontendConnector) Listen() error {
h.listener = &http.Server{}
h.listener.Addr = h.endpoint
h.listener.Handler = h
go func() {
err := h.listener.ListenAndServe()
_ = err
}()

return nil
}

func (h *BasicHTTPFrontendConnector) Stop(ctx context.Context) error {
if h.listener == nil {
return nil
}
err := h.listener.Shutdown(ctx)
if err != nil {
return err
}
return h.listener.Close()
}

func (h *BasicHTTPFrontendConnector) GetEndpoint() string {
return h.endpoint
}

func ReadRequestBody(request *http.Request) ([]byte, error) {
reqBody, err := io.ReadAll(request.Body)
if err != nil {
return nil, err
}
request.Body = io.NopCloser(bytes.NewBuffer(reqBody))
return reqBody, nil
}
44 changes: 44 additions & 0 deletions v2/frontend_connectors/basic_tcp_connection_handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package frontend_connectors

import (
"fmt"
"io"
"net"
quesma_api "quesma_v2/core"
)

type BasicTcpConnectionHandler struct {
processors []quesma_api.Processor
}

func (h *BasicTcpConnectionHandler) SetHandlers(processors []quesma_api.Processor) {
h.processors = processors
}

func (h *BasicTcpConnectionHandler) HandleConnection(conn net.Conn) error {
fmt.Println("Handling connection")
defer conn.Close()

// Example: Read data from the connection
buffer := make([]byte, 1024)
for {
n, err := conn.Read(buffer)
if err != nil {
if err == io.EOF {
fmt.Println("Connection closed")
} else {
fmt.Println("Error reading from connection:", err)
}
return err
}
for _, processor := range h.processors {
if processor != nil {
processor.Handle(nil, buffer[:n])
}
}
fmt.Printf("Received data: %s\n", string(buffer[:n]))

// Echo the data back (for demonstration purposes)
conn.Write(buffer[:n])
}
}
72 changes: 72 additions & 0 deletions v2/frontend_connectors/basic_tcp_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package frontend_connectors

import (
"context"
"fmt"
"net"
quesma_api "quesma_v2/core"
"sync/atomic"
)

type TCPListener struct {
listener net.Listener
Endpoint string
handler quesma_api.TCPConnectionHandler
isShutdown atomic.Bool
}

func NewTCPConnector(endpoint string) *TCPListener {
return &TCPListener{
Endpoint: endpoint,
}
}

func (t *TCPListener) Listen() error {
ln, err := net.Listen("tcp", t.Endpoint)
if err != nil {
return fmt.Errorf("failed to start TCP listener: %v", err)
}
t.listener = ln

// Start listening for incoming connections in a goroutine
go func() {
for {
conn, err := ln.Accept()
if err != nil {
if t.isShutdown.Load() {
return
}
fmt.Println("Failed to accept connection:", err)
continue
}
// Handle each connection in a separate goroutine to allow concurrent handling
go func() {
err := t.GetConnectionHandler().HandleConnection(conn)
if err != nil {
fmt.Println("Error handling connection:", err)
}
}()
}
}()
return nil
}

func (t *TCPListener) GetEndpoint() string {
return t.Endpoint
}

func (t *TCPListener) AddConnectionHandler(handler quesma_api.TCPConnectionHandler) {
t.handler = handler
}

func (t *TCPListener) GetConnectionHandler() quesma_api.TCPConnectionHandler {
return t.handler
}

func (t *TCPListener) Stop(ctx context.Context) error {
t.isShutdown.Store(true)
if t.listener != nil {
return t.listener.Close()
}
return nil
}
Loading

0 comments on commit 8144a02

Please sign in to comment.