diff --git a/component/loki/source/windowsevent/bookmark.go b/component/loki/source/windowsevent/bookmark.go index 416f37132332..5f91a1367295 100644 --- a/component/loki/source/windowsevent/bookmark.go +++ b/component/loki/source/windowsevent/bookmark.go @@ -7,12 +7,8 @@ package windowsevent import ( "github.com/grafana/loki/clients/pkg/promtail/targets/windows/win_eventlog" - "os" - "path/filepath" ) -const bookmarkKey = "bookmark" - type bookMark struct { handle win_eventlog.EvtHandle isNew bool @@ -20,26 +16,17 @@ type bookMark struct { } type db struct { - db *KVDB + file *BookmarkFile path string } -func newBookmarkDB(path string) (*db, error) { - - pdb, err := NewKVDB(path) +func newBookmarkDB(path string, legacyPath string) (*db, error) { + pdb, err := NewBookmarkFile(path, legacyPath) if err != nil { - // Let's try to recreate the file, it could be mangled. - err = os.Remove(path) - if err != nil { - return nil, err - } - pdb, err = NewKVDB(path) - if err != nil { - return nil, err - } + return nil, err } return &db{ - db: pdb, + file: pdb, path: path, }, nil } @@ -49,14 +36,7 @@ func newBookmarkDB(path string) (*db, error) { func (pdb *db) newBookMark() (*bookMark, error) { // 16kb buffer for rendering bookmark buf := make([]byte, 16<<10) - var bookmark string - pdb.transitionXML() - valBytes, _, err := pdb.db.Get("bookmark", bookmarkKey) - if err != nil { - return nil, err - } - bookmark = string(valBytes) - + bookmark := pdb.file.Get() // creates a new bookmark file if none exists. if bookmark == "" { bm, err := win_eventlog.CreateBookmark("") @@ -82,37 +62,11 @@ func (pdb *db) newBookMark() (*bookMark, error) { }, nil } -func (pdb *db) transitionXML() { - // See if we can convert the old bookmark to the new path. - parentPath := filepath.Dir(pdb.path) - bookmarkXML := filepath.Join(parentPath, "bookmark.xml") - _, err := os.Stat(bookmarkXML) - // Only continue if we can access the file. - if err != nil { - return - } - xmlBytes, err := os.ReadFile(bookmarkXML) - if err != nil { - // Try to remove the file so we dont do this again. - _ = os.Remove(bookmarkXML) - return - } - - bookmark := string(xmlBytes) - _ = pdb.db.Put("bookmark", bookmarkKey, []byte(bookmark)) - _ = os.Remove(bookmarkXML) - -} - // save Saves the bookmark at the current event position. func (pdb *db) save(b *bookMark, event win_eventlog.EvtHandle) error { newBookmark, err := win_eventlog.UpdateBookmark(b.handle, event, b.buf) if err != nil { return err } - return pdb.db.Put("bookmark", bookmarkKey, []byte(newBookmark)) -} - -func (pdb *db) close() error { - return pdb.db.Close() + return pdb.file.Put(newBookmark) } diff --git a/component/loki/source/windowsevent/component_test.go b/component/loki/source/windowsevent/component_test.go index 2190994772a0..79a665522fe9 100644 --- a/component/loki/source/windowsevent/component_test.go +++ b/component/loki/source/windowsevent/component_test.go @@ -4,9 +4,6 @@ package windowsevent import ( "context" - "go.etcd.io/bbolt" - "os" - "path/filepath" "strings" "sync/atomic" "testing" @@ -25,52 +22,6 @@ func TestEventLogger(t *testing.T) { createTest(t, "") } -func TestBookmarkStorage(t *testing.T) { - datapath := createTest(t, "") - dbPath := filepath.Join(datapath, "bookmark.db") - // Lets remove the existing file and ensure it recovers correctly. - _ = os.WriteFile(dbPath, nil, 0600) - createTest(t, datapath) - fbytes, err := os.ReadFile(dbPath) - require.NoError(t, err) - for i := 0; i < len(fbytes); i++ { - // Set every tenth byte to zero. - if i%10 != 0 { - continue - } - fbytes[i] = 0 - } - fbytes[28] = 0 - _ = os.WriteFile(dbPath, fbytes, 0600) - createTest(t, datapath) -} - -func TestBookmarkTransition(t *testing.T) { - dir := createTest(t, "") - bb, err := bbolt.Open(filepath.Join(dir, "bookmark.db"), os.ModeExclusive, nil) - require.NoError(t, err) - - var bookmarkString string - err = bb.View(func(tx *bbolt.Tx) error { - b := tx.Bucket([]byte("bookmark")) - v := b.Get([]byte(bookmarkKey)) - require.NotNil(t, v) - nv := make([]byte, len(v)) - copy(nv, v) - bookmarkString = string(nv) - return nil - }) - require.NoError(t, err) - require.NoError(t, bb.Close()) - - xmlPath := filepath.Join(dir, "bookmark.xml") - err = os.WriteFile(xmlPath, []byte(bookmarkString), 0744) - require.NoError(t, err) - createTest(t, dir) - _, err = os.Stat(xmlPath) - require.ErrorIs(t, err, os.ErrNotExist) -} - func createTest(t *testing.T, dataPath string) string { var loggerName = "agent_test" //Setup Windows Event log with the log source name and logging levels @@ -104,13 +55,16 @@ func createTest(t *testing.T, dataPath string) string { }) require.NoError(t, err) ctx := context.Background() - ctx, cancelFunc := context.WithTimeout(ctx, 5*time.Second) + ctx, cancelFunc := context.WithTimeout(ctx, 60*time.Second) found := atomic.Bool{} go c.Run(ctx) tm := time.Now().Format(time.RFC3339Nano) err = wlog.Info(2, tm) require.NoError(t, err) + // This test is extremely flaky without these longer times. Since it has to work through all the noise + // that can be being added to the application event log. + // TODO: Create specific event log source so it does not have any noise. go func() { for { select { @@ -129,7 +83,7 @@ func createTest(t *testing.T, dataPath string) string { require.Eventually(t, func() bool { return found.Load() - }, 20*time.Second, 500*time.Millisecond) + }, 60*time.Second, 500*time.Millisecond) cancelFunc() diff --git a/component/loki/source/windowsevent/component_windows.go b/component/loki/source/windowsevent/component_windows.go index 38a27da26220..e3d03455e26e 100644 --- a/component/loki/source/windowsevent/component_windows.go +++ b/component/loki/source/windowsevent/component_windows.go @@ -106,12 +106,8 @@ func (c *Component) Update(args component.Arguments) error { if newArgs.BookmarkPath == "" { newArgs.BookmarkPath = filepath.Join(c.opts.DataPath, "bookmark.db") } - // If there is an existing item then the current one needs to be closed. - if c.target != nil && c.target.pdb != nil { - _ = c.target.pdb.close() - } - winTarget, err := NewTarget(c.opts.Logger, c.handle, nil, convertConfig(newArgs)) + winTarget, err := NewTarget(c.opts.Logger, c.handle, nil, convertConfig(newArgs), c.opts.DataPath) if err != nil { return err } diff --git a/component/loki/source/windowsevent/file.go b/component/loki/source/windowsevent/file.go new file mode 100644 index 000000000000..b986e054576b --- /dev/null +++ b/component/loki/source/windowsevent/file.go @@ -0,0 +1,139 @@ +//go:build windows + +package windowsevent + +import ( + "encoding/xml" + "fmt" + "os" + "path/filepath" + "regexp" + "strconv" + "strings" + "sync" +) + +// BookmarkFile represents reading and writing to a bookmark.xml file. +// These files are written sequentially in the format of bookmark..xml. +// Each individual bookmark file is immutable once written and is either read or deleted. +// The folder should ONLY contain bookmark files since all other files will be deleted. +type BookmarkFile struct { + mut sync.Mutex + index int + directory string + oldDefaultFile string + currentPath string +} + +var pathMatch, _ = regexp.Compile("bookmark.[0-9]+.xml") + +// NewBookmarkFile creates a wrapper around saving the bookmark file. +func NewBookmarkFile(directory string, oldpath string) (*BookmarkFile, error) { + _ = os.MkdirAll(filepath.Dir(directory), 0600) + index, currentPath := findMostRecentAndPurge(directory, oldpath) + return &BookmarkFile{ + directory: directory, + oldDefaultFile: oldpath, + index: index, + currentPath: currentPath, + }, nil +} + +// Put writes the value in to the newest file, and deletes the old one. +func (bf *BookmarkFile) Put(value string) error { + bf.mut.Lock() + defer bf.mut.Unlock() + + previousIndex := bf.index + bf.index++ + newFile := fmt.Sprintf("bookmark.%d.xml", bf.index) + newPath := filepath.Join(bf.directory, newFile) + err := os.WriteFile(newPath, []byte(value), 0600) + if err != nil { + return err + } + writtenVal, err := os.ReadFile(newPath) + if err != nil { + _ = os.Remove(newPath) + return err + } + // If for some reason the file was not written correctly then bail. + if string(writtenVal) != value { + _ = os.Remove(newPath) + return fmt.Errorf("unable to save data , contents written differ from value") + } + // Finally we can delete the old file. + oldFile := fmt.Sprintf("bookmark.%d.xml", previousIndex) + // We don't care if it errors. + _ = os.Remove(filepath.Join(bf.directory, oldFile)) + return nil +} + +// findMostRecentAndPurge will find the most recent file, including the legacy bookmark. +// If found will return the index and the path to the newest file. +func findMostRecentAndPurge(dir string, legacyPath string) (int, string) { + files, err := os.ReadDir(dir) + if err != nil { + return 1, "" + } + index := 0 + var path string + for _, f := range files { + if pathMatch.MatchString(f.Name()) { + stripped := strings.ReplaceAll(f.Name(), "bookmark.", "") + number := strings.ReplaceAll(stripped, ".xml", "") + foundNum, err := strconv.Atoi(number) + if err != nil { + continue + } + // Cant read the file. + content, err := os.ReadFile(filepath.Join(dir, f.Name())) + if err != nil { + continue + } + + // Need to ensure the file was properly saved previously. + if xml.Unmarshal(content, new(interface{})) != nil { + continue + } + if foundNum > index { + index = foundNum + path = f.Name() + } + } + } + // If we don't have a path then see if we can transition. + if path == "" && legacyPath != "" { + _, err = os.Stat(legacyPath) + if err == nil { + index = 1 + contents, _ := os.ReadFile(legacyPath) + // Try to write the file if we have some contents. + if len(contents) > 0 { + newFile := fmt.Sprintf("bookmark.%d.xml", 1) + newPath := filepath.Join(dir, newFile) + _ = os.WriteFile(newPath, contents, 0600) + _ = os.Remove(legacyPath) + } + } + } + if index == 0 { + index = 1 + } + + // Finally delete all files other than the found. + for _, f := range files { + if f.Name() != path { + _ = os.Remove(filepath.Join(dir, f.Name())) + } + } + return index, path +} + +// Get returns the value which is "" if nothing found. +// If the bucket does not exist then it will be created. +func (bf *BookmarkFile) Get() string { + val, _ := os.ReadFile(filepath.Join(bf.directory, bf.currentPath)) + // We don't want to propagate the path error up the stack if its does not exist. + return string(val) +} diff --git a/component/loki/source/windowsevent/file_test.go b/component/loki/source/windowsevent/file_test.go new file mode 100644 index 000000000000..07c8c80ef27c --- /dev/null +++ b/component/loki/source/windowsevent/file_test.go @@ -0,0 +1,95 @@ +//go:build windows + +package windowsevent + +import ( + "fmt" + "github.com/stretchr/testify/require" + "os" + "path/filepath" + "testing" +) + +func TestBookmarkFile(t *testing.T) { + dir := t.TempDir() + bf, err := NewBookmarkFile(dir, "") + require.NoError(t, err) + require.NotNil(t, bf) + require.True(t, bf.index == 1) + err = bf.Put("") + require.NoError(t, err) + require.True(t, bf.index == 2) +} + +func TestBookmarkFileWithLegacy(t *testing.T) { + dir := t.TempDir() + legacy := t.TempDir() + legacyPath := filepath.Join(legacy, "bookmark.xml") + err := os.WriteFile(legacyPath, []byte("-1"), 0600) + require.NoError(t, err) + bf, err := NewBookmarkFile(dir, legacyPath) + + require.NoError(t, err) + require.NotNil(t, bf) + require.True(t, bf.index == 1) + + // Legacy should no longer be there. + _, err = os.Stat(legacyPath) + require.ErrorIs(t, err, os.ErrNotExist) + + // bookmark.1.xml should have been created from the legacy item. + content, err := os.ReadFile(filepath.Join(dir, "bookmark.1.xml")) + require.NoError(t, err) + require.True(t, string(content) == "-1") + err = bf.Put("2") + require.NoError(t, err) + require.True(t, bf.index == 2) + + // Bookmark 1 should not exist + _, err = os.ReadFile(filepath.Join(dir, "bookmark.1.xml")) + require.ErrorIs(t, err, os.ErrNotExist) + + // Bookmark 2 should exist + content, err = os.ReadFile(filepath.Join(dir, "bookmark.2.xml")) + require.NoError(t, err) + require.True(t, string(content) == "2") +} + +func TestMultipleBookmarks(t *testing.T) { + dir := t.TempDir() + for i := 1; i <= 10; i++ { + writeBookmark(t, dir, i) + } + bf, err := NewBookmarkFile(dir, "") + require.NoError(t, err) + require.NotNil(t, bf) + require.True(t, bf.index == 10) + require.True(t, bf.currentPath == "bookmark.10.xml") + files, err := os.ReadDir(dir) + require.NoError(t, err) + require.Len(t, files, 1) +} + +func TestMultipleBookmarksWithInvalidXML(t *testing.T) { + dir := t.TempDir() + for i := 1; i <= 10; i++ { + writeBookmark(t, dir, i) + } + // Make 10 an invalid file. + err := os.WriteFile(filepath.Join(dir, "bookmark.10.xml"), []byte("bad"), 0600) + require.NoError(t, err) + bf, err := NewBookmarkFile(dir, "") + require.NoError(t, err) + require.NotNil(t, bf) + require.True(t, bf.index == 9) + require.True(t, bf.currentPath == "bookmark.9.xml") + files, err := os.ReadDir(dir) + require.NoError(t, err) + require.Len(t, files, 1) +} + +func writeBookmark(t *testing.T, dir string, index int) { + fileName := fmt.Sprintf("bookmark.%d.xml", index) + err := os.WriteFile(filepath.Join(dir, fileName), []byte(fmt.Sprintf("%d", index)), 0600) + require.NoError(t, err) +} diff --git a/component/loki/source/windowsevent/kvdb.go b/component/loki/source/windowsevent/kvdb.go deleted file mode 100644 index 2a21fca22087..000000000000 --- a/component/loki/source/windowsevent/kvdb.go +++ /dev/null @@ -1,57 +0,0 @@ -package windowsevent - -import ( - "os" - "path/filepath" - - "go.etcd.io/bbolt" -) - -type KVDB struct { - db *bbolt.DB -} - -// NewKVDB creates a wrapper around bbolt. -func NewKVDB(path string) (*KVDB, error) { - _ = os.MkdirAll(filepath.Dir(path), 0600) - bdb, err := bbolt.Open(path, 0600, nil) - if err != nil { - return nil, err - } - return &KVDB{db: bdb}, nil -} - -// Put writes the value in a specific bucket, creating it if it doesnt exist. -func (kv *KVDB) Put(bucket string, key string, value []byte) error { - return kv.db.Update(func(tx *bbolt.Tx) error { - b, err := tx.CreateBucketIfNotExists([]byte(bucket)) - if err != nil { - return err - } - return b.Put([]byte(key), value) - }) -} - -// Get returns the value which is nil if nothing found, true if found and any error. -// If the bucket does not exist then it will be created. -func (kv *KVDB) Get(bucket string, key string) ([]byte, bool, error) { - var nv []byte - err := kv.db.Update(func(tx *bbolt.Tx) error { - b, err := tx.CreateBucketIfNotExists([]byte(bucket)) - if err != nil { - return err - } - v := b.Get([]byte(key)) - if v != nil { - // Have to copy v since it is reused once update has ended. - nv = make([]byte, len(v)) - copy(nv, v) - } - return nil - }) - return nv, nv != nil, err -} - -func (kv *KVDB) Close() error { - return kv.db.Close() -} diff --git a/component/loki/source/windowsevent/target_windows.go b/component/loki/source/windowsevent/target_windows.go index 30e26d03c0ae..b252b821a6c8 100644 --- a/component/loki/source/windowsevent/target_windows.go +++ b/component/loki/source/windowsevent/target_windows.go @@ -52,6 +52,7 @@ func NewTarget( handler api.EntryHandler, relabel []*relabel.Config, cfg *scrapeconfig.WindowsEventsTargetConfig, + datadir string, ) (*Target, error) { sigEvent, err := windows.CreateEvent(nil, 0, 0, nil) if err != nil { @@ -59,7 +60,7 @@ func NewTarget( } defer windows.CloseHandle(sigEvent) - pdb, err := newBookmarkDB(cfg.BookmarkPath) + pdb, err := newBookmarkDB(datadir, cfg.BookmarkPath) if err != nil { return nil, fmt.Errorf("failed to create bookmark using path=%s: %w", cfg.BookmarkPath, err) } @@ -231,9 +232,6 @@ func (t *Target) Stop() error { close(t.done) t.wg.Wait() t.handler.Stop() - if err := t.pdb.close(); err != nil { - return err - } t.closed.Store(true) return t.err } diff --git a/docs/sources/flow/reference/components/loki.source.windowsevent.md b/docs/sources/flow/reference/components/loki.source.windowsevent.md index 6d60fa3dd613..7266ba5df002 100644 --- a/docs/sources/flow/reference/components/loki.source.windowsevent.md +++ b/docs/sources/flow/reference/components/loki.source.windowsevent.md @@ -31,18 +31,18 @@ log entries to the list of receivers passed in `forward_to`. `loki.source.windowsevent` supports the following arguments: -Name | Type | Description | Default | Required ------------- |----------------------|--------------------------------------------------------------------------------|----------------------------| -------- -`locale` | `number` | Locale ID for event rendering. 0 default is Windows Locale. | `0` | no -`eventlog_name` | `string` | Event log to read from. | | See below. -`xpath_query` | `string` | Event log to read from. | `"*"` | See below. -`bookmark_path` | `string` | Keeps position in event log. | `"DATA_PATH/bookmark.xml"` | no -`poll_interval` | `duration` | How often to poll the event log. | `"3s"` | no -`exclude_event_data` | `bool` | Exclude event data. | `false` | no -`exclude_user_data` | `bool` | Exclude user data. | `false` | no +Name | Type | Description | Default | Required +------------ |----------------------|-----------------------------------------------------------------------------|----------------------------| -------- +`locale` | `number` | Locale ID for event rendering. 0 default is Windows Locale. | `0` | no +`eventlog_name` | `string` | Event log to read from. | | See below. +`xpath_query` | `string` | Event log to read from. | `"*"` | See below. +`bookmark_path` | `string` | Keeps position in event log. Deprecated. | `"DATA_PATH/bookmark.xml"` | no +`poll_interval` | `duration` | How often to poll the event log. | `"3s"` | no +`exclude_event_data` | `bool` | Exclude event data. | `false` | no +`exclude_user_data` | `bool` | Exclude user data. | `false` | no `use_incoming_timestamp` | `bool` | When false, assigns the current timestamp to the log when it was processed. | `false` | no -`forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes -`labels` | `map(string)` | The labels to associate with incoming logs. | | no +`forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes +`labels` | `map(string)` | The labels to associate with incoming logs. | | no > **NOTE**: `eventlog_name` is required if `xpath_query` does not specify the event log. @@ -50,6 +50,8 @@ Name | Type | Description > When using the XML form you can specify `event_log` in the `xpath_query`. > If using short form, you must define `eventlog_name`. +> **NOTE**: `bookmark_path` is only used for transitioning old bookmark to newer bookmark. + ## Component health diff --git a/docs/sources/flow/release-notes.md b/docs/sources/flow/release-notes.md index 3ebb824874db..a5b439b2b2b9 100644 --- a/docs/sources/flow/release-notes.md +++ b/docs/sources/flow/release-notes.md @@ -28,11 +28,11 @@ Other release notes for the different Grafana Agent variants are contained on se [release-notes-operator]: {{< relref "../operator/release-notes.md" >}} {{% /admonition %}} -## Breaking change: Windows Event Log bookmark format and named changed +## Breaking change: Windows Event Log bookmark location and named changed Previously Windows Event Logs were stored by default in bookmark.xml file. The newest release changes that to a more resilient -storage named bookmark.db. When running for the first time if using the defaults the old bookmark.xml will be transitioned to -bookmark.db. If using a custom path or fails to transition then duplicate events may be sent. +mechanism where bookmarks are immutable and stored within the Flow data directory. The system will transition the old bookmark +to the new location and then delete the old bookmark file. ## v0.38