Skip to content

Commit

Permalink
array support in CSV parser
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhichang committed Jan 6, 2021
1 parent b7e3d47 commit 090a1b0
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 41 deletions.
55 changes: 49 additions & 6 deletions parser/csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"encoding/csv"
"strconv"
"strings"
"time"

"github.com/housepower/clickhouse_sinker/model"
Expand All @@ -26,7 +27,7 @@ import (

var _ Parser = (*CsvParser)(nil)

// CsvParser implementation to parse input from a CSV format
// CsvParser implementation to parse input from a CSV format per RFC 4180
type CsvParser struct {
title []string
delimiter string
Expand All @@ -36,7 +37,7 @@ type CsvParser struct {
// Parse extract a list of comma-separated values from the data
func (p *CsvParser) Parse(bs []byte) (metric model.Metric, err error) {
r := csv.NewReader(bytes.NewReader(bs))
r.Comma = ','
r.FieldsPerRecord = len(p.title)
if len(p.delimiter) > 0 {
r.Comma = rune(p.delimiter[0])
}
Expand Down Expand Up @@ -87,7 +88,7 @@ func (c *CsvMetric) GetFloat(key string, nullable bool) interface{} {
return n
}
}
return 0
return float64(0)
}

// GetInt returns int
Expand All @@ -99,12 +100,54 @@ func (c *CsvMetric) GetInt(key string, nullable bool) interface{} {
return n
}
}
return 0
return int64(0)
}

// GetArray is Empty implemented for CsvMetric
// GetArray parse an CSV encoded array
func (c *CsvMetric) GetArray(key string, t string) interface{} {
return []interface{}{}
var err error
var array []string
var r *csv.Reader
val := c.GetString(key, false).(string)
valLen := len(val)
if val == "" || val[0] != '[' || val[valLen-1] != ']' {
goto QUIT
}
r = csv.NewReader(strings.NewReader(val[1 : valLen-1]))
if array, err = r.Read(); err != nil {
goto QUIT
}
switch t {
case "int":
results := make([]int64, 0, len(array))
for _, e := range array {
v, _ := strconv.ParseInt(e, 10, 64)
results = append(results, v)
}
return results
case "float":
results := make([]float64, 0, len(array))
for _, e := range array {
v, _ := strconv.ParseFloat(e, 64)
results = append(results, v)
}
return results
case "string":
return array
default:
panic("not supported array type " + t)
}
QUIT:
switch t {
case "int":
return []int64{}
case "float":
return []float64{}
case "string":
return []string{}
default:
return nil
}
}

func (c *CsvMetric) GetDate(key string, nullable bool) interface{} {
Expand Down
207 changes: 172 additions & 35 deletions parser/csv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,39 +21,176 @@ import (
"github.com/stretchr/testify/require"
)

func TestParseCsv(t *testing.T) {
testCases := []struct {
msg string
values []string
}{
{
`1,"DO,NOT,SPLIT",42`,
[]string{"1", "DO,NOT,SPLIT", "42"},
},

{
`2,Daniel,26`,
[]string{"2", "Daniel", "26"},
},

{
`2,Daniel,`,
[]string{"2", "Daniel", ""},
},

{
`2,,Daniel`,
[]string{"2", "", "Daniel"},
},
}

pp := NewParserPool("csv", nil, ",", []string{"2006-01-02", time.RFC3339, time.RFC3339})
csvParser := pp.Get()
defer pp.Put(csvParser)
for _, c := range testCases {
metric, _ := csvParser.Parse([]byte(c.msg))
csvMetric, ok := metric.(*CsvMetric)
require.Equal(t, ok, true)
require.Equal(t, c.values, csvMetric.values)
}
func TestCsvInt(t *testing.T) {
pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout)
parser := pp.Get()
defer pp.Put(parser)
metric, err := parser.Parse(csvSample)
require.Nil(t, err)

var exp, act int64
exp = 1536813227
act = metric.GetInt("its", false).(int64)
require.Equal(t, exp, act)

exp = 0
act = metric.GetInt("its_not_exist", false).(int64)
require.Equal(t, exp, act)

act = metric.GetInt("its_not_exist", true).(int64)
require.Equal(t, exp, act)
}

func TestCsvFloat(t *testing.T) {
pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout)
parser := pp.Get()
defer pp.Put(parser)
metric, err := parser.Parse(csvSample)
require.Nil(t, err)

var exp, act float64
exp = 0.11
act = metric.GetFloat("percent", false).(float64)
require.Equal(t, exp, act)

exp = 0.0
act = metric.GetFloat("percent_not_exist", false).(float64)
require.Equal(t, exp, act)

act = metric.GetFloat("percent_not_exist", true).(float64)
require.Equal(t, exp, act)
}

func TestCsvString(t *testing.T) {
pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout)
parser := pp.Get()
defer pp.Put(parser)
metric, err := parser.Parse(csvSample)
require.Nil(t, err)

