Skip to content

Commit

Permalink
Extract time query parameter in splunkhecreceiver (#27008)
Browse files Browse the repository at this point in the history
This change adds a new feature in splunk hec receiver allowing users to
specify time query parameter. This is to put parity between splunk hec
receiver and splunk HEC raw endpoint

Note: the validation response is slightly different than what splunk
returns. if invalid input is provided for time:
- splunk returns 400 response with this error message:
```
{
    "text": "Error in handling indexed fields",
    "code": 15,
    "invalid-event-number": 0
}
```
- however this doesn't make sense for splunk hec receiver as it is not
indexing anything. Instead, splunk hec receiver will return:
```
{"text":"Invalid data format","code":6}
```

Fixes #27006

---------

Co-authored-by: Antoine Toulme <[email protected]>
  • Loading branch information
splunkericl and atoulme authored Sep 25, 2023
1 parent 20d8727 commit 7db0833
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 3 deletions.
27 changes: 27 additions & 0 deletions .chloggen/splunkhecreceiver-add-time-param.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: splunkhecreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Update splunk hec receiver to extract time query parameter if it is provided

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27006]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
17 changes: 16 additions & 1 deletion receiver/splunkhecreceiver/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"io"
"net"
"net/http"
"strconv"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -265,7 +266,21 @@ func (r *splunkReceiver) handleRawReq(resp http.ResponseWriter, req *http.Reques
}

resourceCustomizer := r.createResourceCustomizer(req)
ld, slLen, err := splunkHecRawToLogData(bodyReader, req.URL.Query(), resourceCustomizer, r.config)
query := req.URL.Query()
var timestamp pcommon.Timestamp
if query.Has(queryTime) {
t, err := strconv.ParseInt(query.Get(queryTime), 10, 64)
if t < 0 {
err = errors.New("time cannot be less than 0")
}
if err != nil {
r.failRequest(ctx, resp, http.StatusBadRequest, invalidFormatRespBody, 0, err)
return
}
timestamp = pcommon.NewTimestampFromTime(time.Unix(t, 0))
}

ld, slLen, err := splunkHecRawToLogData(bodyReader, query, resourceCustomizer, r.config, timestamp)
if err != nil {
r.failRequest(ctx, resp, http.StatusInternalServerError, errInternalServerError, slLen, err)
return
Expand Down
38 changes: 38 additions & 0 deletions receiver/splunkhecreceiver/receiver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -996,6 +996,44 @@ func Test_splunkhecReceiver_handleRawReq(t *testing.T) {
assert.Equal(t, responseErrGzipReader, body)
},
},
{
name: "raw_endpoint_bad_time_negative_number",
req: func() *http.Request {
msgBytes, err := json.Marshal(splunkMsg)
require.NoError(t, err)

req := httptest.NewRequest("POST", "http://localhost/service/collector/raw", bytes.NewReader(msgBytes))

q := req.URL.Query()
q.Add(queryTime, "-5")
req.URL.RawQuery = q.Encode()

return req
}(),
assertResponse: func(t *testing.T, status int, body string) {
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, responseInvalidDataFormat, body)
},
},
{
name: "raw_endpoint_bad_time_not_a_number",
req: func() *http.Request {
msgBytes, err := json.Marshal(splunkMsg)
require.NoError(t, err)

req := httptest.NewRequest("POST", "http://localhost/service/collector/raw", bytes.NewReader(msgBytes))

q := req.URL.Query()
q.Add(queryTime, "notANumber")
req.URL.RawQuery = q.Encode()

return req
}(),
assertResponse: func(t *testing.T, status int, body string) {
assert.Equal(t, http.StatusBadRequest, status)
assert.Equal(t, responseInvalidDataFormat, body)
},
},
}

