Skip to content

Commit

Permalink
Use JSON or NDJSON types instead of []byte (#175)
Browse files Browse the repository at this point in the history
This is part of the global change: Every `map[string]interface{}`,
`[]byte` and `string` that holds parsed JSON will be replaced with type
`JSON`.

This PR contains changes in the ingest part only. `search.go` is crowded
now.

Move types `JSON` and etc to the more generic `types` will done in next
PR.
  • Loading branch information
nablaone authored May 21, 2024
1 parent e0efff2 commit b439adc
Show file tree
Hide file tree
Showing 17 changed files with 416 additions and 200 deletions.
40 changes: 21 additions & 19 deletions quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"mitmproxy/quesma/jsonprocessor"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/quesma/config"
"mitmproxy/quesma/quesma/mux"
"mitmproxy/quesma/quesma/recovery"
"mitmproxy/quesma/telemetry"
"mitmproxy/quesma/util"
Expand Down Expand Up @@ -274,20 +275,15 @@ func (lm *LogManager) ProcessCreateTableQuery(ctx context.Context, query string,
return lm.sendCreateTableQuery(ctx, addOurFieldsToCreateTableQuery(query, config, table))
}

func buildCreateTableQueryNoOurFields(ctx context.Context, tableName, jsonData string, config *ChTableConfig) (string, error) {
m := make(SchemaMap)
err := json.Unmarshal([]byte(jsonData), &m)
if err != nil {
logger.ErrorWithCtx(ctx).Msgf("can't unmarshall, json: %s\nerr:%v", jsonData, err)
return "", err
}
func buildCreateTableQueryNoOurFields(ctx context.Context, tableName string, jsonData mux.JSON, config *ChTableConfig) (string, error) {

createTableCmd := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS "%s"
(
%s
)
%s
COMMENT 'created by Quesma'`,
tableName, FieldsMapToCreateTableString("", m, 1, config)+Indexes(m),
tableName, FieldsMapToCreateTableString("", jsonData, 1, config)+Indexes(jsonData),
config.CreateTablePostFieldsString())
return createTableCmd, nil
}
Expand All @@ -306,7 +302,7 @@ func Indexes(m SchemaMap) string {
return result.String()
}

func (lm *LogManager) CreateTableFromInsertQuery(ctx context.Context, name, jsonData string, config *ChTableConfig) error {
func (lm *LogManager) CreateTableFromInsertQuery(ctx context.Context, name string, jsonData mux.JSON, config *ChTableConfig) error {
// TODO fix lm.AddTableIfDoesntExist(name, jsonData)

query, err := buildCreateTableQueryNoOurFields(ctx, name, jsonData, config)
Expand All @@ -321,7 +317,17 @@ func (lm *LogManager) CreateTableFromInsertQuery(ctx context.Context, name, json
return nil
}

func (lm *LogManager) BuildInsertJson(tableName, js string, config *ChTableConfig) (string, error) {
// TODO
// This method should be refactored to use mux.JSON instead of string
func (lm *LogManager) BuildInsertJson(tableName string, data mux.JSON, config *ChTableConfig) (string, error) {

jsonData, err := json.Marshal(data)

if err != nil {
return "", err
}
js := string(jsonData)

if !config.hasOthers && len(config.attributes) == 0 {
return js, nil
}
Expand Down Expand Up @@ -383,7 +389,7 @@ func (lm *LogManager) BuildInsertJson(tableName, js string, config *ChTableConfi
return fmt.Sprintf("{%s%s%s", nonSchemaStr, comma, schemaFieldsJson[1:]), nil
}

func (lm *LogManager) GetOrCreateTableConfig(ctx context.Context, tableName, jsonData string) (*ChTableConfig, error) {
func (lm *LogManager) GetOrCreateTableConfig(ctx context.Context, tableName string, jsonData mux.JSON) (*ChTableConfig, error) {
table := lm.FindTable(tableName)
var config *ChTableConfig
if table == nil {
Expand All @@ -406,15 +412,15 @@ func (lm *LogManager) GetOrCreateTableConfig(ctx context.Context, tableName, jso
return config, nil
}

func (lm *LogManager) ProcessInsertQuery(ctx context.Context, tableName string, jsonData []string) error {
func (lm *LogManager) ProcessInsertQuery(ctx context.Context, tableName string, jsonData []mux.JSON) error {
if config, err := lm.GetOrCreateTableConfig(ctx, tableName, jsonData[0]); err != nil {
return err
} else {
return lm.Insert(ctx, tableName, jsonData, config)
}
}

func (lm *LogManager) Insert(ctx context.Context, tableName string, jsons []string, config *ChTableConfig) error {
func (lm *LogManager) Insert(ctx context.Context, tableName string, jsons []mux.JSON, config *ChTableConfig) error {
var jsonsReadyForInsertion []string
for _, jsonValue := range jsons {
preprocessedJson := preprocess(jsonValue, NestedSeparator)
Expand Down Expand Up @@ -612,10 +618,6 @@ func (c *ChTableConfig) GetAttributes() []Attribute {
return c.attributes
}

func preprocess(jsonStr string, nestedSeparator string) string {
var data map[string]interface{}
_ = json.Unmarshal([]byte(jsonStr), &data)

resultJSON, _ := json.Marshal(jsonprocessor.FlattenMap(data, nestedSeparator))
return string(resultJSON)
func preprocess(data mux.JSON, nestedSeparator string) mux.JSON {
return jsonprocessor.FlattenMap(data, nestedSeparator)
}
5 changes: 3 additions & 2 deletions quesma/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"mitmproxy/quesma/concurrent"
"mitmproxy/quesma/quesma/config"
"mitmproxy/quesma/quesma/mux"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -45,7 +46,7 @@ func TestInsertNonSchemaFieldsToOthers_1(t *testing.T) {

f := func(t1, t2 TableMap) {
lm := NewLogManager(fieldsMap, config.QuesmaConfiguration{})
j, err := lm.BuildInsertJson("tableName", rowToInsert, hasOthersConfig)
j, err := lm.BuildInsertJson("tableName", mux.MustJSON(rowToInsert), hasOthersConfig)
assert.NoError(t, err)
m := make(SchemaMap)
err = json.Unmarshal([]byte(j), &m)
Expand Down Expand Up @@ -117,7 +118,7 @@ func TestAddTimestamp(t *testing.T) {
castUnsupportedAttrValueTypesToString: false,
preferCastingToOthers: false,
}
query, err := buildCreateTableQueryNoOurFields(context.Background(), "tableName", `{"host.name":"hermes","message":"User password reset requested","service.name":"queue","severity":"info","source":"azure"}`, config)
query, err := buildCreateTableQueryNoOurFields(context.Background(), "tableName", mux.MustJSON(`{"host.name":"hermes","message":"User password reset requested","service.name":"queue","severity":"info","source":"azure"}`), config)
assert.NoError(t, err)
assert.True(t, strings.Contains(query, timestampFieldName))
}
Expand Down
10 changes: 6 additions & 4 deletions quesma/clickhouse/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"mitmproxy/quesma/concurrent"
"mitmproxy/quesma/quesma/config"
"mitmproxy/quesma/quesma/mux"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -143,7 +144,8 @@ func TestAutomaticTableCreationAtInsert(t *testing.T) {
for index2, config := range configs {
for index3, lm := range logManagers(config) {
t.Run("case insertTest["+strconv.Itoa(index1)+"], config["+strconv.Itoa(index2)+"], logManager["+strconv.Itoa(index3)+"]", func(t *testing.T) {
query, err := buildCreateTableQueryNoOurFields(context.Background(), tableName, tt.insertJson, config)

query, err := buildCreateTableQueryNoOurFields(context.Background(), tableName, mux.MustJSON(tt.insertJson), config)
assert.NoError(t, err)
table, err := NewTable(query, config)
assert.NoError(t, err)
Expand Down Expand Up @@ -222,7 +224,7 @@ func TestProcessInsertQuery(t *testing.T) {
mock.ExpectExec(expectedInserts[2*index1+1]).WillReturnResult(sqlmock.NewResult(1, 1))
}

err = lm.lm.ProcessInsertQuery(ctx, tableName, []string{tt.insertJson})
err = lm.lm.ProcessInsertQuery(ctx, tableName, []mux.JSON{mux.MustJSON(tt.insertJson)})
assert.NoError(t, err)
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
Expand Down Expand Up @@ -254,7 +256,7 @@ func TestInsertVeryBigIntegers(t *testing.T) {
mock.ExpectExec(`CREATE TABLE IF NOT EXISTS "` + tableName).WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec(expectedInsertJsons[i]).WillReturnResult(sqlmock.NewResult(0, 0))

err = lm.ProcessInsertQuery(context.Background(), tableName, []string{fmt.Sprintf(`{"severity":"sev","int": %s}`, bigInt)})
err = lm.ProcessInsertQuery(context.Background(), tableName, []mux.JSON{mux.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %s}`, bigInt))})
assert.NoError(t, err)
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
Expand Down Expand Up @@ -286,7 +288,7 @@ func TestInsertVeryBigIntegers(t *testing.T) {

bigIntAsInt, _ := strconv.ParseInt(bigInt, 10, 64)
fmt.Printf(`{"severity":"sev","int": %d}\n`, bigIntAsInt)
err = lm.ProcessInsertQuery(context.Background(), tableName, []string{fmt.Sprintf(`{"severity":"sev","int": %d}`, bigIntAsInt)})
err = lm.ProcessInsertQuery(context.Background(), tableName, []mux.JSON{mux.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %d}`, bigIntAsInt))})
assert.NoError(t, err)
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
Expand Down
88 changes: 49 additions & 39 deletions quesma/proxy/l4_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package proxy