var exp, act string
exp = `escaped_"ws`
act = metric.GetString("channel", false).(string)
require.Equal(t, exp, act)

exp = ""
act = metric.GetString("channel_not_exist", false).(string)
require.Equal(t, exp, act)

act = metric.GetString("channel_not_exist", true).(string)
require.Equal(t, exp, act)
}

func TestCsvDate(t *testing.T) {
pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout)
parser := pp.Get()
defer pp.Put(parser)
metric, err := parser.Parse(csvSample)
require.Nil(t, err)

var exp, act time.Time
exp = time.Date(2019, 12, 16, 0, 0, 0, 0, time.UTC)
act = metric.GetDate("time1", false).(time.Time)
require.Equal(t, exp, act)

exp = time.Time{}
act = metric.GetDate("time1_not_exist", false).(time.Time)
require.Equal(t, exp, act)

act = metric.GetDate("time1_not_exist", true).(time.Time)
require.Equal(t, exp, act)
}

func TestCsvDateTime(t *testing.T) {
pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout)
parser := pp.Get()
defer pp.Put(parser)
metric, err := parser.Parse(csvSample)
require.Nil(t, err)

var exp, act time.Time
exp = time.Date(2019, 12, 16, 12, 10, 30, 0, time.UTC)
act = metric.GetDateTime("time2", false).(time.Time)
require.Equal(t, exp, act)

exp = time.Time{}
act = metric.GetDateTime("time2_not_exist", false).(time.Time)
require.Equal(t, exp, act)

act = metric.GetDateTime("time2_not_exist", true).(time.Time)
require.Equal(t, exp, act)
}

func TestCsvDateTime64(t *testing.T) {
pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout)
parser := pp.Get()
defer pp.Put(parser)
metric, err := parser.Parse(csvSample)
require.Nil(t, err)

var exp, act time.Time
exp = time.Date(2019, 12, 16, 12, 10, 30, 123000000, time.UTC)
act = metric.GetDateTime64("time3", false).(time.Time)
require.Equal(t, exp, act)

exp = time.Time{}
act = metric.GetDateTime64("time3_not_exist", false).(time.Time)
require.Equal(t, exp, act)

act = metric.GetDateTime64("time3_not_exist", true).(time.Time)
require.Equal(t, exp, act)
}

func TestCsvElasticDateTime(t *testing.T) {
pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout)
parser := pp.Get()
defer pp.Put(parser)
metric, err := parser.Parse(csvSample)
require.Nil(t, err)

var exp, act int64
// {"date": "2019-12-16T12:10:30Z"}
// TZ=UTC date -d @1576498230 => Mon 16 Dec 2019 12:10:30 PM UTC
exp = 1576498230
act = metric.GetElasticDateTime("time2", false).(int64)
require.Equal(t, exp, act)

exp = -62135596800
act = metric.GetElasticDateTime("time2_not_exist", false).(int64)
require.Equal(t, exp, act)

act = metric.GetElasticDateTime("time2_not_exist", true).(int64)
require.Equal(t, exp, act)
}

func TestCsvArray(t *testing.T) {
pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout)
parser := pp.Get()
defer pp.Put(parser)
metric, err := parser.Parse(csvSample)
require.Nil(t, err)

actI := metric.GetArray("array_int", "int").([]int64)
expI := []int64{1, 2, 3}
require.Equal(t, expI, actI)

actF := metric.GetArray("array_float", "float").([]float64)
expF := []float64{1.1, 2.2, 3.3}
require.Equal(t, expF, actF)

actS := metric.GetArray("array_string", "string").([]string)
expS := []string{"aa", "bb", "cc"}
require.Equal(t, expS, actS)

actIE := metric.GetArray("array_empty", "int").([]int64)
expIE := []int64{}
require.Equal(t, expIE, actIE)

actFE := metric.GetArray("array_empty", "float").([]float64)
expFE := []float64{}
require.Equal(t, expFE, actFE)

actSE := metric.GetArray("array_empty", "string").([]string)
expSE := []string{}
require.Equal(t, expSE, actSE)
}
3 changes: 3 additions & 0 deletions parser/parser_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,9 @@ var jsonSample = []byte(`{
"array_empty": []
}`)

var csvSampleSchema = []string{"its", "percent", "channel", "time1", "time2", "time3", "array_int", "array_float", "array_string", "array_empty"}
var csvSample = []byte(`1536813227,"0.11","escaped_""ws",2019-12-16,2019-12-16T12:10:30Z,2019-12-16T12:10:30.123Z,"[1,2,3]","[1.1,2.2,3.3]","[aa,bb,cc]","[]"`)

func BenchmarkUnmarshalljson(b *testing.B) {
mp := map[string]interface{}{}
for i := 0; i < b.N; i++ {
Expand Down

0 comments on commit 090a1b0

Please sign in to comment.