Skip to content

Commit

Permalink
Move JSON types to separate package (#184)
Browse files Browse the repository at this point in the history
  • Loading branch information
nablaone authored May 22, 2024
1 parent 383308a commit 9846584
Show file tree
Hide file tree
Showing 22 changed files with 335 additions and 274 deletions.
16 changes: 8 additions & 8 deletions quesma/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import (
"mitmproxy/quesma/jsonprocessor"
"mitmproxy/quesma/logger"
"mitmproxy/quesma/quesma/config"
"mitmproxy/quesma/quesma/mux"
"mitmproxy/quesma/quesma/recovery"
"mitmproxy/quesma/quesma/types"
"mitmproxy/quesma/telemetry"
"mitmproxy/quesma/util"
"regexp"
Expand Down Expand Up @@ -275,7 +275,7 @@ func (lm *LogManager) ProcessCreateTableQuery(ctx context.Context, query string,
return lm.sendCreateTableQuery(ctx, addOurFieldsToCreateTableQuery(query, config, table))
}

func buildCreateTableQueryNoOurFields(ctx context.Context, tableName string, jsonData mux.JSON, config *ChTableConfig) (string, error) {
func buildCreateTableQueryNoOurFields(ctx context.Context, tableName string, jsonData types.JSON, config *ChTableConfig) (string, error) {

createTableCmd := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS "%s"
(
Expand All @@ -302,7 +302,7 @@ func Indexes(m SchemaMap) string {
return result.String()
}

func (lm *LogManager) CreateTableFromInsertQuery(ctx context.Context, name string, jsonData mux.JSON, config *ChTableConfig) error {
func (lm *LogManager) CreateTableFromInsertQuery(ctx context.Context, name string, jsonData types.JSON, config *ChTableConfig) error {
// TODO fix lm.AddTableIfDoesntExist(name, jsonData)

query, err := buildCreateTableQueryNoOurFields(ctx, name, jsonData, config)
Expand All @@ -319,7 +319,7 @@ func (lm *LogManager) CreateTableFromInsertQuery(ctx context.Context, name strin

// TODO
// This method should be refactored to use mux.JSON instead of string
func (lm *LogManager) BuildInsertJson(tableName string, data mux.JSON, config *ChTableConfig) (string, error) {
func (lm *LogManager) BuildInsertJson(tableName string, data types.JSON, config *ChTableConfig) (string, error) {

jsonData, err := json.Marshal(data)

Expand Down Expand Up @@ -389,7 +389,7 @@ func (lm *LogManager) BuildInsertJson(tableName string, data mux.JSON, config *C
return fmt.Sprintf("{%s%s%s", nonSchemaStr, comma, schemaFieldsJson[1:]), nil
}

func (lm *LogManager) GetOrCreateTableConfig(ctx context.Context, tableName string, jsonData mux.JSON) (*ChTableConfig, error) {
func (lm *LogManager) GetOrCreateTableConfig(ctx context.Context, tableName string, jsonData types.JSON) (*ChTableConfig, error) {
table := lm.FindTable(tableName)
var config *ChTableConfig
if table == nil {
Expand All @@ -412,15 +412,15 @@ func (lm *LogManager) GetOrCreateTableConfig(ctx context.Context, tableName stri
return config, nil
}

func (lm *LogManager) ProcessInsertQuery(ctx context.Context, tableName string, jsonData []mux.JSON) error {
func (lm *LogManager) ProcessInsertQuery(ctx context.Context, tableName string, jsonData []types.JSON) error {
if config, err := lm.GetOrCreateTableConfig(ctx, tableName, jsonData[0]); err != nil {
return err
} else {
return lm.Insert(ctx, tableName, jsonData, config)
}
}

func (lm *LogManager) Insert(ctx context.Context, tableName string, jsons []mux.JSON, config *ChTableConfig) error {
func (lm *LogManager) Insert(ctx context.Context, tableName string, jsons []types.JSON, config *ChTableConfig) error {
var jsonsReadyForInsertion []string
for _, jsonValue := range jsons {
preprocessedJson := preprocess(jsonValue, NestedSeparator)
Expand Down Expand Up @@ -618,6 +618,6 @@ func (c *ChTableConfig) GetAttributes() []Attribute {
return c.attributes
}

func preprocess(data mux.JSON, nestedSeparator string) mux.JSON {
func preprocess(data types.JSON, nestedSeparator string) types.JSON {
return jsonprocessor.FlattenMap(data, nestedSeparator)
}
6 changes: 3 additions & 3 deletions quesma/clickhouse/clickhouse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"encoding/json"
"mitmproxy/quesma/concurrent"
"mitmproxy/quesma/quesma/config"
"mitmproxy/quesma/quesma/mux"
"mitmproxy/quesma/quesma/types"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -46,7 +46,7 @@ func TestInsertNonSchemaFieldsToOthers_1(t *testing.T) {

f := func(t1, t2 TableMap) {
lm := NewLogManager(fieldsMap, config.QuesmaConfiguration{})
j, err := lm.BuildInsertJson("tableName", mux.MustJSON(rowToInsert), hasOthersConfig)
j, err := lm.BuildInsertJson("tableName", types.MustJSON(rowToInsert), hasOthersConfig)
assert.NoError(t, err)
m := make(SchemaMap)
err = json.Unmarshal([]byte(j), &m)
Expand Down Expand Up @@ -118,7 +118,7 @@ func TestAddTimestamp(t *testing.T) {
castUnsupportedAttrValueTypesToString: false,
preferCastingToOthers: false,
}
query, err := buildCreateTableQueryNoOurFields(context.Background(), "tableName", mux.MustJSON(`{"host.name":"hermes","message":"User password reset requested","service.name":"queue","severity":"info","source":"azure"}`), config)
query, err := buildCreateTableQueryNoOurFields(context.Background(), "tableName", types.MustJSON(`{"host.name":"hermes","message":"User password reset requested","service.name":"queue","severity":"info","source":"azure"}`), config)
assert.NoError(t, err)
assert.True(t, strings.Contains(query, timestampFieldName))
}
Expand Down
10 changes: 5 additions & 5 deletions quesma/clickhouse/insert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/stretchr/testify/assert"
"mitmproxy/quesma/concurrent"
"mitmproxy/quesma/quesma/config"
"mitmproxy/quesma/quesma/mux"
"mitmproxy/quesma/quesma/types"
"slices"
"strconv"
"strings"
Expand Down Expand Up @@ -145,7 +145,7 @@ func TestAutomaticTableCreationAtInsert(t *testing.T) {
for index3, lm := range logManagers(config) {
t.Run("case insertTest["+strconv.Itoa(index1)+"], config["+strconv.Itoa(index2)+"], logManager["+strconv.Itoa(index3)+"]", func(t *testing.T) {

query, err := buildCreateTableQueryNoOurFields(context.Background(), tableName, mux.MustJSON(tt.insertJson), config)
query, err := buildCreateTableQueryNoOurFields(context.Background(), tableName, types.MustJSON(tt.insertJson), config)
assert.NoError(t, err)
table, err := NewTable(query, config)
assert.NoError(t, err)
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestProcessInsertQuery(t *testing.T) {
mock.ExpectExec(expectedInserts[2*index1+1]).WillReturnResult(sqlmock.NewResult(1, 1))
}

err = lm.lm.ProcessInsertQuery(ctx, tableName, []mux.JSON{mux.MustJSON(tt.insertJson)})
err = lm.lm.ProcessInsertQuery(ctx, tableName, []types.JSON{types.MustJSON(tt.insertJson)})
assert.NoError(t, err)
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
Expand Down Expand Up @@ -256,7 +256,7 @@ func TestInsertVeryBigIntegers(t *testing.T) {
mock.ExpectExec(`CREATE TABLE IF NOT EXISTS "` + tableName).WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec(expectedInsertJsons[i]).WillReturnResult(sqlmock.NewResult(0, 0))

err = lm.ProcessInsertQuery(context.Background(), tableName, []mux.JSON{mux.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %s}`, bigInt))})
err = lm.ProcessInsertQuery(context.Background(), tableName, []types.JSON{types.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %s}`, bigInt))})
assert.NoError(t, err)
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
Expand Down Expand Up @@ -288,7 +288,7 @@ func TestInsertVeryBigIntegers(t *testing.T) {

bigIntAsInt, _ := strconv.ParseInt(bigInt, 10, 64)
fmt.Printf(`{"severity":"sev","int": %d}\n`, bigIntAsInt)
err = lm.ProcessInsertQuery(context.Background(), tableName, []mux.JSON{mux.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %d}`, bigIntAsInt))})
err = lm.ProcessInsertQuery(context.Background(), tableName, []types.JSON{types.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %d}`, bigIntAsInt))})
assert.NoError(t, err)
if err := mock.ExpectationsWereMet(); err != nil {
t.Fatal("there were unfulfilled expections:", err)
Expand Down
22 changes: 11 additions & 11 deletions quesma/proxy/l4_proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"mitmproxy/quesma/logger"
"mitmproxy/quesma/network"
"mitmproxy/quesma/quesma/config"
"mitmproxy/quesma/quesma/mux"
"mitmproxy/quesma/quesma/types"
"mitmproxy/quesma/stats"
"mitmproxy/quesma/util"
"net"
Expand Down Expand Up @@ -59,10 +59,10 @@ func configureRouting() *http.ServeMux {
router.HandleFunc("POST /{index}/_doc", util.BodyHandler(func(body []byte, writer http.ResponseWriter, r *http.Request) {
index := r.PathValue("index")

parsedBody := mux.ParseRequestBody(string(body))
var jsonBody mux.JSON
parsedBody := types.ParseRequestBody(string(body))
var jsonBody types.JSON
switch b := parsedBody.(type) {
case mux.JSON:
case types.JSON:
jsonBody = b
default:
logger.Error().Msgf("Invalid JSON body: %v", parsedBody)
Expand All @@ -77,10 +77,10 @@ func configureRouting() *http.ServeMux {
router.HandleFunc("POST /{index}/_bulk", util.BodyHandler(func(body []byte, writer http.ResponseWriter, r *http.Request) {
index := r.PathValue("index")

parsedBody := mux.ParseRequestBody(string(body))
var jsonBody mux.JSON
parsedBody := types.ParseRequestBody(string(body))
var jsonBody types.JSON
switch b := parsedBody.(type) {
case mux.JSON:
case types.JSON:
jsonBody = b
default:
logger.Error().Msgf("Invalid JSON body: %v", parsedBody)
Expand All @@ -94,17 +94,17 @@ func configureRouting() *http.ServeMux {

router.HandleFunc("POST /_bulk", util.BodyHandler(func(body []byte, writer http.ResponseWriter, r *http.Request) {

parsedBody := mux.ParseRequestBody(string(body))
var ndjson mux.NDJSON
parsedBody := types.ParseRequestBody(string(body))
var ndjson types.NDJSON
switch b := parsedBody.(type) {
case mux.NDJSON:
case types.NDJSON:
ndjson = b
default:
logger.Error().Msgf("Invalid JSON body: %v", parsedBody)
return
}

err := ndjson.BulkForEach(func(operation mux.BulkOperation, document mux.JSON) {
err := ndjson.BulkForEach(func(operation types.BulkOperation, document types.JSON) {

index := operation.GetIndex()
if index == "" {
Expand Down
7 changes: 4 additions & 3 deletions quesma/quesma/matchers.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"mitmproxy/quesma/logger"
"mitmproxy/quesma/quesma/config"
"mitmproxy/quesma/quesma/mux"
"mitmproxy/quesma/quesma/types"
"strings"
)

Expand Down Expand Up @@ -79,12 +80,12 @@ func matchedAgainstPattern(configuration config.QuesmaConfiguration) mux.Request
func matchAgainstKibanaAlerts() mux.RequestMatcher {
return mux.RequestMatcherFunc(func(req *mux.Request) bool {

var query mux.JSON
var query types.JSON

switch req.ParsedBody.(type) {

case mux.JSON:
query = req.ParsedBody.(mux.JSON)
case types.JSON:
query = req.ParsedBody.(types.JSON)

default:
return true
Expand Down
3 changes: 2 additions & 1 deletion quesma/quesma/matchers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package quesma
import (
"github.com/stretchr/testify/assert"
"mitmproxy/quesma/quesma/mux"
"mitmproxy/quesma/quesma/types"
"testing"
)

Expand Down Expand Up @@ -162,7 +163,7 @@ func TestMatchAgainstKibanaAlerts(t *testing.T) {

req := &mux.Request{Body: test.body}

req.ParsedBody = mux.ParseRequestBody(test.body)
req.ParsedBody = types.ParseRequestBody(test.body)

actual := matchAgainstKibanaAlerts().Matches(req)
assert.Equal(t, test.expected, actual)
Expand Down
Loading

0 comments on commit 9846584

Please sign in to comment.