Skip to content

Commit

Permalink
Merge pull request #38 from VictoriaMetrics/issue-24
Browse files Browse the repository at this point in the history
fix issue when response contains ANSI escape sequences
#24
  • Loading branch information
hagen1778 authored Jul 5, 2024
2 parents 4ed20bd + 5050f40 commit c7ba5e9
Show file tree
Hide file tree
Showing 5 changed files with 129 additions and 33 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## tip

* BUGFIX: fix bug with parsing response when one of the field contains ANSI escape sequences. See [this issue](https://github.com/VictoriaMetrics/victorialogs-datasource/issues/24).

## v0.2.3

* BUGFIX: fix bug with displaying response when one of the stream field is defined and lines are not collected. See [this issue](https://github.com/VictoriaMetrics/victorialogs-datasource/issues/34).
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ require (
github.com/unknwon/com v1.0.1 // indirect
github.com/unknwon/log v0.0.0-20150304194804-e617c87089d3 // indirect
github.com/urfave/cli v1.22.14 // indirect
github.com/valyala/fastjson v1.6.4 // indirect
github.com/valyala/fastrand v1.1.0 // indirect
github.com/valyala/histogram v1.2.0 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -226,6 +226,8 @@ github.com/unknwon/log v0.0.0-20150304194804-e617c87089d3/go.mod h1:1xEUf2abjfP9
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli v1.22.14 h1:ebbhrRiGK2i4naQJr+1Xj92HXZCrK7MsyTS/ob3HnAk=
github.com/urfave/cli v1.22.14/go.mod h1:X0eDS6pD6Exaclxm99NJ3FiCDRED7vIHpx2mDOHLvkA=
github.com/valyala/fastjson v1.6.4 h1:uAUNq9Z6ymTgGhcm0UynUAB6tlbakBrz6CQFax3BXVQ=
github.com/valyala/fastjson v1.6.4/go.mod h1:CLCAqky6SMuOcxStkYQvblddUtoRxhYMGLrsQns1aXY=
github.com/valyala/fastrand v1.1.0 h1:f+5HkLW4rsgzdNoleUOB69hyT9IlD2ZQh9GyDMfb5G8=
github.com/valyala/fastrand v1.1.0/go.mod h1:HWqCzkrkg6QXT8V2EXWvXCoow7vLwOFN002oeRzjapQ=
github.com/valyala/histogram v1.2.0 h1:wyYGAZZt3CpwUiIb9AU/Zbllg1llXyrtApRS815OLoQ=
Expand Down
77 changes: 45 additions & 32 deletions pkg/plugin/response.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package plugin

import (
"bufio"
"bytes"
"encoding/json"
"fmt"
"io"
Expand All @@ -9,6 +11,7 @@ import (
"github.com/VictoriaMetrics/metricsql"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/valyala/fastjson"

"github.com/VictoriaMetrics/victorialogs-datasource/pkg/utils"
)
Expand All @@ -25,10 +28,6 @@ const (
gLineField = "Line"
)

// Response contains fields from query response
// It represents victoria logs response
type Response map[string]string

// parseStreamResponse reads data from the reader and collects
// fields and frame with necessary information
func parseStreamResponse(reader io.Reader) backend.DataResponse {
Expand All @@ -44,42 +43,56 @@ func parseStreamResponse(reader io.Reader) backend.DataResponse {

labels := data.Labels{}

dec := json.NewDecoder(reader)
scanner := bufio.NewScanner(reader)

for dec.More() {
var r Response
err := dec.Decode(&r)
for scanner.Scan() {
value, err := fastjson.ParseBytes(scanner.Bytes())
if err != nil {
return newResponseError(fmt.Errorf("error decode response: %s", err), backend.StatusInternal)
}

for fieldName, value := range r {
switch fieldName {
case messageField:
lineField.Append(value)
case timeField:
getTime, err := utils.GetTime(value)
if err != nil {
return newResponseError(fmt.Errorf("error parse time from _time field: %s", err), backend.StatusInternal)
}
timeFd.Append(getTime)
case streamField:
expr, err := metricsql.Parse(value)
if err != nil {
return newResponseError(err, backend.StatusInternal)
}
if mExpr, ok := expr.(*metricsql.MetricExpr); ok {
for _, filters := range mExpr.LabelFilterss {
for _, filter := range filters {
labels[filter.Label] = filter.Value
}
if value.Exists(messageField) {
message := value.GetStringBytes(messageField)
lineField.Append(string(message))
}
if value.Exists(timeField) {
t := value.GetStringBytes(timeField)
getTime, err := utils.GetTime(string(t))
if err != nil {
return newResponseError(fmt.Errorf("error parse time from _time field: %s", err), backend.StatusInternal)
}
timeFd.Append(getTime)
}
if value.Exists(streamField) {
stream := value.GetStringBytes(streamField)
expr, err := metricsql.Parse(string(stream))
if err != nil {
return newResponseError(err, backend.StatusInternal)
}
if mExpr, ok := expr.(*metricsql.MetricExpr); ok {
for _, filters := range mExpr.LabelFilterss {
for _, filter := range filters {
labels[filter.Label] = filter.Value
}
}
default:
labels[fieldName] = value
}
}

obj, err := value.Object()
if err != nil {
return newResponseError(fmt.Errorf("error get object from decoded response: %s", err), backend.StatusInternal)
}
obj.Visit(func(key []byte, v *fastjson.Value) {
if bytes.Equal(key, []byte(timeField)) ||
bytes.Equal(key, []byte(streamField)) ||
bytes.Equal(key, []byte(messageField)) {
return
}
fieldName := string(key)
value := string(v.GetStringBytes())
labels[fieldName] = value
})

d, err := labelsToJSON(labels)
if err != nil {
return newResponseError(err, backend.StatusInternal)
Expand Down Expand Up @@ -120,10 +133,10 @@ func parseStreamResponse(reader io.Reader) backend.DataResponse {
// labelsToJSON converts labels to json representation
// data.Labels when converted to JSON keep the fields sorted
func labelsToJSON(labels data.Labels) (json.RawMessage, error) {
bytes, err := json.Marshal(labels)
b, err := json.Marshal(labels)
if err != nil {
return nil, err
}

return bytes, nil
return b, nil
}
80 changes: 79 additions & 1 deletion pkg/plugin/response_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/valyala/fastjson"
)

func Test_parseStreamResponse(t *testing.T) {
Expand Down Expand Up @@ -44,7 +45,7 @@ func Test_parseStreamResponse(t *testing.T) {
name: "incorrect response",
response: "abcd",
want: func() backend.DataResponse {
return newResponseError(fmt.Errorf("error decode response: invalid character 'a' looking for beginning of value"), backend.StatusInternal)
return newResponseError(fmt.Errorf("error decode response: cannot parse JSON: cannot parse number: unexpected char: \"a\"; unparsed tail: \"abcd\""), backend.StatusInternal)
},
},
{
Expand Down Expand Up @@ -241,6 +242,83 @@ func Test_parseStreamResponse(t *testing.T) {
frame.Meta = &data.FrameMeta{}
rsp.Frames = append(rsp.Frames, frame)

return rsp
},
},
{
name: "response has ANSI chars",
response: `{"_time":"2024-06-26T13:15:15.000Z","_stream_id":"00000000000000009eaf29866f70976a098adc735393deb1","_stream":"{compose_project=\"app\",compose_service=\"gateway\"}","_msg":"\x1b[2m2024-06-26T13:15:15.004Z\x1b[0;39m \x1b[32mTRACE\x1b[0;39m \x1b[35m1\x1b[0;39m \x1b[2m---\x1b[0;39m \x1b[2m[ parallel-19]\x1b[0;39m \x1b[36mo.s.c.g.f.WeightCalculatorWebFilter \x1b[0;39m \x1b[2m:\x1b[0;39m Weights attr: {} ","compose_project":"app","compose_service":"gateway"}`,
want: func() backend.DataResponse {
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
labelsField.Name = gLabelsField

timeFd := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeFd.Name = gTimeField

lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
lineField.Name = gLineField

timeFd.Append(time.Date(2024, 06, 26, 13, 15, 15, 0, time.UTC))

lineField.Append(`\x1b[2m2024-06-26T13:15:15.004Z\x1b[0;39m \x1b[32mTRACE\x1b[0;39m \x1b[35m1\x1b[0;39m \x1b[2m---\x1b[0;39m \x1b[2m[ parallel-19]\x1b[0;39m \x1b[36mo.s.c.g.f.WeightCalculatorWebFilter \x1b[0;39m \x1b[2m:\x1b[0;39m Weights attr: {} `)

labels := data.Labels{
"compose_project": "app",
"compose_service": "gateway",
"_stream_id": "00000000000000009eaf29866f70976a098adc735393deb1",
}

b, _ := labelsToJSON(labels)
labelsField.Append(b)

frame := data.NewFrame("", timeFd, lineField, labelsField)

rsp := backend.DataResponse{}
frame.Meta = &data.FrameMeta{}
rsp.Frames = append(rsp.Frames, frame)

return rsp
},
},
{
name: "response has unicode",
response: `{"_time":"2024-06-26T13:20:34.000Z","_stream":"{compose_project=\"app\",compose_service=\"gateway\"}","_msg":"\u001b[2m2024-06-26T13:20:34.608Z\u001b[0;39m \u001b[33m WARN\u001b[0;39m \u001b[35m1\u001b[0;39m \u001b[2m---\u001b[0;39m \u001b[2m[ main]\u001b[0;39m \u001b[36mjakarta.persistence.spi \u001b[0;39m \u001b[2m:\u001b[0;39m jakarta.persistence.spi::No valid providers found. ","compose_project":"app","compose_service":"gateway"}`,
want: func() backend.DataResponse {
labelsField := data.NewFieldFromFieldType(data.FieldTypeJSON, 0)
labelsField.Name = gLabelsField

timeFd := data.NewFieldFromFieldType(data.FieldTypeTime, 0)
timeFd.Name = gTimeField

lineField := data.NewFieldFromFieldType(data.FieldTypeString, 0)
lineField.Name = gLineField

timeFd.Append(time.Date(2024, 06, 26, 13, 20, 34, 0, time.UTC))

value, err := fastjson.Parse(`{"_msg":"\u001b[2m2024-06-26T13:20:34.608Z\u001b[0;39m \u001b[33m WARN\u001b[0;39m \u001b[35m1\u001b[0;39m \u001b[2m---\u001b[0;39m \u001b[2m[ main]\u001b[0;39m \u001b[36mjakarta.persistence.spi \u001b[0;39m \u001b[2m:\u001b[0;39m jakarta.persistence.spi::No valid providers found. "}`)
if err != nil {
t.Fatalf("error decode response: %s", err)
}

if value.Exists(messageField) {
message := value.GetStringBytes(messageField)
lineField.Append(string(message))
}

labels := data.Labels{
"compose_project": "app",
"compose_service": "gateway",
}

b, _ := labelsToJSON(labels)
labelsField.Append(b)

frame := data.NewFrame("", timeFd, lineField, labelsField)

rsp := backend.DataResponse{}
frame.Meta = &data.FrameMeta{}
rsp.Frames = append(rsp.Frames, frame)

return rsp
},
},
Expand Down

0 comments on commit c7ba5e9

Please sign in to comment.