Skip to content

Commit

Permalink
Adding very naive (dumb) ingest using v2 api (#1055)
Browse files Browse the repository at this point in the history
Very rough implementation of the ingest endpoints with `v2` api, but I
think we should merge this and continue working on such "sketchpad". It
basically shows the way.

---------

Signed-off-by: Przemysław Hejman <[email protected]>
  • Loading branch information
mieciu authored Dec 6, 2024
1 parent 38c6b7f commit 885ea15
Show file tree
Hide file tree
Showing 12 changed files with 2,068 additions and 7 deletions.
98 changes: 98 additions & 0 deletions quesma/backend_connectors/clickhouse_backend_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package backend_connectors

import (
"context"
"database/sql"
"github.com/ClickHouse/clickhouse-go/v2"

quesma_api "quesma_v2/core"
)

type ClickHouseBackendConnector struct {
Endpoint string
connection *sql.DB
}

type ClickHouseRows struct {
rows *sql.Rows
}

func (p *ClickHouseRows) Next() bool {
return p.rows.Next()
}

func (p *ClickHouseRows) Scan(dest ...interface{}) error {
return p.rows.Scan(dest...)
}

func (p *ClickHouseRows) Close() {
err := p.rows.Close()
if err != nil {
panic(err)
}
}

func (p *ClickHouseRows) Err() error {
return p.rows.Err()
}

func (p *ClickHouseBackendConnector) GetId() quesma_api.BackendConnectorType {
return quesma_api.ClickHouseSQLBackend
}

func (p *ClickHouseBackendConnector) Open() error {
conn, err := initDBConnection()
if err != nil {
return err
}
p.connection = conn
return nil
}

func (p *ClickHouseBackendConnector) Close() error {
if p.connection == nil {
return nil
}
return p.connection.Close()
}

func (p *ClickHouseBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) {
rows, err := p.connection.QueryContext(ctx, query, args...)
if err != nil {
return nil, err
}
return &ClickHouseRows{rows: rows}, nil
}

func (p *ClickHouseBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
if len(args) == 0 {
_, err := p.connection.ExecContext(ctx, query)
return err
}
_, err := p.connection.ExecContext(ctx, query, args...)
return err
}

// func initDBConnection(c *config.QuesmaConfiguration, tlsConfig *tls.Config) *sql.DB {
func initDBConnection() (*sql.DB, error) {
options := clickhouse.Options{Addr: []string{"localhost:9000"}}
info := struct {
Name string
Version string
}{
Name: "quesma",
Version: "NEW ODD VERSION", //buildinfo.Version,
}
options.ClientInfo.Products = append(options.ClientInfo.Products, info)
return clickhouse.OpenDB(&options), nil

}

func NewClickHouseBackendConnector(endpoint string) *ClickHouseBackendConnector {
return &ClickHouseBackendConnector{
Endpoint: endpoint,
}
}
102 changes: 102 additions & 0 deletions quesma/backend_connectors/elasticsearch_backend_connector.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
// Copyright Quesma, licensed under the Elastic License 2.0.
// SPDX-License-Identifier: Elastic-2.0

package backend_connectors

import (
"bytes"
"context"
"crypto/tls"
"fmt"
"net/http"
"quesma/elasticsearch"
"quesma/quesma/config"
quesma_api "quesma_v2/core"
"time"
)

const esRequestTimeout = 5 * time.Second

type Rows struct {
Hits []map[string]interface{}
}

// ElasticsearchBackendConnector is just a test impl -
// TODO: THIS IS A TRUE QUESTION MARK WHETHER IT IS GOING TO STAY LIKE THIS
type ElasticsearchBackendConnector struct {
client *http.Client
config config.ElasticsearchConfiguration
}

func NewElasticsearchBackendConnector(cfg config.ElasticsearchConfiguration) *ElasticsearchBackendConnector {
conn := &ElasticsearchBackendConnector{
config: cfg,
client: &http.Client{
Transport: &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
},
Timeout: esRequestTimeout,
},
}
return conn
}

func (e *ElasticsearchBackendConnector) RequestWithHeaders(ctx context.Context, method, endpoint string, body []byte, headers http.Header) (*http.Response, error) {
return e.doRequest(ctx, method, endpoint, body, headers)
}

func (e *ElasticsearchBackendConnector) doRequest(ctx context.Context, method, endpoint string, body []byte, headers http.Header) (*http.Response, error) {
req, err := http.NewRequestWithContext(ctx, method, fmt.Sprintf("%s/%s", e.config.Url, endpoint), bytes.NewBuffer(body))
if err != nil {
return nil, err
}
if req.Header.Get("Content-Type") == "" {
req.Header.Set("Content-Type", "application/json")
}
req = elasticsearch.AddBasicAuthIfNeeded(req, e.config.User, e.config.Password)
for key, values := range headers {
for _, value := range values {
req.Header.Set(key, value)
}
}
return e.client.Do(req)
}

// HttpBackendConnector is a base interface for sending http requests, for now
type HttpBackendConnector interface {
Send(r *http.Request) *http.Response
}

func (e *ElasticsearchBackendConnector) Send(r *http.Request) *http.Response {
r.Host = e.config.Url.Host
r.URL.Host = e.config.Url.Host
r.URL.Scheme = e.config.Url.Scheme
r.RequestURI = "" // this is important for the request to be sent correctly to a different host
maybeAuthdReq := elasticsearch.AddBasicAuthIfNeeded(r, e.config.User, e.config.Password)
if resp, err := e.client.Do(maybeAuthdReq); err != nil {
fmt.Printf("Error: %v\n", err)
panic(err)
} else {
return resp
}
}

func (e *ElasticsearchBackendConnector) GetId() quesma_api.BackendConnectorType {
return quesma_api.ElasticsearchBackend
}

func (e *ElasticsearchBackendConnector) Open() error {
return nil
}

func (e *ElasticsearchBackendConnector) Query(ctx context.Context, query string, args ...interface{}) (quesma_api.Rows, error) {
panic("not implemented")
}

func (e *ElasticsearchBackendConnector) Exec(ctx context.Context, query string, args ...interface{}) error {
panic("not implemented")
}

func (e *ElasticsearchBackendConnector) Close() error {
return nil
}
Loading

0 comments on commit 885ea15

Please sign in to comment.