import (
"context"
"encoding/json"
"fmt"
"io"
"log"
Expand All @@ -11,11 +10,11 @@ import (
"mitmproxy/quesma/logger"
"mitmproxy/quesma/network"
"mitmproxy/quesma/quesma/config"
"mitmproxy/quesma/quesma/mux"
"mitmproxy/quesma/stats"
"mitmproxy/quesma/util"
"net"
"net/http"
"strings"
"sync/atomic"
"time"
)
Expand Down Expand Up @@ -59,24 +58,69 @@ func configureRouting() *http.ServeMux {
configuration := config.QuesmaConfiguration{IndexConfig: map[string]config.IndexConfiguration{"_all": {Name: "_all", Enabled: true}}}
router.HandleFunc("POST /{index}/_doc", util.BodyHandler(func(body []byte, writer http.ResponseWriter, r *http.Request) {
index := r.PathValue("index")

parsedBody := mux.ParseRequestBody(string(body))
var jsonBody mux.JSON
switch b := parsedBody.(type) {
case mux.JSON:
jsonBody = b
default:
logger.Error().Msgf("Invalid JSON body: %v", parsedBody)
return
}

if !elasticsearch.IsInternalIndex(index) {
stats.GlobalStatistics.Process(configuration, index, string(body), clickhouse.NestedSeparator)
stats.GlobalStatistics.Process(configuration, index, jsonBody, clickhouse.NestedSeparator)
}
}))

router.HandleFunc("POST /{index}/_bulk", util.BodyHandler(func(body []byte, writer http.ResponseWriter, r *http.Request) {
index := r.PathValue("index")

parsedBody := mux.ParseRequestBody(string(body))
var jsonBody mux.JSON
switch b := parsedBody.(type) {
case mux.JSON:
jsonBody = b
default:
logger.Error().Msgf("Invalid JSON body: %v", parsedBody)
return
}

if !elasticsearch.IsInternalIndex(index) {
stats.GlobalStatistics.Process(configuration, index, string(body), clickhouse.NestedSeparator)
stats.GlobalStatistics.Process(configuration, index, jsonBody, clickhouse.NestedSeparator)
}
}))

router.HandleFunc("POST /_bulk", util.BodyHandler(func(body []byte, writer http.ResponseWriter, r *http.Request) {
forEachInBulk(string(body), func(index string, document string) {

parsedBody := mux.ParseRequestBody(string(body))
var ndjson mux.NDJSON
switch b := parsedBody.(type) {
case mux.NDJSON:
ndjson = b
default:
logger.Error().Msgf("Invalid JSON body: %v", parsedBody)
return
}

err := ndjson.BulkForEach(func(operation mux.BulkOperation, document mux.JSON) {

index := operation.GetIndex()
if index == "" {
logger.Error().Msg("No index in operation")
return
}

if !elasticsearch.IsInternalIndex(index) {
stats.GlobalStatistics.Process(configuration, index, document, clickhouse.NestedSeparator)
}
})

if err != nil {
logger.Error().Msgf("Error processing _bulk: %v", err)
}

}))
router.HandleFunc("GET /", func(writer http.ResponseWriter, r *http.Request) {
writer.WriteHeader(http.StatusOK)
Expand Down Expand Up @@ -195,37 +239,3 @@ func closeConnection(connection net.Conn) {
logger.Error().Msgf("Error closing connection: %v", err)
}
}

func forEachInBulk(body string, f func(index string, document string)) {
jsons := strings.Split(body, "\n")
for i := 0; i+1 < len(jsons); i += 2 {
action := jsons[i]
document := jsons[i+1]

var jsonData map[string]interface{}
err := json.Unmarshal([]byte(action), &jsonData)
if err != nil {
logger.Error().Msgf("Invalid action JSON in _bulk: %v %s", err, action)
continue
}
createObj, ok := jsonData[bulkCreate]
if ok {
createJson, ok := createObj.(map[string]interface{})
if !ok {
logger.Error().Msgf("Invalid create JSON in _bulk: %s", action)
continue
}
indexName, ok := createJson["_index"].(string)
if !ok {
if len(indexName) == 0 {
logger.Error().Msgf("Invalid create JSON in _bulk, no _index name: %s", action)
continue
}
}

f(indexName, document)
} else {
logger.Debug().Msgf("Unsupported actions in _bulk: %s", action)
}
}
}
2 changes: 1 addition & 1 deletion quesma/quesma/field_caps.go
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ func isInternalColumn(col *clickhouse.Column) bool {
return col.Name == clickhouse.AttributesKeyColumn || col.Name == clickhouse.AttributesValueColumn
}

func handleFieldCaps(ctx context.Context, index string, _ []byte, lm *clickhouse.LogManager) ([]byte, error) {
func handleFieldCaps(ctx context.Context, index string, lm *clickhouse.LogManager) ([]byte, error) {
indexes := lm.ResolveIndexes(ctx, index)
if len(indexes) == 0 {
if !elasticsearch.IsIndexPattern(index) {
Expand Down
3 changes: 1 addition & 2 deletions quesma/quesma/matchers_test.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package quesma

import (
"context"
"github.com/stretchr/testify/assert"
"mitmproxy/quesma/quesma/mux"
"testing"
Expand Down Expand Up @@ -163,7 +162,7 @@ func TestMatchAgainstKibanaAlerts(t *testing.T) {

req := &mux.Request{Body: test.body}

req.ParsedBody = mux.ParseRequestBody(context.TODO(), req)
req.ParsedBody = mux.ParseRequestBody(test.body)

actual := matchAgainstKibanaAlerts().Matches(req)
assert.Equal(t, test.expected, actual)
Expand Down
Loading

0 comments on commit b439adc

Please sign in to comment.