Skip to content

Commit

Permalink
some comments
Browse files Browse the repository at this point in the history
  • Loading branch information
thepalbi committed Dec 6, 2023
1 parent dd3f4d8 commit 7d2e197
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 13 deletions.
2 changes: 2 additions & 0 deletions component/common/loki/client/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,8 @@ func (m *Manager) Stop() {

// StopWithDrain will stop the manager, its Write-Ahead Log watchers, and clients accordingly. If drain is enabled,
// the Watchers will attempt to drain the WAL completely.
// The shutdown procedure first stops the Watchers, allowing them to flush as much data into the clients as possible. Then
// the clients are shut down accordingly.
func (m *Manager) StopWithDrain(drain bool) {
// first stop the receiving channel
m.once.Do(func() { close(m.entries) })
Expand Down
14 changes: 7 additions & 7 deletions component/common/loki/wal/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ package wal
import (
"errors"
"fmt"
"github.com/grafana/agent/component/common/loki/wal/internal"
"io"
"math"
"os"
"strconv"
"time"

"github.com/go-kit/log"
"github.com/grafana/agent/component/common/loki/wal/internal"
"github.com/grafana/agent/pkg/flow/logging/level"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wlog"
Expand Down Expand Up @@ -197,7 +197,7 @@ func (w *Watcher) run() error {
// If tail is false, we know the segment we are "watching" over is closed (no further write will occur to it). Then, the
// segment is read fully, any errors are logged as Warnings, and no error is returned.
func (w *Watcher) watch(segmentNum int, tail bool) error {
level.Debug(w.logger).Log("msg", "Processing segment", "currentSegment", segmentNum, "tail", tail)
level.Debug(w.logger).Log("msg", "Watching WAL segment", "currentSegment", segmentNum, "tail", tail)

segment, err := wlog.OpenReadSegment(wlog.SegmentName(w.walDir, segmentNum))
if err != nil {
Expand Down Expand Up @@ -237,8 +237,8 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
}

// Check if new segments exists, or we are draining the WAL, which means that either:
// - This is the last segment, and we can consume it fully
// - There's some other segment, and we can consume this segment fully as well
// - This is the last segment, and we can consume it fully because we are draining the WAL
// - There's a segment after the current one, and we can consume this segment fully as well
if last <= segmentNum && !w.state.IsDraining() {
continue
}
Expand All @@ -248,13 +248,13 @@ func (w *Watcher) watch(segmentNum int, tail bool) error {
}

// We now that there's either a new segment (last > segmentNum), or we are draining the WAL. Either case, read
// the remaining from the segmentNum segment and return from `watch` to read the next one
// the remaining data from the segmentNum and return from `watch` to read the next one.
_, err = w.readSegment(reader, segmentNum)
if debug {
level.Warn(w.logger).Log("msg", "Error reading segment inside segmentTicker", "segment", segmentNum, "read", reader.Offset(), "err", err)
}

// io.EOF error are non-fatal since we consuming the segment till the end
// io.EOF error are non-fatal since we are consuming the segment till the end
if errors.Unwrap(err) != io.EOF {
return err
}
Expand Down Expand Up @@ -350,7 +350,7 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) {
}

// Stop stops the Watcher, draining the WAL until the end of the last segment if drain is true. Since the writer of the WAL
// is expected to have stopped before the Watcher, the last segment will be drained completely before the end of Stop.
// is expected to have stopped before the Watcher, no further writes are expected, so segments can be safely consumed.
//
// Note if drain is enabled, the caller routine of Stop will block executing the drain procedure.
func (w *Watcher) Stop(drain bool) {
Expand Down
16 changes: 10 additions & 6 deletions component/common/loki/wal/watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package wal

import (
"fmt"
"go.uber.org/atomic"
"os"
"strings"
"testing"
Expand All @@ -14,6 +13,7 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/stretchr/testify/require"
"go.uber.org/atomic"

"github.com/grafana/agent/component/common/loki"
"github.com/grafana/agent/component/common/loki/utils"
Expand Down Expand Up @@ -586,12 +586,16 @@ func (s *slowWriteTo) StoreSeries(series []record.RefSeries, segmentNum int) {
}

func (s *slowWriteTo) AppendEntries(entries wal.RefEntries, segmentNum int) error {
var allLines strings.Builder
for _, e := range entries.Entries {
allLines.WriteString(e.Line)
allLines.WriteString("/")
// only log on development debug flag
if debug {
var allLines strings.Builder
for _, e := range entries.Entries {
allLines.WriteString(e.Line)
allLines.WriteString("/")
}
s.t.Logf("AppendEntries called from segment %d - %s", segmentNum, allLines.String())
}
s.t.Logf("AppendEntries called from segment %d - %s", segmentNum, allLines.String())

s.entriesReceived.Add(uint64(len(entries.Entries)))
time.Sleep(s.sleepAfterAppendEntries)
return nil
Expand Down

0 comments on commit 7d2e197

Please sign in to comment.