Skip to content

Commit

Permalink
add validation on time input
Browse files Browse the repository at this point in the history
  • Loading branch information
splunkericl committed Sep 19, 2023
1 parent c9b523c commit eab5116
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 17 deletions.
18 changes: 17 additions & 1 deletion receiver/splunkhecreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -25,6 +26,7 @@ import (
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/internal/splunk"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/splunkhecreceiver/internal/metadata"
)

Expand Down Expand Up @@ -265,7 +267,21 @@ func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Reques
}

resourceCustomizer := r.createResourceCustomizer(req)
ld, slLen, err := splunkHecRawToLogData(bodyReader, req.URL.Query(), resourceCustomizer, r.config)
query := req.URL.Query()
var timestamp pcommon.Timestamp
if query.Has(queryTime) {
t, err := strconv.ParseInt(query.Get(queryTime), 10, 64)
if t < 0 {
err = errors.New("time cannot be less than 0")
}
if err != nil {
r.failRequest(ctx, resp, http.StatusBadRequest, invalidFormatRespBody, 0, err)
return
}
timestamp = pcommon.NewTimestampFromTime(time.Unix(t, 0))
}

ld, slLen, err := splunkHecRawToLogData(bodyReader, query, resourceCustomizer, r.config, timestamp)
if err != nil {
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, slLen, err)
return
Expand Down
38 changes: 38 additions & 0 deletions receiver/splunkhecreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,44 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) {
assert.Equal(t, responseErrGzipReader, body)
},
},
{
name: "raw_endpoint_bad_time_negative_number",
req: func() *http.Request {
msgBytes, err := json.Marshal(splunkMsg)
require.NoError(t, err)

req := httptest.NewRequest("POST", "http://localhost/service/collector/raw", bytes.NewReader(msgBytes))

q := req.URL.Query()
q.Add(queryTime, "-5")
req.URL.RawQuery = q.Encode()

return req
}(),
assertResponse: func(t *testing.T, status int, body string) {
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, responseInvalidDataFormat, body)
},
},
{
name: "raw_endpoint_bad_time_not_a_number",
req: func() *http.Request {
msgBytes, err := json.Marshal(splunkMsg)
require.NoError(t, err)

req := httptest.NewRequest("POST", "http://localhost/service/collector/raw", bytes.NewReader(msgBytes))

q := req.URL.Query()
q.Add(queryTime, "notANumber")
req.URL.RawQuery = q.Encode()

return req
}(),
assertResponse: func(t *testing.T, status int, body string) {
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, responseInvalidDataFormat, body)
},
},
}

for _, tt := range tests {
Expand Down
11 changes: 1 addition & 10 deletions receiver/splunkhecreceiver/splunk_to_logdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
"io"
"net/url"
"sort"
"strconv"
"time"

"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
Expand Down Expand Up @@ -79,17 +77,10 @@ func splunkHecToLogData(logger *zap.Logger, events []*splunk.Event, resourceCust
}

// splunkHecRawToLogData transforms raw splunk event into log
func splunkHecRawToLogData(bodyReader io.Reader, query url.Values, resourceCustomizer func(pcommon.Resource), config *Config) (plog.Logs, int, error) {
func splunkHecRawToLogData(bodyReader io.Reader, query url.Values, resourceCustomizer func(pcommon.Resource), config *Config, timestamp pcommon.Timestamp) (plog.Logs, int, error) {
ld := plog.NewLogs()
rl := ld.ResourceLogs().AppendEmpty()

var timestamp pcommon.Timestamp
if query.Has(queryTime) {
if t, err := strconv.ParseInt(query.Get(queryTime), 10, 64); err == nil {
timestamp = pcommon.NewTimestampFromTime(time.Unix(t, 0))
}
}

appendSplunkMetadata(rl, config.HecToOtelAttrs, query.Get(host), query.Get(source), query.Get(sourcetype), query.Get(index))
if resourceCustomizer != nil {
resourceCustomizer(rl.Resource())
Expand Down
16 changes: 10 additions & 6 deletions receiver/splunkhecreceiver/splunk_to_logdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,9 @@ func Test_SplunkHecToLogData(t *testing.T) {
}

func Test_SplunkHecRawToLogData(t *testing.T) {
const (
testTimestampVal = 1695146885
)
hecConfig := &Config{
HecToOtelAttrs: splunk.HecToOtelAttrs{
Source: "mysource",
Expand All @@ -356,6 +359,7 @@ func Test_SplunkHecRawToLogData(t *testing.T) {
query map[string][]string
assertResource func(t *testing.T, got plog.Logs, slLen int)
config *Config
time pcommon.Timestamp
}{
{
name: "all_mapping",
Expand Down Expand Up @@ -397,9 +401,10 @@ func Test_SplunkHecRawToLogData(t *testing.T) {
} else {
assert.Fail(t, "index is not added to attributes")
}
assert.Equal(t, time.Unix(1695146885, 0).Unix(), got.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Timestamp().AsTime().Unix())
assert.Equal(t, time.Unix(testTimestampVal, 0).Unix(), got.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Timestamp().AsTime().Unix())
},
config: hecConfig,
time: pcommon.NewTimestampFromTime(time.Unix(testTimestampVal, 0)),
},
{
name: "some_mapping",
Expand Down Expand Up @@ -439,19 +444,18 @@ func Test_SplunkHecRawToLogData(t *testing.T) {
return reader
}(),
query: func() map[string][]string {
m := make(map[string][]string)
m[queryTime] = []string{"1695146885"}
return m
return map[string][]string{}
}(),
assertResource: func(t *testing.T, got plog.Logs, slLen int) {
assert.Equal(t, 1, got.LogRecordCount())
assert.Equal(t, time.Unix(1695146885, 0).Unix(), got.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Timestamp().AsTime().Unix())
assert.Equal(t, time.Unix(testTimestampVal, 0).Unix(), got.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Timestamp().AsTime().Unix())
},
config: func() *Config {
return &Config{
Splitting: SplittingStrategyNone,
}
}(),
time: pcommon.NewTimestampFromTime(time.Unix(testTimestampVal, 0)),
},
{
name: "line splitting",
Expand All @@ -473,7 +477,7 @@ func Test_SplunkHecRawToLogData(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, slLen, err := splunkHecRawToLogData(tt.sc, tt.query, func(resource pcommon.Resource) {}, tt.config)
result, slLen, err := splunkHecRawToLogData(tt.sc, tt.query, func(resource pcommon.Resource) {}, tt.config, tt.time)
require.NoError(t, err)
tt.assertResource(t, result, slLen)
})
Expand Down

0 comments on commit eab5116

Please sign in to comment.