for _, tt := range tests {
Expand Down
6 changes: 5 additions & 1 deletion receiver/splunkhecreceiver/splunk_to_logdata.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ const (
source = "source"
sourcetype = "sourcetype"
host = "host"
queryTime = "time"
)

var (
Expand Down Expand Up @@ -76,9 +77,10 @@ func splunkHecToLogData(logger *zap.Logger, events []*splunk.Event, resourceCust
}

// splunkHecRawToLogData transforms raw splunk event into log
func splunkHecRawToLogData(bodyReader io.Reader, query url.Values, resourceCustomizer func(pcommon.Resource), config *Config) (plog.Logs, int, error) {
func splunkHecRawToLogData(bodyReader io.Reader, query url.Values, resourceCustomizer func(pcommon.Resource), config *Config, timestamp pcommon.Timestamp) (plog.Logs, int, error) {
ld := plog.NewLogs()
rl := ld.ResourceLogs().AppendEmpty()

appendSplunkMetadata(rl, config.HecToOtelAttrs, query.Get(host), query.Get(source), query.Get(sourcetype), query.Get(index))
if resourceCustomizer != nil {
resourceCustomizer(rl.Resource())
Expand All @@ -91,12 +93,14 @@ func splunkHecRawToLogData(bodyReader io.Reader, query url.Values, resourceCusto
}
logRecord := sl.LogRecords().AppendEmpty()
logRecord.Body().SetStr(string(b))
logRecord.SetTimestamp(timestamp)
} else {
sc := bufio.NewScanner(bodyReader)
for sc.Scan() {
logRecord := sl.LogRecords().AppendEmpty()
logLine := sc.Text()
logRecord.Body().SetStr(logLine)
logRecord.SetTimestamp(timestamp)
}
}

Expand Down
13 changes: 12 additions & 1 deletion receiver/splunkhecreceiver/splunk_to_logdata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"bytes"
"io"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -341,6 +342,9 @@ func Test_SplunkHecToLogData(t *testing.T) {
}

func Test_SplunkHecRawToLogData(t *testing.T) {
const (
testTimestampVal = 1695146885
)
hecConfig := &Config{
HecToOtelAttrs: splunk.HecToOtelAttrs{
Source: "mysource",
Expand All @@ -355,6 +359,7 @@ func Test_SplunkHecRawToLogData(t *testing.T) {
query map[string][]string
assertResource func(t *testing.T, got plog.Logs, slLen int)
config *Config
time pcommon.Timestamp
}{
{
name: "all_mapping",
Expand All @@ -369,6 +374,7 @@ func Test_SplunkHecRawToLogData(t *testing.T) {
m[sourcetype] = k
m[source] = k
m[index] = k
m[queryTime] = []string{"1695146885"}
return m
}(),
assertResource: func(t *testing.T, got plog.Logs, slLen int) {
Expand All @@ -395,8 +401,10 @@ func Test_SplunkHecRawToLogData(t *testing.T) {
} else {
assert.Fail(t, "index is not added to attributes")
}
assert.Equal(t, time.Unix(testTimestampVal, 0).Unix(), got.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Timestamp().AsTime().Unix())
},
config: hecConfig,
time: pcommon.NewTimestampFromTime(time.Unix(testTimestampVal, 0)),
},
{
name: "some_mapping",
Expand Down Expand Up @@ -425,6 +433,7 @@ func Test_SplunkHecRawToLogData(t *testing.T) {
} else {
assert.Fail(t, "sourcetype is not added to attributes")
}
assert.Equal(t, time.Unix(0, 0).Unix(), got.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Timestamp().AsTime().Unix())
},
config: hecConfig,
},
Expand All @@ -439,12 +448,14 @@ func Test_SplunkHecRawToLogData(t *testing.T) {
}(),
assertResource: func(t *testing.T, got plog.Logs, slLen int) {
assert.Equal(t, 1, got.LogRecordCount())
assert.Equal(t, time.Unix(testTimestampVal, 0).Unix(), got.ResourceLogs().At(0).ScopeLogs().At(0).LogRecords().At(0).Timestamp().AsTime().Unix())
},
config: func() *Config {
return &Config{
Splitting: SplittingStrategyNone,
}
}(),
time: pcommon.NewTimestampFromTime(time.Unix(testTimestampVal, 0)),
},
{
name: "line splitting",
Expand All @@ -466,7 +477,7 @@ func Test_SplunkHecRawToLogData(t *testing.T) {

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, slLen, err := splunkHecRawToLogData(tt.sc, tt.query, func(resource pcommon.Resource) {}, tt.config)
result, slLen, err := splunkHecRawToLogData(tt.sc, tt.query, func(resource pcommon.Resource) {}, tt.config, tt.time)
require.NoError(t, err)
tt.assertResource(t, result, slLen)
})
Expand Down

0 comments on commit 7db0833

Please sign in to comment.