Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP to show differences in bookmark file and kv store. #5614

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 7 additions & 53 deletions component/loki/source/windowsevent/bookmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,39 +7,26 @@ package windowsevent

import (
"github.com/grafana/loki/clients/pkg/promtail/targets/windows/win_eventlog"
"os"
"path/filepath"
)

const bookmarkKey = "bookmark"

type bookMark struct {
handle win_eventlog.EvtHandle
isNew bool
buf []byte
}

type db struct {
db *KVDB
file *BookmarkFile
path string
}

func newBookmarkDB(path string) (*db, error) {

pdb, err := NewKVDB(path)
func newBookmarkDB(path string, legacyPath string) (*db, error) {
pdb, err := NewBookmarkFile(path, legacyPath)
if err != nil {
// Let's try to recreate the file, it could be mangled.
err = os.Remove(path)
if err != nil {
return nil, err
}
pdb, err = NewKVDB(path)
if err != nil {
return nil, err
}
return nil, err
}
return &db{
db: pdb,
file: pdb,
path: path,
}, nil
}
Expand All @@ -49,14 +36,7 @@ func newBookmarkDB(path string) (*db, error) {
func (pdb *db) newBookMark() (*bookMark, error) {
// 16kb buffer for rendering bookmark
buf := make([]byte, 16<<10)
var bookmark string
pdb.transitionXML()
valBytes, _, err := pdb.db.Get("bookmark", bookmarkKey)
if err != nil {
return nil, err
}
bookmark = string(valBytes)

bookmark := pdb.file.Get()
// creates a new bookmark file if none exists.
if bookmark == "" {
bm, err := win_eventlog.CreateBookmark("")
Expand All @@ -82,37 +62,11 @@ func (pdb *db) newBookMark() (*bookMark, error) {
}, nil
}

func (pdb *db) transitionXML() {
// See if we can convert the old bookmark to the new path.
parentPath := filepath.Dir(pdb.path)
bookmarkXML := filepath.Join(parentPath, "bookmark.xml")
_, err := os.Stat(bookmarkXML)
// Only continue if we can access the file.
if err != nil {
return
}
xmlBytes, err := os.ReadFile(bookmarkXML)
if err != nil {
// Try to remove the file so we dont do this again.
_ = os.Remove(bookmarkXML)
return
}

bookmark := string(xmlBytes)
_ = pdb.db.Put("bookmark", bookmarkKey, []byte(bookmark))
_ = os.Remove(bookmarkXML)

}

// save Saves the bookmark at the current event position.
func (pdb *db) save(b *bookMark, event win_eventlog.EvtHandle) error {
newBookmark, err := win_eventlog.UpdateBookmark(b.handle, event, b.buf)
if err != nil {
return err
}
return pdb.db.Put("bookmark", bookmarkKey, []byte(newBookmark))
}

func (pdb *db) close() error {
return pdb.db.Close()
return pdb.file.Put(newBookmark)
}
56 changes: 5 additions & 51 deletions component/loki/source/windowsevent/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@ package windowsevent

import (
"context"
"go.etcd.io/bbolt"
"os"
"path/filepath"
"strings"
"sync/atomic"
"testing"
Expand All @@ -25,52 +22,6 @@ func TestEventLogger(t *testing.T) {
createTest(t, "")
}

func TestBookmarkStorage(t *testing.T) {
datapath := createTest(t, "")
dbPath := filepath.Join(datapath, "bookmark.db")
// Lets remove the existing file and ensure it recovers correctly.
_ = os.WriteFile(dbPath, nil, 0600)
createTest(t, datapath)
fbytes, err := os.ReadFile(dbPath)
require.NoError(t, err)
for i := 0; i < len(fbytes); i++ {
// Set every tenth byte to zero.
if i%10 != 0 {
continue
}
fbytes[i] = 0
}
fbytes[28] = 0
_ = os.WriteFile(dbPath, fbytes, 0600)
createTest(t, datapath)
}

func TestBookmarkTransition(t *testing.T) {
dir := createTest(t, "")
bb, err := bbolt.Open(filepath.Join(dir, "bookmark.db"), os.ModeExclusive, nil)
require.NoError(t, err)

var bookmarkString string
err = bb.View(func(tx *bbolt.Tx) error {
b := tx.Bucket([]byte("bookmark"))
v := b.Get([]byte(bookmarkKey))
require.NotNil(t, v)
nv := make([]byte, len(v))
copy(nv, v)
bookmarkString = string(nv)
return nil
})
require.NoError(t, err)
require.NoError(t, bb.Close())

xmlPath := filepath.Join(dir, "bookmark.xml")
err = os.WriteFile(xmlPath, []byte(bookmarkString), 0744)
require.NoError(t, err)
createTest(t, dir)
_, err = os.Stat(xmlPath)
require.ErrorIs(t, err, os.ErrNotExist)
}

func createTest(t *testing.T, dataPath string) string {
var loggerName = "agent_test"
//Setup Windows Event log with the log source name and logging levels
Expand Down Expand Up @@ -104,13 +55,16 @@ func createTest(t *testing.T, dataPath string) string {
})
require.NoError(t, err)
ctx := context.Background()
ctx, cancelFunc := context.WithTimeout(ctx, 5*time.Second)
ctx, cancelFunc := context.WithTimeout(ctx, 60*time.Second)
found := atomic.Bool{}
go c.Run(ctx)
tm := time.Now().Format(time.RFC3339Nano)
err = wlog.Info(2, tm)
require.NoError(t, err)

// This test is extremely flaky without these longer times. Since it has to work through all the noise
// that can be being added to the application event log.
// TODO: Create specific event log source so it does not have any noise.
go func() {
for {
select {
Expand All @@ -129,7 +83,7 @@ func createTest(t *testing.T, dataPath string) string {

require.Eventually(t, func() bool {
return found.Load()
}, 20*time.Second, 500*time.Millisecond)
}, 60*time.Second, 500*time.Millisecond)

cancelFunc()

Expand Down
6 changes: 1 addition & 5 deletions component/loki/source/windowsevent/component_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,12 +106,8 @@ func (c *Component) Update(args component.Arguments) error {
if newArgs.BookmarkPath == "" {
newArgs.BookmarkPath = filepath.Join(c.opts.DataPath, "bookmark.db")
}
// If there is an existing item then the current one needs to be closed.
if c.target != nil && c.target.pdb != nil {
_ = c.target.pdb.close()
}

winTarget, err := NewTarget(c.opts.Logger, c.handle, nil, convertConfig(newArgs))
winTarget, err := NewTarget(c.opts.Logger, c.handle, nil, convertConfig(newArgs), c.opts.DataPath)
if err != nil {
return err
}
Expand Down
139 changes: 139 additions & 0 deletions component/loki/source/windowsevent/file.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
//go:build windows

package windowsevent

import (
"encoding/xml"
"fmt"
"os"
"path/filepath"
"regexp"
"strconv"
"strings"
"sync"
)

// BookmarkFile represents reading and writing to a bookmark.xml file.
// These files are written sequentially in the format of bookmark.<number>.xml.
// Each individual bookmark file is immutable once written and is either read or deleted.
// The folder should ONLY contain bookmark files since all other files will be deleted.
type BookmarkFile struct {
mut sync.Mutex
index int
directory string
oldDefaultFile string
currentPath string
}

var pathMatch, _ = regexp.Compile("bookmark.[0-9]+.xml")

// NewBookmarkFile creates a wrapper around saving the bookmark file.
func NewBookmarkFile(directory string, oldpath string) (*BookmarkFile, error) {
_ = os.MkdirAll(filepath.Dir(directory), 0600)
index, currentPath := findMostRecentAndPurge(directory, oldpath)
return &BookmarkFile{
directory: directory,
oldDefaultFile: oldpath,
index: index,
currentPath: currentPath,
}, nil
}

// Put writes the value in to the newest file, and deletes the old one.
func (bf *BookmarkFile) Put(value string) error {
bf.mut.Lock()
defer bf.mut.Unlock()

previousIndex := bf.index
bf.index++
newFile := fmt.Sprintf("bookmark.%d.xml", bf.index)
newPath := filepath.Join(bf.directory, newFile)
err := os.WriteFile(newPath, []byte(value), 0600)
if err != nil {
return err
}
writtenVal, err := os.ReadFile(newPath)
if err != nil {
_ = os.Remove(newPath)
return err
}
// If for some reason the file was not written correctly then bail.
if string(writtenVal) != value {
_ = os.Remove(newPath)
return fmt.Errorf("unable to save data , contents written differ from value")
}
// Finally we can delete the old file.
oldFile := fmt.Sprintf("bookmark.%d.xml", previousIndex)
// We don't care if it errors.
_ = os.Remove(filepath.Join(bf.directory, oldFile))
return nil
}

// findMostRecentAndPurge will find the most recent file, including the legacy bookmark.
// If found will return the index and the path to the newest file.
func findMostRecentAndPurge(dir string, legacyPath string) (int, string) {
files, err := os.ReadDir(dir)
if err != nil {
return 1, ""
}
index := 0
var path string
for _, f := range files {
if pathMatch.MatchString(f.Name()) {
stripped := strings.ReplaceAll(f.Name(), "bookmark.", "")
number := strings.ReplaceAll(stripped, ".xml", "")
foundNum, err := strconv.Atoi(number)
if err != nil {
continue
}
// Cant read the file.
content, err := os.ReadFile(filepath.Join(dir, f.Name()))
if err != nil {
continue
}

// Need to ensure the file was properly saved previously.
if xml.Unmarshal(content, new(interface{})) != nil {
continue
}
if foundNum > index {
index = foundNum
path = f.Name()
}
}
}
// If we don't have a path then see if we can transition.
if path == "" && legacyPath != "" {
_, err = os.Stat(legacyPath)
if err == nil {
index = 1
contents, _ := os.ReadFile(legacyPath)
// Try to write the file if we have some contents.
if len(contents) > 0 {
newFile := fmt.Sprintf("bookmark.%d.xml", 1)
newPath := filepath.Join(dir, newFile)
_ = os.WriteFile(newPath, contents, 0600)
_ = os.Remove(legacyPath)
}
}
}
if index == 0 {
index = 1
}

// Finally delete all files other than the found.
for _, f := range files {
if f.Name() != path {
_ = os.Remove(filepath.Join(dir, f.Name()))
}
}
return index, path
}

// Get returns the value which is "" if nothing found.
// If the bucket does not exist then it will be created.
func (bf *BookmarkFile) Get() string {
val, _ := os.ReadFile(filepath.Join(bf.directory, bf.currentPath))
// We don't want to propagate the path error up the stack if its does not exist.
return string(val)
}
Loading