Skip to content

Commit

Permalink
Merge branch 'main' into feature/ingestionretry
Browse files Browse the repository at this point in the history
  • Loading branch information
fatsheep9146 authored Oct 5, 2023
2 parents 26de91e + 90b801d commit 16ff03a
Show file tree
Hide file tree
Showing 11 changed files with 77 additions and 53 deletions.
27 changes: 27 additions & 0 deletions .chloggen/drosiek-syslog-receiver-attributes.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: syslogparser

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: return correct structure from syslog parser

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [27414]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
18 changes: 1 addition & 17 deletions pkg/stanza/fileconsumer/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,29 +143,13 @@ func (m *Manager) consume(ctx context.Context, paths []string) {
defer wg.Done()
r.ReadToEnd(ctx)
// Delete a file if deleteAfterRead is enabled and we reached the end of the file
if m.deleteAfterRead && r.EOF {
if m.deleteAfterRead && r.AtEOF() {
r.Delete()
}
}(r)
}
wg.Wait()

// Save off any files that were not fully read
if m.deleteAfterRead {
unfinished := make([]*reader.Reader, 0, len(readers))
for _, r := range readers {
if !r.EOF {
unfinished = append(unfinished, r)
}
}
readers = unfinished

// If all files were read and deleted then no need to do bookkeeping on readers
if len(readers) == 0 {
return
}
}

// Any new files that appear should be consumed entirely
m.readerFactory.FromBeginning = true

Expand Down
10 changes: 1 addition & 9 deletions pkg/stanza/fileconsumer/file_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@
package fileconsumer

