diff --git a/operator/builtin/input/journald/journald.go b/operator/builtin/input/journald/journald.go index 6a69a0ce..0293015d 100644 --- a/operator/builtin/input/journald/journald.go +++ b/operator/builtin/input/journald/journald.go @@ -222,11 +222,17 @@ func (operator *JournaldInput) parseJournalEntry(line []byte) (*entry.Entry, str return nil, "", errors.New("journald field for cursor is not a string") } - entry, err := operator.NewEntry(body) + entry, err := operator.NewEntry(body["MESSAGE"]) if err != nil { return nil, "", fmt.Errorf("failed to create entry: %s", err) } + delete(body, "MESSAGE") + + for k, v := range body { + entry.AddAttribute(k, v.(string)) + } + entry.Timestamp = time.Unix(0, timestampInt*1000) // in microseconds return entry, cursorString, nil } diff --git a/operator/builtin/input/journald/journald_test.go b/operator/builtin/input/journald/journald_test.go index d9197ceb..ec35128f 100644 --- a/operator/builtin/input/journald/journald_test.go +++ b/operator/builtin/input/journald/journald_test.go @@ -72,14 +72,14 @@ func TestInputJournald(t *testing.T) { require.NoError(t, err) defer op.Stop() - expected := map[string]interface{}{ + expectedBody := "run-docker-netns-4f76d707d45f.mount: Succeeded." + expectedAttributes := map[string]string{ "_BOOT_ID": "c4fa36de06824d21835c05ff80c54468", "_CAP_EFFECTIVE": "0", "_TRANSPORT": "journal", "_UID": "1000", "_EXE": "/usr/lib/systemd/systemd", "_AUDIT_LOGINUID": "1000", - "MESSAGE": "run-docker-netns-4f76d707d45f.mount: Succeeded.", "_PID": "13894", "_CMDLINE": "/lib/systemd/systemd --user", "_MACHINE_ID": "d777d00e7caf45fbadedceba3975520d", @@ -111,7 +111,8 @@ func TestInputJournald(t *testing.T) { select { case e := <-received: - require.Equal(t, expected, e.Body) + require.Equal(t, expectedAttributes, e.Attributes) + require.Equal(t, expectedBody, e.Body) case <-time.After(time.Second): require.FailNow(t, "Timed out waiting for entry to be read") }