Skip to content

Commit

Permalink
Queue-based client for loki.write (#5434)
Browse files Browse the repository at this point in the history
* replay from marker implemented

* some comments on tailing

* replay test

* passins segment in write to append

* simplify write to series locking

* queue client

* add sync to batches

* using queue client

* queue client closure

* revert replay changes

* simple queue client test

* send old batches in different routine

* stop draining faster if no batches left

* addressing concurrency comments

* single queue implementation WIP

* single sendqueue close semantics

* fixed queue client test

* refactor client name check + comment + routing configs

* interface fix

* fix linter

* linter fix

* linter again

* deleting test fiddling with internal representation

* simplifying naming procedures, and stoppables

* make queue client test more strict, and fix slice reset

* save stoppable clients

* queue client bench

* removing additional param from queue client NEw

* on contexts

* address pr comments

* better queue config capacity

* underscore

* fix test case with batching

* fix benchmark

* fixing benchmark

* add docs

* added changelog entry

* linter fix

* change default queue capacity

* Apply suggestions from code review

Co-authored-by: Clayton Cornell <[email protected]>

---------

Co-authored-by: Clayton Cornell <[email protected]>
  • Loading branch information
thepalbi and clayton-cornell authored Oct 27, 2023
1 parent 08b3eee commit 0fbd5e3
Show file tree
Hide file tree
Showing 15 changed files with 1,158 additions and 98 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ Main (unreleased)
- Update version of River to support raw strings in flow using a backtick. (@erikbaranowski)

- Added support for python profiling to `pyroscope.ebpf` component. (@korniltsev)

- Add queueing logs remote write client for `loki.write` when WAL is enabled. (@thepalbi)

### Bugfixes

Expand Down
43 changes: 35 additions & 8 deletions component/common/loki/client/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,18 @@ const (
// and entries in a single batch request. In case of multi-tenant Promtail, log
// streams for each tenant are stored in a dedicated batch.
type batch struct {
streams map[string]*logproto.Stream
bytes int
createdAt time.Time
streams map[string]*logproto.Stream
// totalBytes holds the total amounts of bytes, across the log lines in this batch.
totalBytes int
createdAt time.Time

maxStreams int
}

func newBatch(maxStreams int, entries ...loki.Entry) *batch {
b := &batch{
streams: map[string]*logproto.Stream{},
bytes: 0,
totalBytes: 0,
createdAt: time.Now(),
maxStreams: maxStreams,
}
Expand All @@ -50,7 +51,7 @@ func newBatch(maxStreams int, entries ...loki.Entry) *batch {

// add an entry to the batch
func (b *batch) add(entry loki.Entry) error {
b.bytes += len(entry.Line)
b.totalBytes += len(entry.Line)

// Append the entry to an already existing stream (if any)
labels := labelsMapToString(entry.Labels, ReservedLabelTenantID)
Expand All @@ -71,6 +72,32 @@ func (b *batch) add(entry loki.Entry) error {
return nil
}

// add an entry to the batch
func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry) error {
b.totalBytes += len(entry.Line)

// Append the entry to an already existing stream (if any)
labels := labelsMapToString(lbs, ReservedLabelTenantID)
if stream, ok := b.streams[labels]; ok {
stream.Entries = append(stream.Entries, entry)
return nil
}

streams := len(b.streams)
if b.maxStreams > 0 && streams >= b.maxStreams {
return fmt.Errorf(errMaxStreamsLimitExceeded, streams, b.maxStreams, labels)
}

// Add the entry as a new stream
b.streams[labels] = &logproto.Stream{
Labels: labels,
Entries: []logproto.Entry{entry},
}

return nil
}

// labelsMapToString encodes an entry's label set as a string, ignoring the without label.
func labelsMapToString(ls model.LabelSet, without model.LabelName) string {
var b strings.Builder
totalSize := 2
Expand Down Expand Up @@ -105,13 +132,13 @@ func labelsMapToString(ls model.LabelSet, without model.LabelName) string {

// sizeBytes returns the current batch size in bytes
func (b *batch) sizeBytes() int {
return b.bytes
return b.totalBytes
}

// sizeBytesAfter returns the size of the batch after the input entry
// will be added to the batch itself
func (b *batch) sizeBytesAfter(entry loki.Entry) int {
return b.bytes + len(entry.Line)
func (b *batch) sizeBytesAfter(line string) int {
return b.totalBytes + len(line)
}

// age of the batch since its creation
Expand Down
13 changes: 2 additions & 11 deletions component/common/loki/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"bufio"
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -194,7 +193,7 @@ func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLin
cfg: cfg,
entries: make(chan loki.Entry),
metrics: metrics,
name: asSha256(cfg),
name: GetClientName(cfg),

externalLabels: cfg.ExternalLabels.LabelSet,
ctx: ctx,
Expand Down Expand Up @@ -318,7 +317,7 @@ func (c *client) run() {

// If adding the entry to the batch will increase the size over the max
// size allowed, we do send the current batch and then create a new one
if batch.sizeBytesAfter(e) > c.cfg.BatchSize {
if batch.sizeBytesAfter(e.Line) > c.cfg.BatchSize {
c.sendBatch(tenantID, batch)

batches[tenantID] = newBatch(c.maxStreams, e)
Expand Down Expand Up @@ -355,14 +354,6 @@ func (c *client) Chan() chan<- loki.Entry {
return c.entries
}

func asSha256(o interface{}) string {
h := sha256.New()
h.Write([]byte(fmt.Sprintf("%v", o)))

temp := fmt.Sprintf("%x", h.Sum(nil))
return temp[:6]
}

func batchIsRateLimited(status int) bool {
return status == 429
}
Expand Down
20 changes: 5 additions & 15 deletions component/common/loki/client/client_writeto.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,9 @@ import (
// clientWriteTo implements a wal.WriteTo that re-builds entries with the stored series, and the received entries. After,
// sends each to the provided Client channel.
type clientWriteTo struct {
series map[chunks.HeadSeriesRef]model.LabelSet
seriesLock sync.RWMutex

// seriesSegment keeps track of the last segment in which the series pointed by each key in this map was seen. Keeping
// this in a separate map avoids unnecessary contention.
//
// Even though it doesn't present a difference right now according to benchmarks, it will help when we introduce other
// calls from the wal.Watcher to the wal.WriteTo like `UpdateSeriesSegment`.
seriesSegment map[chunks.HeadSeriesRef]int
seriesSegmentLock sync.RWMutex
series map[chunks.HeadSeriesRef]model.LabelSet
seriesSegment map[chunks.HeadSeriesRef]int
seriesLock sync.RWMutex

logger log.Logger
toClient chan<- loki.Entry
Expand All @@ -46,8 +39,6 @@ func newClientWriteTo(toClient chan<- loki.Entry, logger log.Logger) *clientWrit
func (c *clientWriteTo) StoreSeries(series []record.RefSeries, segment int) {
c.seriesLock.Lock()
defer c.seriesLock.Unlock()
c.seriesSegmentLock.Lock()
defer c.seriesSegmentLock.Unlock()
for _, seriesRec := range series {
c.seriesSegment[seriesRec.Ref] = segment
labels := util.MapToModelLabelSet(seriesRec.Labels.Map())
Expand All @@ -59,8 +50,6 @@ func (c *clientWriteTo) StoreSeries(series []record.RefSeries, segment int) {
func (c *clientWriteTo) SeriesReset(segmentNum int) {
c.seriesLock.Lock()
defer c.seriesLock.Unlock()
c.seriesSegmentLock.Lock()
defer c.seriesSegmentLock.Unlock()
for k, v := range c.seriesSegment {
if v <= segmentNum {
level.Debug(c.logger).Log("msg", fmt.Sprintf("reclaiming series under segment %d", segmentNum))
Expand All @@ -70,7 +59,7 @@ func (c *clientWriteTo) SeriesReset(segmentNum int) {
}
}

func (c *clientWriteTo) AppendEntries(entries wal.RefEntries) error {
func (c *clientWriteTo) AppendEntries(entries wal.RefEntries, _ int) error {
var entry loki.Entry
c.seriesLock.RLock()
l, ok := c.series[entries.Ref]
Expand All @@ -82,6 +71,7 @@ func (c *clientWriteTo) AppendEntries(entries wal.RefEntries) error {
c.toClient <- entry
}
} else {
// TODO(thepalbi): Add metric here
level.Debug(c.logger).Log("msg", "series for entry not found")
}
return nil
Expand Down
6 changes: 3 additions & 3 deletions component/common/loki/client/client_writeto_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func TestClientWriter_LogEntriesAreReconstructedAndForwardedCorrectly(t *testing
Line: line,
},
},
})
}, 0)
}

require.Eventually(t, func() bool {
Expand Down Expand Up @@ -125,7 +125,7 @@ func TestClientWriter_LogEntriesWithoutMatchingSeriesAreIgnored(t *testing.T) {
Line: line,
},
},
})
}, 0)
}

time.Sleep(time.Second * 2)
Expand Down Expand Up @@ -237,7 +237,7 @@ func startWriter(segmentNum, seriesToReset int, target *clientWriteTo, lines int
Line: fmt.Sprintf("%d - %d - hellooo", segmentNum, i),
},
},
})
}, 0)
// add some jitter between writes
randomSleepMax(time.Millisecond * 1)
}
Expand Down
17 changes: 17 additions & 0 deletions component/common/loki/client/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,23 @@ type Config struct {
DropRateLimitedBatches bool `yaml:"drop_rate_limited_batches"`

StreamLagLabels flagext.StringSliceCSV `yaml:"stream_lag_labels" doc:"deprecated"`

// Queue controls configuration parameters specific to the queue client
Queue QueueConfig
}

// QueueConfig holds configurations for the queue-based remote-write client.
type QueueConfig struct {
// Capacity is the worst case size in bytes desired for the send queue. This value is used to calculate the size of
// the buffered channel used underneath. The worst case scenario assumed is that every batch buffered in full, hence
// the channel capacity would be calculated as: bufferChannelSize = Capacity / BatchSize.
//
// For example, assuming BatchSize
// is the 1 MiB default, and a capacity of 100 MiB, the underlying buffered channel would buffer up to 100 batches.
Capacity int

// DrainTimeout controls the maximum time that draining the send queue can take.
DrainTimeout time.Duration
}

// RegisterFlags with prefix registers flags where every name is prefixed by
Expand Down
Loading

0 comments on commit 0fbd5e3

Please sign in to comment.