From f9113b63b775d3e939e9cdf0447704dec0322e0c Mon Sep 17 00:00:00 2001 From: Felipe Dutra Tine e Silva <5888407+tkanos@users.noreply.github.com> Date: Wed, 28 Nov 2018 19:07:25 -0500 Subject: [PATCH] Add csv parser unix timestamp support (#5047) --- plugins/parsers/csv/README.md | 3 +- plugins/parsers/csv/parser.go | 53 +++++++++++++++++++++--------- plugins/parsers/csv/parser_test.go | 18 ++++++++++ 3 files changed, 58 insertions(+), 16 deletions(-) diff --git a/plugins/parsers/csv/README.md b/plugins/parsers/csv/README.md index e4cfbfc372842..f2cf34c691d7f 100644 --- a/plugins/parsers/csv/README.md +++ b/plugins/parsers/csv/README.md @@ -75,7 +75,8 @@ document. The `csv_timestamp_column` option specifies the column name containing the time value and `csv_timestamp_format` must be set to a Go "reference time" -which is defined to be the specific time: `Mon Jan 2 15:04:05 MST 2006`. +which is defined to be the specific time: `Mon Jan 2 15:04:05 MST 2006`, +it can also be `unix` (for epoch in ms format like 1257894000 ) Consult the Go [time][time parse] package for details and additional examples on how to set the time format. diff --git a/plugins/parsers/csv/parser.go b/plugins/parsers/csv/parser.go index f18068eb70075..e1bbdbbbb0cbb 100644 --- a/plugins/parsers/csv/parser.go +++ b/plugins/parsers/csv/parser.go @@ -207,21 +207,9 @@ outer: measurementName = fmt.Sprintf("%v", recordFields[p.MeasurementColumn]) } - metricTime := p.TimeFunc() - if p.TimestampColumn != "" { - if recordFields[p.TimestampColumn] == nil { - return nil, fmt.Errorf("timestamp column: %v could not be found", p.TimestampColumn) - } - tStr := fmt.Sprintf("%v", recordFields[p.TimestampColumn]) - if p.TimestampFormat == "" { - return nil, fmt.Errorf("timestamp format must be specified") - } - - var err error - metricTime, err = time.Parse(p.TimestampFormat, tStr) - if err != nil { - return nil, err - } + metricTime, err := parseTimestamp(p.TimeFunc, recordFields, p.TimestampColumn, p.TimestampFormat) + if err != nil { + return nil, err } m, err := metric.New(measurementName, tags, recordFields, metricTime) @@ -231,6 +219,41 @@ outer: return m, nil } +// ParseTimestamp return a timestamp, if there is no timestamp on the csv it will be the current timestamp, else it will try to parse the time according to the format +// if the format is "unix" it tries to parse assuming that on the csv it will find an epoch in ms. +func parseTimestamp(timeFunc func() time.Time, recordFields map[string]interface{}, timestampColumn, timestampFormat string) (metricTime time.Time, err error) { + metricTime = timeFunc() + + if timestampColumn != "" { + if recordFields[timestampColumn] == nil { + err = fmt.Errorf("timestamp column: %v could not be found", timestampColumn) + return + } + + tStr := fmt.Sprintf("%v", recordFields[timestampColumn]) + + switch timestampFormat { + case "": + err = fmt.Errorf("timestamp format must be specified") + return + case "unix": + var unixTime int64 + unixTime, err = strconv.ParseInt(tStr, 10, 64) + if err != nil { + return + } + metricTime = time.Unix(unixTime, 0) + default: + metricTime, err = time.Parse(timestampFormat, tStr) + if err != nil { + return + } + } + } + return +} + +// SetDefaultTags set the DefaultTags func (p *Parser) SetDefaultTags(tags map[string]string) { p.DefaultTags = tags } diff --git a/plugins/parsers/csv/parser_test.go b/plugins/parsers/csv/parser_test.go index eff6f953f5651..97da69cd2f5f8 100644 --- a/plugins/parsers/csv/parser_test.go +++ b/plugins/parsers/csv/parser_test.go @@ -88,6 +88,24 @@ func TestTimestampError(t *testing.T) { require.Equal(t, fmt.Errorf("timestamp format must be specified"), err) } +func TestTimestampUnixFormat(t *testing.T) { + p := Parser{ + HeaderRowCount: 1, + ColumnNames: []string{"first", "second", "third"}, + MeasurementColumn: "third", + TimestampColumn: "first", + TimestampFormat: "unix", + TimeFunc: DefaultTime, + } + testCSV := `line1,line2,line3 +1243094706,70,test_name +1257609906,80,test_name2` + metrics, err := p.Parse([]byte(testCSV)) + require.NoError(t, err) + require.Equal(t, metrics[0].Time().UnixNano(), int64(1243094706000000000)) + require.Equal(t, metrics[1].Time().UnixNano(), int64(1257609906000000000)) +} + func TestQuotedCharacter(t *testing.T) { p := Parser{ HeaderRowCount: 1,