From 24cb3f45bcbdf50349e0db428423f048ba9cca50 Mon Sep 17 00:00:00 2001 From: Przemyslaw Delewski <102958445+pdelewski@users.noreply.github.com> Date: Mon, 2 Dec 2024 11:32:48 +0100 Subject: [PATCH] Quesma v2 new frontend (#1057) This PR is about few things: - moving frontend, backend connectors, processors back to `quesma` module - refactoring to split ingest and search routing - moving some code regarding http frontend and routing and doing some cleanup --- .../mysql_backend_connector.go | 0 .../postgres_backend_connector.go | 0 .../basic_http_frontend_connector.go | 36 +- .../basic_tcp_connection_handler.go | 0 .../basic_tcp_connector.go | 0 .../passthrough_tcp_connection_handler.go | 0 .../tcp_postgres_connection_handler.go | 5 +- quesma/go.mod | 15 +- quesma/go.sum | 142 ++++- quesma/main.go | 5 +- quesma/main_test.go | 134 +++- .../{v2 => }/processors/ab_test_processor.go | 0 quesma/{v2 => }/processors/base_processor.go | 0 .../processors/basic_tcp_processor.go | 0 .../processors/mysql_ingest_processor.go | 0 .../processors/mysql_query_processor.go | 0 .../processors/postgres_ingest_processor.go | 0 .../processors/postgres_query_processor.go | 0 .../processors/postgres_to_mysql_processor.go | 0 quesma/quesma/dual_write_proxy.go | 408 ++++++++++++- quesma/quesma/dual_write_proxy_v2.go | 570 ++++++++++++++++++ quesma/quesma/quesma.go | 455 +------------- quesma/quesma/router_v2.go | 423 +++++++++++++ quesma/{v2 => }/test_utils.go | 2 +- quesma/v2/core/quesma_apis.go | 2 + quesma/v2/v2_test.go | 98 --- quesma/{v2 => }/v2_test_objects.go | 15 +- 27 files changed, 1768 insertions(+), 542 deletions(-) rename quesma/{v2 => }/backend_connectors/mysql_backend_connector.go (100%) rename quesma/{v2 => }/backend_connectors/postgres_backend_connector.go (100%) rename quesma/{v2 => }/frontend_connectors/basic_http_frontend_connector.go (73%) rename quesma/{v2 => }/frontend_connectors/basic_tcp_connection_handler.go (100%) rename quesma/{v2 => }/frontend_connectors/basic_tcp_connector.go (100%) rename quesma/{v2 => }/frontend_connectors/passthrough_tcp_connection_handler.go (100%) rename quesma/{v2 => }/frontend_connectors/tcp_postgres_connection_handler.go (93%) rename quesma/{v2 => }/processors/ab_test_processor.go (100%) rename quesma/{v2 => }/processors/base_processor.go (100%) rename quesma/{v2 => }/processors/basic_tcp_processor.go (100%) rename quesma/{v2 => }/processors/mysql_ingest_processor.go (100%) rename quesma/{v2 => }/processors/mysql_query_processor.go (100%) rename quesma/{v2 => }/processors/postgres_ingest_processor.go (100%) rename quesma/{v2 => }/processors/postgres_query_processor.go (100%) rename quesma/{v2 => }/processors/postgres_to_mysql_processor.go (100%) create mode 100644 quesma/quesma/dual_write_proxy_v2.go create mode 100644 quesma/quesma/router_v2.go rename quesma/{v2 => }/test_utils.go (97%) delete mode 100644 quesma/v2/v2_test.go rename quesma/{v2 => }/v2_test_objects.go (96%) diff --git a/quesma/v2/backend_connectors/mysql_backend_connector.go b/quesma/backend_connectors/mysql_backend_connector.go similarity index 100% rename from quesma/v2/backend_connectors/mysql_backend_connector.go rename to quesma/backend_connectors/mysql_backend_connector.go diff --git a/quesma/v2/backend_connectors/postgres_backend_connector.go b/quesma/backend_connectors/postgres_backend_connector.go similarity index 100% rename from quesma/v2/backend_connectors/postgres_backend_connector.go rename to quesma/backend_connectors/postgres_backend_connector.go diff --git a/quesma/v2/frontend_connectors/basic_http_frontend_connector.go b/quesma/frontend_connectors/basic_http_frontend_connector.go similarity index 73% rename from quesma/v2/frontend_connectors/basic_http_frontend_connector.go rename to quesma/frontend_connectors/basic_http_frontend_connector.go index 6434e9f94..16df18f72 100644 --- a/quesma/v2/frontend_connectors/basic_http_frontend_connector.go +++ b/quesma/frontend_connectors/basic_http_frontend_connector.go @@ -14,9 +14,10 @@ import ( ) type HTTPRouter struct { - mux *http.ServeMux // Default HTTP multiplexer - handlers map[string]quesma_api.HandlersPipe // Map to store custom route handlers - mutex sync.RWMutex // Mutex for concurrent access to handlers + mux *http.ServeMux // Default HTTP multiplexer + handlers map[string]quesma_api.HandlersPipe // Map to store custom route handlers + fallbackHandler quesma_api.HTTPFrontendHandler + mutex sync.RWMutex // Mutex for concurrent access to handlers } func NewHTTPRouter() *HTTPRouter { @@ -34,6 +35,18 @@ func (router *HTTPRouter) AddRoute(path string, handler quesma_api.HTTPFrontendH fmt.Printf("Added route: %s\n", path) } +func (router *HTTPRouter) AddFallbackHandler(handler quesma_api.HTTPFrontendHandler) { + router.mutex.Lock() + defer router.mutex.Unlock() + router.fallbackHandler = handler +} + +func (router *HTTPRouter) GetFallbackHandler() quesma_api.HTTPFrontendHandler { + router.mutex.RLock() + defer router.mutex.RUnlock() + return router.fallbackHandler +} + func (router *HTTPRouter) Clone() quesma_api.Cloner { newRouter := NewHTTPRouter() router.mutex.Lock() @@ -41,6 +54,7 @@ func (router *HTTPRouter) Clone() quesma_api.Cloner { for path, handler := range router.handlers { newRouter.handlers[path] = handler } + newRouter.fallbackHandler = router.fallbackHandler return newRouter } @@ -97,15 +111,25 @@ func (h *BasicHTTPFrontendConnector) GetRouter() quesma_api.Router { func (h *BasicHTTPFrontendConnector) ServeHTTP(w http.ResponseWriter, req *http.Request) { handlerWrapper, exists := h.router.GetHandlers()[req.URL.Path] + dispatcher := &quesma_api.Dispatcher{} if !exists { - h.router.Multiplexer().ServeHTTP(w, req) + http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if h.router.GetFallbackHandler() != nil { + fmt.Printf("No handler found for path: %s\n", req.URL.Path) + handler := h.router.GetFallbackHandler() + _, message, _ := handler(req) + _, err := w.Write(message.([]byte)) + if err != nil { + fmt.Printf("Error writing response: %s\n", err) + } + } + }).ServeHTTP(w, req) return } - dispatcher := &quesma_api.Dispatcher{} http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { metadata, message, _ := handlerWrapper.Handler(req) - metadata, message = dispatcher.Dispatch(handlerWrapper.Processors, metadata, message) + _, message = dispatcher.Dispatch(handlerWrapper.Processors, metadata, message) _, err := w.Write(message.([]byte)) if err != nil { fmt.Printf("Error writing response: %s\n", err) diff --git a/quesma/v2/frontend_connectors/basic_tcp_connection_handler.go b/quesma/frontend_connectors/basic_tcp_connection_handler.go similarity index 100% rename from quesma/v2/frontend_connectors/basic_tcp_connection_handler.go rename to quesma/frontend_connectors/basic_tcp_connection_handler.go diff --git a/quesma/v2/frontend_connectors/basic_tcp_connector.go b/quesma/frontend_connectors/basic_tcp_connector.go similarity index 100% rename from quesma/v2/frontend_connectors/basic_tcp_connector.go rename to quesma/frontend_connectors/basic_tcp_connector.go diff --git a/quesma/v2/frontend_connectors/passthrough_tcp_connection_handler.go b/quesma/frontend_connectors/passthrough_tcp_connection_handler.go similarity index 100% rename from quesma/v2/frontend_connectors/passthrough_tcp_connection_handler.go rename to quesma/frontend_connectors/passthrough_tcp_connection_handler.go diff --git a/quesma/v2/frontend_connectors/tcp_postgres_connection_handler.go b/quesma/frontend_connectors/tcp_postgres_connection_handler.go similarity index 93% rename from quesma/v2/frontend_connectors/tcp_postgres_connection_handler.go rename to quesma/frontend_connectors/tcp_postgres_connection_handler.go index 99afac692..8cf968017 100644 --- a/quesma/v2/frontend_connectors/tcp_postgres_connection_handler.go +++ b/quesma/frontend_connectors/tcp_postgres_connection_handler.go @@ -32,9 +32,12 @@ func (p *TcpPostgresConnectionHandler) HandleConnection(conn net.Conn) error { } var resp any = msg metadata := make(map[string]interface{}) - metadata, resp = dispatcher.Dispatch(p.processors, metadata, resp) + _, resp = dispatcher.Dispatch(p.processors, metadata, resp) if resp != nil { _, err = conn.Write(resp.([]byte)) + if err != nil { + return fmt.Errorf("error sending response: %w", err) + } } } } diff --git a/quesma/go.mod b/quesma/go.mod index 8f21a8f40..cea128ff4 100644 --- a/quesma/go.mod +++ b/quesma/go.mod @@ -10,12 +10,16 @@ require ( github.com/DataDog/go-sqllexer v0.0.17 github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df github.com/coreos/go-semver v0.3.1 + github.com/go-sql-driver/mysql v1.8.1 github.com/goccy/go-json v0.10.3 + github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 github.com/gorilla/mux v1.8.1 github.com/gorilla/securecookie v1.1.2 github.com/gorilla/sessions v1.4.0 github.com/hashicorp/go-multierror v1.1.1 + github.com/jackc/pgx/v4 v4.18.3 + github.com/jackc/pgx/v5 v5.7.1 github.com/k0kubun/pp v3.0.1+incompatible github.com/knadh/koanf/parsers/json v0.1.0 github.com/knadh/koanf/parsers/yaml v0.1.0 @@ -34,9 +38,16 @@ require ( ) require ( + filippo.io/edwards25519 v1.1.0 // indirect github.com/go-viper/mapstructure/v2 v2.2.1 // indirect github.com/hashicorp/errwrap v1.0.0 // indirect - github.com/jackc/pgx/v5 v5.7.1 // indirect + github.com/jackc/chunkreader/v2 v2.0.1 // indirect + github.com/jackc/pgconn v1.14.3 // indirect + github.com/jackc/pgio v1.0.0 // indirect + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgproto3/v2 v2.3.3 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/pgtype v1.14.0 // indirect github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/kr/text v0.2.0 // indirect @@ -46,6 +57,8 @@ require ( github.com/tidwall/gjson v1.18.0 // indirect github.com/tidwall/match v1.1.1 // indirect github.com/tidwall/pretty v1.2.1 // indirect + golang.org/x/crypto v0.28.0 // indirect + golang.org/x/text v0.19.0 // indirect ) require ( diff --git a/quesma/go.sum b/quesma/go.sum index 6bb29df6e..b22f860bc 100644 --- a/quesma/go.sum +++ b/quesma/go.sum @@ -1,3 +1,6 @@ +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/ClickHouse/ch-go v0.61.5 h1:zwR8QbYI0tsMiEcze/uIMK+Tz1D3XZXLdNrlaOpeEI4= github.com/ClickHouse/ch-go v0.61.5/go.mod h1:s1LJW/F/LcFs5HJnuogFMta50kKDO0lf9zzfrbl0RQg= github.com/ClickHouse/clickhouse-go/v2 v2.30.0 h1:AG4D/hW39qa58+JHQIFOSnxyL46H6h2lrmGGk17dhFo= @@ -6,15 +9,21 @@ github.com/DATA-DOG/go-sqlmock v1.5.2 h1:OcvFkGmslmlZibjAjaHm3L//6LiuBgolP7Oputl github.com/DATA-DOG/go-sqlmock v1.5.2/go.mod h1:88MAG/4G7SMwSE3CeA0ZKzrT5CiOU3OJ+JlNzwDqpNU= github.com/DataDog/go-sqllexer v0.0.17 h1:u47fJAVg/+5DA74ZW3w0Qu+3qXHd3GtnA8ZBYixdPrM= github.com/DataDog/go-sqllexer v0.0.17/go.mod h1:KwkYhpFEVIq+BfobkTC1vfqm4gTi65skV/DpDBXtexc= +github.com/Masterminds/semver/v3 v3.1.1/go.mod h1:VPu/7SZ7ePZ3QOrcuXROw5FAcLl4a0cBrbBpGY/8hQs= github.com/andybalholm/brotli v1.1.1 h1:PR2pgnyFznKEugtsUo0xLdDop5SKXd5Qf5ysW+7XdTA= github.com/andybalholm/brotli v1.1.1/go.mod h1:05ib4cKhjx3OQYUY22hTVd34Bc8upXjOLL2rKwwZBoA= github.com/antlr4-go/antlr/v4 v4.13.1 h1:SqQKkuVZ+zWkMMNkjy5FZe5mr5WURWnlpmOuzYWrPrQ= github.com/antlr4-go/antlr/v4 v4.13.1/go.mod h1:GKmUxMtwp6ZgGwZSva4eWPC5mS6vUAmOABFgjdkM7Nw= github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df h1:GSoSVRLoBaFpOOds6QyY1L8AX7uoY+Ln3BHc22W40X0= github.com/barkimedes/go-deepcopy v0.0.0-20220514131651-17c30cfc62df/go.mod h1:hiVxq5OP2bUGBRNS3Z/bt/reCLFNbdcST6gISi1fiOM= +github.com/cockroachdb/apd v1.1.0 h1:3LFP3629v+1aKXU5Q37mxmRxX/pIu1nijXydLShEq5I= +github.com/cockroachdb/apd v1.1.0/go.mod h1:8Sl8LxpKi29FqWXR16WEFZRNSz3SoPzUzeMeY4+DwBQ= github.com/coreos/go-semver v0.3.1 h1:yi21YpKnrx1gt5R+la8n5WgS0kCrsPp33dmEyHReZr4= github.com/coreos/go-semver v0.3.1/go.mod h1:irMmmIw/7yzSRPWryHsK7EYSg09caPQL03VsM8rvUec= +github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= +github.com/coreos/go-systemd v0.0.0-20190719114852-fd7a80b32e1f/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc= +github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -28,13 +37,20 @@ github.com/go-faster/city v1.0.1 h1:4WAxSZ3V2Ws4QRDrscLEDcibJY8uf41H6AhXDrNDcGw= github.com/go-faster/city v1.0.1/go.mod h1:jKcUJId49qdW3L1qKHH/3wPeUstCVpVSXTM6vO3VcTw= github.com/go-faster/errors v0.7.1 h1:MkJTnDoEdi9pDabt1dpWf7AA8/BaSYZqibYyhZ20AYg= github.com/go-faster/errors v0.7.1/go.mod h1:5ySTjWFiphBs07IKuiL69nxdfd5+fzh1u7FPGZP2quo= +github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY= +github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-ole/go-ole v1.2.6 h1:/Fpf6oFPoeFik9ty7siob0G6Ke8QvQEuVcuChpwXzpY= github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-sql-driver/mysql v1.8.1 h1:LedoTUt/eveggdHS9qUFC1EFSa8bU2+1pZjSRpvNJ1Y= +github.com/go-sql-driver/mysql v1.8.1/go.mod h1:wEBSXgmK//2ZFJyE+qWnIsVGmvmEKlqwuVSjsCm7DZg= +github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-viper/mapstructure/v2 v2.2.1 h1:ZAaOCxANMuZx5RCeg0mBdEZk7DZasvvZIxtHqx8aGss= github.com/go-viper/mapstructure/v2 v2.2.1/go.mod h1:oJDH3BJKyqBA2TXFhDsKDGDTlndYOZ6rGS0BRZIxGhM= github.com/goccy/go-json v0.10.3 h1:KZ5WoDbxAIgm2HNbYckL0se1fHD6rz5j4ywS6ebzDqA= github.com/goccy/go-json v0.10.3/go.mod h1:oq7eo15ShAhp70Anwd5lgX2pLfOS3QCiwU/PULtXL6M= github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofrs/uuid v4.0.0+incompatible h1:1SD/1F5pU8p29ybwgQSwpQk+mwdRrXCYuPhW6m+TnJw= +github.com/gofrs/uuid v4.0.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= @@ -45,6 +61,7 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= +github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/uuid v1.6.0 h1:NIvaJDMOsjHA8n1jAhLSgzrAzy1Hgr+hNrb57e+94F0= github.com/google/uuid v1.6.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/mux v1.8.1 h1:TuBL49tXwgrFYWhqrNgrUNEY92u81SPhu7sTdzQEiWY= @@ -57,12 +74,55 @@ github.com/hashicorp/errwrap v1.0.0 h1:hLrqtEDnRye3+sgx6z4qVLNuviH3MR5aQ0ykNJa/U github.com/hashicorp/errwrap v1.0.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo= github.com/hashicorp/go-multierror v1.1.1/go.mod h1:iw975J/qwKPdAO1clOe2L8331t/9/fmwbPZ6JB6eMoM= +github.com/jackc/chunkreader v1.0.0/go.mod h1:RT6O25fNZIuasFJRyZ4R/Y2BbhasbmZXF9QQ7T3kePo= +github.com/jackc/chunkreader/v2 v2.0.0/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/chunkreader/v2 v2.0.1 h1:i+RDz65UE+mmpjTfyz0MoVTnzeYxroil2G82ki7MGG8= +github.com/jackc/chunkreader/v2 v2.0.1/go.mod h1:odVSm741yZoC3dpHEUXIqA9tQRhFrgOHwnPIn9lDKlk= +github.com/jackc/pgconn v0.0.0-20190420214824-7e0022ef6ba3/go.mod h1:jkELnwuX+w9qN5YIfX0fl88Ehu4XC3keFuOJJk9pcnA= +github.com/jackc/pgconn v0.0.0-20190824142844-760dd75542eb/go.mod h1:lLjNuW/+OfW9/pnVKPazfWOgNfH2aPem8YQ7ilXGvJE= +github.com/jackc/pgconn v0.0.0-20190831204454-2fabfa3c18b7/go.mod h1:ZJKsE/KZfsUgOEh9hBm+xYTstcNHg7UPMVJqRfQxq4s= +github.com/jackc/pgconn v1.8.0/go.mod h1:1C2Pb36bGIP9QHGBYCjnyhqu7Rv3sGshaQUvmfGIB/o= +github.com/jackc/pgconn v1.9.0/go.mod h1:YctiPyvzfU11JFxoXokUOOKQXQmDMoJL9vJzHH8/2JY= +github.com/jackc/pgconn v1.9.1-0.20210724152538-d89c8390a530/go.mod h1:4z2w8XhRbP1hYxkpTuBjTS3ne3J48K83+u0zoyvg2pI= +github.com/jackc/pgconn v1.14.3 h1:bVoTr12EGANZz66nZPkMInAV/KHD2TxH9npjXXgiB3w= +github.com/jackc/pgconn v1.14.3/go.mod h1:RZbme4uasqzybK2RK5c65VsHxoyaml09lx3tXOcO/VM= +github.com/jackc/pgio v1.0.0 h1:g12B9UwVnzGhueNavwioyEEpAmqMe1E/BN9ES+8ovkE= +github.com/jackc/pgio v1.0.0/go.mod h1:oP+2QK2wFfUWgr+gxjoBH9KGBb31Eio69xUb0w5bYf8= +github.com/jackc/pgmock v0.0.0-20190831213851-13a1b77aafa2/go.mod h1:fGZlG77KXmcq05nJLRkk0+p82V8B8Dw8KN2/V9c/OAE= +github.com/jackc/pgmock v0.0.0-20201204152224-4fe30f7445fd/go.mod h1:hrBW0Enj2AZTNpt/7Y5rr2xe/9Mn757Wtb2xeBzPv2c= +github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65 h1:DadwsjnMwFjfWc9y5Wi/+Zz7xoE5ALHsRQlOctkOiHc= +github.com/jackc/pgmock v0.0.0-20210724152146-4ad1a8207f65/go.mod h1:5R2h2EEX+qri8jOWMbJCtaPWkrrNc7OHwsp2TCqp7ak= github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgproto3 v1.1.0/go.mod h1:eR5FA3leWg7p9aeAqi37XOTgTIbkABlvcPB3E5rlc78= +github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190420180111-c116219b62db/go.mod h1:bhq50y+xrl9n5mRYyCBFKkpRVTLYJVWeCc+mEAI3yXA= +github.com/jackc/pgproto3/v2 v2.0.0-alpha1.0.20190609003834-432c2951c711/go.mod h1:uH0AWtUmuShn0bcesswc4aBTWGvw0cAxIJp+6OB//Wg= +github.com/jackc/pgproto3/v2 v2.0.0-rc3/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= +github.com/jackc/pgproto3/v2 v2.0.0-rc3.0.20190831210041-4c03ce451f29/go.mod h1:ryONWYqW6dqSg1Lw6vXNMXoBJhpzvWKnT95C46ckYeM= +github.com/jackc/pgproto3/v2 v2.0.6/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgproto3/v2 v2.1.1/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgproto3/v2 v2.3.3 h1:1HLSx5H+tXR9pW3in3zaztoEwQYRC9SQaYUHjTSUOag= +github.com/jackc/pgproto3/v2 v2.3.3/go.mod h1:WfJCnwN3HIg9Ish/j3sgWXnAfK8A9Y0bwXYU5xKaEdA= +github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b/go.mod h1:vsD4gTJCa9TptPL8sPkXrLZ+hDuNrZCnj29CQpr4X1E= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgtype v0.0.0-20190421001408-4ed0de4755e0/go.mod h1:hdSHsc1V01CGwFsrv11mJRHWJ6aifDLfdV3aVjFF0zg= +github.com/jackc/pgtype v0.0.0-20190824184912-ab885b375b90/go.mod h1:KcahbBH1nCMSo2DXpzsoWOAfFkdEtEJpPbVLq8eE+mc= +github.com/jackc/pgtype v0.0.0-20190828014616-a8802b16cc59/go.mod h1:MWlu30kVJrUS8lot6TQqcg7mtthZ9T0EoIBFiJcmcyw= +github.com/jackc/pgtype v1.8.1-0.20210724151600-32e20a603178/go.mod h1:C516IlIV9NKqfsMCXTdChteoXmwgUceqaLfjg2e3NlM= +github.com/jackc/pgtype v1.14.0 h1:y+xUdabmyMkJLyApYuPj38mW+aAIqCe5uuBB51rH3Vw= +github.com/jackc/pgtype v1.14.0/go.mod h1:LUMuVrfsFfdKGLw+AFFVv6KtHOFMwRgDDzBt76IqCA4= +github.com/jackc/pgx/v4 v4.0.0-20190420224344-cc3461e65d96/go.mod h1:mdxmSJJuR08CZQyj1PVQBHy9XOp5p8/SHH6a0psbY9Y= +github.com/jackc/pgx/v4 v4.0.0-20190421002000-1b8f0016e912/go.mod h1:no/Y67Jkk/9WuGR0JG/JseM9irFbnEPbuWV2EELPNuM= +github.com/jackc/pgx/v4 v4.0.0-pre1.0.20190824185557-6972a5742186/go.mod h1:X+GQnOEnf1dqHGpw7JmHqHc1NxDoalibchSk9/RWuDc= +github.com/jackc/pgx/v4 v4.12.1-0.20210724153913-640aa07df17c/go.mod h1:1QD0+tgSXP7iUjYm9C1NxKhny7lq6ee99u/z+IHFcgs= +github.com/jackc/pgx/v4 v4.18.3 h1:dE2/TrEsGX3RBprb3qryqSV9Y60iZN1C6i8IrmW9/BA= +github.com/jackc/pgx/v4 v4.18.3/go.mod h1:Ey4Oru5tH5sB6tV7hDmfWFahwF15Eb7DNXlRKx2CkVw= github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs= github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA= +github.com/jackc/puddle v0.0.0-20190413234325-e4ced69a3a2b/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v0.0.0-20190608224051-11cab39313c9/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= +github.com/jackc/puddle v1.1.3/go.mod h1:m4B5Dj62Y0fbyuIc15OsIqK0+JU8nkqQjsgx7dvjSWk= github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88 h1:uC1QfSlInpQF+M0ao65imhwqKnz3Q2z/d8PWZRMQvDM= github.com/k0kubun/colorstring v0.0.0-20150214042306-9440f1994b88/go.mod h1:3w7q1U84EfirKl04SVQ/s7nPm1ZPhiXd34z40TNz36k= github.com/k0kubun/pp v3.0.1+incompatible h1:3tqvf7QgUnZ5tXO6pNAZlrvHgl6DvifjDrd9g2S9Z40= @@ -83,19 +143,32 @@ github.com/knadh/koanf/providers/file v1.1.2 h1:aCC36YGOgV5lTtAFz2qkgtWdeQsgfxUk github.com/knadh/koanf/providers/file v1.1.2/go.mod h1:/faSBcv2mxPVjFrXck95qeoyoZ5myJ6uxN8OOVNJJCI= github.com/knadh/koanf/v2 v2.1.2 h1:I2rtLRqXRy1p01m/utEtpZSSA6dcJbgGVuE27kW2PzQ= github.com/knadh/koanf/v2 v2.1.2/go.mod h1:Gphfaen0q1Fc1HTgJgSTC4oRX9R2R5ErYMZJy8fLJBo= +github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/konsorten/go-windows-terminal-sequences v1.0.2/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.1.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.2.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= +github.com/lib/pq v1.10.2 h1:AqzbZs4ZoCBp+GtejcpCpcxM3zlSMx29dXbUSeVtJb8= +github.com/lib/pq v1.10.2/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 h1:6E+4a0GO5zZEnZ81pIr0yLvtUWk2if982qA3F3QD6H4= github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/markbates/goth v1.80.0 h1:NnvatczZDzOs1hn9Ug+dVYf2Viwwkp/ZDX5K+GLjan8= github.com/markbates/goth v1.80.0/go.mod h1:4/GYHo+W6NWisrMPZnq0Yr2Q70UntNLn7KXEFhrIdAY= +github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= +github.com/mattn/go-colorable v0.1.6/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= +github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.7/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= @@ -110,6 +183,7 @@ github.com/paulmach/orb v0.11.1/go.mod h1:5mULz1xQfs3bmQm63QEJA6lNGujuRafwA5S/En github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= github.com/pierrec/lz4/v4 v4.1.21 h1:yOVMLb6qSIDP67pl/5F7RepeKYu/VmTyEXvuMI5d9mQ= github.com/pierrec/lz4/v4 v4.1.21/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= @@ -117,11 +191,16 @@ github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRI github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c h1:ncq/mPwQF4JjgDlrVEn3C11VoGHZN7m8qihwgMEtzYw= github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= +github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.13.1 h1:KvO1DLK/DRN07sQ1LQKScxyZJuNnedQ5/wKSR38lUII= github.com/rogpeppe/go-internal v1.13.1/go.mod h1:uMEvuHeurkdAXX61udpOXGD/AzZDWNMNyH2VO9fmH0o= +github.com/rs/xid v1.2.1/go.mod h1:+uKXf+4Djp6Md1KODXJxgGQPKngRmWyn10oCKFzNHOQ= github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= +github.com/rs/zerolog v1.13.0/go.mod h1:YbFCdg8HfsridGWAh22vktObvhZbQsZXe4/zB0OKkWU= +github.com/rs/zerolog v1.15.0/go.mod h1:xYTKnLHcpfU2225ny5qZjxnj9NvkumZYjJHlAThCjNc= github.com/rs/zerolog v1.33.0 h1:1cU2KZkvPxNyfgEmhHAz/1A9Bz+llsdYzklWFzgp0r8= github.com/rs/zerolog v1.33.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss= +github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/segmentio/asm v1.2.0 h1:9BQrFxC+YOHJlTlHGkTrFWf59nbL3XnCoFLTwDCI7ys= github.com/segmentio/asm v1.2.0/go.mod h1:BqMnlJP91P8d+4ibuonYZw9mfnzI9HfxselHZr5aAcs= github.com/shirou/gopsutil/v3 v3.24.5 h1:i0t8kL+kQTvpAYToeuiVk3TgDeKOFioZO3Ztz/iZ9pI= @@ -130,10 +209,21 @@ github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFt github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= +github.com/shopspring/decimal v0.0.0-20180709203117-cd690d0c9e24/go.mod h1:M+9NzErvs504Cn4c5DxATwIqPbtswREoFCre64PpcG4= +github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/shopspring/decimal v1.4.0 h1:bxl37RwXBklmTi0C79JfXCEBD1cqqHt0bbgBAGFp81k= github.com/shopspring/decimal v1.4.0/go.mod h1:gawqmDU56v4yIKSwfBSFip1HdCCXN8/+DMd9qYNcwME= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= +github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/stretchr/testify v1.10.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= github.com/tailscale/hujson v0.0.0-20221223112325-20486734a56a h1:SJy1Pu0eH1C29XwJucQo73FrleVK6t4kYz4NVhp34Yw= @@ -165,25 +255,49 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= +github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.mongodb.org/mongo-driver v1.11.4/go.mod h1:PTSz5yu21bkT/wXpkS7WR5f0ddqw5quethTUn9WM+2g= go.opentelemetry.io/otel v1.28.0 h1:/SqNcYk+idO0CxKEUOtKQClMK/MimZihKYMruSMViUo= go.opentelemetry.io/otel v1.28.0/go.mod h1:q68ijF8Fc8CnMHKyzqL6akLO46ePnjkgfIMIjUIX9z4= go.opentelemetry.io/otel/trace v1.28.0 h1:GhQ9cUuQGmNDd5BTCP2dAvv75RdMxEfTmYejp+lkx9g= go.opentelemetry.io/otel/trace v1.28.0/go.mod h1:jPyXzNPg6da9+38HEwElrQiHlVMTnVfM3/yv2OlIHaI= +go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= +go.uber.org/atomic v1.5.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/atomic v1.6.0/go.mod h1:sABNBOSYdrvTF6hTgEIbc7YasKWGhgEQZyfxyTvoXHQ= +go.uber.org/multierr v1.1.0/go.mod h1:wR5kodmAFQ0UK8QlbwjlSNy0Z68gJhDJUG5sjR94q/0= +go.uber.org/multierr v1.3.0/go.mod h1:VgVr7evmIr6uPjLBxg28wmKNXyqE9akIJ5XnfpiKl+4= +go.uber.org/multierr v1.5.0/go.mod h1:FeouvMocqHpRaaGuG9EjoKcStLC43Zu/fmqdUMPcKYU= +go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= +go.uber.org/zap v1.9.1/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= +go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190411191339-88737f569e3a/go.mod h1:WFFai1msRO1wXaEeE5yQxYXgSfI8pQAWXbQop6sCtWE= +golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= +golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201203163018-be400aefbc4c/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= +golang.org/x/crypto v0.0.0-20210616213533-5ff15b29337e/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.28.0 h1:GBDwsMXVQi34v5CCYUm2jkJvu4cbtru2U4TN2PSyQnw= golang.org/x/crypto v0.28.0/go.mod h1:rmgy+3RHxRZMyY0jjAJShp2zgEdOqj2AO7U0pYmeQ7U= golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f h1:XdNn9LlyWAhLVp6P/i8QYBW+hlyhrhei9uErw2B5GJo= golang.org/x/exp v0.0.0-20241108190413-2d47ceb2692f/go.mod h1:D5SMRVC3C2/4+F/DB1wZsLRnSNimn2Sp/NPsCrsv8ak= +golang.org/x/lint v0.0.0-20190930215403-16217165b5de/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= +golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg= golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20190813141303-74dc4d7220e7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/oauth2 v0.24.0 h1:KTBBxWqUa0ykRPLtV69rRto9TLXcqYkeswu48x/gvNE= golang.org/x/oauth2 v0.24.0/go.mod h1:XYTD2NtWslqkgxebSiOHnXEap4TF09sJSc7H1sXbhtI= @@ -191,9 +305,17 @@ golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190403152447-81d4e9dc473e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -206,17 +328,29 @@ golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= -golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= +golang.org/x/text v0.19.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190425163242-31fd60d6bfdc/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/tools v0.0.0-20190621195816-6e04913cbbac/go.mod h1:/rFqwRUd4F7ZHNgwSSTFct+R/Kf4OFW1sUzUTQQTgfc= +golang.org/x/tools v0.0.0-20190823170909-c4a336ef6a2f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029041327-9cc4af7d6b2c/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191029190741-b9c20aec41a5/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20200103221440-774c71fcf114/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28= golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE= golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA= +golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= @@ -227,6 +361,10 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c/go.mod h1:JHkPIbrfpd72SG/EVd6muEfDQjcINNoR0C8j2r3qZ4Q= +gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/inconshreveable/log15.v2 v2.0.0-20180818164646-67afb5ed74ec/go.mod h1:aPpfJ7XW+gOuirDoZ8gHhLh3kZ1B08FtV2bbmy7Jv3s= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.1-2019.2.3/go.mod h1:a3bituU0lyd329TUQxRnasdCoJDkEUEAqEt0JzvZhAg= diff --git a/quesma/main.go b/quesma/main.go index 5dd003982..5fc7d871f 100644 --- a/quesma/main.go +++ b/quesma/main.go @@ -16,6 +16,7 @@ import ( "quesma/connectors" "quesma/elasticsearch" "quesma/feature" + "quesma/frontend_connectors" "quesma/ingest" "quesma/licensing" "quesma/logger" @@ -29,7 +30,6 @@ import ( "quesma/telemetry" "quesma/tracing" "quesma_v2/core" - "quesma_v2/frontend_connectors" "runtime" "syscall" "time" @@ -160,6 +160,7 @@ func constructQuesma(cfg *config.QuesmaConfiguration, sl clickhouse.TableDiscove if cfg.TransparentProxy { return quesma.NewQuesmaTcpProxy(phoneHomeAgent, cfg, quesmaManagementConsole, logChan, false) } else { - return quesma.NewHttpProxy(phoneHomeAgent, lm, ip, sl, im, schemaRegistry, cfg, quesmaManagementConsole, abResultsrepository, indexRegistry) + const quesma_v2 = false + return quesma.NewHttpProxy(phoneHomeAgent, lm, ip, sl, im, schemaRegistry, cfg, quesmaManagementConsole, abResultsrepository, indexRegistry, quesma_v2) } } diff --git a/quesma/main_test.go b/quesma/main_test.go index 143bbc26b..caeb553d3 100644 --- a/quesma/main_test.go +++ b/quesma/main_test.go @@ -3,9 +3,139 @@ package main -import "testing" +import ( + "context" + "github.com/stretchr/testify/assert" + "net/http" + "os" + "os/signal" + "quesma/backend_connectors" + "quesma/frontend_connectors" + "quesma/processors" + quesma_api "quesma_v2/core" + "sync/atomic" + "syscall" + "testing" + "time" +) // just to make sure that the buildIngestOnlyQuesma is used -func TestMain(m *testing.M) { +func Test_Main(m *testing.T) { _ = buildIngestOnlyQuesma() } + +func emitRequests(stop chan os.Signal) { + go func() { + time.Sleep(1 * time.Second) + requestBody := []byte(`{"query": {"match_all": {}}}`) + sendRequest("http://localhost:8888/_bulk", requestBody) + sendRequest("http://localhost:8888/_doc", requestBody) + sendRequest("http://localhost:8888/_search", requestBody) + sendRequest("http://localhost:8888/_search", requestBody) + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + close(stop) + }() +} + +func Test_backendConnectorValidation(t *testing.T) { + var tcpProcessor quesma_api.Processor = processors.NewPostgresToMySqlProcessor() + var postgressPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline() + postgressPipeline.AddProcessor(tcpProcessor) + var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma() + const endpoint = "root:password@tcp(127.0.0.1:3306)/test" + var mySqlBackendConnector quesma_api.BackendConnector = &backend_connectors.MySqlBackendConnector{ + Endpoint: endpoint, + } + postgressPipeline.AddBackendConnector(mySqlBackendConnector) + quesmaBuilder.AddPipeline(postgressPipeline) + _, err := quesmaBuilder.Build() + assert.NoError(t, err) +} + +var fallbackCalled int32 = 0 + +func fallback(request *http.Request) (map[string]interface{}, any, error) { + metadata := quesma_api.MakeNewMetadata() + atomic.AddInt32(&fallbackCalled, 1) + resp := []byte("unknown\n") + return metadata, resp, nil +} + +func ab_testing_scenario() quesma_api.QuesmaBuilder { + var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma() + + ingestFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888") + ingestHTTPRouter := frontend_connectors.NewHTTPRouter() + ingestHTTPRouter.AddRoute("/_bulk", bulk) + ingestHTTPRouter.AddRoute("/_doc", doc) + ingestFrontendConnector.AddRouter(ingestHTTPRouter) + var ingestPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline() + ingestPipeline.AddFrontendConnector(ingestFrontendConnector) + var abIngestTestProcessor quesma_api.Processor = processors.NewABTestProcessor("ABIngestTestProcessor", false) + + var ingestProcessor quesma_api.Processor = NewIngestProcessor() + var innerIngestProcessor1 quesma_api.Processor = NewInnerIngestProcessor1() + ingestProcessor.AddProcessor(innerIngestProcessor1) + var innerIngestProcessor2 quesma_api.Processor = NewInnerIngestProcessor2() + ingestProcessor.AddProcessor(innerIngestProcessor2) + + ingestPipeline.AddProcessor(ingestProcessor) + ingestPipeline.AddProcessor(abIngestTestProcessor) + + queryFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888") + queryHTTPRouter := frontend_connectors.NewHTTPRouter() + queryHTTPRouter.AddRoute("/_search", search) + queryFrontendConnector.AddRouter(queryHTTPRouter) + var queryPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline() + queryPipeline.AddFrontendConnector(queryFrontendConnector) + var queryProcessor quesma_api.Processor = NewQueryProcessor() + var innerQueryProcessor1 quesma_api.Processor = NewInnerQueryProcessor1() + queryProcessor.AddProcessor(innerQueryProcessor1) + var innerQueryProcessor2 quesma_api.Processor = NewInnerQueryProcessor2() + queryProcessor.AddProcessor(innerQueryProcessor2) + var abQueryTestProcessor quesma_api.Processor = processors.NewABTestProcessor("ABQueryTestProcessor", true) + + queryPipeline.AddProcessor(queryProcessor) + queryPipeline.AddProcessor(abQueryTestProcessor) + quesmaBuilder.AddPipeline(ingestPipeline) + quesmaBuilder.AddPipeline(queryPipeline) + + quesma, _ := quesmaBuilder.Build() + return quesma +} + +func fallbackScenario() quesma_api.QuesmaBuilder { + var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma() + + ingestFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888") + ingestHTTPRouter := frontend_connectors.NewHTTPRouter() + var fallback quesma_api.HTTPFrontendHandler = fallback + ingestHTTPRouter.AddFallbackHandler(fallback) + ingestFrontendConnector.AddRouter(ingestHTTPRouter) + var ingestPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline() + ingestPipeline.AddFrontendConnector(ingestFrontendConnector) + quesmaBuilder.AddPipeline(ingestPipeline) + quesma, _ := quesmaBuilder.Build() + quesma.Start() + return quesma +} + +func Test_fallbackScenario(t *testing.T) { + q1 := fallbackScenario() + q1.Start() + stop := make(chan os.Signal, 1) + emitRequests(stop) + <-stop + q1.Stop(context.Background()) + atomic.LoadInt32(&fallbackCalled) + assert.Equal(t, int32(4), fallbackCalled) +} + +func Test_scenario1(t *testing.T) { + q1 := ab_testing_scenario() + q1.Start() + stop := make(chan os.Signal, 1) + emitRequests(stop) + <-stop + q1.Stop(context.Background()) +} diff --git a/quesma/v2/processors/ab_test_processor.go b/quesma/processors/ab_test_processor.go similarity index 100% rename from quesma/v2/processors/ab_test_processor.go rename to quesma/processors/ab_test_processor.go diff --git a/quesma/v2/processors/base_processor.go b/quesma/processors/base_processor.go similarity index 100% rename from quesma/v2/processors/base_processor.go rename to quesma/processors/base_processor.go diff --git a/quesma/v2/processors/basic_tcp_processor.go b/quesma/processors/basic_tcp_processor.go similarity index 100% rename from quesma/v2/processors/basic_tcp_processor.go rename to quesma/processors/basic_tcp_processor.go diff --git a/quesma/v2/processors/mysql_ingest_processor.go b/quesma/processors/mysql_ingest_processor.go similarity index 100% rename from quesma/v2/processors/mysql_ingest_processor.go rename to quesma/processors/mysql_ingest_processor.go diff --git a/quesma/v2/processors/mysql_query_processor.go b/quesma/processors/mysql_query_processor.go similarity index 100% rename from quesma/v2/processors/mysql_query_processor.go rename to quesma/processors/mysql_query_processor.go diff --git a/quesma/v2/processors/postgres_ingest_processor.go b/quesma/processors/postgres_ingest_processor.go similarity index 100% rename from quesma/v2/processors/postgres_ingest_processor.go rename to quesma/processors/postgres_ingest_processor.go diff --git a/quesma/v2/processors/postgres_query_processor.go b/quesma/processors/postgres_query_processor.go similarity index 100% rename from quesma/v2/processors/postgres_query_processor.go rename to quesma/processors/postgres_query_processor.go diff --git a/quesma/v2/processors/postgres_to_mysql_processor.go b/quesma/processors/postgres_to_mysql_processor.go similarity index 100% rename from quesma/v2/processors/postgres_to_mysql_processor.go rename to quesma/processors/postgres_to_mysql_processor.go diff --git a/quesma/quesma/dual_write_proxy.go b/quesma/quesma/dual_write_proxy.go index 34b96208c..5a53a89d3 100644 --- a/quesma/quesma/dual_write_proxy.go +++ b/quesma/quesma/dual_write_proxy.go @@ -3,22 +3,36 @@ package quesma import ( + "bytes" "context" "crypto/tls" "errors" + "fmt" + "io" "net/http" + "quesma/ab_testing" "quesma/clickhouse" "quesma/elasticsearch" + "quesma/end_user_errors" + "quesma/feature" + "quesma/ingest" "quesma/logger" + "quesma/queryparser" "quesma/quesma/async_search_storage" "quesma/quesma/config" + "quesma/quesma/gzip" "quesma/quesma/mux" "quesma/quesma/recovery" + "quesma/quesma/routes" + "quesma/quesma/types" "quesma/quesma/ui" "quesma/schema" + "quesma/table_resolver" "quesma/telemetry" + "quesma/tracing" "quesma/util" "strconv" + "strings" "sync/atomic" "time" ) @@ -69,7 +83,17 @@ func (q *dualWriteHttpProxy) Stop(ctx context.Context) { q.Close(ctx) } -func newDualWriteProxy(schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, registry schema.Registry, config *config.QuesmaConfiguration, pathRouter *mux.PathRouter, quesmaManagementConsole *ui.QuesmaManagementConsole, agent telemetry.PhoneHomeAgent, queryRunner *QueryRunner) *dualWriteHttpProxy { +func newDualWriteProxy(schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, registry schema.Registry, config *config.QuesmaConfiguration, quesmaManagementConsole *ui.QuesmaManagementConsole, agent telemetry.PhoneHomeAgent, processor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxy { + queryRunner := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole, registry, abResultsRepository, resolver) + // not sure how we should configure our query translator ??? + // is this a config option?? + + queryRunner.DateMathRenderer = queryparser.DateMathExpressionFormatLiteral + + // tests should not be run with optimization enabled by default + queryRunner.EnableQueryOptimization(config) + + pathRouter := ConfigureRouter(config, registry, logManager, processor, quesmaManagementConsole, agent, queryRunner, resolver) tr := &http.Transport{ TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, @@ -152,3 +176,385 @@ func (q *dualWriteHttpProxy) Ingest() { logger.Info().Msgf("Accepting HTTP at :%d", q.publicPort) }() } + +func responseFromElastic(ctx context.Context, elkResponse *http.Response, w http.ResponseWriter) { + id := ctx.Value(tracing.RequestIdCtxKey).(string) + logger.Debug().Str(logger.RID, id).Msg("responding from Elasticsearch") + + copyHeaders(w, elkResponse) + w.Header().Set(quesmaSourceHeader, quesmaSourceElastic) + // io.Copy calls WriteHeader implicitly + w.WriteHeader(elkResponse.StatusCode) + if _, err := io.Copy(w, elkResponse.Body); err != nil { + logger.ErrorWithCtx(ctx).Msgf("Error copying response body: %v", err) + http.Error(w, "Error copying response body", http.StatusInternalServerError) + return + } + elkResponse.Body.Close() +} + +func responseFromQuesma(ctx context.Context, unzipped []byte, w http.ResponseWriter, quesmaResponse *mux.Result, zip bool) { + id := ctx.Value(tracing.RequestIdCtxKey).(string) + logger.Debug().Str(logger.RID, id).Msg("responding from Quesma") + + for key, value := range quesmaResponse.Meta { + w.Header().Set(key, value) + } + if zip { + w.Header().Set("Content-Encoding", "gzip") + } + w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse) + w.WriteHeader(quesmaResponse.StatusCode) + if zip { + zipped, err := gzip.Zip(unzipped) + if err != nil { + logger.ErrorWithCtx(ctx).Msgf("Error zipping: %v", err) + } + _, _ = io.Copy(w, bytes.NewBuffer(zipped)) + } else { + _, _ = io.Copy(w, bytes.NewBuffer(unzipped)) + } +} + +type router struct { + config *config.QuesmaConfiguration + requestPreprocessors processorChain + quesmaManagementConsole *ui.QuesmaManagementConsole + phoneHomeAgent telemetry.PhoneHomeAgent + httpClient *http.Client + failedRequests atomic.Int64 +} + +func (r *router) registerPreprocessor(preprocessor RequestPreprocessor) { + r.requestPreprocessors = append(r.requestPreprocessors, preprocessor) +} + +func (r *router) errorResponse(ctx context.Context, err error, w http.ResponseWriter) { + r.failedRequests.Add(1) + + msg := "Internal Quesma Error.\nPlease contact support if the problem persists." + reason := "Failed request." + result := mux.ServerErrorResult() + + // if error is an error with user-friendly message, we should use it + var endUserError *end_user_errors.EndUserError + if errors.As(err, &endUserError) { + msg = endUserError.EndUserErrorMessage() + reason = endUserError.Reason() + + // we treat all `Q1xxx` errors as bad requests here + if endUserError.ErrorType().Number < 2000 { + result = mux.BadReqeustResult() + } + } + + logger.ErrorWithCtxAndReason(ctx, reason).Msgf("quesma request failed: %v", err) + + requestId := "n/a" + if contextRid, ok := ctx.Value(tracing.RequestIdCtxKey).(string); ok { + requestId = contextRid + } + + // We should not send our error message to the client. There can be sensitive information in it. + // We will send ID of failed request instead + responseFromQuesma(ctx, []byte(fmt.Sprintf("%s\nRequest ID: %s\n", msg, requestId)), w, result, false) +} + +func (*router) closedIndexResponse(ctx context.Context, w http.ResponseWriter, pattern string) { + // TODO we should return a proper status code here (400?) + w.WriteHeader(http.StatusOK) + + response := make(types.JSON) + + response["error"] = queryparser.Error{ + RootCause: []queryparser.RootCause{ + { + Type: "index_closed_exception", + Reason: fmt.Sprintf("pattern %s is not routed to any connector", pattern), + }, + }, + Type: "index_closed_exception", + Reason: fmt.Sprintf("pattern %s is not routed to any connector", pattern), + } + + b, err := response.Bytes() + if err != nil { + logger.ErrorWithCtx(ctx).Msgf("Error marshalling response: %v", err) + return + } + + w.Write(b) + +} + +func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, router *mux.PathRouter, logManager *clickhouse.LogManager) { + defer recovery.LogAndHandlePanic(ctx, func(err error) { + w.WriteHeader(500) + w.Write(queryparser.InternalQuesmaError("Unknown Quesma error")) + }) + + quesmaRequest, ctx, err := r.preprocessRequest(ctx, &mux.Request{ + Method: req.Method, + Path: strings.TrimSuffix(req.URL.Path, "/"), + Params: map[string]string{}, + Headers: req.Header, + QueryParams: req.URL.Query(), + Body: string(reqBody), + }) + + if err != nil { + logger.ErrorWithCtx(ctx).Msgf("Error preprocessing request: %v", err) + } + + quesmaRequest.ParsedBody = types.ParseRequestBody(quesmaRequest.Body) + + handler, decision := router.Matches(quesmaRequest) + + if decision != nil { + w.Header().Set(quesmaTableResolverHeader, decision.String()) + } else { + w.Header().Set(quesmaTableResolverHeader, "n/a") + } + + if handler != nil { + quesmaResponse, err := recordRequestToClickhouse(req.URL.Path, r.quesmaManagementConsole, func() (*mux.Result, error) { + return handler(ctx, quesmaRequest) + }) + + zip := strings.Contains(req.Header.Get("Accept-Encoding"), "gzip") + + if err == nil { + logger.Debug().Ctx(ctx).Msg("responding from quesma") + unzipped := []byte{} + if quesmaResponse != nil { + unzipped = []byte(quesmaResponse.Body) + } + if len(unzipped) == 0 { + logger.WarnWithCtx(ctx).Msgf("empty response from Clickhouse, method=%s", req.Method) + } + addProductAndContentHeaders(req.Header, w.Header()) + + responseFromQuesma(ctx, unzipped, w, quesmaResponse, zip) + + } else { + r.errorResponse(ctx, err, w) + } + } else { + + var sendToElastic bool + + if decision != nil { + + if decision.Err != nil { + w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse) + addProductAndContentHeaders(req.Header, w.Header()) + r.errorResponse(ctx, decision.Err, w) + return + } + + if decision.IsClosed { + w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse) + addProductAndContentHeaders(req.Header, w.Header()) + r.closedIndexResponse(ctx, w, decision.IndexPattern) + return + } + + if decision.IsEmpty { + w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse) + addProductAndContentHeaders(req.Header, w.Header()) + w.WriteHeader(http.StatusNoContent) + w.Write(queryparser.EmptySearchResponse(ctx)) + return + } + + for _, connector := range decision.UseConnectors { + if _, ok := connector.(*table_resolver.ConnectorDecisionElastic); ok { + // this is desired elastic call + sendToElastic = true + break + } + } + + } else { + // this is fallback case + // in case we don't support sth, we should send it to Elastic + sendToElastic = true + } + + if sendToElastic { + feature.AnalyzeUnsupportedCalls(ctx, req.Method, req.URL.Path, req.Header.Get(opaqueIdHeaderKey), logManager.ResolveIndexPattern) + + rawResponse := <-r.sendHttpRequestToElastic(ctx, req, reqBody, true) + response := rawResponse.response + if response != nil { + responseFromElastic(ctx, response, w) + } else { + w.Header().Set(quesmaSourceHeader, quesmaSourceElastic) + w.WriteHeader(500) + if rawResponse.error != nil { + _, _ = w.Write([]byte(rawResponse.error.Error())) + } + } + } else { + r.errorResponse(ctx, end_user_errors.ErrNoConnector.New(fmt.Errorf("no connector found")), w) + } + } +} + +func (r *router) preprocessRequest(ctx context.Context, quesmaRequest *mux.Request) (*mux.Request, context.Context, error) { + var err error + var processedRequest = quesmaRequest + for _, preprocessor := range r.requestPreprocessors { + ctx, processedRequest, err = preprocessor.PreprocessRequest(ctx, processedRequest) + if err != nil { + return nil, nil, err + } + } + return processedRequest, ctx, nil +} + +type elasticResult struct { + response *http.Response + error error + took time.Duration +} + +func (r *router) sendHttpRequestToElastic(ctx context.Context, req *http.Request, + reqBody []byte, isManagement bool) chan elasticResult { + elkResponseChan := make(chan elasticResult) + + // If Quesma is exposing unauthenticated API but underlying Elasticsearch requires authentication, we should add the + if r.config.DisableAuth && req.Header.Get("Authorization") == "" && r.config.Elasticsearch.User != "" { + logger.DebugWithCtx(ctx).Msgf("path=%s routed to Elasticsearch, need add auth header to the request", req.URL) + req.SetBasicAuth(r.config.Elasticsearch.User, r.config.Elasticsearch.Password) + } + + if req.Header.Get("Authorization") != "" { + var userName string + if user, err := util.ExtractUsernameFromBasicAuthHeader(req.Header.Get("Authorization")); err == nil { + userName = user + } else { + logger.Warn().Msgf("Failed to extract username from auth header: %v", err) + } + logger.DebugWithCtx(ctx).Msgf("[AUTH] [%s] routed to Elasticsearch, called by user [%s]", req.URL, userName) + } + + go func() { + elkResponseChan <- recordRequestToElastic(req.URL.Path, r.quesmaManagementConsole, func() elasticResult { + + isWrite := elasticsearch.IsWriteRequest(req) + + var span telemetry.Span + if isManagement { + if isWrite { + span = r.phoneHomeAgent.ElasticBypassedWriteRequestsDuration().Begin() + } else { + span = r.phoneHomeAgent.ElasticBypassedReadRequestsDuration().Begin() + } + } else { + if isWrite { + span = r.phoneHomeAgent.ElasticWriteRequestsDuration().Begin() + } else { + span = r.phoneHomeAgent.ElasticReadRequestsDuration().Begin() + } + } + + resp, err := r.sendHttpRequest(ctx, r.config.Elasticsearch.Url.String(), req, reqBody) + took := span.End(err) + return elasticResult{resp, err, took} + }) + }() + return elkResponseChan +} + +func isResponseOk(resp *http.Response) bool { + return resp != nil && resp.StatusCode >= 200 && resp.StatusCode < 500 +} + +func isIngest(path string) bool { + return strings.HasSuffix(path, routes.BulkPath) // We may add more methods in future such as `_put` or `_create` +} + +func recordRequestToClickhouse(path string, qmc *ui.QuesmaManagementConsole, requestFunc func() (*mux.Result, error)) (*mux.Result, error) { + statName := ui.RequestStatisticKibana2Clickhouse + if isIngest(path) { + statName = ui.RequestStatisticIngest2Clickhouse + } + now := time.Now() + response, err := requestFunc() + qmc.RecordRequest(statName, time.Since(now), err != nil) + return response, err +} + +func recordRequestToElastic(path string, qmc *ui.QuesmaManagementConsole, requestFunc func() elasticResult) elasticResult { + statName := ui.RequestStatisticKibana2Elasticsearch + if isIngest(path) { + statName = ui.RequestStatisticIngest2Elasticsearch + } + now := time.Now() + response := requestFunc() + qmc.RecordRequest(statName, time.Since(now), !isResponseOk(response.response)) + return response +} + +func peekBody(r *http.Request) ([]byte, error) { + reqBody, err := io.ReadAll(r.Body) + if err != nil { + logger.ErrorWithCtxAndReason(r.Context(), "incomplete request"). + Msgf("Error reading request body: %v", err) + return nil, err + } + + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { + case "": + // No compression, leaving reqBody as-is + case "gzip": + reqBody, err = gzip.UnZip(reqBody) + if err != nil { + logger.ErrorWithCtxAndReason(r.Context(), "invalid gzip body"). + Msgf("Error decompressing gzip body: %v", err) + return nil, err + } + default: + logger.ErrorWithCtxAndReason(r.Context(), "unsupported Content-Encoding type"). + Msgf("Unsupported Content-Encoding type: %s", contentEncoding) + return nil, errors.New("unsupported Content-Encoding type") + } + r.Header.Del("Content-Encoding") // In the transparent proxy case we will send an uncompressed body, so the header should be removed + + r.Body = io.NopCloser(bytes.NewBuffer(reqBody)) + return reqBody, nil +} + +func copyHeaders(w http.ResponseWriter, elkResponse *http.Response) { + for key, values := range elkResponse.Header { + for _, value := range values { + if key != httpHeaderContentLength { + if w.Header().Get(key) == "" { + w.Header().Add(key, value) + } + } + } + } +} + +func (r *router) sendHttpRequest(ctx context.Context, address string, originalReq *http.Request, originalReqBody []byte) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, originalReq.Method, address+originalReq.URL.String(), bytes.NewBuffer(originalReqBody)) + + if err != nil { + logger.ErrorWithCtx(ctx).Msgf("Error creating request: %v", err) + return nil, err + } + + req.Header = originalReq.Header + + resp, err := r.httpClient.Do(req) + if err != nil { + logger.ErrorWithCtxAndReason(ctx, "No network connection"). + Msgf("Error sending request: %v", err) + return nil, err + } + + return resp, nil +} diff --git a/quesma/quesma/dual_write_proxy_v2.go b/quesma/quesma/dual_write_proxy_v2.go new file mode 100644 index 000000000..91ae1146a --- /dev/null +++ b/quesma/quesma/dual_write_proxy_v2.go @@ -0,0 +1,570 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package quesma + +import ( + "bytes" + "context" + "crypto/tls" + "errors" + "fmt" + "io" + "net/http" + "quesma/ab_testing" + "quesma/clickhouse" + "quesma/elasticsearch" + "quesma/end_user_errors" + "quesma/feature" + "quesma/ingest" + "quesma/logger" + "quesma/queryparser" + "quesma/quesma/async_search_storage" + "quesma/quesma/config" + "quesma/quesma/gzip" + "quesma/quesma/mux" + "quesma/quesma/recovery" + "quesma/quesma/routes" + "quesma/quesma/types" + "quesma/quesma/ui" + "quesma/schema" + "quesma/table_resolver" + "quesma/telemetry" + "quesma/tracing" + "quesma/util" + "strconv" + "strings" + "sync/atomic" + "time" +) + +const concurrentClientsLimitV2 = 100 // FIXME this should be configurable + +type simultaneousClientsLimiterV2 struct { + counter atomic.Int64 + handler http.Handler + limit int64 +} + +func newSimultaneousClientsLimiterV2(handler http.Handler, limit int64) *simultaneousClientsLimiterV2 { + return &simultaneousClientsLimiterV2{ + handler: handler, + limit: limit, + } +} + +func (c *simultaneousClientsLimiterV2) ServeHTTP(w http.ResponseWriter, r *http.Request) { + + current := c.counter.Load() + // this is hard limit, we should not allow to go over it + if current >= c.limit { + logger.ErrorWithCtx(r.Context()).Msgf("Too many requests. current: %d, limit: %d", current, c.limit) + http.Error(w, "Too many requests", http.StatusTooManyRequests) + return + } + + c.counter.Add(1) + defer c.counter.Add(-1) + c.handler.ServeHTTP(w, r) +} + +type dualWriteHttpProxyV2 struct { + routingHttpServer *http.Server + indexManagement elasticsearch.IndexManagement + logManager *clickhouse.LogManager + publicPort util.Port + asyncQueriesEvictor *async_search_storage.AsyncQueriesEvictor + queryRunner *QueryRunner + schemaRegistry schema.Registry + schemaLoader clickhouse.TableDiscovery +} + +func (q *dualWriteHttpProxyV2) Stop(ctx context.Context) { + q.Close(ctx) +} + +func newDualWriteProxyV2(schemaLoader clickhouse.TableDiscovery, logManager *clickhouse.LogManager, indexManager elasticsearch.IndexManagement, registry schema.Registry, config *config.QuesmaConfiguration, quesmaManagementConsole *ui.QuesmaManagementConsole, agent telemetry.PhoneHomeAgent, processor *ingest.IngestProcessor, resolver table_resolver.TableResolver, abResultsRepository ab_testing.Sender) *dualWriteHttpProxyV2 { + queryRunner := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole, registry, abResultsRepository, resolver) + // not sure how we should configure our query translator ??? + // is this a config option?? + + queryRunner.DateMathRenderer = queryparser.DateMathExpressionFormatLiteral + + // tests should not be run with optimization enabled by default + queryRunner.EnableQueryOptimization(config) + + ingestRouter := ConfigureIngestRouterV2(config, processor, agent, resolver) + searchRouter := ConfigureSearchRouterV2(config, registry, logManager, quesmaManagementConsole, queryRunner, resolver) + + tr := &http.Transport{ + TLSClientConfig: &tls.Config{InsecureSkipVerify: true}, + } + client := &http.Client{ + Transport: tr, + Timeout: time.Minute, // should be more configurable, 30s is Kibana default timeout + } + routerInstance := routerV2{phoneHomeAgent: agent, config: config, quesmaManagementConsole: quesmaManagementConsole, httpClient: client, requestPreprocessors: processorChain{}} + routerInstance. + registerPreprocessor(NewTraceIdPreprocessor()) + + agent.FailedRequestsCollector(func() int64 { + return routerInstance.failedRequests.Load() + }) + + handler := http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + defer recovery.LogPanic() + reqBody, err := peekBodyV2(req) + if err != nil { + http.Error(w, "Error reading request body", http.StatusInternalServerError) + return + } + + ua := req.Header.Get("User-Agent") + agent.UserAgentCounters().Add(ua, 1) + + routerInstance.reroute(req.Context(), w, req, reqBody, searchRouter, ingestRouter, logManager) + }) + var limitedHandler http.Handler + if config.DisableAuth { + limitedHandler = newSimultaneousClientsLimiterV2(handler, concurrentClientsLimitV2) + } else { + limitedHandler = newSimultaneousClientsLimiterV2(NewAuthMiddleware(handler, config.Elasticsearch), concurrentClientsLimitV2) + } + + return &dualWriteHttpProxyV2{ + schemaRegistry: registry, + schemaLoader: schemaLoader, + routingHttpServer: &http.Server{ + Addr: ":" + strconv.Itoa(int(config.PublicTcpPort)), + Handler: limitedHandler, + }, + indexManagement: indexManager, + logManager: logManager, + publicPort: config.PublicTcpPort, + asyncQueriesEvictor: async_search_storage.NewAsyncQueriesEvictor( + queryRunner.AsyncRequestStorage.(async_search_storage.AsyncSearchStorageInMemory), + queryRunner.AsyncQueriesContexts.(async_search_storage.AsyncQueryContextStorageInMemory), + ), + queryRunner: queryRunner, + } +} + +func (q *dualWriteHttpProxyV2) Close(ctx context.Context) { + if q.logManager != nil { + defer q.logManager.Close() + } + if q.queryRunner != nil { + q.queryRunner.Close() + } + if q.asyncQueriesEvictor != nil { + q.asyncQueriesEvictor.Close() + } + if err := q.routingHttpServer.Shutdown(ctx); err != nil { + logger.Fatal().Msgf("Error during server shutdown: %v", err) + } +} + +func (q *dualWriteHttpProxyV2) Ingest() { + q.schemaLoader.ReloadTableDefinitions() + q.logManager.Start() + q.indexManagement.Start() + go q.asyncQueriesEvictor.AsyncQueriesGC() + go func() { + if err := q.routingHttpServer.ListenAndServe(); err != nil && !errors.Is(err, http.ErrServerClosed) { + logger.Fatal().Msgf("Error starting http server: %v", err) + } + logger.Info().Msgf("Accepting HTTP at :%d", q.publicPort) + }() +} + +func responseFromElasticV2(ctx context.Context, elkResponse *http.Response, w http.ResponseWriter) { + id := ctx.Value(tracing.RequestIdCtxKey).(string) + logger.Debug().Str(logger.RID, id).Msg("responding from Elasticsearch") + + copyHeadersV2(w, elkResponse) + w.Header().Set(quesmaSourceHeader, quesmaSourceElastic) + // io.Copy calls WriteHeader implicitly + w.WriteHeader(elkResponse.StatusCode) + if _, err := io.Copy(w, elkResponse.Body); err != nil { + logger.ErrorWithCtx(ctx).Msgf("Error copying response body: %v", err) + http.Error(w, "Error copying response body", http.StatusInternalServerError) + return + } + elkResponse.Body.Close() +} + +func responseFromQuesmaV2(ctx context.Context, unzipped []byte, w http.ResponseWriter, quesmaResponse *mux.Result, zip bool) { + id := ctx.Value(tracing.RequestIdCtxKey).(string) + logger.Debug().Str(logger.RID, id).Msg("responding from Quesma") + + for key, value := range quesmaResponse.Meta { + w.Header().Set(key, value) + } + if zip { + w.Header().Set("Content-Encoding", "gzip") + } + w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse) + w.WriteHeader(quesmaResponse.StatusCode) + if zip { + zipped, err := gzip.Zip(unzipped) + if err != nil { + logger.ErrorWithCtx(ctx).Msgf("Error zipping: %v", err) + } + _, _ = io.Copy(w, bytes.NewBuffer(zipped)) + } else { + _, _ = io.Copy(w, bytes.NewBuffer(unzipped)) + } +} + +type routerV2 struct { + config *config.QuesmaConfiguration + requestPreprocessors processorChain + quesmaManagementConsole *ui.QuesmaManagementConsole + phoneHomeAgent telemetry.PhoneHomeAgent + httpClient *http.Client + failedRequests atomic.Int64 +} + +func (r *routerV2) registerPreprocessor(preprocessor RequestPreprocessor) { + r.requestPreprocessors = append(r.requestPreprocessors, preprocessor) +} + +func (r *routerV2) errorResponseV2(ctx context.Context, err error, w http.ResponseWriter) { + r.failedRequests.Add(1) + + msg := "Internal Quesma Error.\nPlease contact support if the problem persists." + reason := "Failed request." + result := mux.ServerErrorResult() + + // if error is an error with user-friendly message, we should use it + var endUserError *end_user_errors.EndUserError + if errors.As(err, &endUserError) { + msg = endUserError.EndUserErrorMessage() + reason = endUserError.Reason() + + // we treat all `Q1xxx` errors as bad requests here + if endUserError.ErrorType().Number < 2000 { + result = mux.BadReqeustResult() + } + } + + logger.ErrorWithCtxAndReason(ctx, reason).Msgf("quesma request failed: %v", err) + + requestId := "n/a" + if contextRid, ok := ctx.Value(tracing.RequestIdCtxKey).(string); ok { + requestId = contextRid + } + + // We should not send our error message to the client. There can be sensitive information in it. + // We will send ID of failed request instead + responseFromQuesmaV2(ctx, []byte(fmt.Sprintf("%s\nRequest ID: %s\n", msg, requestId)), w, result, false) +} + +func (*routerV2) closedIndexResponse(ctx context.Context, w http.ResponseWriter, pattern string) { + // TODO we should return a proper status code here (400?) + w.WriteHeader(http.StatusOK) + + response := make(types.JSON) + + response["error"] = queryparser.Error{ + RootCause: []queryparser.RootCause{ + { + Type: "index_closed_exception", + Reason: fmt.Sprintf("pattern %s is not routed to any connector", pattern), + }, + }, + Type: "index_closed_exception", + Reason: fmt.Sprintf("pattern %s is not routed to any connector", pattern), + } + + b, err := response.Bytes() + if err != nil { + logger.ErrorWithCtx(ctx).Msgf("Error marshalling response: %v", err) + return + } + + w.Write(b) + +} + +func (r *routerV2) reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, pathRouter *mux.PathRouter, ingestRouter *mux.PathRouter, logManager *clickhouse.LogManager) { + defer recovery.LogAndHandlePanic(ctx, func(err error) { + w.WriteHeader(500) + w.Write(queryparser.InternalQuesmaError("Unknown Quesma error")) + }) + + quesmaRequest, ctx, err := r.preprocessRequest(ctx, &mux.Request{ + Method: req.Method, + Path: strings.TrimSuffix(req.URL.Path, "/"), + Params: map[string]string{}, + Headers: req.Header, + QueryParams: req.URL.Query(), + Body: string(reqBody), + }) + + if err != nil { + logger.ErrorWithCtx(ctx).Msgf("Error preprocessing request: %v", err) + } + + quesmaRequest.ParsedBody = types.ParseRequestBody(quesmaRequest.Body) + var handler mux.Handler + var decision *table_resolver.Decision + + searchHandler, searchDecision := pathRouter.Matches(quesmaRequest) + ingestHandler, ingestDecision := ingestRouter.Matches(quesmaRequest) + + if searchHandler == nil { + handler = ingestHandler + decision = ingestDecision + } else { + handler = searchHandler + decision = searchDecision + } + + if decision != nil { + w.Header().Set(quesmaTableResolverHeader, decision.String()) + } else { + w.Header().Set(quesmaTableResolverHeader, "n/a") + } + + if handler != nil { + quesmaResponse, err := recordRequestToClickhouseV2(req.URL.Path, r.quesmaManagementConsole, func() (*mux.Result, error) { + return handler(ctx, quesmaRequest) + }) + + zip := strings.Contains(req.Header.Get("Accept-Encoding"), "gzip") + + if err == nil { + logger.Debug().Ctx(ctx).Msg("responding from quesma") + unzipped := []byte{} + if quesmaResponse != nil { + unzipped = []byte(quesmaResponse.Body) + } + if len(unzipped) == 0 { + logger.WarnWithCtx(ctx).Msgf("empty response from Clickhouse, method=%s", req.Method) + } + addProductAndContentHeaders(req.Header, w.Header()) + + responseFromQuesma(ctx, unzipped, w, quesmaResponse, zip) + + } else { + r.errorResponseV2(ctx, err, w) + } + } else { + + var sendToElastic bool + + if decision != nil { + + if decision.Err != nil { + w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse) + addProductAndContentHeaders(req.Header, w.Header()) + r.errorResponseV2(ctx, decision.Err, w) + return + } + + if decision.IsClosed { + w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse) + addProductAndContentHeaders(req.Header, w.Header()) + r.closedIndexResponse(ctx, w, decision.IndexPattern) + return + } + + if decision.IsEmpty { + w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse) + addProductAndContentHeaders(req.Header, w.Header()) + w.WriteHeader(http.StatusNoContent) + w.Write(queryparser.EmptySearchResponse(ctx)) + return + } + + for _, connector := range decision.UseConnectors { + if _, ok := connector.(*table_resolver.ConnectorDecisionElastic); ok { + // this is desired elastic call + sendToElastic = true + break + } + } + + } else { + // this is fallback case + // in case we don't support sth, we should send it to Elastic + sendToElastic = true + } + + if sendToElastic { + feature.AnalyzeUnsupportedCalls(ctx, req.Method, req.URL.Path, req.Header.Get(opaqueIdHeaderKey), logManager.ResolveIndexPattern) + + rawResponse := <-r.sendHttpRequestToElastic(ctx, req, reqBody, true) + response := rawResponse.response + if response != nil { + responseFromElasticV2(ctx, response, w) + } else { + w.Header().Set(quesmaSourceHeader, quesmaSourceElastic) + w.WriteHeader(500) + if rawResponse.error != nil { + _, _ = w.Write([]byte(rawResponse.error.Error())) + } + } + } else { + r.errorResponseV2(ctx, end_user_errors.ErrNoConnector.New(fmt.Errorf("no connector found")), w) + } + } +} + +func (r *routerV2) preprocessRequest(ctx context.Context, quesmaRequest *mux.Request) (*mux.Request, context.Context, error) { + var err error + var processedRequest = quesmaRequest + for _, preprocessor := range r.requestPreprocessors { + ctx, processedRequest, err = preprocessor.PreprocessRequest(ctx, processedRequest) + if err != nil { + return nil, nil, err + } + } + return processedRequest, ctx, nil +} + +type elasticResultV2 struct { + response *http.Response + error error + took time.Duration +} + +func (r *routerV2) sendHttpRequestToElastic(ctx context.Context, req *http.Request, + reqBody []byte, isManagement bool) chan elasticResultV2 { + elkResponseChan := make(chan elasticResultV2) + + // If Quesma is exposing unauthenticated API but underlying Elasticsearch requires authentication, we should add the + if r.config.DisableAuth && req.Header.Get("Authorization") == "" && r.config.Elasticsearch.User != "" { + logger.DebugWithCtx(ctx).Msgf("path=%s routed to Elasticsearch, need add auth header to the request", req.URL) + req.SetBasicAuth(r.config.Elasticsearch.User, r.config.Elasticsearch.Password) + } + + if req.Header.Get("Authorization") != "" { + var userName string + if user, err := util.ExtractUsernameFromBasicAuthHeader(req.Header.Get("Authorization")); err == nil { + userName = user + } else { + logger.Warn().Msgf("Failed to extract username from auth header: %v", err) + } + logger.DebugWithCtx(ctx).Msgf("[AUTH] [%s] routed to Elasticsearch, called by user [%s]", req.URL, userName) + } + + go func() { + elkResponseChan <- recordRequestToElasticV2(req.URL.Path, r.quesmaManagementConsole, func() elasticResultV2 { + + isWrite := elasticsearch.IsWriteRequest(req) + + var span telemetry.Span + if isManagement { + if isWrite { + span = r.phoneHomeAgent.ElasticBypassedWriteRequestsDuration().Begin() + } else { + span = r.phoneHomeAgent.ElasticBypassedReadRequestsDuration().Begin() + } + } else { + if isWrite { + span = r.phoneHomeAgent.ElasticWriteRequestsDuration().Begin() + } else { + span = r.phoneHomeAgent.ElasticReadRequestsDuration().Begin() + } + } + + resp, err := r.sendHttpRequest(ctx, r.config.Elasticsearch.Url.String(), req, reqBody) + took := span.End(err) + return elasticResultV2{resp, err, took} + }) + }() + return elkResponseChan +} + +func isResponseOkV2(resp *http.Response) bool { + return resp != nil && resp.StatusCode >= 200 && resp.StatusCode < 500 +} + +func isIngestV2(path string) bool { + return strings.HasSuffix(path, routes.BulkPath) // We may add more methods in future such as `_put` or `_create` +} + +func recordRequestToClickhouseV2(path string, qmc *ui.QuesmaManagementConsole, requestFunc func() (*mux.Result, error)) (*mux.Result, error) { + statName := ui.RequestStatisticKibana2Clickhouse + if isIngestV2(path) { + statName = ui.RequestStatisticIngest2Clickhouse + } + now := time.Now() + response, err := requestFunc() + qmc.RecordRequest(statName, time.Since(now), err != nil) + return response, err +} + +func recordRequestToElasticV2(path string, qmc *ui.QuesmaManagementConsole, requestFunc func() elasticResultV2) elasticResultV2 { + statName := ui.RequestStatisticKibana2Elasticsearch + if isIngest(path) { + statName = ui.RequestStatisticIngest2Elasticsearch + } + now := time.Now() + response := requestFunc() + qmc.RecordRequest(statName, time.Since(now), !isResponseOkV2(response.response)) + return response +} + +func peekBodyV2(r *http.Request) ([]byte, error) { + reqBody, err := io.ReadAll(r.Body) + if err != nil { + logger.ErrorWithCtxAndReason(r.Context(), "incomplete request"). + Msgf("Error reading request body: %v", err) + return nil, err + } + + contentEncoding := r.Header.Get("Content-Encoding") + switch contentEncoding { + case "": + // No compression, leaving reqBody as-is + case "gzip": + reqBody, err = gzip.UnZip(reqBody) + if err != nil { + logger.ErrorWithCtxAndReason(r.Context(), "invalid gzip body"). + Msgf("Error decompressing gzip body: %v", err) + return nil, err + } + default: + logger.ErrorWithCtxAndReason(r.Context(), "unsupported Content-Encoding type"). + Msgf("Unsupported Content-Encoding type: %s", contentEncoding) + return nil, errors.New("unsupported Content-Encoding type") + } + r.Header.Del("Content-Encoding") // In the transparent proxy case we will send an uncompressed body, so the header should be removed + + r.Body = io.NopCloser(bytes.NewBuffer(reqBody)) + return reqBody, nil +} + +func copyHeadersV2(w http.ResponseWriter, elkResponse *http.Response) { + for key, values := range elkResponse.Header { + for _, value := range values { + if key != httpHeaderContentLength { + if w.Header().Get(key) == "" { + w.Header().Add(key, value) + } + } + } + } +} + +func (r *routerV2) sendHttpRequest(ctx context.Context, address string, originalReq *http.Request, originalReqBody []byte) (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, originalReq.Method, address+originalReq.URL.String(), bytes.NewBuffer(originalReqBody)) + + if err != nil { + logger.ErrorWithCtx(ctx).Msgf("Error creating request: %v", err) + return nil, err + } + + req.Header = originalReq.Header + + resp, err := r.httpClient.Do(req) + if err != nil { + logger.ErrorWithCtxAndReason(ctx, "No network connection"). + Msgf("Error sending request: %v", err) + return nil, err + } + + return resp, nil +} diff --git a/quesma/quesma/quesma.go b/quesma/quesma/quesma.go index bd828cf3e..58789893c 100644 --- a/quesma/quesma/quesma.go +++ b/quesma/quesma/quesma.go @@ -3,36 +3,20 @@ package quesma import ( - "bytes" "context" - "errors" - "fmt" - "io" - "net/http" "quesma/ab_testing" "quesma/clickhouse" "quesma/elasticsearch" - "quesma/end_user_errors" - "quesma/feature" "quesma/ingest" "quesma/logger" "quesma/proxy" - "quesma/queryparser" "quesma/quesma/config" - "quesma/quesma/gzip" - "quesma/quesma/mux" "quesma/quesma/recovery" - "quesma/quesma/routes" - "quesma/quesma/types" "quesma/quesma/ui" "quesma/schema" "quesma/table_resolver" "quesma/telemetry" - "quesma/tracing" "quesma/util" - "strings" - "sync/atomic" - "time" ) type ( @@ -49,43 +33,16 @@ type ( } ) -func responseFromElastic(ctx context.Context, elkResponse *http.Response, w http.ResponseWriter) { - id := ctx.Value(tracing.RequestIdCtxKey).(string) - logger.Debug().Str(logger.RID, id).Msg("responding from Elasticsearch") - - copyHeaders(w, elkResponse) - w.Header().Set(quesmaSourceHeader, quesmaSourceElastic) - // io.Copy calls WriteHeader implicitly - w.WriteHeader(elkResponse.StatusCode) - if _, err := io.Copy(w, elkResponse.Body); err != nil { - logger.ErrorWithCtx(ctx).Msgf("Error copying response body: %v", err) - http.Error(w, "Error copying response body", http.StatusInternalServerError) - return - } - elkResponse.Body.Close() +func (q *Quesma) Close(ctx context.Context) { + q.processor.Stop(ctx) } -func responseFromQuesma(ctx context.Context, unzipped []byte, w http.ResponseWriter, quesmaResponse *mux.Result, zip bool) { - id := ctx.Value(tracing.RequestIdCtxKey).(string) - logger.Debug().Str(logger.RID, id).Msg("responding from Quesma") +func (q *Quesma) Start() { + defer recovery.LogPanic() + logger.Info().Msgf("starting quesma, transparent proxy mode: %t", q.config.TransparentProxy) - for key, value := range quesmaResponse.Meta { - w.Header().Set(key, value) - } - if zip { - w.Header().Set("Content-Encoding", "gzip") - } - w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse) - w.WriteHeader(quesmaResponse.StatusCode) - if zip { - zipped, err := gzip.Zip(unzipped) - if err != nil { - logger.ErrorWithCtx(ctx).Msgf("Error zipping: %v", err) - } - _, _ = io.Copy(w, bytes.NewBuffer(zipped)) - } else { - _, _ = io.Copy(w, bytes.NewBuffer(unzipped)) - } + go q.processor.Ingest() + go q.quesmaManagementConsole.Run() } func NewQuesmaTcpProxy(phoneHomeAgent telemetry.PhoneHomeAgent, config *config.QuesmaConfiguration, quesmaManagementConsole *ui.QuesmaManagementConsole, logChan <-chan logger.LogWithLevel, inspect bool) *Quesma { @@ -97,380 +54,34 @@ func NewQuesmaTcpProxy(phoneHomeAgent telemetry.PhoneHomeAgent, config *config.Q } } -func NewHttpProxy(phoneHomeAgent telemetry.PhoneHomeAgent, logManager *clickhouse.LogManager, ingestProcessor *ingest.IngestProcessor, schemaLoader clickhouse.TableDiscovery, - indexManager elasticsearch.IndexManagement, schemaRegistry schema.Registry, config *config.QuesmaConfiguration, - quesmaManagementConsole *ui.QuesmaManagementConsole, abResultsRepository ab_testing.Sender, resolver table_resolver.TableResolver) *Quesma { - queryRunner := NewQueryRunner(logManager, config, indexManager, quesmaManagementConsole, schemaRegistry, abResultsRepository, resolver) - - // not sure how we should configure our query translator ??? - // is this a config option?? - - queryRunner.DateMathRenderer = queryparser.DateMathExpressionFormatLiteral - - // tests should not be run with optimization enabled by default - queryRunner.EnableQueryOptimization(config) - - router := ConfigureRouter(config, schemaRegistry, logManager, ingestProcessor, quesmaManagementConsole, phoneHomeAgent, queryRunner, resolver) - return &Quesma{ - telemetryAgent: phoneHomeAgent, - processor: newDualWriteProxy(schemaLoader, logManager, indexManager, schemaRegistry, config, router, quesmaManagementConsole, phoneHomeAgent, queryRunner), - publicTcpPort: config.PublicTcpPort, - quesmaManagementConsole: quesmaManagementConsole, - config: config, - } -} - -type router struct { - config *config.QuesmaConfiguration - requestPreprocessors processorChain - quesmaManagementConsole *ui.QuesmaManagementConsole - phoneHomeAgent telemetry.PhoneHomeAgent - httpClient *http.Client - failedRequests atomic.Int64 -} - -func (r *router) registerPreprocessor(preprocessor RequestPreprocessor) { - r.requestPreprocessors = append(r.requestPreprocessors, preprocessor) -} - -func (r *router) errorResponse(ctx context.Context, err error, w http.ResponseWriter) { - r.failedRequests.Add(1) - - msg := "Internal Quesma Error.\nPlease contact support if the problem persists." - reason := "Failed request." - result := mux.ServerErrorResult() - - // if error is an error with user-friendly message, we should use it - var endUserError *end_user_errors.EndUserError - if errors.As(err, &endUserError) { - msg = endUserError.EndUserErrorMessage() - reason = endUserError.Reason() - - // we treat all `Q1xxx` errors as bad requests here - if endUserError.ErrorType().Number < 2000 { - result = mux.BadReqeustResult() - } - } - - logger.ErrorWithCtxAndReason(ctx, reason).Msgf("quesma request failed: %v", err) - - requestId := "n/a" - if contextRid, ok := ctx.Value(tracing.RequestIdCtxKey).(string); ok { - requestId = contextRid - } - - // We should not send our error message to the client. There can be sensitive information in it. - // We will send ID of failed request instead - responseFromQuesma(ctx, []byte(fmt.Sprintf("%s\nRequest ID: %s\n", msg, requestId)), w, result, false) -} - -func (*router) closedIndexResponse(ctx context.Context, w http.ResponseWriter, pattern string) { - // TODO we should return a proper status code here (400?) - w.WriteHeader(http.StatusOK) - - response := make(types.JSON) - - response["error"] = queryparser.Error{ - RootCause: []queryparser.RootCause{ - { - Type: "index_closed_exception", - Reason: fmt.Sprintf("pattern %s is not routed to any connector", pattern), - }, - }, - Type: "index_closed_exception", - Reason: fmt.Sprintf("pattern %s is not routed to any connector", pattern), - } - - b, err := response.Bytes() - if err != nil { - logger.ErrorWithCtx(ctx).Msgf("Error marshalling response: %v", err) - return - } - - w.Write(b) - -} - -func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.Request, reqBody []byte, router *mux.PathRouter, logManager *clickhouse.LogManager) { - defer recovery.LogAndHandlePanic(ctx, func(err error) { - w.WriteHeader(500) - w.Write(queryparser.InternalQuesmaError("Unknown Quesma error")) - }) - - quesmaRequest, ctx, err := r.preprocessRequest(ctx, &mux.Request{ - Method: req.Method, - Path: strings.TrimSuffix(req.URL.Path, "/"), - Params: map[string]string{}, - Headers: req.Header, - QueryParams: req.URL.Query(), - Body: string(reqBody), - }) - - if err != nil { - logger.ErrorWithCtx(ctx).Msgf("Error preprocessing request: %v", err) - } - - quesmaRequest.ParsedBody = types.ParseRequestBody(quesmaRequest.Body) - - handler, decision := router.Matches(quesmaRequest) - - if decision != nil { - w.Header().Set(quesmaTableResolverHeader, decision.String()) - } else { - w.Header().Set(quesmaTableResolverHeader, "n/a") - } - - if handler != nil { - quesmaResponse, err := recordRequestToClickhouse(req.URL.Path, r.quesmaManagementConsole, func() (*mux.Result, error) { - return handler(ctx, quesmaRequest) - }) - - zip := strings.Contains(req.Header.Get("Accept-Encoding"), "gzip") - - if err == nil { - logger.Debug().Ctx(ctx).Msg("responding from quesma") - unzipped := []byte{} - if quesmaResponse != nil { - unzipped = []byte(quesmaResponse.Body) - } - if len(unzipped) == 0 { - logger.WarnWithCtx(ctx).Msgf("empty response from Clickhouse, method=%s", req.Method) - } - addProductAndContentHeaders(req.Header, w.Header()) - - responseFromQuesma(ctx, unzipped, w, quesmaResponse, zip) - - } else { - r.errorResponse(ctx, err, w) +func NewHttpProxy(phoneHomeAgent telemetry.PhoneHomeAgent, + logManager *clickhouse.LogManager, ingestProcessor *ingest.IngestProcessor, + schemaLoader clickhouse.TableDiscovery, + indexManager elasticsearch.IndexManagement, + schemaRegistry schema.Registry, config *config.QuesmaConfiguration, + quesmaManagementConsole *ui.QuesmaManagementConsole, + abResultsRepository ab_testing.Sender, resolver table_resolver.TableResolver, + v2 bool) *Quesma { + + if v2 { + return &Quesma{ + telemetryAgent: phoneHomeAgent, + processor: newDualWriteProxyV2(schemaLoader, logManager, indexManager, + schemaRegistry, config, quesmaManagementConsole, phoneHomeAgent, + ingestProcessor, resolver, abResultsRepository), + publicTcpPort: config.PublicTcpPort, + quesmaManagementConsole: quesmaManagementConsole, + config: config, } } else { - - var sendToElastic bool - - if decision != nil { - - if decision.Err != nil { - w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse) - addProductAndContentHeaders(req.Header, w.Header()) - r.errorResponse(ctx, decision.Err, w) - return - } - - if decision.IsClosed { - w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse) - addProductAndContentHeaders(req.Header, w.Header()) - r.closedIndexResponse(ctx, w, decision.IndexPattern) - return - } - - if decision.IsEmpty { - w.Header().Set(quesmaSourceHeader, quesmaSourceClickhouse) - addProductAndContentHeaders(req.Header, w.Header()) - w.WriteHeader(http.StatusNoContent) - w.Write(queryparser.EmptySearchResponse(ctx)) - return - } - - for _, connector := range decision.UseConnectors { - if _, ok := connector.(*table_resolver.ConnectorDecisionElastic); ok { - // this is desired elastic call - sendToElastic = true - break - } - } - - } else { - // this is fallback case - // in case we don't support sth, we should send it to Elastic - sendToElastic = true + return &Quesma{ + telemetryAgent: phoneHomeAgent, + processor: newDualWriteProxy(schemaLoader, logManager, indexManager, + schemaRegistry, config, quesmaManagementConsole, phoneHomeAgent, + ingestProcessor, resolver, abResultsRepository), + publicTcpPort: config.PublicTcpPort, + quesmaManagementConsole: quesmaManagementConsole, + config: config, } - - if sendToElastic { - feature.AnalyzeUnsupportedCalls(ctx, req.Method, req.URL.Path, req.Header.Get(opaqueIdHeaderKey), logManager.ResolveIndexPattern) - - rawResponse := <-r.sendHttpRequestToElastic(ctx, req, reqBody, true) - response := rawResponse.response - if response != nil { - responseFromElastic(ctx, response, w) - } else { - w.Header().Set(quesmaSourceHeader, quesmaSourceElastic) - w.WriteHeader(500) - if rawResponse.error != nil { - _, _ = w.Write([]byte(rawResponse.error.Error())) - } - } - } else { - r.errorResponse(ctx, end_user_errors.ErrNoConnector.New(fmt.Errorf("no connector found")), w) - } - } -} - -func (r *router) preprocessRequest(ctx context.Context, quesmaRequest *mux.Request) (*mux.Request, context.Context, error) { - var err error - var processedRequest = quesmaRequest - for _, preprocessor := range r.requestPreprocessors { - ctx, processedRequest, err = preprocessor.PreprocessRequest(ctx, processedRequest) - if err != nil { - return nil, nil, err - } - } - return processedRequest, ctx, nil -} - -type elasticResult struct { - response *http.Response - error error - took time.Duration -} - -func (r *router) sendHttpRequestToElastic(ctx context.Context, req *http.Request, - reqBody []byte, isManagement bool) chan elasticResult { - elkResponseChan := make(chan elasticResult) - - // If Quesma is exposing unauthenticated API but underlying Elasticsearch requires authentication, we should add the - if r.config.DisableAuth && req.Header.Get("Authorization") == "" && r.config.Elasticsearch.User != "" { - logger.DebugWithCtx(ctx).Msgf("path=%s routed to Elasticsearch, need add auth header to the request", req.URL) - req.SetBasicAuth(r.config.Elasticsearch.User, r.config.Elasticsearch.Password) } - - if req.Header.Get("Authorization") != "" { - var userName string - if user, err := util.ExtractUsernameFromBasicAuthHeader(req.Header.Get("Authorization")); err == nil { - userName = user - } else { - logger.Warn().Msgf("Failed to extract username from auth header: %v", err) - } - logger.DebugWithCtx(ctx).Msgf("[AUTH] [%s] routed to Elasticsearch, called by user [%s]", req.URL, userName) - } - - go func() { - elkResponseChan <- recordRequestToElastic(req.URL.Path, r.quesmaManagementConsole, func() elasticResult { - - isWrite := elasticsearch.IsWriteRequest(req) - - var span telemetry.Span - if isManagement { - if isWrite { - span = r.phoneHomeAgent.ElasticBypassedWriteRequestsDuration().Begin() - } else { - span = r.phoneHomeAgent.ElasticBypassedReadRequestsDuration().Begin() - } - } else { - if isWrite { - span = r.phoneHomeAgent.ElasticWriteRequestsDuration().Begin() - } else { - span = r.phoneHomeAgent.ElasticReadRequestsDuration().Begin() - } - } - - resp, err := r.sendHttpRequest(ctx, r.config.Elasticsearch.Url.String(), req, reqBody) - took := span.End(err) - return elasticResult{resp, err, took} - }) - }() - return elkResponseChan -} - -func isResponseOk(resp *http.Response) bool { - return resp != nil && resp.StatusCode >= 200 && resp.StatusCode < 500 -} - -func isIngest(path string) bool { - return strings.HasSuffix(path, routes.BulkPath) // We may add more methods in future such as `_put` or `_create` -} - -func recordRequestToClickhouse(path string, qmc *ui.QuesmaManagementConsole, requestFunc func() (*mux.Result, error)) (*mux.Result, error) { - statName := ui.RequestStatisticKibana2Clickhouse - if isIngest(path) { - statName = ui.RequestStatisticIngest2Clickhouse - } - now := time.Now() - response, err := requestFunc() - qmc.RecordRequest(statName, time.Since(now), err != nil) - return response, err -} - -func recordRequestToElastic(path string, qmc *ui.QuesmaManagementConsole, requestFunc func() elasticResult) elasticResult { - statName := ui.RequestStatisticKibana2Elasticsearch - if isIngest(path) { - statName = ui.RequestStatisticIngest2Elasticsearch - } - now := time.Now() - response := requestFunc() - qmc.RecordRequest(statName, time.Since(now), !isResponseOk(response.response)) - return response -} - -func peekBody(r *http.Request) ([]byte, error) { - reqBody, err := io.ReadAll(r.Body) - if err != nil { - logger.ErrorWithCtxAndReason(r.Context(), "incomplete request"). - Msgf("Error reading request body: %v", err) - return nil, err - } - - contentEncoding := r.Header.Get("Content-Encoding") - switch contentEncoding { - case "": - // No compression, leaving reqBody as-is - case "gzip": - reqBody, err = gzip.UnZip(reqBody) - if err != nil { - logger.ErrorWithCtxAndReason(r.Context(), "invalid gzip body"). - Msgf("Error decompressing gzip body: %v", err) - return nil, err - } - default: - logger.ErrorWithCtxAndReason(r.Context(), "unsupported Content-Encoding type"). - Msgf("Unsupported Content-Encoding type: %s", contentEncoding) - return nil, errors.New("unsupported Content-Encoding type") - } - r.Header.Del("Content-Encoding") // In the transparent proxy case we will send an uncompressed body, so the header should be removed - - r.Body = io.NopCloser(bytes.NewBuffer(reqBody)) - return reqBody, nil -} - -func copyHeaders(w http.ResponseWriter, elkResponse *http.Response) { - for key, values := range elkResponse.Header { - for _, value := range values { - if key != httpHeaderContentLength { - if w.Header().Get(key) == "" { - w.Header().Add(key, value) - } - } - } - } -} - -func (q *Quesma) Close(ctx context.Context) { - q.processor.Stop(ctx) -} - -func (q *Quesma) Start() { - defer recovery.LogPanic() - logger.Info().Msgf("starting quesma, transparent proxy mode: %t", q.config.TransparentProxy) - - go q.processor.Ingest() - go q.quesmaManagementConsole.Run() -} - -func (r *router) sendHttpRequest(ctx context.Context, address string, originalReq *http.Request, originalReqBody []byte) (*http.Response, error) { - req, err := http.NewRequestWithContext(ctx, originalReq.Method, address+originalReq.URL.String(), bytes.NewBuffer(originalReqBody)) - - if err != nil { - logger.ErrorWithCtx(ctx).Msgf("Error creating request: %v", err) - return nil, err - } - - req.Header = originalReq.Header - - resp, err := r.httpClient.Do(req) - if err != nil { - logger.ErrorWithCtxAndReason(ctx, "No network connection"). - Msgf("Error sending request: %v", err) - return nil, err - } - - return resp, nil } diff --git a/quesma/quesma/router_v2.go b/quesma/quesma/router_v2.go new file mode 100644 index 000000000..e4666d43b --- /dev/null +++ b/quesma/quesma/router_v2.go @@ -0,0 +1,423 @@ +// Copyright Quesma, licensed under the Elastic License 2.0. +// SPDX-License-Identifier: Elastic-2.0 +package quesma + +import ( + "context" + "errors" + "github.com/goccy/go-json" + "net/http" + "quesma/clickhouse" + "quesma/elasticsearch" + "quesma/ingest" + "quesma/logger" + "quesma/queryparser" + "quesma/quesma/config" + "quesma/quesma/errors" + "quesma/quesma/functionality/bulk" + "quesma/quesma/functionality/doc" + "quesma/quesma/functionality/field_capabilities" + "quesma/quesma/functionality/resolve" + "quesma/quesma/functionality/terms_enum" + "quesma/quesma/mux" + "quesma/quesma/routes" + "quesma/quesma/types" + "quesma/quesma/ui" + "quesma/schema" + "quesma/table_resolver" + "quesma/telemetry" + "quesma/tracing" + "strings" + "time" +) + +func ConfigureIngestRouterV2(cfg *config.QuesmaConfiguration, ip *ingest.IngestProcessor, phoneHomeAgent telemetry.PhoneHomeAgent, tableResolver table_resolver.TableResolver) *mux.PathRouter { + // some syntactic sugar + method := mux.IsHTTPMethod + and := mux.And + + router := mux.NewPathRouter() + + // These are the endpoints that are not supported by Quesma + // These will redirect to the elastic cluster. + for _, path := range elasticsearch.InternalPaths { + router.Register(path, mux.Never(), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { return nil, nil }) + } + router.Register(routes.BulkPath, and(method("POST", "PUT"), matchedAgainstBulkBody(cfg, tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + + body, err := types.ExpectNDJSON(req.ParsedBody) + if err != nil { + return nil, err + } + + results, err := bulk.Write(ctx, nil, body, ip, cfg, phoneHomeAgent, tableResolver) + return bulkInsertResult(ctx, results, err) + }) + router.Register(routes.IndexDocPath, and(method("POST"), matchedExactIngestPath(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + index := req.Params["index"] + + body, err := types.ExpectJSON(req.ParsedBody) + if err != nil { + return &mux.Result{ + Body: string(queryparser.BadRequestParseError(err)), + StatusCode: http.StatusBadRequest, + }, nil + } + + result, err := doc.Write(ctx, &index, body, ip, cfg, phoneHomeAgent, tableResolver) + if err != nil { + return &mux.Result{ + Body: string(queryparser.BadRequestParseError(err)), + StatusCode: http.StatusBadRequest, + }, nil + } + + return indexDocResult(result) + }) + + router.Register(routes.IndexBulkPath, and(method("POST", "PUT"), matchedExactIngestPath(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + index := req.Params["index"] + + body, err := types.ExpectNDJSON(req.ParsedBody) + if err != nil { + return nil, err + } + + results, err := bulk.Write(ctx, &index, body, ip, cfg, phoneHomeAgent, tableResolver) + return bulkInsertResult(ctx, results, err) + }) + return router +} + +func ConfigureSearchRouterV2(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *clickhouse.LogManager, console *ui.QuesmaManagementConsole, queryRunner *QueryRunner, tableResolver table_resolver.TableResolver) *mux.PathRouter { + + // some syntactic sugar + method := mux.IsHTTPMethod + and := mux.And + + router := mux.NewPathRouter() + + // These are the endpoints that are not supported by Quesma + // These will redirect to the elastic cluster. + for _, path := range elasticsearch.InternalPaths { + router.Register(path, mux.Never(), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { return nil, nil }) + } + + // These are the endpoints that are supported by Quesma + + // Warning: + // The first handler that matches the path will be considered to use. + // If the predicate returns false it will be redirected to the elastic cluster. + // If the predicate returns true, the handler will be used. + // + // So, if you add multiple handlers with the same path, the first one will be used, the rest will be redirected to the elastic cluster. + // This is current limitation of the router. + + router.Register(routes.ClusterHealthPath, method("GET"), func(_ context.Context, req *mux.Request) (*mux.Result, error) { + return elasticsearchQueryResult(`{"cluster_name": "quesma"}`, http.StatusOK), nil + }) + + router.Register(routes.IndexRefreshPath, and(method("POST"), matchedExactQueryPath(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + return elasticsearchInsertResult(`{"_shards":{"total":1,"successful":1,"failed":0}}`, http.StatusOK), nil + }) + + router.Register(routes.ResolveIndexPath, method("GET"), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + sources, err := resolve.HandleResolve(req.Params["index"], sr, cfg) + if err != nil { + return nil, err + } + return resolveIndexResult(sources) + }) + + router.Register(routes.IndexCountPath, and(method("GET"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + cnt, err := queryRunner.handleCount(ctx, req.Params["index"]) + if err != nil { + if errors.Is(quesma_errors.ErrIndexNotExists(), err) { + return &mux.Result{StatusCode: http.StatusNotFound}, nil + } else { + return nil, err + } + } + + if cnt == -1 { + return &mux.Result{StatusCode: http.StatusNotFound}, nil + } else { + return elasticsearchCountResult(cnt, http.StatusOK) + } + }) + + // TODO: This endpoint is currently disabled (mux.Never()) as it's pretty much used only by internal Kibana requests, + // it's error-prone to detect them in matchAgainstKibanaInternal() and Quesma can't handle well the cases of wildcard + // matching many indices either way. + router.Register(routes.GlobalSearchPath, and(mux.Never(), method("GET", "POST"), matchAgainstKibanaInternal()), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + + body, err := types.ExpectJSON(req.ParsedBody) + if err != nil { + return nil, err + } + + // TODO we should pass JSON here instead of []byte + responseBody, err := queryRunner.handleSearch(ctx, "*", body) + if err != nil { + if errors.Is(quesma_errors.ErrIndexNotExists(), err) { + return &mux.Result{StatusCode: http.StatusNotFound}, nil + } else { + return nil, err + } + } + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil + }) + + router.Register(routes.IndexSearchPath, and(method("GET", "POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + + body, err := types.ExpectJSON(req.ParsedBody) + if err != nil { + return nil, err + } + + responseBody, err := queryRunner.handleSearch(ctx, req.Params["index"], body) + if err != nil { + if errors.Is(quesma_errors.ErrIndexNotExists(), err) { + return &mux.Result{StatusCode: http.StatusNotFound}, nil + } else if errors.Is(err, quesma_errors.ErrCouldNotParseRequest()) { + return &mux.Result{ + Body: string(queryparser.BadRequestParseError(err)), + StatusCode: http.StatusBadRequest, + }, nil + } else { + return nil, err + } + } + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil + }) + router.Register(routes.IndexAsyncSearchPath, and(method("POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + waitForResultsMs := 1000 // Defaults to 1 second as in docs + if v, ok := req.Params["wait_for_completion_timeout"]; ok { + if w, err := time.ParseDuration(v); err == nil { + waitForResultsMs = int(w.Milliseconds()) + } else { + logger.Warn().Msgf("Can't parse wait_for_completion_timeout value: %s", v) + } + } + keepOnCompletion := false + if v, ok := req.Params["keep_on_completion"]; ok { + if v == "true" { + keepOnCompletion = true + } + } + + body, err := types.ExpectJSON(req.ParsedBody) + if err != nil { + return nil, err + } + + responseBody, err := queryRunner.handleAsyncSearch(ctx, req.Params["index"], body, waitForResultsMs, keepOnCompletion) + if err != nil { + if errors.Is(quesma_errors.ErrIndexNotExists(), err) { + return &mux.Result{StatusCode: http.StatusNotFound}, nil + } else if errors.Is(err, quesma_errors.ErrCouldNotParseRequest()) { + return &mux.Result{ + Body: string(queryparser.BadRequestParseError(err)), + StatusCode: http.StatusBadRequest, + }, nil + } else { + return nil, err + } + } + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil + }) + + router.Register(routes.IndexMappingPath, and(method("GET", "PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + + switch req.Method { + + case "GET": + index := req.Params["index"] + + foundSchema, found := sr.FindSchema(schema.TableName(index)) + if !found { + return &mux.Result{StatusCode: http.StatusNotFound}, nil + } + + hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema) + mappings := elasticsearch.GenerateMappings(hierarchicalSchema) + + return getIndexMappingResult(index, mappings) + + case "PUT": + index := req.Params["index"] + + body, err := types.ExpectJSON(req.ParsedBody) + if err != nil { + return nil, err + } + + columns := elasticsearch.ParseMappings("", body) + sr.UpdateDynamicConfiguration(schema.TableName(index), schema.Table{Columns: columns}) + return putIndexResult(index) + } + + return nil, errors.New("unsupported method") + + }) + + router.Register(routes.AsyncSearchStatusPath, and(method("GET"), matchedAgainstAsyncId()), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + responseBody, err := queryRunner.handleAsyncSearchStatus(ctx, req.Params["id"]) + if err != nil { + return nil, err + } + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil + }) + + router.Register(routes.AsyncSearchIdPath, and(method("GET", "DELETE"), matchedAgainstAsyncId()), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + + switch req.Method { + + case "GET": + ctx = context.WithValue(ctx, tracing.AsyncIdCtxKey, req.Params["id"]) + responseBody, err := queryRunner.handlePartialAsyncSearch(ctx, req.Params["id"]) + if err != nil { + return nil, err + } + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil + + case "DELETE": + responseBody, err := queryRunner.deleteAsyncSearch(req.Params["id"]) + if err != nil { + return nil, err + } + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil + } + + return nil, errors.New("unsupported method") + }) + + router.Register(routes.FieldCapsPath, and(method("GET", "POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + + responseBody, err := field_capabilities.HandleFieldCaps(ctx, cfg, sr, req.Params["index"], lm) + if err != nil { + if errors.Is(quesma_errors.ErrIndexNotExists(), err) { + if req.QueryParams.Get("allow_no_indices") == "true" || req.QueryParams.Get("ignore_unavailable") == "true" { + return elasticsearchQueryResult(string(field_capabilities.EmptyFieldCapsResponse()), http.StatusOK), nil + } + return &mux.Result{StatusCode: http.StatusNotFound}, nil + } else { + return nil, err + } + } + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil + }) + router.Register(routes.TermsEnumPath, and(method("POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + if strings.Contains(req.Params["index"], ",") { + return nil, errors.New("multi index terms enum is not yet supported") + } else { + + var body types.JSON + switch b := req.ParsedBody.(type) { + case types.JSON: + body = b + default: + return nil, errors.New("invalid request body, expecting JSON") + } + + if responseBody, err := terms_enum.HandleTermsEnum(ctx, req.Params["index"], body, lm, sr, console); err != nil { + return nil, err + } else { + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil + } + } + }) + + router.Register(routes.EQLSearch, and(method("GET", "POST"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + body, err := types.ExpectJSON(req.ParsedBody) + if err != nil { + return nil, err + } + + responseBody, err := queryRunner.handleEQLSearch(ctx, req.Params["index"], body) + if err != nil { + if errors.Is(quesma_errors.ErrIndexNotExists(), err) { + return &mux.Result{StatusCode: http.StatusNotFound}, nil + } else { + return nil, err + } + } + return elasticsearchQueryResult(string(responseBody), http.StatusOK), nil + }) + + router.Register(routes.IndexPath, and(method("GET", "PUT"), matchedAgainstPattern(tableResolver)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + + switch req.Method { + + case "GET": + index := req.Params["index"] + + foundSchema, found := sr.FindSchema(schema.TableName(index)) + if !found { + return &mux.Result{StatusCode: http.StatusNotFound}, nil + } + + hierarchicalSchema := schema.SchemaToHierarchicalSchema(&foundSchema) + mappings := elasticsearch.GenerateMappings(hierarchicalSchema) + + return getIndexResult(index, mappings) + + case "PUT": + + index := req.Params["index"] + if req.Body == "" { + logger.Warn().Msgf("empty body in PUT /%s request, Quesma is not doing anything", index) + return putIndexResult(index) + } + + body, err := types.ExpectJSON(req.ParsedBody) + if err != nil { + return nil, err + } + + mappings, ok := body["mappings"] + if !ok { + logger.Warn().Msgf("no mappings found in PUT /%s request, ignoring that request. Full content: %s", index, req.Body) + return putIndexResult(index) + } + columns := elasticsearch.ParseMappings("", mappings.(map[string]interface{})) + + sr.UpdateDynamicConfiguration(schema.TableName(index), schema.Table{Columns: columns}) + + return putIndexResult(index) + } + + return nil, errors.New("unsupported method") + }) + + router.Register(routes.QuesmaTableResolverPath, method("GET"), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { + indexPattern := req.Params["index"] + + decisions := make(map[string]*table_resolver.Decision) + humanReadable := make(map[string]string) + for _, pipeline := range tableResolver.Pipelines() { + decision := tableResolver.Resolve(pipeline, indexPattern) + decisions[pipeline] = decision + humanReadable[pipeline] = decision.String() + } + + resp := struct { + IndexPattern string `json:"index_pattern"` + Decisions map[string]*table_resolver.Decision `json:"decisions"` + HumanReadable map[string]string `json:"human_readable"` + }{ + IndexPattern: indexPattern, + Decisions: decisions, + HumanReadable: humanReadable, + } + + body, err := json.MarshalIndent(resp, "", " ") + if err != nil { + return nil, err + } + + return &mux.Result{Body: string(body), StatusCode: http.StatusOK}, nil + }) + + return router +} diff --git a/quesma/v2/test_utils.go b/quesma/test_utils.go similarity index 97% rename from quesma/v2/test_utils.go rename to quesma/test_utils.go index 85f622a80..990ed1e80 100644 --- a/quesma/v2/test_utils.go +++ b/quesma/test_utils.go @@ -1,7 +1,7 @@ // Copyright Quesma, licensed under the Elastic License 2.0. // SPDX-License-Identifier: Elastic-2.0 -package v2 +package main import ( "bytes" diff --git a/quesma/v2/core/quesma_apis.go b/quesma/v2/core/quesma_apis.go index 1d45089cb..89a8366c8 100644 --- a/quesma/v2/core/quesma_apis.go +++ b/quesma/v2/core/quesma_apis.go @@ -11,6 +11,8 @@ import ( type Router interface { Cloner AddRoute(path string, handler HTTPFrontendHandler) + AddFallbackHandler(handler HTTPFrontendHandler) + GetFallbackHandler() HTTPFrontendHandler GetHandlers() map[string]HandlersPipe SetHandlers(handlers map[string]HandlersPipe) Multiplexer() *http.ServeMux diff --git a/quesma/v2/v2_test.go b/quesma/v2/v2_test.go deleted file mode 100644 index d13bb62ba..000000000 --- a/quesma/v2/v2_test.go +++ /dev/null @@ -1,98 +0,0 @@ -// Copyright Quesma, licensed under the Elastic License 2.0. -// SPDX-License-Identifier: Elastic-2.0 - -package v2 - -import ( - "context" - "github.com/stretchr/testify/assert" - "os" - "os/signal" - "quesma_v2/backend_connectors" - quesma_api "quesma_v2/core" - "quesma_v2/frontend_connectors" - "quesma_v2/processors" - "syscall" - "testing" - "time" -) - -func emitRequests(stop chan os.Signal) { - go func() { - time.Sleep(1 * time.Second) - requestBody := []byte(`{"query": {"match_all": {}}}`) - sendRequest("http://localhost:8888/_bulk", requestBody) - sendRequest("http://localhost:8888/_doc", requestBody) - sendRequest("http://localhost:8888/_search", requestBody) - sendRequest("http://localhost:8888/_search", requestBody) - signal.Notify(stop, os.Interrupt, syscall.SIGTERM) - close(stop) - }() -} - -func Test_backendConnectorValidation(t *testing.T) { - var tcpProcessor quesma_api.Processor = processors.NewPostgresToMySqlProcessor() - var postgressPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline() - postgressPipeline.AddProcessor(tcpProcessor) - var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma() - const endpoint = "root:password@tcp(127.0.0.1:3306)/test" - var mySqlBackendConnector quesma_api.BackendConnector = &backend_connectors.MySqlBackendConnector{ - Endpoint: endpoint, - } - postgressPipeline.AddBackendConnector(mySqlBackendConnector) - quesmaBuilder.AddPipeline(postgressPipeline) - _, err := quesmaBuilder.Build() - assert.NoError(t, err) -} - -func ab_testing_scenario() quesma_api.QuesmaBuilder { - var quesmaBuilder quesma_api.QuesmaBuilder = quesma_api.NewQuesma() - - ingestFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888") - ingestHTTPRouter := frontend_connectors.NewHTTPRouter() - ingestHTTPRouter.AddRoute("/_bulk", bulk) - ingestHTTPRouter.AddRoute("/_doc", doc) - ingestFrontendConnector.AddRouter(ingestHTTPRouter) - var ingestPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline() - ingestPipeline.AddFrontendConnector(ingestFrontendConnector) - var abIngestTestProcessor quesma_api.Processor = processors.NewABTestProcessor("ABIngestTestProcessor", false) - - var ingestProcessor quesma_api.Processor = NewIngestProcessor() - var innerIngestProcessor1 quesma_api.Processor = NewInnerIngestProcessor1() - ingestProcessor.AddProcessor(innerIngestProcessor1) - var innerIngestProcessor2 quesma_api.Processor = NewInnerIngestProcessor2() - ingestProcessor.AddProcessor(innerIngestProcessor2) - - ingestPipeline.AddProcessor(ingestProcessor) - ingestPipeline.AddProcessor(abIngestTestProcessor) - - queryFrontendConnector := frontend_connectors.NewBasicHTTPFrontendConnector(":8888") - queryHTTPRouter := frontend_connectors.NewHTTPRouter() - queryHTTPRouter.AddRoute("/_search", search) - queryFrontendConnector.AddRouter(queryHTTPRouter) - var queryPipeline quesma_api.PipelineBuilder = quesma_api.NewPipeline() - queryPipeline.AddFrontendConnector(queryFrontendConnector) - var queryProcessor quesma_api.Processor = NewQueryProcessor() - var innerQueryProcessor1 quesma_api.Processor = NewInnerQueryProcessor1() - queryProcessor.AddProcessor(innerQueryProcessor1) - var innerQueryProcessor2 quesma_api.Processor = NewInnerQueryProcessor2() - queryProcessor.AddProcessor(innerQueryProcessor2) - var abQueryTestProcessor quesma_api.Processor = processors.NewABTestProcessor("ABQueryTestProcessor", true) - - queryPipeline.AddProcessor(queryProcessor) - queryPipeline.AddProcessor(abQueryTestProcessor) - quesmaBuilder.AddPipeline(ingestPipeline) - quesmaBuilder.AddPipeline(queryPipeline) - - quesma, _ := quesmaBuilder.Build() - return quesma -} - -func Test_scenario1(t *testing.T) { - q1 := ab_testing_scenario() - q1.Start() - stop := make(chan os.Signal, 1) - emitRequests(stop) - <-stop - q1.Stop(context.Background()) -} diff --git a/quesma/v2/v2_test_objects.go b/quesma/v2_test_objects.go similarity index 96% rename from quesma/v2/v2_test_objects.go rename to quesma/v2_test_objects.go index acc9592a3..46eb669ac 100644 --- a/quesma/v2/v2_test_objects.go +++ b/quesma/v2_test_objects.go @@ -1,13 +1,13 @@ // Copyright Quesma, licensed under the Elastic License 2.0. // SPDX-License-Identifier: Elastic-2.0 -package v2 +package main import ( "net/http" + "quesma/frontend_connectors" + "quesma/processors" quesma_api "quesma_v2/core" - "quesma_v2/frontend_connectors" - "quesma_v2/processors" "strconv" "sync/atomic" ) @@ -130,7 +130,8 @@ func (p *IngestProcessor) GetId() string { func (p *IngestProcessor) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { var data []byte for _, m := range message { - data, err := quesma_api.CheckedCast[[]byte](m) + var err error + data, err = quesma_api.CheckedCast[[]byte](m) if err != nil { panic("IngestProcessor: invalid message type") } @@ -231,7 +232,8 @@ func (p *InnerIngestProcessor2) GetId() string { func (p *InnerIngestProcessor2) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { var data []byte for _, m := range message { - data, err := quesma_api.CheckedCast[[]byte](m) + var err error + data, err = quesma_api.CheckedCast[[]byte](m) if err != nil { panic("InnerIngestProcessor2: invalid message type") } @@ -260,7 +262,8 @@ func (p *InnerIngestProcessor1) GetId() string { func (p *InnerIngestProcessor1) Handle(metadata map[string]interface{}, message ...any) (map[string]interface{}, any, error) { var data []byte for _, m := range message { - data, err := quesma_api.CheckedCast[[]byte](m) + var err error + data, err = quesma_api.CheckedCast[[]byte](m) if err != nil { panic("InnerIngestProcessor1: invalid message type") }