Skip to content

Commit

Permalink
No ingest setup (#815)
Browse files Browse the repository at this point in the history
Changes:
- we can add  a single query pipeline now
- ingests attempts will end up with an error
- common table will not be created on the startup

---------

Signed-off-by: Przemysław Hejman <[email protected]>
Co-authored-by: Przemysław Hejman <[email protected]>
Co-authored-by: Piotr Grabowski <[email protected]>
  • Loading branch information
3 people authored Sep 30, 2024
1 parent 11ea050 commit bdeaade
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 14 deletions.
1 change: 1 addition & 0 deletions quesma/end_user_errors/end_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ var ErrExpectedNDJSON = errorType(1002, "Invalid request body. We're expecting N
var ErrSearchCondition = errorType(2001, "Not supported search condition.")
var ErrNoSuchTable = errorType(2002, "Missing table.")
var ErrNoSuchSchema = errorType(2003, "Missing schema.")
var ErrNoIngest = errorType(2004, "Ingest is not enabled.")

var ErrDatabaseTableNotFound = errorType(3001, "Table not found in database.")
var ErrDatabaseFieldNotFound = errorType(3002, "Field not found in database.")
Expand Down
17 changes: 11 additions & 6 deletions quesma/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,18 +85,23 @@ func main() {
phoneHomeAgent.Start()

virtualTableStorage := persistence.NewElasticJSONDatabase(cfg.Elasticsearch, common_table.VirtualTableElasticIndexName)

tableDisco := clickhouse.NewTableDiscovery(&cfg, connectionPool, virtualTableStorage)
schemaRegistry := schema.NewSchemaRegistry(clickhouse.TableDiscoveryTableProviderAdapter{TableDiscovery: tableDisco}, &cfg, clickhouse.SchemaTypeAdapter{})

connManager := connectors.NewConnectorManager(&cfg, connectionPool, phoneHomeAgent, tableDisco)
lm := connManager.GetConnector()

// Ensure common table exists. This table have to be created before ingest processor starts
common_table.EnsureCommonTableExists(connectionPool)
var ingestProcessor *ingest.IngestProcessor

if cfg.EnableIngest {
// Ensure common table exists. This table have to be created before ingest processor starts
common_table.EnsureCommonTableExists(connectionPool)

ingestProcessor = ingest.NewEmptyIngestProcessor(&cfg, connectionPool, phoneHomeAgent, tableDisco, schemaRegistry, virtualTableStorage)
} else {
logger.Info().Msg("Ingest processor is disabled.")
}

//create ingest processor, very lame but for the sake of refactor
ip := ingest.NewEmptyIngestProcessor(&cfg, connectionPool, phoneHomeAgent, tableDisco, schemaRegistry, virtualTableStorage)
im := elasticsearch.NewIndexManagement(cfg.Elasticsearch.Url.String())

logger.Info().Msgf("loaded config: %s", cfg.String())
Expand All @@ -106,7 +111,7 @@ func main() {
abTestingController := sender.NewSenderCoordinator(&cfg)
abTestingController.Start()

instance := constructQuesma(&cfg, tableDisco, lm, ip, im, schemaRegistry, phoneHomeAgent, quesmaManagementConsole, qmcLogChannel, abTestingController.GetSender())
instance := constructQuesma(&cfg, tableDisco, lm, ingestProcessor, im, schemaRegistry, phoneHomeAgent, quesmaManagementConsole, qmcLogChannel, abTestingController.GetSender())
instance.Start()

<-doneCh
Expand Down
2 changes: 2 additions & 0 deletions quesma/quesma/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ type QuesmaConfiguration struct {
QuesmaInternalTelemetryUrl *Url `koanf:"internalTelemetryUrl"`
DisableAuth bool `koanf:"disableAuth"`
AutodiscoveryEnabled bool

EnableIngest bool // this is computed from the configuration 2.0
}

type LoggingConfiguration struct {
Expand Down
54 changes: 50 additions & 4 deletions quesma/quesma/config/config_v2.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,6 @@ func (c *QuesmaNewConfiguration) validatePipelines() error {
return fmt.Errorf("single pipeline Quesma can only be used for querying, but the processor is not of query type")
}

// We have a correct query-only pipeline, but we haven't yet implemented the case of disabling ingest...
return fmt.Errorf("single pipeline Quesma with querying (ingest disabled) is not supported at this moment")
} else {
return fmt.Errorf(fmt.Sprintf("frontend connector named [%s] referred in pipeline [%s] not found in configuration", fcName, c.Pipelines[0].Name))
}
Expand Down Expand Up @@ -456,24 +454,68 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {
conf.DisableAuth = true
}
}

conf.Logging = c.Logging
conf.InstallationId = c.InstallationId
conf.LicenseKey = c.LicenseKey
conf.IngestStatistics = c.IngestStatistics

conf.AutodiscoveryEnabled = false
conf.Connectors = make(map[string]RelationalDbConfiguration)
relDBConn, connType, relationalDBErr := c.getRelationalDBConf()

isSinglePipeline, isDualPipeline := c.getPipelinesType()

if isSinglePipeline {
procType := c.getProcessorByName(c.Pipelines[0].Processors[0]).Type
processor := c.getProcessorByName(c.Pipelines[0].Processors[0])
procType := processor.Type
if procType == QuesmaV1ProcessorNoOp {
conf.TransparentProxy = true
} else if procType == QuesmaV1ProcessorQuery {

queryProcessor := processor

// this a COPY-PASTE from the dual pipeline case, but we need to do it here as well
// TODO refactor this to a separate function

elasticBackendName := c.getElasticsearchBackendConnector().Name
var relationalDBBackendName string
if relationalDBBackend, _ := c.getRelationalDBBackendConnector(); relationalDBBackend != nil {
relationalDBBackendName = relationalDBBackend.Name
}

conf.IndexConfig = make(map[string]IndexConfiguration)
for indexName, indexConfig := range queryProcessor.Config.IndexConfig {
processedConfig := indexConfig
processedConfig.Name = indexName

if slices.Contains(indexConfig.Target, elasticBackendName) {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, ElasticsearchTarget)
}
if slices.Contains(indexConfig.Target, relationalDBBackendName) {
processedConfig.QueryTarget = append(processedConfig.QueryTarget, ClickhouseTarget)
}

if len(indexConfig.QueryTarget) == 2 && !(indexConfig.QueryTarget[0] == ClickhouseTarget && indexConfig.QueryTarget[1] == ElasticsearchTarget) {
errAcc = multierror.Append(errAcc, fmt.Errorf("index %s has invalid dual query target configuration - when you specify two targets, Elastic has to be the primary one and ClickHouse has to be the secondary one", indexName))
continue
}
if len(indexConfig.QueryTarget) == 2 {
// Turn on A/B testing
processedConfig.Optimizers = make(map[string]OptimizerConfiguration)
processedConfig.Optimizers["elastic_ab_testing"] = OptimizerConfiguration{
Disabled: false,
Properties: map[string]string{},
}
}

conf.IndexConfig[indexName] = processedConfig
}

} else {
errAcc = multierror.Append(errAcc, fmt.Errorf("unsupported processor %s in single pipeline", procType))
}
}

if isDualPipeline {
fc1 := c.getFrontendConnectorByName(c.Pipelines[0].FrontendConnectors[0])
var queryPipeline, ingestPipeline Pipeline
Expand Down Expand Up @@ -522,6 +564,10 @@ func (c *QuesmaNewConfiguration) TranslateToLegacyConfig() QuesmaConfiguration {

conf.IndexConfig[indexName] = processedConfig
}

conf.EnableIngest = true
conf.IngestStatistics = true

for indexName, indexConfig := range ingestProcessor.Config.IndexConfig {
processedConfig, found := conf.IndexConfig[indexName]
if !found {
Expand Down
24 changes: 24 additions & 0 deletions quesma/quesma/config/config_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,30 @@ func TestQuesmaAddingHydrolixTablesToExistingElasticsearch(t *testing.T) {

assert.Equal(t, []string{"clickhouse"}, logsIndexConf.QueryTarget)
assert.Equal(t, []string{"elasticsearch"}, logsIndexConf.IngestTarget)

}

func TestQuesmaHydrolixQueryOnly(t *testing.T) {
os.Setenv(configFileLocationEnvVar, "./test_configs/quesma_hydrolix_tables_query_only.yaml")
cfg := LoadV2Config()
if err := cfg.Validate(); err != nil {
t.Fatalf("error validating config: %v", err)
}
legacyConf := cfg.TranslateToLegacyConfig()
assert.False(t, legacyConf.TransparentProxy)
assert.Equal(t, 2, len(legacyConf.IndexConfig))

siemIndexConf, ok := legacyConf.IndexConfig["siem"]
assert.True(t, ok)
logsIndexConf, ok := legacyConf.IndexConfig["logs"]
assert.True(t, ok)

assert.Equal(t, []string{"clickhouse"}, siemIndexConf.QueryTarget)

assert.Equal(t, []string{"clickhouse"}, logsIndexConf.QueryTarget)

assert.Equal(t, false, legacyConf.EnableIngest)
assert.Equal(t, false, legacyConf.IngestStatistics)
}

func TestMatchName(t *testing.T) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# Use case:
# * user has some indices in Elasticsearch
# * user has two tables in Hydrolix named `siem` and `logs`
#
# User wants to see those two Hydrolix tables as Elasticsearch indices in Kibana
# User wants to see all their Elasticsearch indices in Kibana as they were before
#
# ( ingest is not the case here, but the config has to be present due to impl. constraints )

logging:
level: info
frontendConnectors:
- name: elastic-ingest
type: elasticsearch-fe-ingest
config:
listenPort: 8080
- name: elastic-query
type: elasticsearch-fe-query
config:
listenPort: 8080
backendConnectors:
- name: my-minimal-elasticsearch
type: elasticsearch
config:
url: "http://elasticsearch:9200"
user: elastic
password: quesmaquesma
- name: my-hydrolix-instance
type: hydrolix
config:
url: "clickhouse://localhost:9000"
user: "u"
password: "p"
database: "d"
ingestStatistics: true
processors:
- name: my-query-processor
type: quesma-v1-processor-query
config:
indexes:
siem:
target: [my-hydrolix-instance]
logs:
target: [my-hydrolix-instance]
pipelines:
- name: my-elasticsearch-proxy-read
frontendConnectors: [ elastic-query ]
processors: [ my-query-processor ]
backendConnectors: [ my-minimal-elasticsearch, my-hydrolix-instance ]
24 changes: 23 additions & 1 deletion quesma/quesma/functionality/bulk/bulk.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net/http"
"quesma/clickhouse"
"quesma/elasticsearch"
"quesma/end_user_errors"
"quesma/ingest"
"quesma/jsonprocessor"
"quesma/logger"
Expand All @@ -19,6 +20,8 @@ import (
"quesma/quesma/types"
"quesma/stats"
"quesma/telemetry"
"sort"
"strings"
"sync"
)

Expand Down Expand Up @@ -78,12 +81,31 @@ func Write(ctx context.Context, defaultIndex *string, bulk types.NDJSON, ip *ing
return []BulkItem{}, err
}

// we fail if there are some documents to insert into Clickhouse but ingest processor is not available
if len(clickhouseDocumentsToInsert) > 0 && ip == nil {

indexes := make(map[string]struct{})
for index := range clickhouseDocumentsToInsert {
indexes[index] = struct{}{}
}

indexesAsList := make([]string, 0, len(indexes))
for index := range indexes {
indexesAsList = append(indexesAsList, index)
}
sort.Strings(indexesAsList)

return []BulkItem{}, end_user_errors.ErrNoIngest.New(fmt.Errorf("ingest processor is not available, but documents are targeted to Clickhouse indexes: %s", strings.Join(indexesAsList, ",")))
}

err = sendToElastic(elasticRequestBody, cfg, elasticBulkEntries)
if err != nil {
return []BulkItem{}, err
}

sendToClickhouse(ctx, clickhouseDocumentsToInsert, phoneHomeAgent, cfg, ip)
if ip != nil {
sendToClickhouse(ctx, clickhouseDocumentsToInsert, phoneHomeAgent, cfg, ip)
}

return results, nil
}
Expand Down
6 changes: 6 additions & 0 deletions quesma/quesma/functionality/doc/doc.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,15 @@ import (

func Write(ctx context.Context, tableName *string, body types.JSON, ip *ingest.IngestProcessor, cfg *config.QuesmaConfiguration, phoneHomeAgent telemetry.PhoneHomeAgent) (bulk.BulkItem, error) {
// Translate single doc write to a bulk request, reusing exiting logic of bulk ingest

results, err := bulk.Write(ctx, tableName, []types.JSON{
map[string]interface{}{"index": map[string]interface{}{"_index": *tableName}},
body,
}, ip, cfg, phoneHomeAgent)

if err != nil {
return bulk.BulkItem{}, err
}

return results[0], err
}
51 changes: 48 additions & 3 deletions quesma/quesma/router.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"net/http"
"quesma/clickhouse"
"quesma/elasticsearch"
"quesma/end_user_errors"
"quesma/ingest"
"quesma/logger"
"quesma/queryparser"
Expand All @@ -28,6 +29,7 @@ import (
"quesma/tracing"
"regexp"
"strings"
"sync"
"time"
)

Expand All @@ -50,7 +52,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
}

results, err := bulk.Write(ctx, nil, body, ip, cfg, phoneHomeAgent)
return bulkInsertResult(results, err)
return bulkInsertResult(ctx, results, err)
})

router.Register(routes.IndexRefreshPath, and(method("POST"), matchedExactQueryPath(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
Expand Down Expand Up @@ -88,7 +90,7 @@ func configureRouter(cfg *config.QuesmaConfiguration, sr schema.Registry, lm *cl
}

results, err := bulk.Write(ctx, &index, body, ip, cfg, phoneHomeAgent)
return bulkInsertResult(results, err)
return bulkInsertResult(ctx, results, err)
})

router.Register(routes.ResolveIndexPath, method("GET"), func(ctx context.Context, req *mux.Request) (*mux.Result, error) {
Expand Down Expand Up @@ -375,7 +377,50 @@ func elasticsearchQueryResult(body string, statusCode int) *mux.Result {
}, StatusCode: statusCode}
}

func bulkInsertResult(ops []bulk.BulkItem, err error) (*mux.Result, error) {
var ingestWarning sync.Once

func noIngestEnabledButThereIngestRequest() {
logger.Error().Msgf("Ingest is disabled by configuration, but the request is trying to ingest data. ")
}

func bulkInsertResult(ctx context.Context, ops []bulk.BulkItem, err error) (*mux.Result, error) {

if err != nil {
var msg string
var reason string
var httpCode int

var endUserError *end_user_errors.EndUserError
if errors.As(err, &endUserError) {
msg = string(queryparser.InternalQuesmaError(endUserError.EndUserErrorMessage()))
reason = endUserError.Reason()
httpCode = http.StatusInternalServerError

if endUserError.ErrorType().Number == end_user_errors.ErrNoIngest.Number {
// agents have no mercy, they will try again, and again
// we should log this error once
ingestWarning.Do(noIngestEnabledButThereIngestRequest)
}

} else {
msg = string(queryparser.BadRequestParseError(err))
reason = err.Error()
httpCode = http.StatusBadRequest
}

// ingest can be noisy, so we can enable debug logs here
var logEveryIngestError bool

if logEveryIngestError {
logger.ErrorWithCtxAndReason(ctx, reason).Msgf("Bulk insert error: %v", err)
}

return &mux.Result{
Body: msg,
StatusCode: httpCode,
}, nil
}

if err != nil {
return &mux.Result{
Body: string(queryparser.BadRequestParseError(err)),
Expand Down

0 comments on commit bdeaade

Please sign in to comment.