Skip to content

Commit

Permalink
Merge branch 'main' into pr-ab-e2e-ignore-fields
Browse files Browse the repository at this point in the history
  • Loading branch information
nablaone authored Jan 16, 2025
2 parents d3f3fb7 + e44f0af commit f1ae5af
Show file tree
Hide file tree
Showing 96 changed files with 709 additions and 335 deletions.
375 changes: 294 additions & 81 deletions NOTICE.MD

Large diffs are not rendered by default.

3 changes: 0 additions & 3 deletions quesma/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@ FROM golang:alpine AS builder


WORKDIR /quesma
## v2 is a seprate module, not package
COPY v2/go.mod v2/go.mod
COPY v2/go.sum v2/go.sum

COPY go.mod go.sum ./

Expand Down
2 changes: 1 addition & 1 deletion quesma/backend_connectors/clickhouse_backend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"database/sql"
"github.com/ClickHouse/clickhouse-go/v2"

quesma_api "quesma_v2/core"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
)

type ClickHouseBackendConnector struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ import (
"fmt"
"github.com/QuesmaOrg/quesma/quesma/elasticsearch"
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"net/http"
quesma_api "quesma_v2/core"
"time"
)

Expand Down
2 changes: 1 addition & 1 deletion quesma/backend_connectors/mysql_backend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package backend_connectors
import (
"context"
"database/sql"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
_ "github.com/go-sql-driver/mysql"
quesma_api "quesma_v2/core"
)

