Skip to content

Commit

Permalink
[receiver/journald] adds byte array message parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
fuero committed Oct 28, 2024
1 parent 6323659 commit 752b464
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 16 deletions.
21 changes: 11 additions & 10 deletions pkg/stanza/operator/input/journald/config_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,17 @@ func NewConfigWithID(operatorID string) *Config {
type Config struct {
helper.InputConfig `mapstructure:",squash"`

Directory *string `mapstructure:"directory,omitempty"`
Files []string `mapstructure:"files,omitempty"`
StartAt string `mapstructure:"start_at,omitempty"`
Units []string `mapstructure:"units,omitempty"`
Priority string `mapstructure:"priority,omitempty"`
Matches []MatchConfig `mapstructure:"matches,omitempty"`
Identifiers []string `mapstructure:"identifiers,omitempty"`
Grep string `mapstructure:"grep,omitempty"`
Dmesg bool `mapstructure:"dmesg,omitempty"`
All bool `mapstructure:"all,omitempty"`
Directory *string `mapstructure:"directory,omitempty"`
Files []string `mapstructure:"files,omitempty"`
StartAt string `mapstructure:"start_at,omitempty"`
Units []string `mapstructure:"units,omitempty"`
Priority string `mapstructure:"priority,omitempty"`
Matches []MatchConfig `mapstructure:"matches,omitempty"`
Identifiers []string `mapstructure:"identifiers,omitempty"`
Grep string `mapstructure:"grep,omitempty"`
Dmesg bool `mapstructure:"dmesg,omitempty"`
All bool `mapstructure:"all,omitempty"`
ConvertMessageBytes bool `mapstructure:"convert_message_bytes,omitempty"`
}

type MatchConfig map[string]string
3 changes: 2 additions & 1 deletion pkg/stanza/operator/input/journald/config_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error
return exec.CommandContext(ctx, "journalctl", journalArgs...) // #nosec - ...
// journalctl is an executable that is required for this operator to function
},
json: jsoniter.ConfigFastest,
convert_message_bytes: c.ConvertMessageBytes,
json: jsoniter.ConfigFastest,
}, nil
}

Expand Down
29 changes: 24 additions & 5 deletions pkg/stanza/operator/input/journald/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ type Input struct {

newCmd func(ctx context.Context, cursor []byte) cmd

persister operator.Persister
json jsoniter.API
cancel context.CancelFunc
wg sync.WaitGroup
errChan chan error
persister operator.Persister
json jsoniter.API
convert_message_bytes bool
cancel context.CancelFunc
wg sync.WaitGroup
errChan chan error
}

type cmd interface {
Expand Down Expand Up @@ -118,6 +119,9 @@ func (operator *Input) newJournalctl(ctx context.Context) (*journalctl, error) {
return nil, fmt.Errorf("failed to get journalctl state: %w", err)
}

operator.persister = persister

Check failure on line 122 in pkg/stanza/operator/input/journald/input.go

View workflow job for this annotation

GitHub Actions / govulncheck (pkg)

undefined: persister

Check failure on line 122 in pkg/stanza/operator/input/journald/input.go

View workflow job for this annotation

GitHub Actions / govulncheck (receiver-1)

undefined: persister

// Start journalctl
journal := operator.newCmd(ctx, cursor)
jctl := &journalctl{
cmd: journal,
Expand Down Expand Up @@ -227,6 +231,21 @@ func (operator *Input) parseJournalEntry(line []byte) (*entry.Entry, string, err
return nil, "", fmt.Errorf("parse timestamp: %w", err)
}

if operator.convert_message_bytes {
// Convert the message bytes to string if given as a byte array
msg_arr, ok := body["MESSAGE"].([]interface{})
if ok {
var bytes []byte
for _, val := range msg_arr {
float_val, ok := val.(float64)
if ok {
bytes = append(bytes, byte(int(float_val)))
}
}
body["MESSAGE"] = string(bytes[:])
}
}

delete(body, "__REALTIME_TIMESTAMP")

cursor, ok := body["__CURSOR"]
Expand Down
1 change: 1 addition & 0 deletions receiver/journaldreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ Journald receiver requires that:
| `dmesg` | 'false' | Show only kernel messages. This shows logs from current boot and adds the match `_TRANSPORT=kernel`. See [Multiple filtering options](#multiple-filtering-options) examples. |
| `storage` | none | The ID of a storage extension to be used to store cursors. Cursors allow the receiver to pick up where it left off in the case of a collector restart. If no storage extension is used, the receiver will manage cursors in memory only. |
| `all` | 'false' | If `true`, very long logs and logs with unprintable characters will also be included. |
| `convert_message_bytes` | 'false' | If `true` and if the `MESSAGE` field is read [as an array of bytes](https://github.com/systemd/systemd/blob/main/docs/JOURNAL_EXPORT_FORMATS.md#journal-json-format), the array will be converted to string. |
| `retry_on_failure.enabled` | `false` | If `true`, the receiver will pause reading a file and attempt to resend the current batch of logs if it encounters an error from downstream components. |
| `retry_on_failure.initial_interval` | `1 second` | Time to wait after the first failure before retrying. |
| `retry_on_failure.max_interval` | `30 seconds` | Upper bound on retry backoff interval. Once this value is reached the delay between consecutive retries will remain constant at the specified value. |
Expand Down

0 comments on commit 752b464

Please sign in to comment.