Skip to content

Commit

Permalink
save bookmark every 10s
Browse files Browse the repository at this point in the history
  • Loading branch information
wildum committed Jan 29, 2025
1 parent 7cb72f6 commit 432eada
Show file tree
Hide file tree
Showing 5 changed files with 132 additions and 20 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ Main (unreleased)

- Bump snmp_exporter and embedded modules to 0.27.0. Add support for multi-module handling by comma separation and expose argument to increase SNMP polling concurrency for `prometheus.exporter.snmp`. (@v-zhuravlev)

- Reduce CPU usage of `loki.source.windowsevent` by up to 60% by updating the bookmark file every 10 seconds instead of after every event. (@wildum)

v1.6.1
-----------------

Expand Down
37 changes: 24 additions & 13 deletions internal/component/loki/source/windowsevent/bookmark.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,19 @@ import (
"io/fs"
"os"

"github.com/natefinch/atomic"
uberAtomic "go.uber.org/atomic"

"github.com/grafana/loki/v3/clients/pkg/promtail/targets/windows/win_eventlog"
"github.com/natefinch/atomic"
)

type bookMark struct {
handle win_eventlog.EvtHandle
isNew bool
path string
buf []byte

bookmarkStr *uberAtomic.String
}

// newBookMark creates a new windows event bookmark.
Expand All @@ -33,19 +36,21 @@ func newBookMark(path string) (*bookMark, error) {
_, err := os.Stat(path)
// creates a new bookmark file if none exists.
if errors.Is(err, fs.ErrNotExist) {
_, err := os.Create(path)
f, err := os.Create(path)
if err != nil {
return nil, err
}
defer f.Close()
bm, err := win_eventlog.CreateBookmark("")
if err != nil {
return nil, err
}
return &bookMark{
handle: bm,
path: path,
isNew: true,
buf: buf,
handle: bm,
path: path,
isNew: true,
buf: buf,
bookmarkStr: uberAtomic.NewString(""),
}, nil
}
if err != nil {
Expand Down Expand Up @@ -74,18 +79,24 @@ func newBookMark(path string) (*bookMark, error) {
}
}
return &bookMark{
handle: bm,
path: path,
isNew: fileString == "",
buf: buf,
handle: bm,
path: path,
isNew: fileString == "",
buf: buf,
bookmarkStr: uberAtomic.NewString(""),
}, nil
}