type MySqlRows struct {
Expand Down
2 changes: 1 addition & 1 deletion quesma/backend_connectors/postgres_backend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ package backend_connectors

import (
"context"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"github.com/jackc/pgx/v4"
quesma_api "quesma_v2/core"
)

type PostgresBackendConnector struct {
Expand Down
4 changes: 2 additions & 2 deletions quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"github.com/QuesmaOrg/quesma/quesma/quesma/recovery"
"github.com/QuesmaOrg/quesma/quesma/schema"
"github.com/QuesmaOrg/quesma/quesma/util"
quesma_api "quesma_v2/core"
"quesma_v2/core/diag"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"github.com/QuesmaOrg/quesma/quesma/v2/core/diag"
"slices"
"strings"
"sync/atomic"
Expand Down
2 changes: 1 addition & 1 deletion quesma/clickhouse/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ import (
"github.com/QuesmaOrg/quesma/quesma/buildinfo"
"github.com/QuesmaOrg/quesma/quesma/logger"
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"net"
quesma_api "quesma_v2/core"
"strings"
"time"
)
Expand Down
4 changes: 2 additions & 2 deletions quesma/clickhouse/quesma_communicator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ import (
"github.com/QuesmaOrg/quesma/quesma/logger"
"github.com/QuesmaOrg/quesma/quesma/model"
"github.com/QuesmaOrg/quesma/quesma/quesma/recovery"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"github.com/QuesmaOrg/quesma/quesma/v2/core/tracing"
"math/rand"
quesma_api "quesma_v2/core"
tracing "quesma_v2/core/tracing"
"strconv"
"strings"
"sync/atomic"
Expand Down
48 changes: 47 additions & 1 deletion quesma/clickhouse/table_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,13 @@ import (
"github.com/QuesmaOrg/quesma/quesma/logger"
"github.com/QuesmaOrg/quesma/quesma/persistence"
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
"github.com/QuesmaOrg/quesma/quesma/quesma/types"
"github.com/QuesmaOrg/quesma/quesma/schema"
"github.com/QuesmaOrg/quesma/quesma/util"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"github.com/goccy/go-json"
quesma_api "quesma_v2/core"
"strings"
"sync"
"sync/atomic"
"time"
)
Expand All @@ -34,12 +36,15 @@ func (d DbKind) String() string {
type TableDiscovery interface {
ReloadTableDefinitions()
TableDefinitions() *TableMap
AddTable(tableName string, table *Table)
TableDefinitionsFetchError() error

LastAccessTime() time.Time
LastReloadTime() time.Time
ForceReloadCh() <-chan chan<- struct{}
AutodiscoveryEnabled() bool

RegisterTablesReloadListener(ch chan<- types.ReloadMessage)
}

type tableDiscovery struct {
Expand All @@ -51,6 +56,9 @@ type tableDiscovery struct {
forceReloadCh chan chan<- struct{}
ReloadTablesError error
virtualTableStorage persistence.JSONDatabase

reloadObserversMutex sync.Mutex
reloadObservers []chan<- types.ReloadMessage
}

type columnMetadata struct {
Expand Down Expand Up @@ -79,6 +87,10 @@ type TableDiscoveryTableProviderAdapter struct {
TableDiscovery
}

func (t TableDiscoveryTableProviderAdapter) RegisterTablesReloadListener(ch chan<- types.ReloadMessage) {
t.TableDiscovery.RegisterTablesReloadListener(ch)
}

func (t TableDiscoveryTableProviderAdapter) TableDefinitions() map[string]schema.Table {

// here we filter out our internal columns
Expand Down Expand Up @@ -125,6 +137,31 @@ func NewTableDiscoveryWith(cfg *config.QuesmaConfiguration, dbConnPool quesma_ap
return result
}

func (td *tableDiscovery) AddTable(tableName string, table *Table) {
td.tableDefinitions.Load().Store(tableName, table)
td.notifyObservers()
}

func (td *tableDiscovery) RegisterTablesReloadListener(ch chan<- types.ReloadMessage) {
td.reloadObserversMutex.Lock()
defer td.reloadObserversMutex.Unlock()
td.reloadObservers = append(td.reloadObservers, ch)
}

func (td *tableDiscovery) notifyObservers() {

td.reloadObserversMutex.Lock()
defer td.reloadObserversMutex.Unlock()

msg := types.ReloadMessage{Timestamp: time.Now()}
for _, observer := range td.reloadObservers {
fmt.Println("Sending message to observer", observer)
go func() {
observer <- msg
}()
}
}

func (td *tableDiscovery) TableDefinitionsFetchError() error {
return td.ReloadTablesError
}
Expand Down Expand Up @@ -178,6 +215,8 @@ func (td *tableDiscovery) ReloadTableDefinitions() {

td.ReloadTablesError = nil
td.populateTableDefinitions(configuredTables, databaseName, td.cfg)

td.notifyObservers()
}

func (td *tableDiscovery) readVirtualTables(configuredTables map[string]discoveredTable) map[string]discoveredTable {
Expand Down Expand Up @@ -634,6 +673,9 @@ func NewEmptyTableDiscovery() *EmptyTableDiscovery {
}
}

func (td *EmptyTableDiscovery) RegisterTablesReloadListener(ch chan<- types.ReloadMessage) {
}

func (td *EmptyTableDiscovery) ReloadTableDefinitions() {
}

Expand All @@ -660,3 +702,7 @@ func (td *EmptyTableDiscovery) ForceReloadCh() <-chan chan<- struct{} {
func (td *EmptyTableDiscovery) AutodiscoveryEnabled() bool {
return td.Autodiscovery
}

func (td *EmptyTableDiscovery) AddTable(tableName string, table *Table) {
td.TableMap.Store(tableName, table)
}
2 changes: 1 addition & 1 deletion quesma/common_table/const.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ package common_table
import (
"context"
"github.com/QuesmaOrg/quesma/quesma/logger"
quesma_api "quesma_v2/core"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
)

const TableName = "quesma_common_table"
Expand Down
2 changes: 1 addition & 1 deletion quesma/connectors/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/QuesmaOrg/quesma/quesma/logger"
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
"github.com/QuesmaOrg/quesma/quesma/telemetry"
quesma_api "quesma_v2/core"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
)

type Connector interface {
Expand Down
2 changes: 1 addition & 1 deletion quesma/eql/e2e/query_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,10 @@ package e2e
import (
"bytes"
"fmt"
"github.com/QuesmaOrg/quesma/quesma/jsonprocessor"
"github.com/goccy/go-json"
"io"
"net/http"
"quesma/jsonprocessor"
"sort"
"strings"
)
Expand Down
4 changes: 2 additions & 2 deletions quesma/frontend_connectors/basic_http_frontend_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,10 @@ import (
"github.com/QuesmaOrg/quesma/quesma/clickhouse"
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
"github.com/QuesmaOrg/quesma/quesma/schema"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"github.com/QuesmaOrg/quesma/quesma/v2/core/diag"
"io"
"net/http"
quesma_api "quesma_v2/core"
"quesma_v2/core/diag"
"sync"
)

Expand Down
2 changes: 1 addition & 1 deletion quesma/frontend_connectors/basic_tcp_connection_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ package frontend_connectors

import (
"fmt"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"io"
"net"
quesma_api "quesma_v2/core"
)

type BasicTcpConnectionHandler struct {
Expand Down
2 changes: 1 addition & 1 deletion quesma/frontend_connectors/basic_tcp_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package frontend_connectors
import (
"context"
"fmt"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"net"
quesma_api "quesma_v2/core"
"sync/atomic"
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/QuesmaOrg/quesma/quesma/clickhouse"
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
"github.com/QuesmaOrg/quesma/quesma/schema"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"net/http"
quesma_api "quesma_v2/core"
)

type ElasticHttpIngestFrontendConnector struct {
Expand Down
2 changes: 1 addition & 1 deletion quesma/frontend_connectors/elasticsearch_ingest.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ package frontend_connectors
import (
"context"
"github.com/QuesmaOrg/quesma/quesma/processors/es_to_ch_common"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"net/http"
quesma_api "quesma_v2/core"
)

type ElasticsearchIngestFrontendConnector struct {
Expand Down
2 changes: 1 addition & 1 deletion quesma/frontend_connectors/elasticsearch_query.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ import (
"github.com/QuesmaOrg/quesma/quesma/elasticsearch"
"github.com/QuesmaOrg/quesma/quesma/processors/es_to_ch_common"
"github.com/QuesmaOrg/quesma/quesma/quesma/config"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"net/http"
quesma_api "quesma_v2/core"
)

type ElasticsearchQueryFrontendConnector struct {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ package frontend_connectors

import (
"fmt"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"io"
"net"
quesma_api "quesma_v2/core"
)

func (p *PassThroughConnectionHandler) copyData(src io.Reader, dest io.Writer) {
Expand Down
8 changes: 4 additions & 4 deletions quesma/frontend_connectors/router_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,12 @@ import (
"github.com/QuesmaOrg/quesma/quesma/quesma/ui"
"github.com/QuesmaOrg/quesma/quesma/schema"
"github.com/QuesmaOrg/quesma/quesma/util"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"github.com/QuesmaOrg/quesma/quesma/v2/core/diag"
"github.com/QuesmaOrg/quesma/quesma/v2/core/routes"
"github.com/QuesmaOrg/quesma/quesma/v2/core/tracing"
"io"
"net/http"
quesma_api "quesma_v2/core"
"quesma_v2/core/diag"
"quesma_v2/core/routes"
"quesma_v2/core/tracing"
"strings"
"sync/atomic"
"time"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ package frontend_connectors

import (
"fmt"
quesma_api "github.com/QuesmaOrg/quesma/quesma/v2/core"
"github.com/jackc/pgx/v5/pgproto3"
"net"
quesma_api "quesma_v2/core"
)

type TcpPostgresConnectionHandler struct {
Expand Down
Loading

0 comments on commit f1ae5af

Please sign in to comment.