Skip to content

Commit

Permalink
[receiver/journald] adds byte array message parsing (#36005)
Browse files Browse the repository at this point in the history
#### Description

Adds message conversion from byte array support to journald receiver.
Introduces a config setting for enabling this behaviour.

#### Link to tracking issue

Fixes #35964 

#### Testing

Adds tests for decoding a message successfully to string

Tested on a real world example as well with the resulting binary

#### Documentation

Adds description of the setting the PR introduces
  • Loading branch information
fuero authored Oct 30, 2024
1 parent 7016433 commit 24e6ba0
Show file tree
Hide file tree
Showing 5 changed files with 63 additions and 17 deletions.
27 changes: 27 additions & 0 deletions .chloggen/36005-add-bytes-array-message-parsing.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: 'enhancement'

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: journaldreceiver

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: adds ability to parse journald's MESSAGE field as a string if desired

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [36005]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
23 changes: 12 additions & 11 deletions pkg/stanza/operator/input/journald/config_all.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,18 @@ func NewConfigWithID(operatorID string) *Config {
type Config struct {
helper.InputConfig `mapstructure:",squash"`

Directory *string `mapstructure:"directory,omitempty"`
Files []string `mapstructure:"files,omitempty"`
StartAt string `mapstructure:"start_at,omitempty"`
Units []string `mapstructure:"units,omitempty"`
Priority string `mapstructure:"priority,omitempty"`
Matches []MatchConfig `mapstructure:"matches,omitempty"`
Identifiers []string `mapstructure:"identifiers,omitempty"`
Grep string `mapstructure:"grep,omitempty"`
Dmesg bool `mapstructure:"dmesg,omitempty"`
All bool `mapstructure:"all,omitempty"`
Namespace string `mapstructure:"namespace,omitempty"`
Directory *string `mapstructure:"directory,omitempty"`
Files []string `mapstructure:"files,omitempty"`
StartAt string `mapstructure:"start_at,omitempty"`
Units []string `mapstructure:"units,omitempty"`
Priority string `mapstructure:"priority,omitempty"`
Matches []MatchConfig `mapstructure:"matches,omitempty"`
Identifiers []string `mapstructure:"identifiers,omitempty"`
Grep string `mapstructure:"grep,omitempty"`
Dmesg bool `mapstructure:"dmesg,omitempty"`
All bool `mapstructure:"all,omitempty"`
Namespace string `mapstructure:"namespace,omitempty"`
ConvertMessageBytes bool `mapstructure:"convert_message_bytes,omitempty"`
}

type MatchConfig map[string]string
3 changes: 2 additions & 1 deletion pkg/stanza/operator/input/journald/config_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ func (c Config) Build(set component.TelemetrySettings) (operator.Operator, error
return exec.CommandContext(ctx, "journalctl", journalArgs...) // #nosec - ...
// journalctl is an executable that is required for this operator to function
},
json: jsoniter.ConfigFastest,
convertMessageBytes: c.ConvertMessageBytes,
json: jsoniter.ConfigFastest,
}, nil
}

Expand Down
26 changes: 21 additions & 5 deletions pkg/stanza/operator/input/journald/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,12 @@ type Input struct {

newCmd func(ctx context.Context, cursor []byte) cmd

persister operator.Persister
json jsoniter.API
cancel context.CancelFunc
wg sync.WaitGroup
errChan chan error
persister operator.Persister
json jsoniter.API
convertMessageBytes bool
cancel context.CancelFunc
wg sync.WaitGroup
errChan chan error
}

type cmd interface {
Expand Down Expand Up @@ -227,6 +228,21 @@ func (operator *Input) parseJournalEntry(line []byte) (*entry.Entry, string, err
return nil, "", fmt.Errorf("parse timestamp: %w", err)
}

if operator.convertMessageBytes {
// Convert the message bytes to string if given as a byte array
msgArr, msgArrayOk := body["MESSAGE"].([]any)
if msgArrayOk {
var bytes []byte
for _, val := range msgArr {
floatVal, floatCheckOk := val.(float64)
if floatCheckOk {
bytes = append(bytes, byte(int(floatVal)))
}
}
body["MESSAGE"] = string(bytes)
}
}

delete(body, "__REALTIME_TIMESTAMP")

cursor, ok := body["__CURSOR"]
Expand Down
1 change: 1 addition & 0 deletions receiver/journaldreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ Journald receiver requires that:
| `storage` | none | The ID of a storage extension to be used to store cursors. Cursors allow the receiver to pick up where it left off in the case of a collector restart. If no storage extension is used, the receiver will manage cursors in memory only. |
| `all` | 'false' | If `true`, very long logs and logs with unprintable characters will also be included. |
| `namespace` | | Will query the given namespace. See man page [`systemd-journald.service(8)`](https://www.man7.org/linux/man-pages/man8/systemd-journald.service.8.html#JOURNAL_NAMESPACES) for details. |
| `convert_message_bytes` | 'false' | If `true` and if the `MESSAGE` field is read [as an array of bytes](https://github.com/systemd/systemd/blob/main/docs/JOURNAL_EXPORT_FORMATS.md#journal-json-format), the array will be converted to string. |
| `retry_on_failure.enabled` | `false` | If `true`, the receiver will pause reading a file and attempt to resend the current batch of logs if it encounters an error from downstream components. |
| `retry_on_failure.initial_interval` | `1 second` | Time to wait after the first failure before retrying. |
| `retry_on_failure.max_interval` | `30 seconds` | Upper bound on retry backoff interval. Once this value is reached the delay between consecutive retries will remain constant at the specified value. |
Expand Down

0 comments on commit 24e6ba0

Please sign in to comment.