// save Saves the bookmark at the current event position.
func (b *bookMark) save(event win_eventlog.EvtHandle) error {
func (b *bookMark) update(event win_eventlog.EvtHandle) error {
newBookmark, err := win_eventlog.UpdateBookmark(b.handle, event, b.buf)
if err != nil {
return err
}
return atomic.WriteFile(b.path, bytes.NewReader([]byte(newBookmark)))
b.bookmarkStr.Store(newBookmark)
return nil
}

// save Saves the bookmark at the current event position.
func (b *bookMark) save() error {
return atomic.WriteFile(b.path, bytes.NewReader([]byte(b.bookmarkStr.Load())))
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"os"
"path"
"sync"
"time"

"github.com/grafana/loki/v3/clients/pkg/promtail/api"
"github.com/grafana/loki/v3/clients/pkg/promtail/scrapeconfig"
Expand Down Expand Up @@ -116,10 +117,13 @@ func (c *Component) Update(args component.Arguments) error {
return err
}

winTarget, err := NewTarget(c.opts.Logger, c.handle, nil, convertConfig(newArgs))
// Same as the loki.source.file sync position period
bookmarkSyncPeriod := 10 * time.Second
winTarget, err := NewTarget(c.opts.Logger, c.handle, nil, convertConfig(newArgs), bookmarkSyncPeriod)
if err != nil {
return err
}

// Stop the original target.
if c.target != nil {
err := c.target.Stop()
Expand Down
39 changes: 33 additions & 6 deletions internal/component/loki/source/windowsevent/target.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ func NewTarget(
handler api.EntryHandler,
relabel []*relabel.Config,
cfg *scrapeconfig.WindowsEventsTargetConfig,
bookmarkSyncPeriod time.Duration,
) (*Target, error) {
sigEvent, err := windows.CreateEvent(nil, 0, 0, nil)
if err != nil {
Expand Down Expand Up @@ -91,6 +92,7 @@ func NewTarget(
t.cfg.PollInterval = 3 * time.Second
}
go t.loop()
go t.updateBookmark(bookmarkSyncPeriod)
return t, nil
}

Expand Down Expand Up @@ -120,15 +122,13 @@ func (t *Target) loop() {
}
t.err = nil
// we have received events to handle.
for i, entry := range t.renderEntries(events) {
for _, entry := range t.renderEntries(events) {
t.handler.Chan() <- entry
if err := t.bm.save(handles[i]); err != nil {
t.err = err
level.Error(util_log.Logger).Log("msg", "error saving bookmark", "err", err)
}
}
if len(handles) != 0 {
t.bm.update(handles[len(handles)-1])
}
win_eventlog.Close(handles)

}
// no more messages we wait for next poll timer tick.
select {
Expand All @@ -139,6 +139,32 @@ func (t *Target) loop() {
}
}

func (t *Target) updateBookmark(bookmarkSyncPeriod time.Duration) {
t.wg.Add(1)

bookmarkTick := time.NewTicker(bookmarkSyncPeriod)
defer func() {
bookmarkTick.Stop()
t.wg.Done()
}()

for {
select {
case <-bookmarkTick.C:
t.saveBookmarkPosition()
case <-t.done:
return
}
}
}

func (t *Target) saveBookmarkPosition() {
if err := t.bm.save(); err != nil {
t.err = err
level.Error(util_log.Logger).Log("msg", "error saving bookmark", "err", err)
}
}

// renderEntries renders Loki entries from windows event logs
func (t *Target) renderEntries(events []win_eventlog.Event) []api.Entry {
res := make([]api.Entry, 0, len(events))
Expand Down Expand Up @@ -226,5 +252,6 @@ func (t *Target) Stop() error {
close(t.done)
t.wg.Wait()
t.handler.Stop()
t.saveBookmarkPosition()
return t.err
}
68 changes: 68 additions & 0 deletions internal/component/loki/source/windowsevent/target_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package windowsevent

import (
"os"
"path"
"testing"
"time"

"github.com/go-kit/kit/log"
"github.com/grafana/alloy/internal/component/common/loki/utils"
"github.com/grafana/loki/v3/clients/pkg/promtail/api"
"github.com/grafana/loki/v3/clients/pkg/promtail/scrapeconfig"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"golang.org/x/sys/windows/svc/eventlog"
)

func TestBookmarkUpdate(t *testing.T) {
defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("go.opencensus.io/stats/view.(*worker).start"))

var loggerName = "alloy_test"
_ = eventlog.InstallAsEventCreate(loggerName, eventlog.Info|eventlog.Warning|eventlog.Error)
wlog, err := eventlog.Open(loggerName)
require.NoError(t, err)

dirPath := "bookmarktest"
filePath := path.Join(dirPath, "bookmark.xml")
require.NoError(t, os.MkdirAll(path.Dir(filePath), 700))
defer func() {
require.NoError(t, os.RemoveAll(dirPath))
}()

scrapeConfig := &scrapeconfig.WindowsEventsTargetConfig{
Locale: 0,
EventlogName: "Application",
Query: "*",
UseIncomingTimestamp: false,
BookmarkPath: filePath,
PollInterval: 10 * time.Millisecond,
ExcludeEventData: false,
ExcludeEventMessage: false,
ExcludeUserData: false,
Labels: utils.ToLabelSet(map[string]string{"job": "windows"}),
}
handle := &handler{handler: make(chan api.Entry)}
winTarget, err := NewTarget(log.NewLogfmtLogger(os.Stderr), handle, nil, scrapeConfig, 1000*time.Millisecond)
require.NoError(t, err)

tm := time.Now().Format(time.RFC3339Nano)
err = wlog.Info(2, tm)
require.NoError(t, err)

select {
case e := <-handle.handler:
require.Equal(t, model.LabelValue("windows"), e.Labels["job"])
case <-time.After(3 * time.Second):
require.FailNow(t, "failed waiting for event")
}
winTarget.Stop()

require.NoError(t, wlog.Close())

content, err := os.ReadFile(filePath)
require.NoError(t, err)
// check that only the start because the RecordId changes
require.Contains(t, string(content), "<BookmarkList>\r\n <Bookmark Channel='Application' RecordId=")
}

0 comments on commit 432eada

Please sign in to comment.