diff --git a/.gitignore b/.gitignore
index a5c9e9921..6e814cfd1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -21,4 +21,5 @@ docker/security/es.local
docker/security/certificate-bundle.zip
docker/security/clickhouse
quesma/config.yaml
+quesma/.installation_id
examples/kibana-sample-data/quesma/logs/*
diff --git a/docker/ci.yml b/docker/ci.yml
index 40e340f08..315d58894 100644
--- a/docker/ci.yml
+++ b/docker/ci.yml
@@ -9,6 +9,7 @@ services:
- QUESMA_port=8080
- QUESMA_logging_path=/var/quesma/logs
- QUESMA_clickhouse_url=clickhouse://clickhouse:9000
+ - QUESMA_connectors_my-CH-connector_type=clickhouse-os
- QUESMA_logging_fileLogging=true
depends_on:
clickhouse:
diff --git a/docker/quesma/config/ci-config.yaml b/docker/quesma/config/ci-config.yaml
index aa8da1337..ce2de7035 100644
--- a/docker/quesma/config/ci-config.yaml
+++ b/docker/quesma/config/ci-config.yaml
@@ -1,5 +1,5 @@
mode: "dual-write-query-clickhouse"
-licenseKey: "empty-license-key-that-is-just-ci"
+installationId: "just-Quesma-smoke-test-instance"
port: 8080
indexes:
kibana_sample_data_ecommerce:
diff --git a/quesma/buildinfo/build.go b/quesma/buildinfo/build.go
index e3f137bec..d8979f40b 100644
--- a/quesma/buildinfo/build.go
+++ b/quesma/buildinfo/build.go
@@ -2,11 +2,6 @@
// SPDX-License-Identifier: Elastic-2.0
package buildinfo
-const (
- DevelopmentLicenseKey = "cdd749a3-e777-11ee-bcf8-0242ac150004"
-)
-
var Version = "development"
var BuildHash = ""
var BuildDate = ""
-var LicenseKey = DevelopmentLicenseKey
diff --git a/quesma/clickhouse/clickhouse.go b/quesma/clickhouse/clickhouse.go
index bfaf68bf0..e25db47e8 100644
--- a/quesma/clickhouse/clickhouse.go
+++ b/quesma/clickhouse/clickhouse.go
@@ -32,6 +32,7 @@ const (
)
type (
+ // LogManager should be renamed to Connector -> TODO !!!
LogManager struct {
ctx context.Context
cancel context.CancelFunc
@@ -262,6 +263,26 @@ func (lm *LogManager) sendCreateTableQuery(ctx context.Context, query string) er
return nil
}
+func (lm *LogManager) executeRawQuery(query string) (*sql.Rows, error) {
+ if res, err := lm.chDb.Query(query); err != nil {
+ return nil, fmt.Errorf("error in executeRawQuery: query: %s\nerr:%v", query, err)
+ } else {
+ return res, nil
+ }
+}
+
+func (lm *LogManager) CheckIfConnectedToHydrolix() error {
+ if rows, err := lm.executeRawQuery(`SELECT concat(database,'.', table) FROM system.tables WHERE engine = 'TurbineStorage';`); err != nil {
+ return fmt.Errorf("error executing HDX identifying query: %v", err)
+ } else {
+ defer rows.Close()
+ if rows.Next() {
+ return fmt.Errorf("detected Hydrolix-specific table engine, which is not allowed")
+ }
+ return nil
+ }
+}
+
func (lm *LogManager) ProcessCreateTableQuery(ctx context.Context, query string, config *ChTableConfig) error {
table, err := NewTable(query, config)
if err != nil {
diff --git a/quesma/config.yaml.template b/quesma/config.yaml.template
index 441b77e20..e7294565d 100644
--- a/quesma/config.yaml.template
+++ b/quesma/config.yaml.template
@@ -9,6 +9,9 @@ port: 8080 # public tcp port to listen for incoming traffic
elasticsearch:
url: "http://localhost:9200"
call: false
+connectors:
+ - name: "clickhouse-conn"
+ type: "clickhouse" # one of [clickhouse, clickhouse-os, hydrolix]
#clickhouse: # this config is going to be removed, but for now let's just comment out
# url: "clickhouse://localhost:9000"
ingestStatistics: true
diff --git a/quesma/connectors/clickhouse.go b/quesma/connectors/clickhouse.go
new file mode 100644
index 000000000..1520ea095
--- /dev/null
+++ b/quesma/connectors/clickhouse.go
@@ -0,0 +1,25 @@
+// Copyright Quesma, licensed under the Elastic License 2.0.
+// SPDX-License-Identifier: Elastic-2.0
+package connectors
+
+import (
+ "quesma/clickhouse"
+)
+
+type ClickHouseConnector struct {
+ Connector *clickhouse.LogManager
+}
+
+const clickHouseConnectorTypeName = "clickhouse"
+
+func (c *ClickHouseConnector) LicensingCheck() (err error) {
+ return c.Connector.CheckIfConnectedToHydrolix()
+}
+
+func (c *ClickHouseConnector) Type() string {
+ return clickHouseConnectorTypeName
+}
+
+func (c *ClickHouseConnector) GetConnector() *clickhouse.LogManager {
+ return c.Connector
+}
diff --git a/quesma/connectors/clickhouse_os.go b/quesma/connectors/clickhouse_os.go
new file mode 100644
index 000000000..9bda29a3a
--- /dev/null
+++ b/quesma/connectors/clickhouse_os.go
@@ -0,0 +1,24 @@
+// Copyright Quesma, licensed under the Elastic License 2.0.
+// SPDX-License-Identifier: Elastic-2.0
+package connectors
+
+import "quesma/clickhouse"
+
+type ClickHouseOSConnector struct {
+ Connector *clickhouse.LogManager
+}
+
+const clickHouseOSConnectorTypeName = "clickhouse-os"
+
+func (c *ClickHouseOSConnector) LicensingCheck() error {
+ // TODO: Check if you're connected to ClickHouse Cloud OR Hydrolix and fail if so
+ return c.Connector.CheckIfConnectedToHydrolix()
+}
+
+func (c *ClickHouseOSConnector) Type() string {
+ return clickHouseOSConnectorTypeName
+}
+
+func (c *ClickHouseOSConnector) GetConnector() *clickhouse.LogManager {
+ return c.Connector
+}
diff --git a/quesma/connectors/connector.go b/quesma/connectors/connector.go
new file mode 100644
index 000000000..cce532be7
--- /dev/null
+++ b/quesma/connectors/connector.go
@@ -0,0 +1,64 @@
+// Copyright Quesma, licensed under the Elastic License 2.0.
+// SPDX-License-Identifier: Elastic-2.0
+package connectors
+
+import (
+ "database/sql"
+ "fmt"
+ "quesma/clickhouse"
+ "quesma/licensing"
+ "quesma/logger"
+ "quesma/quesma/config"
+ "quesma/telemetry"
+)
+
+type Connector interface {
+ LicensingCheck() error
+ Type() string
+ GetConnector() *clickhouse.LogManager // enforce contract for having connector instance ... maybe unnecessary
+}
+
+type ConnectorManager struct {
+ connectors []Connector
+}
+
+// GetConnector - TODO this is just bypassing the fact that we support only 1 connector at a time today :>
+func (c *ConnectorManager) GetConnector() *clickhouse.LogManager {
+ if len(c.connectors) == 0 {
+ panic("No connectors found")
+ }
+ conn := c.connectors[0]
+ if err := conn.LicensingCheck(); err != nil {
+ licensing.PanicWithLicenseViolation(fmt.Errorf("connector [%s] reported licensing issue: [%v]", conn.Type(), err))
+ }
+ return c.connectors[0].GetConnector()
+}
+
+func NewConnectorManager(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader clickhouse.TableDiscovery) *ConnectorManager {
+ return &ConnectorManager{
+ connectors: registerConnectors(cfg, chDb, phoneHomeAgent, loader),
+ }
+}
+
+func registerConnectors(cfg config.QuesmaConfiguration, chDb *sql.DB, phoneHomeAgent telemetry.PhoneHomeAgent, loader clickhouse.TableDiscovery) (conns []Connector) {
+ for connName, conn := range cfg.Connectors {
+ logger.Info().Msgf("Registering connector named [%s] of type [%s]", connName, conn.ConnectorType)
+ switch conn.ConnectorType {
+ case clickHouseConnectorTypeName:
+ conns = append(conns, &ClickHouseConnector{
+ Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader),
+ })
+ case clickHouseOSConnectorTypeName:
+ conns = append(conns, &ClickHouseOSConnector{
+ Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader),
+ })
+ case hydrolixConnectorTypeName:
+ conns = append(conns, &HydrolixConnector{
+ Connector: clickhouse.NewEmptyLogManager(cfg, chDb, phoneHomeAgent, loader),
+ })
+ default:
+ logger.Error().Msgf("Unknown connector type [%s]", conn.ConnectorType)
+ }
+ }
+ return conns
+}
diff --git a/quesma/connectors/hydrolix.go b/quesma/connectors/hydrolix.go
new file mode 100644
index 000000000..bdbc50937
--- /dev/null
+++ b/quesma/connectors/hydrolix.go
@@ -0,0 +1,27 @@
+// Copyright Quesma, licensed under the Elastic License 2.0.
+// SPDX-License-Identifier: Elastic-2.0
+package connectors
+
+import (
+ "quesma/clickhouse"
+ "quesma/logger"
+)
+
+type HydrolixConnector struct {
+ Connector *clickhouse.LogManager
+}
+
+const hydrolixConnectorTypeName = "hydrolix"
+
+func (h *HydrolixConnector) LicensingCheck() error {
+ logger.Debug().Msg("Runtime checks for Hydrolix connector is not required, as static configuration disables it.")
+ return nil
+}
+
+func (h *HydrolixConnector) Type() string {
+ return hydrolixConnectorTypeName
+}
+
+func (h *HydrolixConnector) GetConnector() *clickhouse.LogManager {
+ return h.Connector
+}
diff --git a/quesma/license/headers.go b/quesma/license/headers.go
deleted file mode 100644
index 7b207fd2d..000000000
--- a/quesma/license/headers.go
+++ /dev/null
@@ -1,6 +0,0 @@
-// Copyright Quesma, licensed under the Elastic License 2.0.
-// SPDX-License-Identifier: Elastic-2.0
-package license
-
-// Header Used to pass license key by phone home service
-const Header = "X-License-Key"
diff --git a/quesma/licensing/error.go b/quesma/licensing/error.go
new file mode 100644
index 000000000..ebf54e47c
--- /dev/null
+++ b/quesma/licensing/error.go
@@ -0,0 +1,14 @@
+// Copyright Quesma, licensed under the Elastic License 2.0.
+// SPDX-License-Identifier: Elastic-2.0
+package licensing
+
+import "fmt"
+
+const (
+ errorMessage = `There's been license violation detected. Please contact us at:
+ support@quesma.com`
+)
+
+func PanicWithLicenseViolation(initialErr error) {
+ panic(fmt.Sprintf("Error thrown: %v\n%s", initialErr, errorMessage))
+}
diff --git a/quesma/licensing/license_manager.go b/quesma/licensing/license_manager.go
new file mode 100644
index 000000000..fdaf0dd21
--- /dev/null
+++ b/quesma/licensing/license_manager.go
@@ -0,0 +1,88 @@
+// Copyright Quesma, licensed under the Elastic License 2.0.
+// SPDX-License-Identifier: Elastic-2.0
+package licensing
+
+import (
+ "bytes"
+ "encoding/json"
+ "fmt"
+ "io"
+ "net/http"
+ "time"
+)
+
+const (
+ obtainLicenseEndpoint = "https://quesma-licensing-service-gd46dsvxda-uc.a.run.app/api/license/obtain"
+ verifyLicenseEndpoint = "https://quesma-licensing-service-gd46dsvxda-uc.a.run.app/api/license/verify"
+)
+
+type InstallationIDPayload struct {
+ InstallationID string `json:"installation_id"`
+}
+
+type LicensePayload struct {
+ LicenseKey []byte `json:"license_key"`
+}
+
+// obtainLicenseKey presents an InstallationId to the license server and receives a LicenseKey in return
+func (l *LicenseModule) obtainLicenseKey() (err error) {
+ fmt.Printf("Obtaining license key for installation ID [%s]\n", l.InstallationID)
+ var payloadBytes []byte
+ if payloadBytes, err = json.Marshal(InstallationIDPayload{InstallationID: l.InstallationID}); err != nil {
+ return err
+ }
+ resp, err := http.Post(obtainLicenseEndpoint, "application/json", bytes.NewReader(payloadBytes))
+ if err != nil {
+ return err
+ }
+ defer resp.Body.Close()
+
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return err
+ }
+
+ var licenseResponse LicensePayload
+ if err = json.Unmarshal(body, &licenseResponse); err != nil {
+ return err
+ }
+ l.LicenseKey = licenseResponse.LicenseKey
+ fmt.Printf("License key obtained and set successfully, key=[%s]\n", l.LicenseKey)
+ return nil
+}
+
+// processLicense presents the license to the license server and receives an AllowList in return
+func (l *LicenseModule) processLicense() error {
+ if fetchedLicense, err := l.fetchLicense(); err != nil {
+ return fmt.Errorf("failed processing license by the license server: %v", err)
+ } else {
+ l.License = fetchedLicense
+ fmt.Printf("Allowlist loaded successfully\n%s\n", fetchedLicense.String())
+ }
+ if l.License.ExpirationDate.Before(time.Now()) {
+ return fmt.Errorf("license expired on %s", l.License.ExpirationDate)
+ }
+ return nil
+}
+
+func (l *LicenseModule) fetchLicense() (a *License, err error) {
+ var payloadBytes []byte
+ if payloadBytes, err = json.Marshal(LicensePayload{LicenseKey: l.LicenseKey}); err != nil {
+ return nil, err
+ }
+ resp, err := http.Post(verifyLicenseEndpoint, "application/json", bytes.NewReader(payloadBytes))
+ if err != nil {
+ return nil, err
+ }
+ defer resp.Body.Close()
+ body, err := io.ReadAll(resp.Body)
+ if err != nil {
+ return nil, err
+ }
+
+ if err = json.Unmarshal(body, &a); err != nil {
+ return nil, err
+ } else {
+ return a, nil
+ }
+}
diff --git a/quesma/licensing/model.go b/quesma/licensing/model.go
new file mode 100644
index 000000000..cf94035bd
--- /dev/null
+++ b/quesma/licensing/model.go
@@ -0,0 +1,23 @@
+// Copyright Quesma, licensed under the Elastic License 2.0.
+// SPDX-License-Identifier: Elastic-2.0
+package licensing
+
+import (
+ "fmt"
+ "strings"
+ "time"
+)
+
+// License is an object returned by the license server based on the provided (and positively verified) license key
+type License struct {
+ InstallationID string `json:"installation_id"`
+ ClientID string `json:"client_id"`
+ Connectors []string `json:"connectors"`
+ Processors []string `json:"processors"`
+ ExpirationDate time.Time `json:"expiration_date"`
+}
+
+func (a *License) String() string {
+ return fmt.Sprintf("[Quesma License]\n\tInstallation ID: %s\n\tClient Name: %s\n\tConnectors: [%v]\n\tProcessors: [%v]\n\tExpires: %s\n",
+ a.InstallationID, a.ClientID, strings.Join(a.Connectors, ", "), strings.Join(a.Processors, ", "), a.ExpirationDate)
+}
diff --git a/quesma/licensing/readme.md b/quesma/licensing/readme.md
new file mode 100644
index 000000000..7af754cf2
--- /dev/null
+++ b/quesma/licensing/readme.md
@@ -0,0 +1,54 @@
+Quesma Licensing Module
+=======================
+
+The diagram below illustrates the interaction between the Quesma Licensing Module and the License Server.
+
+```mermaid
+sequenceDiagram
+ box Quesma Instance
+participant Q as Quesma
(main process)
+participant License as License Module
+end
+ box License Server
+participant Server as License Server
+end
+Q->>License: Boot up
+ alt License Key not found in configuration
+ License-->>License: Generate `Installation ID`
+ License-->>Server: Register `Installation ID`
+ Server-->>License: Obtain License Key
+ activate License
+ License -->> Q: (maybe panic)
+ deactivate License
+end
+ License->>Server: Present License Key
+ activate Server
+ Server-->>Server: Verify License Key
+ Server->>License: Return License
(granted permissions, expiration date)
+ deactivate Server
+ activate License
+ License -->> Q: (maybe panic)
+ deactivate License
+License->>License: Validate configuration
+ License -->> Q: (maybe panic)
+ activate License
+deactivate License
+License->>License: Trigger runtime checks
(Connectors/Processors)
+ activate License
+ License -->> Q: (maybe panic)
+ deactivate License
+License->>Q: Done veryfing.
+```
+
+## Key assumptions
+
+* Unless provided explicitly, License Module is going to generate unique `Installation ID` in the form of UUID.
+* Aforementioned `Installation ID` is going to be used to identify the Quesma installation, so ideally it has to persist between restarts. \
+ We are going to attempt writing it to a file, next to the configuration. This will cover local build use case.
+ If we fail writing it, we are going to regenerate it on each boot (this probably implies cloud deployment situation).
+* License Module is going to use `Installation ID` to **obtain the License Key** from the License Server (unless the License Key is **not** specified in the configuration)
+* Quesma License is going to be signed by us and will contain expiration date.
+* License Module is going to contact license server and ask for what it's eligible to based on the License Key.
+* Based on the that information License Module is going to validate the configuration.
+* License Module is going to trigger local checks (usage of allowed processors/connectors).
+ Those checks are going to be part of respective components.
diff --git a/quesma/licensing/runner.go b/quesma/licensing/runner.go
new file mode 100644
index 000000000..b036ca15f
--- /dev/null
+++ b/quesma/licensing/runner.go
@@ -0,0 +1,86 @@
+// Copyright Quesma, licensed under the Elastic License 2.0.
+// SPDX-License-Identifier: Elastic-2.0
+package licensing
+
+import (
+ "fmt"
+ "github.com/google/uuid"
+ "os"
+ "quesma/quesma/config"
+ "slices"
+)
+
+type LicenseModule struct {
+ InstallationID string
+ LicenseKey []byte
+ License *License
+ Config *config.QuesmaConfiguration
+}
+
+const (
+ installationIdFile = ".installation_id"
+)
+
+func Init(config *config.QuesmaConfiguration) *LicenseModule {
+ l := &LicenseModule{
+ Config: config,
+ LicenseKey: []byte(config.LicenseKey),
+ }
+ l.Run()
+ return l
+}
+
+func (l *LicenseModule) Run() {
+ if len(l.LicenseKey) > 0 {
+ fmt.Printf("License key [%s] already present, skipping license key obtainment.\n", l.LicenseKey)
+ } else {
+ l.setInstallationID()
+ if err := l.obtainLicenseKey(); err != nil {
+ PanicWithLicenseViolation(fmt.Errorf("failed to obtain license key: %v", err))
+ }
+ }
+ if err := l.processLicense(); err != nil {
+ PanicWithLicenseViolation(fmt.Errorf("failed to process license: %v", err))
+ }
+ if err := l.validateConfig(); err != nil {
+ PanicWithLicenseViolation(fmt.Errorf("failed to validate configuration: %v", err))
+ }
+}
+
+func (l *LicenseModule) validateConfig() error {
+ // Check if connectors are allowed
+ for _, conn := range l.Config.Connectors {
+ if !slices.Contains(l.License.Connectors, conn.ConnectorType) {
+ return fmt.Errorf("connector [%s] is not allowed within the current license", conn.ConnectorType)
+ }
+ }
+ return nil
+}
+
+func (l *LicenseModule) setInstallationID() {
+ if l.Config.InstallationId != "" {
+ fmt.Printf("Installation ID provided in the configuration [%s]\n", l.Config.InstallationId)
+ l.InstallationID = l.Config.InstallationId
+ return
+ }
+
+ if data, err := os.ReadFile(installationIdFile); err != nil {
+ fmt.Printf("Reading Installation ID failed [%v], generating new one\n", err)
+ generatedID := uuid.New().String()
+ fmt.Printf("Generated Installation ID of [%s]\n", generatedID)
+ l.tryStoringInstallationIdInFile(generatedID)
+ l.InstallationID = generatedID
+ } else {
+ installationID := string(data)
+ fmt.Printf("Installation ID of [%s] found\n", installationID)
+ l.InstallationID = installationID
+ }
+}
+
+func (l *LicenseModule) tryStoringInstallationIdInFile(installationID string) {
+ if err := os.WriteFile(installationIdFile, []byte(installationID), 0644); err != nil {
+ fmt.Printf("Failed to store Installation ID in file: %v\n", err)
+ } else {
+ fmt.Printf("Stored Installation ID in file [%s]\n", installationIdFile)
+ }
+}
diff --git a/quesma/logger/configuration.go b/quesma/logger/configuration.go
index 2d1bd4eef..6029366b4 100644
--- a/quesma/logger/configuration.go
+++ b/quesma/logger/configuration.go
@@ -12,5 +12,5 @@ type Configuration struct {
Path string
RemoteLogDrainUrl *url.URL
Level zerolog.Level
- LicenseKey string
+ ClientId string
}
diff --git a/quesma/logger/log_sender.go b/quesma/logger/log_sender.go
index bb5976040..eac3e4d13 100644
--- a/quesma/logger/log_sender.go
+++ b/quesma/logger/log_sender.go
@@ -8,7 +8,6 @@ import (
"fmt"
"net/http"
"net/url"
- "quesma/license"
"quesma/telemetry/headers"
"strconv"
"time"
@@ -16,7 +15,7 @@ import (
type LogSender struct {
Url *url.URL
- LicenseKey string
+ ClientId string
LogBuffer []byte
LastSendTime time.Time
Interval time.Duration
@@ -90,7 +89,7 @@ func (logSender *LogSender) sendLogs() error {
}
req.Header.Set("Content-Type", "text/plain")
req.Header.Set(telemetry_headers.XTelemetryRemoteLog, "true") // value is arbitrary, just have to be non-empty
- req.Header.Set(license.Header, logSender.LicenseKey)
+ req.Header.Set(telemetry_headers.ClientId, logSender.ClientId)
resp, err := logSender.httpClient.Do(req)
if err != nil {
return err
diff --git a/quesma/logger/logger.go b/quesma/logger/logger.go
index c9c132549..8c64e17ac 100644
--- a/quesma/logger/logger.go
+++ b/quesma/logger/logger.go
@@ -68,7 +68,7 @@ func InitLogger(cfg Configuration, sig chan os.Signal, doneCh chan struct{}, asy
logDrainUrl := *cfg.RemoteLogDrainUrl
logForwarder := LogForwarder{logSender: LogSender{
Url: &logDrainUrl,
- LicenseKey: cfg.LicenseKey,
+ ClientId: cfg.ClientId,
LogBuffer: make([]byte, 0, initialBufferSize),
LastSendTime: time.Now(),
Interval: time.Minute,
diff --git a/quesma/main.go b/quesma/main.go
index e61c17ff1..aeae7e05d 100644
--- a/quesma/main.go
+++ b/quesma/main.go
@@ -10,8 +10,10 @@ import (
"os/signal"
"quesma/buildinfo"
"quesma/clickhouse"
+ "quesma/connectors"
"quesma/elasticsearch"
"quesma/feature"
+ "quesma/licensing"
"quesma/logger"
"quesma/quesma"
"quesma/quesma/config"
@@ -33,8 +35,8 @@ const banner = `
func main() {
println(banner)
- fmt.Printf("Quesma build info: version=[%s], build hash=[%s], build date=[%s] license key=[%s]\n",
- buildinfo.Version, buildinfo.BuildHash, buildinfo.BuildDate, config.MaskLicenseKey(buildinfo.LicenseKey))
+ fmt.Printf("Quesma build info: version=[%s], build hash=[%s], build date=[%s]\n",
+ buildinfo.Version, buildinfo.BuildHash, buildinfo.BuildDate)
sig := make(chan os.Signal, 1)
signal.Notify(sig, syscall.SIGINT, syscall.SIGTERM)
@@ -47,12 +49,13 @@ func main() {
var asyncQueryTraceLogger *tracing.AsyncTraceLogger
+ licenseMod := licensing.Init(&cfg)
qmcLogChannel := logger.InitLogger(logger.Configuration{
FileLogging: cfg.Logging.FileLogging,
Path: cfg.Logging.Path,
RemoteLogDrainUrl: cfg.Logging.RemoteLogDrainUrl.ToUrl(),
Level: cfg.Logging.Level,
- LicenseKey: cfg.LicenseKey,
+ ClientId: licenseMod.License.ClientID,
}, sig, doneCh, asyncQueryTraceLogger)
defer logger.StdLogFile.Close()
defer logger.ErrLogFile.Close()
@@ -65,14 +68,16 @@ func main() {
var connectionPool = clickhouse.InitDBConnectionPool(cfg)
- phoneHomeAgent := telemetry.NewPhoneHomeAgent(cfg, connectionPool)
+ phoneHomeAgent := telemetry.NewPhoneHomeAgent(cfg, connectionPool, licenseMod.License.ClientID)
phoneHomeAgent.Start()
schemaManagement := clickhouse.NewSchemaManagement(connectionPool)
schemaLoader := clickhouse.NewTableDiscovery(cfg, schemaManagement)
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: schemaLoader}, cfg, clickhouse.SchemaTypeAdapter{})
- lm := clickhouse.NewEmptyLogManager(cfg, connectionPool, phoneHomeAgent, schemaLoader)
+ connManager := connectors.NewConnectorManager(cfg, connectionPool, phoneHomeAgent, schemaLoader)
+ lm := connManager.GetConnector()
+
im := elasticsearch.NewIndexManagement(cfg.Elasticsearch.Url.String())
logger.Info().Msgf("loaded config: %s", cfg.String())
diff --git a/quesma/quesma/config/config.go b/quesma/quesma/config/config.go
index a3dfee442..9fc2c85fa 100644
--- a/quesma/quesma/config/config.go
+++ b/quesma/quesma/config/config.go
@@ -12,7 +12,6 @@ import (
"github.com/rs/zerolog"
"log"
"os"
- "quesma/buildinfo"
"quesma/elasticsearch/elasticsearch_field_types"
"quesma/index"
"quesma/network"
@@ -30,9 +29,14 @@ var (
)
type QuesmaConfiguration struct {
- Mode operationMode `koanf:"mode"`
- LicenseKey string `koanf:"licenseKey"`
- ClickHouse RelationalDbConfiguration `koanf:"clickhouse"`
+ // both clickhouse and hydrolix connections are going to be deprecated and everything is going to live under connector
+ Connectors map[string]RelationalDbConfiguration `koanf:"connectors"`
+ Mode operationMode `koanf:"mode"`
+ InstallationId string `koanf:"installationId"`
+ LicenseKey string `koanf:"licenseKey"`
+ //deprecated
+ ClickHouse RelationalDbConfiguration `koanf:"clickhouse"`
+ //deprecated
Hydrolix RelationalDbConfiguration `koanf:"hydrolix"`
Elasticsearch ElasticsearchConfiguration `koanf:"elasticsearch"`
IndexConfig map[string]IndexConfiguration `koanf:"indexes"`
@@ -50,11 +54,13 @@ type LoggingConfiguration struct {
}
type RelationalDbConfiguration struct {
- Url *Url `koanf:"url"`
- User string `koanf:"user"`
- Password string `koanf:"password"`
- Database string `koanf:"database"`
- AdminUrl *Url `koanf:"adminUrl"`
+ //ConnectorName string `koanf:"name"`
+ ConnectorType string `koanf:"type"`
+ Url *Url `koanf:"url"`
+ User string `koanf:"user"`
+ Password string `koanf:"password"`
+ Database string `koanf:"database"`
+ AdminUrl *Url `koanf:"adminUrl"`
}
func (c *RelationalDbConfiguration) IsEmpty() bool {
@@ -120,7 +126,6 @@ func Load() QuesmaConfiguration {
}
}
}
- config.configureLicenseKey()
return config
}
@@ -148,6 +153,14 @@ func (c *QuesmaConfiguration) Validate() error {
if c.PublicTcpPort == 0 { // unmarshalling defaults to 0 if not present
result = multierror.Append(result, fmt.Errorf("specifying Quesma TCP port for incoming traffic is required"))
}
+ if len(c.Connectors) != 1 {
+ result = multierror.Append(result, fmt.Errorf("at this moment Quesma supports **exactly** one connector"))
+ }
+ //for _, conn := range c.Connectors {
+ // if conn.Url == nil {
+ // result = multierror.Append(result, fmt.Errorf("connector %s requires setting the URL", conn.ConnectorType))
+ // }
+ //}
if c.ClickHouse.Url == nil && c.Hydrolix.Url == nil {
result = multierror.Append(result, fmt.Errorf("clickHouse or hydrolix URL is required"))
}
@@ -200,30 +213,6 @@ func (c *QuesmaConfiguration) validateIndexName(indexName string, result error)
return result
}
-func MaskLicenseKey(licenseKey string) string {
- if len(licenseKey) > 4 {
- return "****" + licenseKey[len(licenseKey)-4:]
- } else {
- return "****"
- }
-}
-
-func (c *QuesmaConfiguration) configureLicenseKey() {
- // This condition implies that we're dealing with customer-specific build,
- // which has license key injected at the build time via ldflags, see `docs/private-beta-releases.md`
- if buildinfo.LicenseKey != buildinfo.DevelopmentLicenseKey && buildinfo.LicenseKey != "" {
- // `buildinfo.LicenseKey` can be injected at the build time, don't get fooled by the IDE warning above
- fmt.Printf("Using license key from build: %s\n", MaskLicenseKey(buildinfo.LicenseKey))
- c.LicenseKey = buildinfo.LicenseKey
- return
- } else if c.LicenseKey != "" { // In case of **any other** setup, we fall back to what's been configured by user (==config or env vars)
- fmt.Printf("Using license key from configuration: %s\n", MaskLicenseKey(c.LicenseKey))
- return
- } else {
- log.Fatalf("missing license key. Quiting...")
- }
-}
-
func (c *QuesmaConfiguration) ReadsFromClickhouse() bool {
return c.Mode == DualWriteQueryClickhouse || c.Mode == DualWriteQueryClickhouseFallback ||
c.Mode == DualWriteQueryClickhouseVerify || c.Mode == ClickHouse
@@ -260,22 +249,37 @@ func (c *QuesmaConfiguration) String() string {
if c.Elasticsearch.Password != "" {
elasticsearchExtra += "\n Elasticsearch password: ***"
}
-
clickhouseUrl := ""
- if c.ClickHouse.Url != nil {
- clickhouseUrl = c.ClickHouse.Url.String()
- }
-
clickhouseExtra := ""
if c.ClickHouse.User != "" {
clickhouseExtra = fmt.Sprintf("\n ClickHouse user: %s", c.ClickHouse.User)
}
+ if c.ClickHouse.Url != nil {
+ clickhouseUrl = c.ClickHouse.Url.String()
+ }
if c.ClickHouse.Password != "" {
clickhouseExtra += "\n ClickHouse password: ***"
}
if c.ClickHouse.Database != "" {
clickhouseExtra += fmt.Sprintf("\n ClickHouse database: %s", c.ClickHouse.Database)
}
+ var connectorString strings.Builder
+ for connName, conn := range c.Connectors {
+ connectorString.WriteString(fmt.Sprintf("\n - [%s] connector", connName))
+ connectorString.WriteString(fmt.Sprintf("\n Type: %s", conn.ConnectorType))
+ if conn.Url != nil {
+ connectorString.WriteString(fmt.Sprintf("\n Url: %s", conn.Url.String()))
+ }
+ if conn.User != "" {
+ connectorString.WriteString(fmt.Sprintf("\n User: %s", conn.User))
+ }
+ if conn.Password != "" {
+ connectorString.WriteString("\n Password: ***")
+ }
+ if conn.Database != "" {
+ connectorString.WriteString(fmt.Sprintf("\n Database: %s", conn.Database))
+ }
+ }
quesmaInternalTelemetryUrl := "disabled"
if c.QuesmaInternalTelemetryUrl != nil {
quesmaInternalTelemetryUrl = c.QuesmaInternalTelemetryUrl.String()
@@ -284,7 +288,8 @@ func (c *QuesmaConfiguration) String() string {
Quesma Configuration:
Mode: %s
Elasticsearch URL: %s%s
- ClickHouse URL: %s%s
+ ClickhouseUrl: %s%s
+ Connectors: %s
Call Elasticsearch: %v
Indexes: %s
Logs Path: %s
@@ -297,6 +302,7 @@ Quesma Configuration:
elasticsearchExtra,
clickhouseUrl,
clickhouseExtra,
+ connectorString.String(),
c.Elasticsearch.Call,
indexConfigs,
c.Logging.Path,
diff --git a/quesma/quesma/config/test_config.yaml b/quesma/quesma/config/test_config.yaml
index 8ed1537b8..10984d57f 100644
--- a/quesma/quesma/config/test_config.yaml
+++ b/quesma/quesma/config/test_config.yaml
@@ -5,6 +5,9 @@ port: 8080 # public tcp port to listen for incoming traffic
elasticsearch:
url: "http://localhost:9200"
call: false
+connectors:
+ my-clickhouse-connector:
+ type: "clickhouse"
clickhouse:
url: "clickhouse://localhost:9000"
ingestStatistics: true
diff --git a/quesma/quesma/ui/live_tail.go b/quesma/quesma/ui/live_tail.go
index f5862d637..4e635ab7b 100644
--- a/quesma/quesma/ui/live_tail.go
+++ b/quesma/quesma/ui/live_tail.go
@@ -72,7 +72,7 @@ document.body.addEventListener('htmx:afterSwap', function(event) {
buffer.Write(qmc.generateUnsupportedQuerySidePanel())
// Don't get foiled by warning, this detects whether it's our development Quesma
- if buildinfo.LicenseKey == buildinfo.DevelopmentLicenseKey || buildinfo.LicenseKey == "" {
+ if buildinfo.Version == "development" {
buffer.Html(`Useful links
`)
buffer.Html(``)
buffer.Html(`- Kibana Log Explorer
`)
diff --git a/quesma/telemetry/headers/header.go b/quesma/telemetry/headers/header.go
index db6f37be5..63c609772 100644
--- a/quesma/telemetry/headers/header.go
+++ b/quesma/telemetry/headers/header.go
@@ -4,3 +4,4 @@ package telemetry_headers
// XTelemetryRemoteLog Used to inform telemetry endpoint that the payload contains logs
const XTelemetryRemoteLog = "X-Telemetry-Remote-Log"
+const ClientId = "X-Client-Id"
diff --git a/quesma/telemetry/phone_home.go b/quesma/telemetry/phone_home.go
index ab6d4855c..9d740d320 100644
--- a/quesma/telemetry/phone_home.go
+++ b/quesma/telemetry/phone_home.go
@@ -16,11 +16,13 @@ import (
"os"
"quesma/buildinfo"
"quesma/health"
- "quesma/license"
+ telemetry_headers "quesma/telemetry/headers"
+
"quesma/logger"
"quesma/quesma/config"
"quesma/quesma/recovery"
"quesma/stats/errorstats"
+
"runtime"
"strings"
"time"
@@ -129,6 +131,7 @@ type agent struct {
clickHouseDb *sql.DB
config config.QuesmaConfiguration
+ clientId string
instanceId string
statedAt time.Time
@@ -172,7 +175,7 @@ func hostname() string {
return name
}
-func NewPhoneHomeAgent(configuration config.QuesmaConfiguration, clickHouseDb *sql.DB) PhoneHomeAgent {
+func NewPhoneHomeAgent(configuration config.QuesmaConfiguration, clickHouseDb *sql.DB, clientId string) PhoneHomeAgent {
// TODO
// this is a question, maybe we should inherit context from the caller
@@ -187,6 +190,7 @@ func NewPhoneHomeAgent(configuration config.QuesmaConfiguration, clickHouseDb *s
instanceId: generateInstanceID(),
clickHouseDb: clickHouseDb,
config: configuration,
+ clientId: clientId,
clickHouseQueryTimes: newDurationMeasurement(ctx),
clickHouseInsertsTimes: newDurationMeasurement(ctx),
elasticReadTimes: newDurationMeasurement(ctx),
@@ -552,7 +556,7 @@ func (a *agent) phoneHomeRemoteEndpoint(ctx context.Context, body []byte) (err e
}
request.Header.Set("Content-Type", "application/json")
request.Header.Set("User-Agent", "quesma/"+buildinfo.Version)
- request.Header.Set(license.Header, a.config.LicenseKey)
+ request.Header.Set(telemetry_headers.ClientId, a.clientId)
resp, err := a.httpClient.Do(request)
if err != nil {
@@ -587,7 +591,7 @@ func (a *agent) phoneHomeLocalQuesma(ctx context.Context, body []byte) (err erro
}
request.Header.Set("Content-Type", "application/json")
request.Header.Set("User-Agent", "quesma/"+buildinfo.Version)
- request.Header.Set(license.Header, a.config.LicenseKey)
+ request.Header.Set(telemetry_headers.ClientId, a.clientId)
resp, err := a.httpClient.Do(request)
if err != nil {