From 090a1b02ab7019177c4c47247cac6e23ca65803c Mon Sep 17 00:00:00 2001 From: Zhichang Yu Date: Wed, 6 Jan 2021 15:08:19 +0800 Subject: [PATCH] array support in CSV parser --- parser/csv.go | 55 +++++++++-- parser/csv_test.go | 207 +++++++++++++++++++++++++++++++++++------- parser/parser_test.go | 3 + 3 files changed, 224 insertions(+), 41 deletions(-) diff --git a/parser/csv.go b/parser/csv.go index aebbc8ba..815b758c 100644 --- a/parser/csv.go +++ b/parser/csv.go @@ -18,6 +18,7 @@ import ( "bytes" "encoding/csv" "strconv" + "strings" "time" "github.com/housepower/clickhouse_sinker/model" @@ -26,7 +27,7 @@ import ( var _ Parser = (*CsvParser)(nil) -// CsvParser implementation to parse input from a CSV format +// CsvParser implementation to parse input from a CSV format per RFC 4180 type CsvParser struct { title []string delimiter string @@ -36,7 +37,7 @@ type CsvParser struct { // Parse extract a list of comma-separated values from the data func (p *CsvParser) Parse(bs []byte) (metric model.Metric, err error) { r := csv.NewReader(bytes.NewReader(bs)) - r.Comma = ',' + r.FieldsPerRecord = len(p.title) if len(p.delimiter) > 0 { r.Comma = rune(p.delimiter[0]) } @@ -87,7 +88,7 @@ func (c *CsvMetric) GetFloat(key string, nullable bool) interface{} { return n } } - return 0 + return float64(0) } // GetInt returns int @@ -99,12 +100,54 @@ func (c *CsvMetric) GetInt(key string, nullable bool) interface{} { return n } } - return 0 + return int64(0) } -// GetArray is Empty implemented for CsvMetric +// GetArray parse an CSV encoded array func (c *CsvMetric) GetArray(key string, t string) interface{} { - return []interface{}{} + var err error + var array []string + var r *csv.Reader + val := c.GetString(key, false).(string) + valLen := len(val) + if val == "" || val[0] != '[' || val[valLen-1] != ']' { + goto QUIT + } + r = csv.NewReader(strings.NewReader(val[1 : valLen-1])) + if array, err = r.Read(); err != nil { + goto QUIT + } + switch t { + case "int": + results := make([]int64, 0, len(array)) + for _, e := range array { + v, _ := strconv.ParseInt(e, 10, 64) + results = append(results, v) + } + return results + case "float": + results := make([]float64, 0, len(array)) + for _, e := range array { + v, _ := strconv.ParseFloat(e, 64) + results = append(results, v) + } + return results + case "string": + return array + default: + panic("not supported array type " + t) + } +QUIT: + switch t { + case "int": + return []int64{} + case "float": + return []float64{} + case "string": + return []string{} + default: + return nil + } } func (c *CsvMetric) GetDate(key string, nullable bool) interface{} { diff --git a/parser/csv_test.go b/parser/csv_test.go index 0a4eed00..ce25393b 100644 --- a/parser/csv_test.go +++ b/parser/csv_test.go @@ -21,39 +21,176 @@ import ( "github.com/stretchr/testify/require" ) -func TestParseCsv(t *testing.T) { - testCases := []struct { - msg string - values []string - }{ - { - `1,"DO,NOT,SPLIT",42`, - []string{"1", "DO,NOT,SPLIT", "42"}, - }, - - { - `2,Daniel,26`, - []string{"2", "Daniel", "26"}, - }, - - { - `2,Daniel,`, - []string{"2", "Daniel", ""}, - }, - - { - `2,,Daniel`, - []string{"2", "", "Daniel"}, - }, - } - - pp := NewParserPool("csv", nil, ",", []string{"2006-01-02", time.RFC3339, time.RFC3339}) - csvParser := pp.Get() - defer pp.Put(csvParser) - for _, c := range testCases { - metric, _ := csvParser.Parse([]byte(c.msg)) - csvMetric, ok := metric.(*CsvMetric) - require.Equal(t, ok, true) - require.Equal(t, c.values, csvMetric.values) - } +func TestCsvInt(t *testing.T) { + pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout) + parser := pp.Get() + defer pp.Put(parser) + metric, err := parser.Parse(csvSample) + require.Nil(t, err) + + var exp, act int64 + exp = 1536813227 + act = metric.GetInt("its", false).(int64) + require.Equal(t, exp, act) + + exp = 0 + act = metric.GetInt("its_not_exist", false).(int64) + require.Equal(t, exp, act) + + act = metric.GetInt("its_not_exist", true).(int64) + require.Equal(t, exp, act) +} + +func TestCsvFloat(t *testing.T) { + pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout) + parser := pp.Get() + defer pp.Put(parser) + metric, err := parser.Parse(csvSample) + require.Nil(t, err) + + var exp, act float64 + exp = 0.11 + act = metric.GetFloat("percent", false).(float64) + require.Equal(t, exp, act) + + exp = 0.0 + act = metric.GetFloat("percent_not_exist", false).(float64) + require.Equal(t, exp, act) + + act = metric.GetFloat("percent_not_exist", true).(float64) + require.Equal(t, exp, act) +} + +func TestCsvString(t *testing.T) { + pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout) + parser := pp.Get() + defer pp.Put(parser) + metric, err := parser.Parse(csvSample) + require.Nil(t, err) + + var exp, act string + exp = `escaped_"ws` + act = metric.GetString("channel", false).(string) + require.Equal(t, exp, act) + + exp = "" + act = metric.GetString("channel_not_exist", false).(string) + require.Equal(t, exp, act) + + act = metric.GetString("channel_not_exist", true).(string) + require.Equal(t, exp, act) +} + +func TestCsvDate(t *testing.T) { + pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout) + parser := pp.Get() + defer pp.Put(parser) + metric, err := parser.Parse(csvSample) + require.Nil(t, err) + + var exp, act time.Time + exp = time.Date(2019, 12, 16, 0, 0, 0, 0, time.UTC) + act = metric.GetDate("time1", false).(time.Time) + require.Equal(t, exp, act) + + exp = time.Time{} + act = metric.GetDate("time1_not_exist", false).(time.Time) + require.Equal(t, exp, act) + + act = metric.GetDate("time1_not_exist", true).(time.Time) + require.Equal(t, exp, act) +} + +func TestCsvDateTime(t *testing.T) { + pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout) + parser := pp.Get() + defer pp.Put(parser) + metric, err := parser.Parse(csvSample) + require.Nil(t, err) + + var exp, act time.Time + exp = time.Date(2019, 12, 16, 12, 10, 30, 0, time.UTC) + act = metric.GetDateTime("time2", false).(time.Time) + require.Equal(t, exp, act) + + exp = time.Time{} + act = metric.GetDateTime("time2_not_exist", false).(time.Time) + require.Equal(t, exp, act) + + act = metric.GetDateTime("time2_not_exist", true).(time.Time) + require.Equal(t, exp, act) +} + +func TestCsvDateTime64(t *testing.T) { + pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout) + parser := pp.Get() + defer pp.Put(parser) + metric, err := parser.Parse(csvSample) + require.Nil(t, err) + + var exp, act time.Time + exp = time.Date(2019, 12, 16, 12, 10, 30, 123000000, time.UTC) + act = metric.GetDateTime64("time3", false).(time.Time) + require.Equal(t, exp, act) + + exp = time.Time{} + act = metric.GetDateTime64("time3_not_exist", false).(time.Time) + require.Equal(t, exp, act) + + act = metric.GetDateTime64("time3_not_exist", true).(time.Time) + require.Equal(t, exp, act) +} + +func TestCsvElasticDateTime(t *testing.T) { + pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout) + parser := pp.Get() + defer pp.Put(parser) + metric, err := parser.Parse(csvSample) + require.Nil(t, err) + + var exp, act int64 + // {"date": "2019-12-16T12:10:30Z"} + // TZ=UTC date -d @1576498230 => Mon 16 Dec 2019 12:10:30 PM UTC + exp = 1576498230 + act = metric.GetElasticDateTime("time2", false).(int64) + require.Equal(t, exp, act) + + exp = -62135596800 + act = metric.GetElasticDateTime("time2_not_exist", false).(int64) + require.Equal(t, exp, act) + + act = metric.GetElasticDateTime("time2_not_exist", true).(int64) + require.Equal(t, exp, act) +} + +func TestCsvArray(t *testing.T) { + pp := NewParserPool("csv", csvSampleSchema, "", DefaultTSLayout) + parser := pp.Get() + defer pp.Put(parser) + metric, err := parser.Parse(csvSample) + require.Nil(t, err) + + actI := metric.GetArray("array_int", "int").([]int64) + expI := []int64{1, 2, 3} + require.Equal(t, expI, actI) + + actF := metric.GetArray("array_float", "float").([]float64) + expF := []float64{1.1, 2.2, 3.3} + require.Equal(t, expF, actF) + + actS := metric.GetArray("array_string", "string").([]string) + expS := []string{"aa", "bb", "cc"} + require.Equal(t, expS, actS) + + actIE := metric.GetArray("array_empty", "int").([]int64) + expIE := []int64{} + require.Equal(t, expIE, actIE) + + actFE := metric.GetArray("array_empty", "float").([]float64) + expFE := []float64{} + require.Equal(t, expFE, actFE) + + actSE := metric.GetArray("array_empty", "string").([]string) + expSE := []string{} + require.Equal(t, expSE, actSE) } diff --git a/parser/parser_test.go b/parser/parser_test.go index f5bea01c..e77248e5 100644 --- a/parser/parser_test.go +++ b/parser/parser_test.go @@ -44,6 +44,9 @@ var jsonSample = []byte(`{ "array_empty": [] }`) +var csvSampleSchema = []string{"its", "percent", "channel", "time1", "time2", "time3", "array_int", "array_float", "array_string", "array_empty"} +var csvSample = []byte(`1536813227,"0.11","escaped_""ws",2019-12-16,2019-12-16T12:10:30Z,2019-12-16T12:10:30.123Z,"[1,2,3]","[1.1,2.2,3.3]","[aa,bb,cc]","[]"`) + func BenchmarkUnmarshalljson(b *testing.B) { mp := map[string]interface{}{} for i := 0; i < b.N; i++ {