Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(pipeline): support reading pretty printed json #345

Merged
merged 8 commits into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions pkg/iterator/pipe.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"errors"

"github.com/reubenmiller/go-c8y-cli/v2/pkg/jsonUtilities"
"github.com/reubenmiller/go-c8y-cli/v2/pkg/stream"
"github.com/tidwall/gjson"
)

Expand Down Expand Up @@ -69,6 +70,7 @@ type PipeIterator struct {
filter Filter
opts *PipeOptions
reader *bufio.Reader
stream *stream.InputStreamer
}

// IsBound return true if the iterator is bound
Expand All @@ -80,11 +82,14 @@ func (i *PipeIterator) IsBound() bool {
func (i *PipeIterator) GetNext() (line []byte, input interface{}, err error) {
i.mu.Lock()
defer i.mu.Unlock()
line, err = i.reader.ReadBytes('\n')
line = bytes.TrimSpace(line)

if err != nil {
return line, line, err
line, err = i.stream.Read()
if len(line) > 0 && errors.Is(err, io.EOF) {
// Don't return io.EOF if the line includes a value
// TODO: Ideally this should be changed so the reader of the
// iterator handles io.EOF correctly (e.g. if there is still a value process it, then
// react to the io.EOF)
err = nil
}

if i.filter != nil {
Expand Down Expand Up @@ -175,6 +180,9 @@ func NewPipeIterator(in io.Reader, filter ...Filter) (Iterator, error) {

return &PipeIterator{
reader: reader,
stream: &stream.InputStreamer{
Buffer: reader,
},
filter: pipelineFilter,
}, nil
}
Expand Down Expand Up @@ -210,6 +218,9 @@ func NewJSONPipeIterator(in io.Reader, pipeOpts *PipeOptions, filter ...Filter)

return &PipeIterator{
reader: reader,
stream: &stream.InputStreamer{
Buffer: reader,
},
filter: pipelineFilter,
opts: pipeOpts,
}, nil
Expand Down
157 changes: 157 additions & 0 deletions pkg/stream/stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
package stream

import (
"bufio"
"bytes"
"encoding/json"
"io"
"unicode"
)

// InputStreamer an input streamer which breaks down a buffer into input values
type InputStreamer struct {
IgnoreEmptyLines bool
Buffer *bufio.Reader
}

func (r *InputStreamer) isJSONObject() (bool, error) {
c, _, err := r.Buffer.ReadRune()
if err == io.EOF {
return false, err
}
r.Buffer.UnreadRune()
return c == '{', err
}

func (r *InputStreamer) consumeWhitespace() error {
for {
c, _, err := r.Buffer.ReadRune()
if err == io.EOF {
return err
}
if !unicode.IsSpace(c) {
r.Buffer.UnreadRune()
break
}
}
return nil
}
func (r *InputStreamer) consumeWhitespaceOnLine() error {
for {
c, _, err := r.Buffer.ReadRune()
if err == io.EOF {
return err
}
if !(c == ' ' || c == '\t') {
r.Buffer.UnreadRune()
break
}
}
return nil
}

// ReadJSONObject read the next JSON object
func (r *InputStreamer) ReadJSONObject() ([]byte, error) {
out := bytes.Buffer{}
brackets := 0
var err error

if err := r.consumeWhitespace(); err != nil {
return nil, err
}

// Simple json object parser
var prev rune
quote := 0
for {
c, _, rErr := r.Buffer.ReadRune()
if rErr == io.EOF {
err = io.EOF
break
}
switch c {
case '"':
if prev != '\\' {
quote = (quote + 1) % 2
}
case '{':
if quote == 0 {
brackets++
}
case '}':
if quote == 0 {
brackets--
}
}
out.WriteRune(c)
if brackets == 0 {
break
}
prev = c
}

// Consume a trailing newline if present
r.consumeIf('\n')
return out.Bytes(), err
}

// ReadLine reads the next chunk of text until the next newline char
// if not put it back on the buffer
func (r *InputStreamer) consumeIf(c rune) error {
v, _, err := r.Buffer.ReadRune()
if err == nil {
if v != c {
return r.Buffer.UnreadRune()
}
}
return nil
}

// ReadLine reads the next chunk of text until the next newline char
func (r *InputStreamer) ReadLine() ([]byte, error) {
return r.Buffer.ReadBytes('\n')
}

// Read reads the next delimited value (either text or JSON object)
func (r *InputStreamer) Read() (output []byte, err error) {
if r.IgnoreEmptyLines {
if err := r.consumeWhitespace(); err != nil {
return output, err
}

} else {
if err := r.consumeWhitespaceOnLine(); err != nil {
return output, err
}
}

var isJSON bool
isJSON, err = r.isJSONObject()
if err != nil {
return output, err
}

if isJSON {
output, err = r.ReadJSONObject()
} else {
output, err = r.ReadLine()
if err != nil {
return output, err
}
output = r.formatLine(output)
}
return output, err
}

func (r *InputStreamer) formatLine(b []byte) []byte {
// Prase json strings (e.g. quoted strings)
// as it improves compatibility with jq output when not using the -r option,
// e.g. `echo '{"key":"1234"}' | jq '.key' | c8y util show`
var strValue string
if err := json.Unmarshal(b, &strValue); err == nil {
return []byte(strValue)
}

b = bytes.TrimSpace(b)
return b
}
150 changes: 150 additions & 0 deletions pkg/stream/stream_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
package stream

import (
"bufio"
"io"
"strings"
"testing"

"github.com/stretchr/testify/assert"
)

func newTestStreamer(v string, ignoreEmptyLines bool) *InputStreamer {
return &InputStreamer{
Buffer: bufio.NewReader(strings.NewReader(v)),
IgnoreEmptyLines: ignoreEmptyLines,
}
}

func Test_ReadSimpleWithoutEmptyLines(t *testing.T) {
input := strings.TrimSpace(`
1

2
3
`)
s := newTestStreamer(input, true)

var obj []byte
var err error

obj, err = s.Read()
assert.Nil(t, err)
assert.Equal(t, "1", string(obj))

obj, err = s.Read()
assert.Nil(t, err)
assert.Equal(t, "2", string(obj))

obj, err = s.Read()
assert.ErrorIs(t, err, io.EOF)
assert.Equal(t, "3", string(obj))
}

func Test_Read_Simple(t *testing.T) {
input := strings.TrimSpace(`
1

"2"
1.23
3
`)
s := newTestStreamer(input, false)

var obj []byte
var err error

obj, err = s.Read()
assert.Nil(t, err)
assert.Equal(t, "1", string(obj))

obj, err = s.Read()
assert.Nil(t, err)
assert.Equal(t, "", string(obj))

obj, err = s.Read()
assert.Nil(t, err)
assert.Equal(t, "2", string(obj))

obj, err = s.Read()
assert.Nil(t, err)
assert.Equal(t, "1.23", string(obj))

obj, err = s.Read()
assert.ErrorIs(t, err, io.EOF)
assert.Equal(t, "3", string(obj))
}

func Test_Read_Complex_JSON(t *testing.T) {
input := strings.TrimSpace(`
{"name":"1 {literal \" value}"}
{"name":{"1":{"2":{"3":{"4":{"5":"value"}}}}}}{"name":"3"}
`)
s := newTestStreamer(input, false)

var obj []byte
var err error

obj, err = s.Read()
assert.Nil(t, err)
assert.Equal(t, `{"name":"1 {literal \" value}"}`, string(obj))

obj, err = s.Read()
assert.Nil(t, err)
assert.Equal(t, `{"name":{"1":{"2":{"3":{"4":{"5":"value"}}}}}}`, string(obj))

obj, err = s.Read()
assert.Nil(t, err)
assert.Equal(t, `{"name":"3"}`, string(obj))
}

func Test_ReadPartialJSON(t *testing.T) {
input := strings.TrimSpace(`
{"name":"1"
`)
s := newTestStreamer(input, false)

var obj []byte
var err error

obj, err = s.Read()
assert.ErrorIs(t, err, io.EOF)
assert.Equal(t, `{"name":"1"`, string(obj))
}

func Test_Read_Mixed(t *testing.T) {
input := strings.TrimLeft(`
{"name":"1"}{"name":"2"}

3
4
`, "\n\t ")
s := newTestStreamer(input, false)

var obj []byte
var err error

obj, err = s.Read()
assert.Nil(t, err)
assert.Equal(t, `{"name":"1"}`, string(obj))

obj, err = s.Read()
assert.Nil(t, err)
assert.Equal(t, `{"name":"2"}`, string(obj))

obj, err = s.Read()
assert.Nil(t, err)
assert.Equal(t, "", string(obj))

obj, err = s.Read()
assert.Nil(t, err)
assert.Equal(t, "3", string(obj))

obj, err = s.Read()
assert.Nil(t, err)
assert.Equal(t, "4", string(obj))

obj, err = s.Read()
assert.ErrorIs(t, err, io.EOF)
assert.Equal(t, "", string(obj))
}
7 changes: 7 additions & 0 deletions tests/manual/pipeline/mixed_pipeline.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{"name": "device01"}{"name": "device02"}
{"name": "device03"}
device04
"device05"
{
"name": "device06"
}
26 changes: 26 additions & 0 deletions tests/manual/pipeline/piping_pretty_json.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
# yaml-language-server: $schema=https://raw.githubusercontent.com/reubenmiller/commander/feat/handle-nested-files/schema.json
#
# Piping pretty printed json
#
config:
env:
C8Y_SETTINGS_DEFAULTS_CACHE: true
C8Y_SETTINGS_CACHE_METHODS: GET PUT POST
C8Y_SETTINGS_DEFAULTS_CACHETTL: 100h
C8Y_SETTINGS_DEFAULTS_DRYFORMAT: json

tests:
It supports reading mixed piped input:
command: |
cat manual/pipeline/mixed_pipeline.txt |
c8y devices create --dry |
c8y util show --select body -o json -c
exit-code: 0
stdout:
exactly: |
{"body":{"c8y_IsDevice":{},"name":"device01"}}
{"body":{"c8y_IsDevice":{},"name":"device02"}}
{"body":{"c8y_IsDevice":{},"name":"device03"}}
{"body":{"c8y_IsDevice":{},"name":"device04"}}
{"body":{"c8y_IsDevice":{},"name":"device05"}}
{"body":{"c8y_IsDevice":{},"name":"device06"}}
Loading