import (
"bytes"
"context"
"fmt"
"os"
Expand Down Expand Up @@ -1384,7 +1383,7 @@ func TestDeleteAfterRead(t *testing.T) {
cfg.DeleteAfterRead = true
emitCalls := make(chan *emitParams, totalLines)
operator, _ := buildTestManager(t, cfg, withEmitChan(emitCalls))

operator.persister = testutil.NewMockPersister("test")
operator.poll(context.Background())
actualTokens = append(actualTokens, waitForNTokens(t, emitCalls, totalLines)...)

Expand Down Expand Up @@ -1518,7 +1517,6 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {
bytesPerLine := 100
shortFileLine := tokenWithLength(bytesPerLine - 1)
longFileLines := 100000
longFileSize := longFileLines * bytesPerLine
longFileFirstLine := "first line of long file\n"

require.NoError(t, featuregate.GlobalRegistry().Set(allowFileDeletion.ID(), true))
Expand Down Expand Up @@ -1580,12 +1578,6 @@ func TestDeleteAfterRead_SkipPartials(t *testing.T) {

// long file was partially consumed and should NOT have been deleted
require.FileExists(t, longFile.Name())

// Verify that only long file is remembered and that (0 < offset < fileSize)
require.Equal(t, 1, len(operator.knownFiles))
reader := operator.knownFiles[0]
require.True(t, bytes.HasPrefix(reader.Fingerprint.FirstBytes, []byte(longFileFirstLine)))
require.Less(t, reader.Offset, int64(longFileSize))
}

func TestHeaderPersistance(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/internal/reader/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (f *Factory) build(file *os.File, m *Metadata, lineSplitFunc bufio.SplitFun
Metadata: m,
file: file,
FileName: file.Name(),
SugaredLogger: f.SugaredLogger.With("path", file.Name()),
logger: f.SugaredLogger.With("path", file.Name()),
decoder: decode.New(f.Encoding),
lineSplitFunc: lineSplitFunc,
}
Expand Down
32 changes: 18 additions & 14 deletions pkg/stanza/fileconsumer/internal/reader/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,17 +38,17 @@ type Metadata struct {

// Reader manages a single file
type Reader struct {
*zap.SugaredLogger
*Config
*Metadata
FileName string
EOF bool
logger *zap.SugaredLogger
file *os.File
lineSplitFunc bufio.SplitFunc
splitFunc bufio.SplitFunc
decoder *decode.Decoder
headerReader *header.Reader
processFunc emit.Callback
eof bool
}

// offsetToEnd sets the starting offset
Expand All @@ -71,7 +71,7 @@ func (r *Reader) NewFingerprintFromFile() (*fingerprint.Fingerprint, error) {
// ReadToEnd will read until the end of the file
func (r *Reader) ReadToEnd(ctx context.Context) {
if _, err := r.file.Seek(r.Offset, 0); err != nil {
r.Errorw("Failed to seek", zap.Error(err))
r.logger.Errorw("Failed to seek", zap.Error(err))
return
}

Expand All @@ -87,18 +87,18 @@ func (r *Reader) ReadToEnd(ctx context.Context) {

ok := s.Scan()
if !ok {
r.EOF = true
r.eof = true
if err := s.Error(); err != nil {
// If Scan returned an error then we are not guaranteed to be at the end of the file
r.EOF = false
r.Errorw("Failed during scan", zap.Error(err))
r.eof = false
r.logger.Errorw("Failed during scan", zap.Error(err))
}
break
}

token, err := r.decoder.Decode(s.Bytes())
if err != nil {
r.Errorw("decode: %w", zap.Error(err))
r.logger.Errorw("decode: %w", zap.Error(err))
} else if err := r.processFunc(ctx, token, r.FileAttributes); err != nil {
if errors.Is(err, header.ErrEndOfHeader) {
r.finalizeHeader()
Expand All @@ -110,12 +110,12 @@ func (r *Reader) ReadToEnd(ctx context.Context) {
r.splitFunc = r.lineSplitFunc
r.processFunc = r.Emit
if _, err = r.file.Seek(r.Offset, 0); err != nil {
r.Errorw("Failed to seek post-header", zap.Error(err))
r.logger.Errorw("Failed to seek post-header", zap.Error(err))
return
}
s = scanner.New(r, r.MaxLogSize, scanner.DefaultBufferSize, r.Offset, r.splitFunc)
} else {
r.Errorw("process: %w", zap.Error(err))
r.logger.Errorw("process: %w", zap.Error(err))
}
}

Expand All @@ -125,7 +125,7 @@ func (r *Reader) ReadToEnd(ctx context.Context) {

func (r *Reader) finalizeHeader() {
if err := r.headerReader.Stop(); err != nil {
r.Errorw("Failed to stop header pipeline during finalization", zap.Error(err))
r.logger.Errorw("Failed to stop header pipeline during finalization", zap.Error(err))
}
r.headerReader = nil
r.HeaderFinalized = true
Expand All @@ -138,22 +138,22 @@ func (r *Reader) Delete() {
}
r.Close()
if err := os.Remove(r.FileName); err != nil {
r.Errorf("could not delete %s", r.FileName)
r.logger.Errorf("could not delete %s", r.FileName)
}
}

// Close will close the file
func (r *Reader) Close() {
if r.file != nil {
if err := r.file.Close(); err != nil {
r.Debugw("Problem closing reader", zap.Error(err))
r.logger.Debugw("Problem closing reader", zap.Error(err))
}
r.file = nil
}

if r.headerReader != nil {
if err := r.headerReader.Stop(); err != nil {
r.Errorw("Failed to stop header pipeline", zap.Error(err))
r.logger.Errorw("Failed to stop header pipeline", zap.Error(err))
}
}
}
Expand Down Expand Up @@ -201,8 +201,12 @@ func (r *Reader) ValidateFingerprint() bool {
}
refreshedFingerprint, err := fingerprint.New(r.file, r.FingerprintSize)
if err != nil {
r.Debugw("Failed to create fingerprint", zap.Error(err))
r.logger.Debugw("Failed to create fingerprint", zap.Error(err))
return false
}
return refreshedFingerprint.StartsWith(r.Fingerprint)
}

func (r *Reader) AtEOF() bool {
return r.eof
}
2 changes: 1 addition & 1 deletion pkg/stanza/fileconsumer/reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,7 +186,7 @@ func TestTokenizationTooLongWithLineStartPattern(t *testing.T) {
require.NoError(t, err)

r.ReadToEnd(context.Background())
require.True(t, r.EOF)
require.True(t, r.AtEOF())

for _, expected := range expected {
require.Equal(t, expected, readToken(t, emitChan))
Expand Down
4 changes: 2 additions & 2 deletions pkg/stanza/operator/input/syslog/syslog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ var (
"msg_id": "ID52020",
"priority": 86,
"proc_id": "23108",
"structured_data": map[string]map[string]string{
"SecureAuth@27389": {
"structured_data": map[string]interface{}{
"SecureAuth@27389": map[string]interface{}{
"PEN": "27389",
"Realm": "SecureAuth0",
"UserHostAddress": "192.168.2.132",
Expand Down
12 changes: 6 additions & 6 deletions pkg/stanza/operator/parser/syslog/data.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ func CreateCases(basicConfig func() *Config) ([]Case, error) {
"msg_id": "ID52020",
"priority": 86,
"proc_id": "23108",
"structured_data": map[string]map[string]string{
"SecureAuth@27389": {
"structured_data": map[string]interface{}{
"SecureAuth@27389": map[string]interface{}{
"PEN": "27389",
"Realm": "SecureAuth0",
"UserHostAddress": "192.168.2.132",
Expand Down Expand Up @@ -197,8 +197,8 @@ func CreateCases(basicConfig func() *Config) ([]Case, error) {
"msg_id": "ID52020",
"priority": 86,
"proc_id": "23108",
"structured_data": map[string]map[string]string{
"SecureAuth@27389": {
"structured_data": map[string]interface{}{
"SecureAuth@27389": map[string]interface{}{
"PEN": "27389",
"Realm": "SecureAuth0",
"UserHostAddress": "192.168.2.132",
Expand Down Expand Up @@ -235,8 +235,8 @@ func CreateCases(basicConfig func() *Config) ([]Case, error) {
"msg_id": "ID52020",
"priority": 86,
"proc_id": "23108",
"structured_data": map[string]map[string]string{
"SecureAuth@27389": {
"structured_data": map[string]interface{}{
"SecureAuth@27389": map[string]interface{}{
"PEN": "27389",
"Realm": "SecureAuth0",
"UserHostAddress": "192.168.2.132",
Expand Down
18 changes: 17 additions & 1 deletion pkg/stanza/operator/parser/syslog/syslog.go
Original file line number Diff line number Diff line change
Expand Up @@ -243,7 +243,7 @@ func (s *Parser) toSafeMap(message map[string]interface{}) (map[string]interface
delete(message, key)
continue
}
message[key] = *v
message[key] = convertMap(*v)
default:
return nil, fmt.Errorf("key %s has unknown field of type %T", key, v)
}
Expand All @@ -252,6 +252,22 @@ func (s *Parser) toSafeMap(message map[string]interface{}) (map[string]interface
return message, nil
}

// convertMap converts map[string]map[string]string to map[string]interface{}
// which is expected by stanza converter
func convertMap(data map[string]map[string]string) map[string]interface{} {
ret := map[string]interface{}{}
for key, value := range data {
ret[key] = map[string]interface{}{}
r := ret[key].(map[string]interface{})

for k, v := range value {
r[k] = v
}
}

return ret
}

func toBytes(value interface{}) ([]byte, error) {
switch v := value.(type) {
case string:
Expand Down
3 changes: 2 additions & 1 deletion testbed/tests/scenarios.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ func ScenarioTestTraceNoBackend10kSPS(

tc.Sleep(tc.Duration)

rss, _, _ := tc.AgentMemoryInfo()
rss, _, err := tc.AgentMemoryInfo()
require.NoError(t, err)
assert.Less(t, configuration.ExpectedMinFinalRAM, rss)
}

Expand Down
2 changes: 1 addition & 1 deletion testbed/tests/trace_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ func TestTraceNoBackend10kSPS(t *testing.T) {
t,
testbed.NewOTLPTraceDataSender(testbed.DefaultHost, testbed.GetAvailablePort(t)),
testbed.NewOTLPDataReceiver(testbed.GetAvailablePort(t)),
testbed.ResourceSpec{ExpectedMaxCPU: 60, ExpectedMaxRAM: testConf.ExpectedMaxRAM},
testbed.ResourceSpec{ExpectedMaxCPU: 80, ExpectedMaxRAM: testConf.ExpectedMaxRAM},
performanceResultsSummary,
testConf,
)
Expand Down

0 comments on commit 16ff03a

Please sign in to comment.