diff --git a/quesma/clickhouse/clickhouse.go b/quesma/clickhouse/clickhouse.go index 977528542..5682e9567 100644 --- a/quesma/clickhouse/clickhouse.go +++ b/quesma/clickhouse/clickhouse.go @@ -12,8 +12,8 @@ import ( "mitmproxy/quesma/jsonprocessor" "mitmproxy/quesma/logger" "mitmproxy/quesma/quesma/config" - "mitmproxy/quesma/quesma/mux" "mitmproxy/quesma/quesma/recovery" + "mitmproxy/quesma/quesma/types" "mitmproxy/quesma/telemetry" "mitmproxy/quesma/util" "regexp" @@ -275,7 +275,7 @@ func (lm *LogManager) ProcessCreateTableQuery(ctx context.Context, query string, return lm.sendCreateTableQuery(ctx, addOurFieldsToCreateTableQuery(query, config, table)) } -func buildCreateTableQueryNoOurFields(ctx context.Context, tableName string, jsonData mux.JSON, config *ChTableConfig) (string, error) { +func buildCreateTableQueryNoOurFields(ctx context.Context, tableName string, jsonData types.JSON, config *ChTableConfig) (string, error) { createTableCmd := fmt.Sprintf(`CREATE TABLE IF NOT EXISTS "%s" ( @@ -302,7 +302,7 @@ func Indexes(m SchemaMap) string { return result.String() } -func (lm *LogManager) CreateTableFromInsertQuery(ctx context.Context, name string, jsonData mux.JSON, config *ChTableConfig) error { +func (lm *LogManager) CreateTableFromInsertQuery(ctx context.Context, name string, jsonData types.JSON, config *ChTableConfig) error { // TODO fix lm.AddTableIfDoesntExist(name, jsonData) query, err := buildCreateTableQueryNoOurFields(ctx, name, jsonData, config) @@ -319,7 +319,7 @@ func (lm *LogManager) CreateTableFromInsertQuery(ctx context.Context, name strin // TODO // This method should be refactored to use mux.JSON instead of string -func (lm *LogManager) BuildInsertJson(tableName string, data mux.JSON, config *ChTableConfig) (string, error) { +func (lm *LogManager) BuildInsertJson(tableName string, data types.JSON, config *ChTableConfig) (string, error) { jsonData, err := json.Marshal(data) @@ -389,7 +389,7 @@ func (lm *LogManager) BuildInsertJson(tableName string, data mux.JSON, config *C return fmt.Sprintf("{%s%s%s", nonSchemaStr, comma, schemaFieldsJson[1:]), nil } -func (lm *LogManager) GetOrCreateTableConfig(ctx context.Context, tableName string, jsonData mux.JSON) (*ChTableConfig, error) { +func (lm *LogManager) GetOrCreateTableConfig(ctx context.Context, tableName string, jsonData types.JSON) (*ChTableConfig, error) { table := lm.FindTable(tableName) var config *ChTableConfig if table == nil { @@ -412,7 +412,7 @@ func (lm *LogManager) GetOrCreateTableConfig(ctx context.Context, tableName stri return config, nil } -func (lm *LogManager) ProcessInsertQuery(ctx context.Context, tableName string, jsonData []mux.JSON) error { +func (lm *LogManager) ProcessInsertQuery(ctx context.Context, tableName string, jsonData []types.JSON) error { if config, err := lm.GetOrCreateTableConfig(ctx, tableName, jsonData[0]); err != nil { return err } else { @@ -420,7 +420,7 @@ func (lm *LogManager) ProcessInsertQuery(ctx context.Context, tableName string, } } -func (lm *LogManager) Insert(ctx context.Context, tableName string, jsons []mux.JSON, config *ChTableConfig) error { +func (lm *LogManager) Insert(ctx context.Context, tableName string, jsons []types.JSON, config *ChTableConfig) error { var jsonsReadyForInsertion []string for _, jsonValue := range jsons { preprocessedJson := preprocess(jsonValue, NestedSeparator) @@ -618,6 +618,6 @@ func (c *ChTableConfig) GetAttributes() []Attribute { return c.attributes } -func preprocess(data mux.JSON, nestedSeparator string) mux.JSON { +func preprocess(data types.JSON, nestedSeparator string) types.JSON { return jsonprocessor.FlattenMap(data, nestedSeparator) } diff --git a/quesma/clickhouse/clickhouse_test.go b/quesma/clickhouse/clickhouse_test.go index 6b63dd05e..c3f59c7aa 100644 --- a/quesma/clickhouse/clickhouse_test.go +++ b/quesma/clickhouse/clickhouse_test.go @@ -5,7 +5,7 @@ import ( "encoding/json" "mitmproxy/quesma/concurrent" "mitmproxy/quesma/quesma/config" - "mitmproxy/quesma/quesma/mux" + "mitmproxy/quesma/quesma/types" "strings" "sync/atomic" "testing" @@ -46,7 +46,7 @@ func TestInsertNonSchemaFieldsToOthers_1(t *testing.T) { f := func(t1, t2 TableMap) { lm := NewLogManager(fieldsMap, config.QuesmaConfiguration{}) - j, err := lm.BuildInsertJson("tableName", mux.MustJSON(rowToInsert), hasOthersConfig) + j, err := lm.BuildInsertJson("tableName", types.MustJSON(rowToInsert), hasOthersConfig) assert.NoError(t, err) m := make(SchemaMap) err = json.Unmarshal([]byte(j), &m) @@ -118,7 +118,7 @@ func TestAddTimestamp(t *testing.T) { castUnsupportedAttrValueTypesToString: false, preferCastingToOthers: false, } - query, err := buildCreateTableQueryNoOurFields(context.Background(), "tableName", mux.MustJSON(`{"host.name":"hermes","message":"User password reset requested","service.name":"queue","severity":"info","source":"azure"}`), config) + query, err := buildCreateTableQueryNoOurFields(context.Background(), "tableName", types.MustJSON(`{"host.name":"hermes","message":"User password reset requested","service.name":"queue","severity":"info","source":"azure"}`), config) assert.NoError(t, err) assert.True(t, strings.Contains(query, timestampFieldName)) } diff --git a/quesma/clickhouse/insert_test.go b/quesma/clickhouse/insert_test.go index b0cb4cf59..ec4196a6d 100644 --- a/quesma/clickhouse/insert_test.go +++ b/quesma/clickhouse/insert_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/assert" "mitmproxy/quesma/concurrent" "mitmproxy/quesma/quesma/config" - "mitmproxy/quesma/quesma/mux" + "mitmproxy/quesma/quesma/types" "slices" "strconv" "strings" @@ -145,7 +145,7 @@ func TestAutomaticTableCreationAtInsert(t *testing.T) { for index3, lm := range logManagers(config) { t.Run("case insertTest["+strconv.Itoa(index1)+"], config["+strconv.Itoa(index2)+"], logManager["+strconv.Itoa(index3)+"]", func(t *testing.T) { - query, err := buildCreateTableQueryNoOurFields(context.Background(), tableName, mux.MustJSON(tt.insertJson), config) + query, err := buildCreateTableQueryNoOurFields(context.Background(), tableName, types.MustJSON(tt.insertJson), config) assert.NoError(t, err) table, err := NewTable(query, config) assert.NoError(t, err) @@ -224,7 +224,7 @@ func TestProcessInsertQuery(t *testing.T) { mock.ExpectExec(expectedInserts[2*index1+1]).WillReturnResult(sqlmock.NewResult(1, 1)) } - err = lm.lm.ProcessInsertQuery(ctx, tableName, []mux.JSON{mux.MustJSON(tt.insertJson)}) + err = lm.lm.ProcessInsertQuery(ctx, tableName, []types.JSON{types.MustJSON(tt.insertJson)}) assert.NoError(t, err) if err := mock.ExpectationsWereMet(); err != nil { t.Fatal("there were unfulfilled expections:", err) @@ -256,7 +256,7 @@ func TestInsertVeryBigIntegers(t *testing.T) { mock.ExpectExec(`CREATE TABLE IF NOT EXISTS "` + tableName).WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec(expectedInsertJsons[i]).WillReturnResult(sqlmock.NewResult(0, 0)) - err = lm.ProcessInsertQuery(context.Background(), tableName, []mux.JSON{mux.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %s}`, bigInt))}) + err = lm.ProcessInsertQuery(context.Background(), tableName, []types.JSON{types.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %s}`, bigInt))}) assert.NoError(t, err) if err := mock.ExpectationsWereMet(); err != nil { t.Fatal("there were unfulfilled expections:", err) @@ -288,7 +288,7 @@ func TestInsertVeryBigIntegers(t *testing.T) { bigIntAsInt, _ := strconv.ParseInt(bigInt, 10, 64) fmt.Printf(`{"severity":"sev","int": %d}\n`, bigIntAsInt) - err = lm.ProcessInsertQuery(context.Background(), tableName, []mux.JSON{mux.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %d}`, bigIntAsInt))}) + err = lm.ProcessInsertQuery(context.Background(), tableName, []types.JSON{types.MustJSON(fmt.Sprintf(`{"severity":"sev","int": %d}`, bigIntAsInt))}) assert.NoError(t, err) if err := mock.ExpectationsWereMet(); err != nil { t.Fatal("there were unfulfilled expections:", err) diff --git a/quesma/proxy/l4_proxy.go b/quesma/proxy/l4_proxy.go index bcfc650c5..e7bc25cdd 100644 --- a/quesma/proxy/l4_proxy.go +++ b/quesma/proxy/l4_proxy.go @@ -10,7 +10,7 @@ import ( "mitmproxy/quesma/logger" "mitmproxy/quesma/network" "mitmproxy/quesma/quesma/config" - "mitmproxy/quesma/quesma/mux" + "mitmproxy/quesma/quesma/types" "mitmproxy/quesma/stats" "mitmproxy/quesma/util" "net" @@ -59,10 +59,10 @@ func configureRouting() *http.ServeMux { router.HandleFunc("POST /{index}/_doc", util.BodyHandler(func(body []byte, writer http.ResponseWriter, r *http.Request) { index := r.PathValue("index") - parsedBody := mux.ParseRequestBody(string(body)) - var jsonBody mux.JSON + parsedBody := types.ParseRequestBody(string(body)) + var jsonBody types.JSON switch b := parsedBody.(type) { - case mux.JSON: + case types.JSON: jsonBody = b default: logger.Error().Msgf("Invalid JSON body: %v", parsedBody) @@ -77,10 +77,10 @@ func configureRouting() *http.ServeMux { router.HandleFunc("POST /{index}/_bulk", util.BodyHandler(func(body []byte, writer http.ResponseWriter, r *http.Request) { index := r.PathValue("index") - parsedBody := mux.ParseRequestBody(string(body)) - var jsonBody mux.JSON + parsedBody := types.ParseRequestBody(string(body)) + var jsonBody types.JSON switch b := parsedBody.(type) { - case mux.JSON: + case types.JSON: jsonBody = b default: logger.Error().Msgf("Invalid JSON body: %v", parsedBody) @@ -94,17 +94,17 @@ func configureRouting() *http.ServeMux { router.HandleFunc("POST /_bulk", util.BodyHandler(func(body []byte, writer http.ResponseWriter, r *http.Request) { - parsedBody := mux.ParseRequestBody(string(body)) - var ndjson mux.NDJSON + parsedBody := types.ParseRequestBody(string(body)) + var ndjson types.NDJSON switch b := parsedBody.(type) { - case mux.NDJSON: + case types.NDJSON: ndjson = b default: logger.Error().Msgf("Invalid JSON body: %v", parsedBody) return } - err := ndjson.BulkForEach(func(operation mux.BulkOperation, document mux.JSON) { + err := ndjson.BulkForEach(func(operation types.BulkOperation, document types.JSON) { index := operation.GetIndex() if index == "" { diff --git a/quesma/quesma/matchers.go b/quesma/quesma/matchers.go index f67366959..c71c3286d 100644 --- a/quesma/quesma/matchers.go +++ b/quesma/quesma/matchers.go @@ -5,6 +5,7 @@ import ( "mitmproxy/quesma/logger" "mitmproxy/quesma/quesma/config" "mitmproxy/quesma/quesma/mux" + "mitmproxy/quesma/quesma/types" "strings" ) @@ -79,12 +80,12 @@ func matchedAgainstPattern(configuration config.QuesmaConfiguration) mux.Request func matchAgainstKibanaAlerts() mux.RequestMatcher { return mux.RequestMatcherFunc(func(req *mux.Request) bool { - var query mux.JSON + var query types.JSON switch req.ParsedBody.(type) { - case mux.JSON: - query = req.ParsedBody.(mux.JSON) + case types.JSON: + query = req.ParsedBody.(types.JSON) default: return true diff --git a/quesma/quesma/matchers_test.go b/quesma/quesma/matchers_test.go index 8d4182a1b..51627eb0e 100644 --- a/quesma/quesma/matchers_test.go +++ b/quesma/quesma/matchers_test.go @@ -3,6 +3,7 @@ package quesma import ( "github.com/stretchr/testify/assert" "mitmproxy/quesma/quesma/mux" + "mitmproxy/quesma/quesma/types" "testing" ) @@ -162,7 +163,7 @@ func TestMatchAgainstKibanaAlerts(t *testing.T) { req := &mux.Request{Body: test.body} - req.ParsedBody = mux.ParseRequestBody(test.body) + req.ParsedBody = types.ParseRequestBody(test.body) actual := matchAgainstKibanaAlerts().Matches(req) assert.Equal(t, test.expected, actual) diff --git a/quesma/quesma/mux/body.go b/quesma/quesma/mux/body.go deleted file mode 100644 index 6770febca..000000000 --- a/quesma/quesma/mux/body.go +++ /dev/null @@ -1,190 +0,0 @@ -package mux - -import ( - "encoding/json" - "fmt" - "strings" -) - -// -// These types are generic. -// TODO move them to a separate package `types` -// - -type JSON map[string]interface{} - -func ParseJSON(body string) (JSON, error) { - - var res JSON - err := json.Unmarshal([]byte(body), &res) - - return res, err -} - -func MustJSON(s string) JSON { - - res, err := ParseJSON(s) - if err != nil { - panic(fmt.Sprintf("Failed to parse JSON: %v", err)) - } - - return res -} - -func (j JSON) Bytes() ([]byte, error) { - return json.Marshal(j) -} - -func (j JSON) Remarshal(v interface{}) error { - b, err := json.Marshal(j) - if err != nil { - return err - } - return json.Unmarshal(b, v) -} - -func (j JSON) ShortString() string { - - var asString string - asBytes, err := json.Marshal(j) - - if err != nil { - asString = fmt.Sprintf("Error marshalling JSON: %v, json: %v", err, j) - } else { - asString = string(asBytes) - } - - if len(asString) < 70 { - return asString - } - return asString[:70] - -} - -type NDJSON []JSON - -func ParseNDJSON(body string) (NDJSON, error) { - var ndjson NDJSON - - var err error - var errors []error - for x, line := range strings.Split(body, "\n") { - - if line == "" { - continue - } - - parsedLine := make(JSON) - - err = json.Unmarshal([]byte(line), &parsedLine) - if err != nil { - errors = append(errors, fmt.Errorf("error while parsing line %d: %s: %s", x, line, err)) - break - } - - ndjson = append(ndjson, parsedLine) - } - - if len(errors) > 0 { - err = fmt.Errorf("errors while parsing NDJSON: %v", errors) - } - - return ndjson, err -} - -type DocumentTarget struct { - Index *string `json:"_index"` - Id *string `json:"_id"` // document's target id in Elasticsearch, we ignore it when writing to Clickhouse. -} - -type BulkOperation map[string]DocumentTarget - -func (op BulkOperation) GetIndex() string { - for _, target := range op { // this map contains only 1 element though - if target.Index != nil { - return *target.Index - } - } - - return "" -} - -func (op BulkOperation) GetOperation() string { - for operation := range op { - return operation - } - return "" -} - -func (n NDJSON) BulkForEach(f func(operation BulkOperation, doc JSON)) error { - - for i := 0; i+1 < len(n); i += 2 { - operation := n[i] // {"create":{"_index":"kibana_sample_data_flights", "_id": 1}} - document := n[i+1] // {"FlightNum":"9HY9SWR","DestCountry":"AU","OriginWeather":"Sunny","OriginCityName":"Frankfurt am Main" } - - var operationParsed BulkOperation // operationName (create, index, update, delete) -> DocumentTarget - - err := operation.Remarshal(&operationParsed) - if err != nil { - return err - } - - f(operationParsed, document) - } - - return nil - -} - -// There we can add methods to iterate over NDJSON - -type Unknown struct { - Body string - JSONParseError error - NDJSONParseError error -} - -func (u *Unknown) String() string { - - return fmt.Sprintf("Unknown{Body: %s, JSONParseError: %v, NDJSONParseError: %v}", u.Body, u.JSONParseError, u.NDJSONParseError) - -} - -type RequestBody interface { - isParsedRequestBody() // this is a marker method -} - -func (j JSON) isParsedRequestBody() {} -func (n NDJSON) isParsedRequestBody() {} -func (u *Unknown) isParsedRequestBody() {} - -func ParseRequestBody(body string) RequestBody { - - unknow := &Unknown{} - unknow.Body = body - - // json - if len(body) > 1 && body[0] == '{' { - parsedBody, err := ParseJSON(body) - if err != nil { - unknow.JSONParseError = err - } else { - return parsedBody - } - } - - // ndjson - if len(body) > 1 && body[0] == '{' { - - parsedBody, err := ParseNDJSON(body) - if err != nil { - unknow.NDJSONParseError = err - } else { - return parsedBody - } - - // if nothing else, it's unknown - } - - return unknow -} diff --git a/quesma/quesma/mux/mux.go b/quesma/quesma/mux/mux.go index 598ddfe4a..1061f3fe1 100644 --- a/quesma/quesma/mux/mux.go +++ b/quesma/quesma/mux/mux.go @@ -4,6 +4,7 @@ import ( "context" "github.com/ucarion/urlpath" "mitmproxy/quesma/logger" + "mitmproxy/quesma/quesma/types" "net/http" "net/url" "strings" @@ -34,7 +35,7 @@ type ( QueryParams url.Values Body string - ParsedBody RequestBody + ParsedBody types.RequestBody } Handler func(ctx context.Context, req *Request) (*Result, error) diff --git a/quesma/quesma/quesma.go b/quesma/quesma/quesma.go index ee9a78145..ebdc76881 100644 --- a/quesma/quesma/quesma.go +++ b/quesma/quesma/quesma.go @@ -17,6 +17,7 @@ import ( "mitmproxy/quesma/quesma/mux" "mitmproxy/quesma/quesma/recovery" "mitmproxy/quesma/quesma/routes" + "mitmproxy/quesma/quesma/types" "mitmproxy/quesma/quesma/ui" "mitmproxy/quesma/telemetry" "mitmproxy/quesma/tracing" @@ -151,7 +152,7 @@ func (r *router) reroute(ctx context.Context, w http.ResponseWriter, req *http.R Body: string(reqBody), } - quesmaRequest.ParsedBody = mux.ParseRequestBody(quesmaRequest.Body) + quesmaRequest.ParsedBody = types.ParseRequestBody(quesmaRequest.Body) handler, found := router.Matches(quesmaRequest) diff --git a/quesma/quesma/router.go b/quesma/quesma/router.go index 2b4478dc2..bf75d5c0b 100644 --- a/quesma/quesma/router.go +++ b/quesma/quesma/router.go @@ -13,6 +13,7 @@ import ( "mitmproxy/quesma/quesma/mux" "mitmproxy/quesma/quesma/routes" "mitmproxy/quesma/quesma/termsenum" + "mitmproxy/quesma/quesma/types" "mitmproxy/quesma/quesma/ui" "mitmproxy/quesma/telemetry" "mitmproxy/quesma/tracing" @@ -40,11 +41,11 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, router.Register(routes.BulkPath, and(method("POST"), matchedAgainstBulkBody(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { - var ndjson mux.NDJSON + var ndjson types.NDJSON switch b := req.ParsedBody.(type) { - case mux.NDJSON: + case types.NDJSON: ndjson = b default: @@ -61,9 +62,9 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, router.Register(routes.IndexDocPath, and(method("POST"), matchedExact(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { - var body mux.JSON + var body types.JSON switch b := req.ParsedBody.(type) { - case mux.JSON: + case types.JSON: body = b default: return nil, fmt.Errorf("invalid request body, expecting JSON . Got: %T", req.ParsedBody) @@ -76,9 +77,9 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, router.Register(routes.IndexBulkPath, and(method("POST", "PUT"), matchedExact(cfg)), func(ctx context.Context, req *mux.Request) (*mux.Result, error) { index := req.Params["index"] - var body mux.NDJSON + var body types.NDJSON switch b := req.ParsedBody.(type) { - case mux.NDJSON: + case types.NDJSON: body = b default: return nil, fmt.Errorf("invalid request body, expecting NDJSON. Got: %T", req.ParsedBody) @@ -271,9 +272,9 @@ func configureRouter(cfg config.QuesmaConfiguration, lm *clickhouse.LogManager, return nil, errors.New("multi index terms enum is not yet supported") } else { - var body mux.JSON + var body types.JSON switch b := req.ParsedBody.(type) { - case mux.JSON: + case types.JSON: body = b default: return nil, errors.New("invalid request body, expecting JSON") diff --git a/quesma/quesma/termsenum/terms_enum.go b/quesma/quesma/termsenum/terms_enum.go index cd81fd155..d1c70677d 100644 --- a/quesma/quesma/termsenum/terms_enum.go +++ b/quesma/quesma/termsenum/terms_enum.go @@ -8,13 +8,13 @@ import ( "mitmproxy/quesma/logger" "mitmproxy/quesma/model" "mitmproxy/quesma/queryparser" - "mitmproxy/quesma/quesma/mux" + "mitmproxy/quesma/quesma/types" "mitmproxy/quesma/quesma/ui" "mitmproxy/quesma/tracing" "time" ) -func HandleTermsEnum(ctx context.Context, index string, body mux.JSON, lm *clickhouse.LogManager, +func HandleTermsEnum(ctx context.Context, index string, body types.JSON, lm *clickhouse.LogManager, qmc *ui.QuesmaManagementConsole) ([]byte, error) { if resolvedTableName := lm.ResolveTableName(ctx, index); resolvedTableName == "" { errorMsg := fmt.Sprintf("terms enum failed - could not resolve table name for index: %s", index) @@ -25,7 +25,7 @@ func HandleTermsEnum(ctx context.Context, index string, body mux.JSON, lm *click } } -func handleTermsEnumRequest(ctx context.Context, body mux.JSON, qt *queryparser.ClickhouseQueryTranslator, qmc *ui.QuesmaManagementConsole) (result []byte, err error) { +func handleTermsEnumRequest(ctx context.Context, body types.JSON, qt *queryparser.ClickhouseQueryTranslator, qmc *ui.QuesmaManagementConsole) (result []byte, err error) { request := NewRequest() startTime := time.Now() diff --git a/quesma/quesma/termsenum/terms_enum_test.go b/quesma/quesma/termsenum/terms_enum_test.go index c5e967d10..e992c270e 100644 --- a/quesma/quesma/termsenum/terms_enum_test.go +++ b/quesma/quesma/termsenum/terms_enum_test.go @@ -11,7 +11,7 @@ import ( "mitmproxy/quesma/model" "mitmproxy/quesma/queryparser" "mitmproxy/quesma/quesma/config" - "mitmproxy/quesma/quesma/mux" + "mitmproxy/quesma/quesma/types" "mitmproxy/quesma/quesma/ui" "mitmproxy/quesma/telemetry" "mitmproxy/quesma/tracing" @@ -89,7 +89,7 @@ func TestHandleTermsEnumRequest(t *testing.T) { mock.ExpectQuery(fmt.Sprintf("%s|%s", regexp.QuoteMeta(expectedQuery1), regexp.QuoteMeta(expectedQuery2))). WillReturnRows(sqlmock.NewRows([]string{"client_name"}).AddRow("client_a").AddRow("client_b")) - resp, err := handleTermsEnumRequest(ctx, mux.MustJSON(string(rawRequestBody)), qt, managementConsole) + resp, err := handleTermsEnumRequest(ctx, types.MustJSON(string(rawRequestBody)), qt, managementConsole) assert.NoError(t, err) var responseModel model.TermsEnumResponse diff --git a/quesma/quesma/types/json.go b/quesma/quesma/types/json.go new file mode 100644 index 000000000..6d4573b5c --- /dev/null +++ b/quesma/quesma/types/json.go @@ -0,0 +1,57 @@ +package types + +import ( + "encoding/json" + "fmt" +) + +type JSON map[string]interface{} + +func ParseJSON(body string) (JSON, error) { + + var res JSON + err := json.Unmarshal([]byte(body), &res) + + return res, err +} + +// Parses JSON and panics if it fails. This is useful for tests only. +func MustJSON(s string) JSON { + + res, err := ParseJSON(s) + if err != nil { + panic(fmt.Sprintf("Failed to parse JSON: %v", err)) + } + + return res +} + +func (j JSON) Bytes() ([]byte, error) { + return json.Marshal(j) +} + +func (j JSON) Remarshal(v interface{}) error { + b, err := json.Marshal(j) + if err != nil { + return err + } + return json.Unmarshal(b, v) +} + +func (j JSON) ShortString() string { + + var asString string + asBytes, err := json.Marshal(j) + + if err != nil { + asString = fmt.Sprintf("Error marshalling JSON: %v, json: %v", err, j) + } else { + asString = string(asBytes) + } + + if len(asString) < 70 { + return asString + } + return asString[:70] + +} diff --git a/quesma/quesma/mux/body_test.go b/quesma/quesma/types/json_test.go similarity index 58% rename from quesma/quesma/mux/body_test.go rename to quesma/quesma/types/json_test.go index 79736e11d..eecbd1762 100644 --- a/quesma/quesma/mux/body_test.go +++ b/quesma/quesma/types/json_test.go @@ -1,4 +1,4 @@ -package mux +package types import ( "encoding/json" @@ -36,23 +36,3 @@ func TestReMarshalJSON(t *testing.T) { assert.Equal(t, "value2", destData.Key2) } - -func TestParseNDJSON(t *testing.T) { - - ndjson := `{"create":{"_index":"device_logs"}} -{"client_id": "123"} -{"create":{"_index":"device_logs"}} -{"client_id": "234"}` - - // when - responseBody := ParseRequestBody(ndjson) - - switch b := responseBody.(type) { - case NDJSON: - ndjsonData := b - assert.Equal(t, 4, len(ndjsonData)) - default: - t.Fatal("Invalid response body. Should be NDJSON") - } - -} diff --git a/quesma/quesma/types/ndjson.go b/quesma/quesma/types/ndjson.go new file mode 100644 index 000000000..7cda85fa2 --- /dev/null +++ b/quesma/quesma/types/ndjson.go @@ -0,0 +1,82 @@ +package types + +import ( + "encoding/json" + "fmt" + "strings" +) + +type NDJSON []JSON + +func ParseNDJSON(body string) (NDJSON, error) { + var ndjson NDJSON + + var err error + var errors []error + for x, line := range strings.Split(body, "\n") { + + if line == "" { + continue + } + + parsedLine := make(JSON) + + err = json.Unmarshal([]byte(line), &parsedLine) + if err != nil { + errors = append(errors, fmt.Errorf("error while parsing line %d: %s: %s", x, line, err)) + break + } + + ndjson = append(ndjson, parsedLine) + } + + if len(errors) > 0 { + err = fmt.Errorf("errors while parsing NDJSON: %v", errors) + } + + return ndjson, err +} + +type DocumentTarget struct { + Index *string `json:"_index"` + Id *string `json:"_id"` // document's target id in Elasticsearch, we ignore it when writing to Clickhouse. +} + +type BulkOperation map[string]DocumentTarget + +func (op BulkOperation) GetIndex() string { + for _, target := range op { // this map contains only 1 element though + if target.Index != nil { + return *target.Index + } + } + + return "" +} + +func (op BulkOperation) GetOperation() string { + for operation := range op { + return operation + } + return "" +} + +func (n NDJSON) BulkForEach(f func(operation BulkOperation, doc JSON)) error { + + for i := 0; i+1 < len(n); i += 2 { + operation := n[i] // {"create":{"_index":"kibana_sample_data_flights", "_id": 1}} + document := n[i+1] // {"FlightNum":"9HY9SWR","DestCountry":"AU","OriginWeather":"Sunny","OriginCityName":"Frankfurt am Main" } + + var operationParsed BulkOperation // operationName (create, index, update, delete) -> DocumentTarget + + err := operation.Remarshal(&operationParsed) + if err != nil { + return err + } + + f(operationParsed, document) + } + + return nil + +} diff --git a/quesma/quesma/types/ndjson_test.go b/quesma/quesma/types/ndjson_test.go new file mode 100644 index 000000000..3711d8171 --- /dev/null +++ b/quesma/quesma/types/ndjson_test.go @@ -0,0 +1,26 @@ +package types + +import ( + "github.com/stretchr/testify/assert" + "testing" +) + +func TestParseNDJSON(t *testing.T) { + + ndjson := `{"create":{"_index":"device_logs"}} +{"client_id": "123"} +{"create":{"_index":"device_logs"}} +{"client_id": "234"}` + + // when + responseBody := ParseRequestBody(ndjson) + + switch b := responseBody.(type) { + case NDJSON: + ndjsonData := b + assert.Equal(t, 4, len(ndjsonData)) + default: + t.Fatal("Invalid response body. Should be NDJSON") + } + +} diff --git a/quesma/quesma/types/request_body.go b/quesma/quesma/types/request_body.go new file mode 100644 index 000000000..07ad7e49a --- /dev/null +++ b/quesma/quesma/types/request_body.go @@ -0,0 +1,58 @@ +package types + +import ( + "fmt" +) + +// There we can add methods to iterate over NDJSON + +type Unknown struct { + Body string + JSONParseError error + NDJSONParseError error +} + +func (u *Unknown) String() string { + + return fmt.Sprintf("Unknown{Body: %s, JSONParseError: %v, NDJSONParseError: %v}", u.Body, u.JSONParseError, u.NDJSONParseError) + +} + +type RequestBody interface { + isParsedRequestBody() // this is a marker method +} + +func (j JSON) isParsedRequestBody() {} +func (n NDJSON) isParsedRequestBody() {} +func (u *Unknown) isParsedRequestBody() {} + +func ParseRequestBody(body string) RequestBody { + + unknow := &Unknown{} + unknow.Body = body + + // json + if len(body) > 1 && body[0] == '{' { + parsedBody, err := ParseJSON(body) + if err != nil { + unknow.JSONParseError = err + } else { + return parsedBody + } + } + + // ndjson + if len(body) > 1 && body[0] == '{' { + + parsedBody, err := ParseNDJSON(body) + if err != nil { + unknow.NDJSONParseError = err + } else { + return parsedBody + } + + // if nothing else, it's unknown + } + + return unknow +} diff --git a/quesma/quesma/types/request_body_test.go b/quesma/quesma/types/request_body_test.go new file mode 100644 index 000000000..12a4e58ec --- /dev/null +++ b/quesma/quesma/types/request_body_test.go @@ -0,0 +1,43 @@ +package types + +import "testing" + +func TestParseRequestBody(t *testing.T) { + + ndjson := `{"create":{"_index":"device_logs"}} +{"client_id": "123"} +{"create":{"_index":"device_logs"}} +{"client_id": "234"}` + + // when + responseBody := ParseRequestBody(ndjson) + + switch b := responseBody.(type) { + case NDJSON: + ndjsonData := b + if len(ndjsonData) != 4 { + t.Fatalf("Expected 4, got %v", len(ndjsonData)) + } + default: + t.Fatal("Invalid response body. Should be NDJSON") + } +} + +func TestParseRequestBody2(t *testing.T) { + json := `{"client": "123"}` + + // when + responseBody := ParseRequestBody(json) + + switch b := responseBody.(type) { + case JSON: + jsonData := b + if jsonData["client"] != "123" { + + t.Fatalf("Expected 123, got %v", jsonData["client"]) + } + default: + t.Fatal("Invalid response body. Should be JSON") + } + +} diff --git a/quesma/quesma/ui/html_pages_test.go b/quesma/quesma/ui/html_pages_test.go index 2f31e85a0..54d8877c0 100644 --- a/quesma/quesma/ui/html_pages_test.go +++ b/quesma/quesma/ui/html_pages_test.go @@ -7,7 +7,7 @@ import ( "mitmproxy/quesma/clickhouse" "mitmproxy/quesma/concurrent" "mitmproxy/quesma/quesma/config" - "mitmproxy/quesma/quesma/mux" + "mitmproxy/quesma/quesma/types" "mitmproxy/quesma/stats" "mitmproxy/quesma/telemetry" "mitmproxy/quesma/tracing" @@ -56,7 +56,7 @@ func TestHtmlPages(t *testing.T) { t.Run("statistics got no XSS", func(t *testing.T) { cfg := config.QuesmaConfiguration{} - stats.GlobalStatistics.Process(cfg, xss, mux.MustJSON("{}"), clickhouse.NestedSeparator) + stats.GlobalStatistics.Process(cfg, xss, types.MustJSON("{}"), clickhouse.NestedSeparator) response := string(qmc.generateStatistics()) assert.NotContains(t, response, xss) }) diff --git a/quesma/quesma/write.go b/quesma/quesma/write.go index 5b703c65a..b3ca790a1 100644 --- a/quesma/quesma/write.go +++ b/quesma/quesma/write.go @@ -6,8 +6,8 @@ import ( "mitmproxy/quesma/clickhouse" "mitmproxy/quesma/logger" "mitmproxy/quesma/quesma/config" - "mitmproxy/quesma/quesma/mux" "mitmproxy/quesma/quesma/recovery" + "mitmproxy/quesma/quesma/types" "mitmproxy/quesma/stats" "mitmproxy/quesma/stats/errorstats" "mitmproxy/quesma/telemetry" @@ -21,7 +21,7 @@ type ( } ) -func dualWriteBulk(ctx context.Context, defaultIndex *string, bulk mux.NDJSON, lm *clickhouse.LogManager, +func dualWriteBulk(ctx context.Context, defaultIndex *string, bulk types.NDJSON, lm *clickhouse.LogManager, cfg config.QuesmaConfiguration, phoneHomeAgent telemetry.PhoneHomeAgent) (results []WriteResult) { if config.TrafficAnalysis.Load() { logger.Info().Msg("analysing traffic, not writing to Clickhouse") @@ -29,9 +29,9 @@ func dualWriteBulk(ctx context.Context, defaultIndex *string, bulk mux.NDJSON, l } defer recovery.LogPanic() - indicesWithDocumentsToInsert := make(map[string][]mux.JSON, len(bulk)) + indicesWithDocumentsToInsert := make(map[string][]types.JSON, len(bulk)) - err := bulk.BulkForEach(func(op mux.BulkOperation, document mux.JSON) { + err := bulk.BulkForEach(func(op types.BulkOperation, document types.JSON) { index := op.GetIndex() operation := op.GetOperation() @@ -87,7 +87,7 @@ func dualWriteBulk(ctx context.Context, defaultIndex *string, bulk mux.NDJSON, l for indexName, documents := range indicesWithDocumentsToInsert { phoneHomeAgent.IngestCounters().Add(indexName, int64(len(documents))) - withConfiguration(ctx, cfg, indexName, make(mux.JSON), func() error { + withConfiguration(ctx, cfg, indexName, make(types.JSON), func() error { for _, document := range documents { stats.GlobalStatistics.Process(cfg, indexName, document, clickhouse.NestedSeparator) } @@ -97,7 +97,7 @@ func dualWriteBulk(ctx context.Context, defaultIndex *string, bulk mux.NDJSON, l return results } -func dualWrite(ctx context.Context, tableName string, body mux.JSON, lm *clickhouse.LogManager, cfg config.QuesmaConfiguration) { +func dualWrite(ctx context.Context, tableName string, body types.JSON, lm *clickhouse.LogManager, cfg config.QuesmaConfiguration) { stats.GlobalStatistics.Process(cfg, tableName, body, clickhouse.NestedSeparator) if config.TrafficAnalysis.Load() { logger.Info().Msgf("analysing traffic, not writing to Clickhouse %s", tableName) @@ -110,13 +110,13 @@ func dualWrite(ctx context.Context, tableName string, body mux.JSON, lm *clickho } withConfiguration(ctx, cfg, tableName, body, func() error { - return lm.ProcessInsertQuery(ctx, tableName, mux.NDJSON{body}) + return lm.ProcessInsertQuery(ctx, tableName, types.NDJSON{body}) }) } var insertCounter = atomic.Int32{} -func withConfiguration(ctx context.Context, cfg config.QuesmaConfiguration, indexName string, body mux.JSON, action func() error) { +func withConfiguration(ctx context.Context, cfg config.QuesmaConfiguration, indexName string, body types.JSON, action func() error) { if len(cfg.IndexConfig) == 0 { logger.InfoWithCtx(ctx).Msgf("%s --> clickhouse, body(shortened): %s", indexName, body.ShortString()) err := action() diff --git a/quesma/stats/index_statistics.go b/quesma/stats/index_statistics.go index 284973fa2..286bf9812 100644 --- a/quesma/stats/index_statistics.go +++ b/quesma/stats/index_statistics.go @@ -4,7 +4,7 @@ import ( "fmt" "mitmproxy/quesma/jsonprocessor" "mitmproxy/quesma/quesma/config" - "mitmproxy/quesma/quesma/mux" + "mitmproxy/quesma/quesma/types" "sort" "strconv" "strings" @@ -67,7 +67,7 @@ func New() *Statistics { return &statistics } -func (s *Statistics) Process(cfg config.QuesmaConfiguration, index string, jsonData mux.JSON, nestedSeparator string) { +func (s *Statistics) Process(cfg config.QuesmaConfiguration, index string, jsonData types.JSON, nestedSeparator string) { // TODO reading cfg.IngestStatistics is not thread safe if !cfg.IngestStatistics { return diff --git a/quesma/stats/index_statistics_test.go b/quesma/stats/index_statistics_test.go index 649279270..aeeb46636 100644 --- a/quesma/stats/index_statistics_test.go +++ b/quesma/stats/index_statistics_test.go @@ -4,7 +4,7 @@ import ( "encoding/json" "github.com/stretchr/testify/assert" "mitmproxy/quesma/quesma/config" - "mitmproxy/quesma/quesma/mux" + "mitmproxy/quesma/quesma/types" "testing" ) @@ -43,10 +43,10 @@ func TestStatistics_process(t *testing.T) { cfg := config.QuesmaConfiguration{ IngestStatistics: true, } - stats.Process(cfg, "index1", mux.MustJSON(string(marshal1)), "::") - stats.Process(cfg, "index1", mux.MustJSON(string(marshal2)), "::") - stats.Process(cfg, "index1", mux.MustJSON(string(marshal3)), "::") - stats.Process(cfg, "index1", mux.MustJSON(string(marshal3)), "::") + stats.Process(cfg, "index1", types.MustJSON(string(marshal1)), "::") + stats.Process(cfg, "index1", types.MustJSON(string(marshal2)), "::") + stats.Process(cfg, "index1", types.MustJSON(string(marshal3)), "::") + stats.Process(cfg, "index1", types.MustJSON(string(marshal3)), "::") indexStats := (*stats)["index1"]