diff --git a/CHANGELOG.md b/CHANGELOG.md index aa414524359d..ad62c5ea7ad7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -45,6 +45,8 @@ Main (unreleased) - Update version of River to support raw strings in flow using a backtick. (@erikbaranowski) - Added support for python profiling to `pyroscope.ebpf` component. (@korniltsev) + +- Add queueing logs remote write client for `loki.write` when WAL is enabled. (@thepalbi) ### Bugfixes diff --git a/component/common/loki/client/batch.go b/component/common/loki/client/batch.go index 369a619d2556..8417ae019f53 100644 --- a/component/common/loki/client/batch.go +++ b/component/common/loki/client/batch.go @@ -24,9 +24,10 @@ const ( // and entries in a single batch request. In case of multi-tenant Promtail, log // streams for each tenant are stored in a dedicated batch. type batch struct { - streams map[string]*logproto.Stream - bytes int - createdAt time.Time + streams map[string]*logproto.Stream + // totalBytes holds the total amounts of bytes, across the log lines in this batch. + totalBytes int + createdAt time.Time maxStreams int } @@ -34,7 +35,7 @@ type batch struct { func newBatch(maxStreams int, entries ...loki.Entry) *batch { b := &batch{ streams: map[string]*logproto.Stream{}, - bytes: 0, + totalBytes: 0, createdAt: time.Now(), maxStreams: maxStreams, } @@ -50,7 +51,7 @@ func newBatch(maxStreams int, entries ...loki.Entry) *batch { // add an entry to the batch func (b *batch) add(entry loki.Entry) error { - b.bytes += len(entry.Line) + b.totalBytes += len(entry.Line) // Append the entry to an already existing stream (if any) labels := labelsMapToString(entry.Labels, ReservedLabelTenantID) @@ -71,6 +72,32 @@ func (b *batch) add(entry loki.Entry) error { return nil } +// add an entry to the batch +func (b *batch) addFromWAL(lbs model.LabelSet, entry logproto.Entry) error { + b.totalBytes += len(entry.Line) + + // Append the entry to an already existing stream (if any) + labels := labelsMapToString(lbs, ReservedLabelTenantID) + if stream, ok := b.streams[labels]; ok { + stream.Entries = append(stream.Entries, entry) + return nil + } + + streams := len(b.streams) + if b.maxStreams > 0 && streams >= b.maxStreams { + return fmt.Errorf(errMaxStreamsLimitExceeded, streams, b.maxStreams, labels) + } + + // Add the entry as a new stream + b.streams[labels] = &logproto.Stream{ + Labels: labels, + Entries: []logproto.Entry{entry}, + } + + return nil +} + +// labelsMapToString encodes an entry's label set as a string, ignoring the without label. func labelsMapToString(ls model.LabelSet, without model.LabelName) string { var b strings.Builder totalSize := 2 @@ -105,13 +132,13 @@ func labelsMapToString(ls model.LabelSet, without model.LabelName) string { // sizeBytes returns the current batch size in bytes func (b *batch) sizeBytes() int { - return b.bytes + return b.totalBytes } // sizeBytesAfter returns the size of the batch after the input entry // will be added to the batch itself -func (b *batch) sizeBytesAfter(entry loki.Entry) int { - return b.bytes + len(entry.Line) +func (b *batch) sizeBytesAfter(line string) int { + return b.totalBytes + len(line) } // age of the batch since its creation diff --git a/component/common/loki/client/client.go b/component/common/loki/client/client.go index 15bec06f4be9..36839dbcffa3 100644 --- a/component/common/loki/client/client.go +++ b/component/common/loki/client/client.go @@ -4,7 +4,6 @@ import ( "bufio" "bytes" "context" - "crypto/sha256" "errors" "fmt" "io" @@ -194,7 +193,7 @@ func newClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLin cfg: cfg, entries: make(chan loki.Entry), metrics: metrics, - name: asSha256(cfg), + name: GetClientName(cfg), externalLabels: cfg.ExternalLabels.LabelSet, ctx: ctx, @@ -318,7 +317,7 @@ func (c *client) run() { // If adding the entry to the batch will increase the size over the max // size allowed, we do send the current batch and then create a new one - if batch.sizeBytesAfter(e) > c.cfg.BatchSize { + if batch.sizeBytesAfter(e.Line) > c.cfg.BatchSize { c.sendBatch(tenantID, batch) batches[tenantID] = newBatch(c.maxStreams, e) @@ -355,14 +354,6 @@ func (c *client) Chan() chan<- loki.Entry { return c.entries } -func asSha256(o interface{}) string { - h := sha256.New() - h.Write([]byte(fmt.Sprintf("%v", o))) - - temp := fmt.Sprintf("%x", h.Sum(nil)) - return temp[:6] -} - func batchIsRateLimited(status int) bool { return status == 429 } diff --git a/component/common/loki/client/client_writeto.go b/component/common/loki/client/client_writeto.go index 33403882afa4..355b060286ef 100644 --- a/component/common/loki/client/client_writeto.go +++ b/component/common/loki/client/client_writeto.go @@ -18,16 +18,9 @@ import ( // clientWriteTo implements a wal.WriteTo that re-builds entries with the stored series, and the received entries. After, // sends each to the provided Client channel. type clientWriteTo struct { - series map[chunks.HeadSeriesRef]model.LabelSet - seriesLock sync.RWMutex - - // seriesSegment keeps track of the last segment in which the series pointed by each key in this map was seen. Keeping - // this in a separate map avoids unnecessary contention. - // - // Even though it doesn't present a difference right now according to benchmarks, it will help when we introduce other - // calls from the wal.Watcher to the wal.WriteTo like `UpdateSeriesSegment`. - seriesSegment map[chunks.HeadSeriesRef]int - seriesSegmentLock sync.RWMutex + series map[chunks.HeadSeriesRef]model.LabelSet + seriesSegment map[chunks.HeadSeriesRef]int + seriesLock sync.RWMutex logger log.Logger toClient chan<- loki.Entry @@ -46,8 +39,6 @@ func newClientWriteTo(toClient chan<- loki.Entry, logger log.Logger) *clientWrit func (c *clientWriteTo) StoreSeries(series []record.RefSeries, segment int) { c.seriesLock.Lock() defer c.seriesLock.Unlock() - c.seriesSegmentLock.Lock() - defer c.seriesSegmentLock.Unlock() for _, seriesRec := range series { c.seriesSegment[seriesRec.Ref] = segment labels := util.MapToModelLabelSet(seriesRec.Labels.Map()) @@ -59,8 +50,6 @@ func (c *clientWriteTo) StoreSeries(series []record.RefSeries, segment int) { func (c *clientWriteTo) SeriesReset(segmentNum int) { c.seriesLock.Lock() defer c.seriesLock.Unlock() - c.seriesSegmentLock.Lock() - defer c.seriesSegmentLock.Unlock() for k, v := range c.seriesSegment { if v <= segmentNum { level.Debug(c.logger).Log("msg", fmt.Sprintf("reclaiming series under segment %d", segmentNum)) @@ -70,7 +59,7 @@ func (c *clientWriteTo) SeriesReset(segmentNum int) { } } -func (c *clientWriteTo) AppendEntries(entries wal.RefEntries) error { +func (c *clientWriteTo) AppendEntries(entries wal.RefEntries, _ int) error { var entry loki.Entry c.seriesLock.RLock() l, ok := c.series[entries.Ref] @@ -82,6 +71,7 @@ func (c *clientWriteTo) AppendEntries(entries wal.RefEntries) error { c.toClient <- entry } } else { + // TODO(thepalbi): Add metric here level.Debug(c.logger).Log("msg", "series for entry not found") } return nil diff --git a/component/common/loki/client/client_writeto_test.go b/component/common/loki/client/client_writeto_test.go index f6c42b85bcc0..1ad7929b58ad 100644 --- a/component/common/loki/client/client_writeto_test.go +++ b/component/common/loki/client/client_writeto_test.go @@ -68,7 +68,7 @@ func TestClientWriter_LogEntriesAreReconstructedAndForwardedCorrectly(t *testing Line: line, }, }, - }) + }, 0) } require.Eventually(t, func() bool { @@ -125,7 +125,7 @@ func TestClientWriter_LogEntriesWithoutMatchingSeriesAreIgnored(t *testing.T) { Line: line, }, }, - }) + }, 0) } time.Sleep(time.Second * 2) @@ -237,7 +237,7 @@ func startWriter(segmentNum, seriesToReset int, target *clientWriteTo, lines int Line: fmt.Sprintf("%d - %d - hellooo", segmentNum, i), }, }, - }) + }, 0) // add some jitter between writes randomSleepMax(time.Millisecond * 1) } diff --git a/component/common/loki/client/config.go b/component/common/loki/client/config.go index 0673e8aab8df..df684d3c3ae7 100644 --- a/component/common/loki/client/config.go +++ b/component/common/loki/client/config.go @@ -46,6 +46,23 @@ type Config struct { DropRateLimitedBatches bool `yaml:"drop_rate_limited_batches"` StreamLagLabels flagext.StringSliceCSV `yaml:"stream_lag_labels" doc:"deprecated"` + + // Queue controls configuration parameters specific to the queue client + Queue QueueConfig +} + +// QueueConfig holds configurations for the queue-based remote-write client. +type QueueConfig struct { + // Capacity is the worst case size in bytes desired for the send queue. This value is used to calculate the size of + // the buffered channel used underneath. The worst case scenario assumed is that every batch buffered in full, hence + // the channel capacity would be calculated as: bufferChannelSize = Capacity / BatchSize. + // + // For example, assuming BatchSize + // is the 1 MiB default, and a capacity of 100 MiB, the underlying buffered channel would buffer up to 100 batches. + Capacity int + + // DrainTimeout controls the maximum time that draining the send queue can take. + DrainTimeout time.Duration } // RegisterFlags with prefix registers flags where every name is prefixed by diff --git a/component/common/loki/client/manager.go b/component/common/loki/client/manager.go index 0c7ee5a531bb..e91792a31228 100644 --- a/component/common/loki/client/manager.go +++ b/component/common/loki/client/manager.go @@ -1,6 +1,7 @@ package client import ( + "crypto/sha256" "fmt" "strings" "sync" @@ -37,6 +38,11 @@ type Stoppable interface { Stop() } +type StoppableClient interface { + Stop() + StopNow() +} + // Manager manages remote write client instantiation, and connects the related components to orchestrate the flow of loki.Entry // from the scrape targets, to the remote write clients themselves. // @@ -48,6 +54,9 @@ type Manager struct { clients []Client walWatchers []Stoppable + // stoppableClients is kept separate from clients for avoiding having to couple queueClient to the Client interface + stoppableClients []StoppableClient + entries chan loki.Entry once sync.Once @@ -67,51 +76,59 @@ func NewManager(metrics *Metrics, logger log.Logger, limits limit.Config, reg pr clientsCheck := make(map[string]struct{}) clients := make([]Client, 0, len(clientCfgs)) watchers := make([]Stoppable, 0, len(clientCfgs)) + stoppableClients := make([]StoppableClient, 0, len(clientCfgs)) for _, cfg := range clientCfgs { - client, err := New(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger) - if err != nil { - return nil, err - } - // Don't allow duplicate clients, we have client specific metrics that need at least one unique label value (name). - if _, ok := clientsCheck[client.Name()]; ok { + clientName := GetClientName(cfg) + if _, ok := clientsCheck[clientName]; ok { return nil, fmt.Errorf("duplicate client configs are not allowed, found duplicate for name: %s", cfg.Name) } - clientsCheck[client.Name()] = fake - clients = append(clients, client) + clientsCheck[clientName] = fake if walCfg.Enabled { - // Create and launch wal watcher for this client - // add some context information for the logger the watcher uses - wlog := log.With(logger, "client", client.Name()) + wlog := log.With(logger, "client", clientName) + + queue, err := NewQueue(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger) + if err != nil { + return nil, fmt.Errorf("error starting queue client: %w", err) + } + stoppableClients = append(stoppableClients, queue) - writeTo := newClientWriteTo(client.Chan(), wlog) // subscribe watcher's wal.WriteTo to writer events. This will make the writer trigger the cleanup of the wal.WriteTo // series cache whenever a segment is deleted. - notifier.SubscribeCleanup(writeTo) + notifier.SubscribeCleanup(queue) - watcher := wal.NewWatcher(walCfg.Dir, client.Name(), watcherMetrics, writeTo, wlog, walCfg.WatchConfig) + watcher := wal.NewWatcher(walCfg.Dir, clientName, watcherMetrics, queue, wlog, walCfg.WatchConfig) // subscribe watcher to wal write events notifier.SubscribeWrite(watcher) - level.Debug(logger).Log("msg", "starting WAL watcher for client", "client", client.Name()) + level.Debug(logger).Log("msg", "starting WAL watcher for client", "client", clientName) watcher.Start() watchers = append(watchers, watcher) + } else { + client, err := New(metrics, cfg, limits.MaxStreams, limits.MaxLineSize.Val(), limits.MaxLineSizeTruncate, logger) + if err != nil { + return nil, fmt.Errorf("error starting client: %w", err) + } + + clients = append(clients, client) + stoppableClients = append(stoppableClients, client) } } manager := &Manager{ - clients: clients, - walWatchers: watchers, - entries: make(chan loki.Entry), + clients: clients, + stoppableClients: stoppableClients, + walWatchers: watchers, + entries: make(chan loki.Entry), } if walCfg.Enabled { - manager.name = "wal" + manager.name = buildManagerName("wal", clientCfgs...) manager.startWithConsume() } else { - manager.name = "multi" + manager.name = buildManagerName("multi", clientCfgs...) manager.startWithForward() } return manager, nil @@ -148,22 +165,13 @@ func (m *Manager) startWithForward() { } func (m *Manager) StopNow() { - for _, c := range m.clients { + for _, c := range m.stoppableClients { c.StopNow() } } func (m *Manager) Name() string { - var sb strings.Builder - sb.WriteString(m.name) - sb.WriteString(":") - for i, c := range m.clients { - sb.WriteString(c.Name()) - if i != len(m.clients)-1 { - sb.WriteString(",") - } - } - return sb.String() + return m.name } func (m *Manager) Chan() chan<- loki.Entry { @@ -179,7 +187,38 @@ func (m *Manager) Stop() { walWatcher.Stop() } // close clients - for _, c := range m.clients { + for _, c := range m.stoppableClients { c.Stop() } } + +// GetClientName computes the specific name for each client config. The name is either the configured Name setting in Config, +// or a hash of the config as whole, this allows us to detect repeated configs. +func GetClientName(cfg Config) string { + if cfg.Name != "" { + return cfg.Name + } + return asSha256(cfg) +} + +func asSha256(o interface{}) string { + h := sha256.New() + h.Write([]byte(fmt.Sprintf("%v", o))) + + temp := fmt.Sprintf("%x", h.Sum(nil)) + return temp[:6] +} + +// buildManagerName assembles the Manager's name from all configs, and a given prefix. +func buildManagerName(prefix string, cfgs ...Config) string { + var sb strings.Builder + sb.WriteString(prefix) + sb.WriteString(":") + for i, c := range cfgs { + sb.WriteString(GetClientName(c)) + if i != len(cfgs)-1 { + sb.WriteString(",") + } + } + return sb.String() +} diff --git a/component/common/loki/client/manager_test.go b/component/common/loki/client/manager_test.go index dc3764992640..97ba4c1b46cb 100644 --- a/component/common/loki/client/manager_test.go +++ b/component/common/loki/client/manager_test.go @@ -17,7 +17,6 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/agent/component/common/loki" - "github.com/grafana/agent/component/common/loki/client/fake" "github.com/grafana/agent/component/common/loki/limit" "github.com/grafana/agent/component/common/loki/utils" "github.com/grafana/agent/component/common/loki/wal" @@ -99,6 +98,10 @@ func newServerAndClientConfig(t *testing.T) (Config, chan utils.RemoteWriteReque BackoffConfig: backoff.Config{ MaxRetries: 0, }, + Queue: QueueConfig{ + Capacity: 10, // buffered channel of size 10 + DrainTimeout: time.Second * 10, + }, } return testClientConfig, receivedReqsChan, closerFunc(func() { server.Close() @@ -127,10 +130,10 @@ func TestManager_WALEnabled(t *testing.T) { require.NoError(t, err) require.Equal(t, "wal:test-client", manager.Name()) - receivedRequest := utils.NewSyncSlice[utils.RemoteWriteRequest]() + receivedRequests := utils.NewSyncSlice[utils.RemoteWriteRequest]() go func() { for req := range rwReceivedReqs { - receivedRequest.Append(req) + receivedRequests.Append(req) } }() @@ -155,13 +158,13 @@ func TestManager_WALEnabled(t *testing.T) { } require.Eventually(t, func() bool { - return receivedRequest.Length() == totalLines + return receivedRequests.Length() == totalLines }, 5*time.Second, time.Second, "timed out waiting for requests to be received") var seenEntries = map[string]struct{}{} // assert over rw client received entries - defer receivedRequest.DoneIterate() - for _, req := range receivedRequest.StartIterate() { + defer receivedRequests.DoneIterate() + for _, req := range receivedRequests.StartIterate() { require.Len(t, req.Request.Streams, 1, "expected 1 stream requests to be received") require.Len(t, req.Request.Streams[0].Entries, 1, "expected 1 entry in the only stream received per request") require.Equal(t, `{wal_enabled="true"}`, req.Request.Streams[0].Labels) @@ -304,23 +307,3 @@ func TestManager_WALDisabled_MultipleConfigs(t *testing.T) { } require.Len(t, seenEntries, expectedTotalLines) } - -func TestManager_StopClients(t *testing.T) { - var stopped int - - stopping := func() { - stopped++ - } - fc := fake.NewClient(stopping) - clients := []Client{fc, fc, fc, fc} - m := &Manager{ - clients: clients, - entries: make(chan loki.Entry), - } - m.startWithForward() - m.Stop() - - if stopped != len(clients) { - t.Fatal("missing stop call") - } -} diff --git a/component/common/loki/client/queue_client.go b/component/common/loki/client/queue_client.go new file mode 100644 index 000000000000..3d869dda3ce6 --- /dev/null +++ b/component/common/loki/client/queue_client.go @@ -0,0 +1,579 @@ +package client + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io" + "net/http" + "strconv" + "sync" + "time" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + agentWal "github.com/grafana/agent/component/common/loki/wal" + "github.com/grafana/dskit/backoff" + "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/record" + + "github.com/grafana/loki/pkg/ingester/wal" + "github.com/grafana/loki/pkg/logproto" + lokiutil "github.com/grafana/loki/pkg/util" +) + +// StoppableWriteTo is a mixing of the WAL's WriteTo interface, that is Stoppable as well. +type StoppableWriteTo interface { + agentWal.WriteTo + Stoppable + StopNow() +} + +// queuedBatch is a batch specific to a tenant, that is considered ready to be sent. +type queuedBatch struct { + TenantID string + Batch *batch +} + +// queue wraps a buffered channel and a routine that reads from it, sending batches of entries. +type queue struct { + client *queueClient + q chan queuedBatch + quit chan struct{} + wg sync.WaitGroup + logger log.Logger +} + +func newQueue(client *queueClient, size int, logger log.Logger) *queue { + q := queue{ + client: client, + q: make(chan queuedBatch, size), + quit: make(chan struct{}), + logger: logger, + } + + q.wg.Add(1) + go q.run() + + return &q +} + +// enqueue is a blocking operation to add to the send queue a batch ready to be sent. +func (q *queue) enqueue(qb queuedBatch) { + q.q <- qb +} + +// enqueueWithCancel tries to enqueue a batch, giving up if the supplied context times deadlines +// times out. If the batch is successfully enqueued, it returns true. +func (q *queue) enqueueWithCancel(ctx context.Context, qb queuedBatch) bool { + select { + case <-ctx.Done(): + return false + case q.q <- qb: + } + return true +} + +func (q *queue) run() { + defer q.wg.Done() + + for { + select { + case <-q.quit: + return + case qb := <-q.q: + // Since inside the actual send operation a context with time out is used, we should exceed that timeout + // instead of cancelling this send operations, since that batch has been taken out of the queue. + q.client.sendBatch(context.Background(), qb.TenantID, qb.Batch) + } + } +} + +// closeAndDrain stops gracefully the queue. The process first stops the main routine that reads batches to be sent, +// to instead drain the queue and send those batches from this thread, exiting if the supplied context deadline +// is exceeded. Also, if the underlying buffered channel is fully drain, this will exit promptly. +func (q *queue) closeAndDrain(ctx context.Context) { + // defer main channel closing + defer close(q.q) + + // first stop main routine, and wait for it to signal + close(q.quit) + q.wg.Wait() + + // keep reading messages from sendQueue until all have been consumed, or timeout is exceeded + for { + select { + case qb := <-q.q: + // drain uses the same timeout, so if a timeout was applied to the parent context, it can cancel the underlying + // send operation preemptively. + q.client.sendBatch(ctx, qb.TenantID, qb.Batch) + case <-ctx.Done(): + level.Warn(q.logger).Log("msg", "timeout exceeded while draining send queue") + return + default: + level.Debug(q.logger).Log("msg", "drain queue exited because there were no batches left to send") + return + // if default clause is taken, it means there's nothing left in the send queue + } + } +} + +// closeNow closes the queue, without draining batches that might be buffered to be sent. +func (q *queue) closeNow() { + close(q.quit) + q.wg.Wait() + close(q.q) +} + +// queueClient is a WAL-specific remote write client implementation. This client attests to the wal.WriteTo interface, +// which allows it to be injected in the wal.Watcher as a destination where to write read series and entries. As the watcher +// reads from the WAL, batches are created and dispatched onto a send queue when ready to be sent. +type queueClient struct { + metrics *Metrics + logger log.Logger + cfg Config + client *http.Client + + batches map[string]*batch + batchesMtx sync.Mutex + sendQueue *queue + drainTimeout time.Duration + + wg sync.WaitGroup + + externalLabels model.LabelSet + + // series cache + series map[chunks.HeadSeriesRef]model.LabelSet + seriesSegment map[chunks.HeadSeriesRef]int + seriesLock sync.RWMutex + + // ctx is used in any upstream calls from the `client`. + ctx context.Context + cancel context.CancelFunc + maxStreams int + maxLineSize int + maxLineSizeTruncate bool + quit chan struct{} +} + +// NewQueue creates a new queueClient. +func NewQueue(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger) (StoppableWriteTo, error) { + if cfg.StreamLagLabels.String() != "" { + return nil, fmt.Errorf("client config stream_lag_labels is deprecated and the associated metric has been removed, stream_lag_labels: %+v", cfg.StreamLagLabels.String()) + } + return newQueueClient(metrics, cfg, maxStreams, maxLineSize, maxLineSizeTruncate, logger) +} + +func newQueueClient(metrics *Metrics, cfg Config, maxStreams, maxLineSize int, maxLineSizeTruncate bool, logger log.Logger) (*queueClient, error) { + if cfg.URL.URL == nil { + return nil, errors.New("client needs target URL") + } + + ctx, cancel := context.WithCancel(context.Background()) + + c := &queueClient{ + logger: log.With(logger, "component", "client", "host", cfg.URL.Host), + cfg: cfg, + metrics: metrics, + drainTimeout: cfg.Queue.DrainTimeout, + quit: make(chan struct{}), + + batches: make(map[string]*batch), + + series: make(map[chunks.HeadSeriesRef]model.LabelSet), + seriesSegment: make(map[chunks.HeadSeriesRef]int), + + externalLabels: cfg.ExternalLabels.LabelSet, + ctx: ctx, + cancel: cancel, + maxStreams: maxStreams, + maxLineSize: maxLineSize, + maxLineSizeTruncate: maxLineSizeTruncate, + } + + // The buffered channel size is calculated using the configured capacity, which is the worst case number of bytes + // the send queue can consume. + var queueBufferSize = cfg.Queue.Capacity / cfg.BatchSize + c.sendQueue = newQueue(c, queueBufferSize, logger) + + err := cfg.Client.Validate() + if err != nil { + return nil, err + } + + c.client, err = config.NewClientFromConfig(cfg.Client, "GrafanaAgent", config.WithHTTP2Disabled()) + if err != nil { + return nil, err + } + + c.client.Timeout = cfg.Timeout + + // Initialize counters to 0 so the metrics are exported before the first + // occurrence of incrementing to avoid missing metrics. + for _, counter := range c.metrics.countersWithHost { + counter.WithLabelValues(c.cfg.URL.Host).Add(0) + } + + c.wg.Add(1) + go c.runSendOldBatches() + return c, nil +} + +func (c *queueClient) initBatchMetrics(tenantID string) { + // Initialize counters to 0 so the metrics are exported before the first + // occurrence of incrementing to avoid missing metrics. + for _, counter := range c.metrics.countersWithHostTenantReason { + for _, reason := range Reasons { + counter.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(0) + } + } + + for _, counter := range c.metrics.countersWithHostTenant { + counter.WithLabelValues(c.cfg.URL.Host, tenantID).Add(0) + } +} + +func (c *queueClient) SeriesReset(segmentNum int) { + c.seriesLock.Lock() + defer c.seriesLock.Unlock() + for k, v := range c.seriesSegment { + if v <= segmentNum { + level.Debug(c.logger).Log("msg", fmt.Sprintf("reclaiming series under segment %d", segmentNum)) + delete(c.seriesSegment, k) + delete(c.series, k) + } + } +} + +func (c *queueClient) StoreSeries(series []record.RefSeries, segment int) { + c.seriesLock.Lock() + defer c.seriesLock.Unlock() + for _, seriesRec := range series { + c.seriesSegment[seriesRec.Ref] = segment + labels := lokiutil.MapToModelLabelSet(seriesRec.Labels.Map()) + c.series[seriesRec.Ref] = labels + } +} + +func (c *queueClient) AppendEntries(entries wal.RefEntries, _ int) error { + c.seriesLock.RLock() + l, ok := c.series[entries.Ref] + c.seriesLock.RUnlock() + if ok { + for _, e := range entries.Entries { + c.appendSingleEntry(l, e) + } + } else { + // TODO(thepalbi): Add metric here + level.Debug(c.logger).Log("msg", "series for entry not found") + } + return nil +} + +func (c *queueClient) appendSingleEntry(lbs model.LabelSet, e logproto.Entry) { + lbs, tenantID := c.processLabels(lbs) + + // Either drop or mutate the log entry because its length is greater than maxLineSize. maxLineSize == 0 means disabled. + if c.maxLineSize != 0 && len(e.Line) > c.maxLineSize { + if !c.maxLineSizeTruncate { + c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Inc() + c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Add(float64(len(e.Line))) + return + } + + c.metrics.mutatedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Inc() + c.metrics.mutatedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonLineTooLong).Add(float64(len(e.Line) - c.maxLineSize)) + e.Line = e.Line[:c.maxLineSize] + } + + // TODO: can I make this locking more fine grained? + c.batchesMtx.Lock() + + batch, ok := c.batches[tenantID] + + // If the batch doesn't exist yet, we create a new one with the entry + if !ok { + nb := newBatch(c.maxStreams) + // since the batch is new, adding a new entry, and hence a new stream, won't fail since there aren't any stream + // registered in the batch. + _ = nb.addFromWAL(lbs, e) + + c.batches[tenantID] = nb + c.batchesMtx.Unlock() + + c.initBatchMetrics(tenantID) + return + } + + // If adding the entry to the batch will increase the size over the max + // size allowed, we do send the current batch and then create a new one + if batch.sizeBytesAfter(e.Line) > c.cfg.BatchSize { + c.sendQueue.enqueue(queuedBatch{ + TenantID: tenantID, + Batch: batch, + }) + + nb := newBatch(c.maxStreams) + _ = nb.addFromWAL(lbs, e) + c.batches[tenantID] = nb + c.batchesMtx.Unlock() + + return + } + + // The max size of the batch isn't reached, so we can add the entry + err := batch.addFromWAL(lbs, e) + c.batchesMtx.Unlock() + + if err != nil { + level.Error(c.logger).Log("msg", "batch add err", "tenant", tenantID, "error", err) + reason := ReasonGeneric + if err.Error() == errMaxStreamsLimitExceeded { + reason = ReasonStreamLimited + } + c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Add(float64(len(e.Line))) + c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, reason).Inc() + } +} + +func (c *queueClient) runSendOldBatches() { + // Given the client handles multiple batches (1 per tenant) and each batch + // can be created at a different point in time, we look for batches whose + // max wait time has been reached every 10 times per BatchWait, so that the + // maximum delay we have sending batches is 10% of the max waiting time. + // We apply a cap of 10ms to the ticker, to avoid too frequent checks in + // case the BatchWait is very low. + minWaitCheckFrequency := 10 * time.Millisecond + maxWaitCheckFrequency := c.cfg.BatchWait / 10 + if maxWaitCheckFrequency < minWaitCheckFrequency { + maxWaitCheckFrequency = minWaitCheckFrequency + } + + maxWaitCheck := time.NewTicker(maxWaitCheckFrequency) + + // pablo: maybe this should be moved out + defer func() { + maxWaitCheck.Stop() + c.wg.Done() + }() + + var batchesToFlush []queuedBatch + + for { + select { + case <-c.quit: + return + + case <-maxWaitCheck.C: + c.batchesMtx.Lock() + // Send all batches whose max wait time has been reached + for tenantID, b := range c.batches { + if b.age() < c.cfg.BatchWait { + continue + } + + // add to batches to flush, so we can enqueue them later and release the batches lock + // as early as possible + batchesToFlush = append(batchesToFlush, queuedBatch{ + TenantID: tenantID, + Batch: b, + }) + + // deleting assuming that since the batch expired the wait time, it + // hasn't been written for some time + delete(c.batches, tenantID) + } + + c.batchesMtx.Unlock() + + // enqueue batches that were marked as too old + for _, qb := range batchesToFlush { + c.sendQueue.enqueue(qb) + } + + batchesToFlush = batchesToFlush[:0] // renew slide + } + } +} + +// enqueuePendingBatches will go over the pending batches, and enqueue them in the send queue. If the context's +// deadline is exceeded in any enqueue operation, this routine exits. +func (c *queueClient) enqueuePendingBatches(ctx context.Context) { + c.batchesMtx.Lock() + defer c.batchesMtx.Unlock() + + for tenantID, batch := range c.batches { + if !c.sendQueue.enqueueWithCancel(ctx, queuedBatch{ + TenantID: tenantID, + Batch: batch, + }) { + // if enqueue times out due to the context timing out, cancel all + return + } + } +} + +func (c *queueClient) sendBatch(ctx context.Context, tenantID string, batch *batch) { + buf, entriesCount, err := batch.encode() + if err != nil { + level.Error(c.logger).Log("msg", "error encoding batch", "error", err) + return + } + bufBytes := float64(len(buf)) + c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) + + backoff := backoff.New(c.ctx, c.cfg.BackoffConfig) + var status int + for { + start := time.Now() + // send uses `timeout` internally, so `context.Background` is good enough. + status, err = c.send(ctx, tenantID, buf) + + c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds()) + + // Immediately drop rate limited batches to avoid HOL blocking for other tenants not experiencing throttling + if c.cfg.DropRateLimitedBatches && batchIsRateLimited(status) { + level.Warn(c.logger).Log("msg", "dropping batch due to rate limiting applied at ingester") + c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonRateLimited).Add(bufBytes) + c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, ReasonRateLimited).Add(float64(entriesCount)) + return + } + + if err == nil { + c.metrics.sentBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) + c.metrics.sentEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) + + return + } + + // Only retry 429s, 500s and connection-level errors. + if status > 0 && !batchIsRateLimited(status) && status/100 != 5 { + break + } + + level.Warn(c.logger).Log("msg", "error sending batch, will retry", "status", status, "tenant", tenantID, "error", err) + c.metrics.batchRetries.WithLabelValues(c.cfg.URL.Host, tenantID).Inc() + backoff.Wait() + + // Make sure it sends at least once before checking for retry. + if !backoff.Ongoing() { + break + } + } + + if err != nil { + level.Error(c.logger).Log("msg", "final error sending batch", "status", status, "tenant", tenantID, "error", err) + // If the reason for the last retry error was rate limiting, count the drops as such, even if the previous errors + // were for a different reason + dropReason := ReasonGeneric + if batchIsRateLimited(status) { + dropReason = ReasonRateLimited + } + c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host, tenantID, dropReason).Add(bufBytes) + c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host, tenantID, dropReason).Add(float64(entriesCount)) + } +} + +func (c *queueClient) send(ctx context.Context, tenantID string, buf []byte) (int, error) { + ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout) + defer cancel() + req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf)) + if err != nil { + return -1, err + } + req = req.WithContext(ctx) + req.Header.Set("Content-Type", contentType) + req.Header.Set("User-Agent", UserAgent) + + // If the tenant ID is not empty promtail is running in multi-tenant mode, so + // we should send it to Loki + if tenantID != "" { + req.Header.Set("X-Scope-OrgID", tenantID) + } + + // Add custom headers on request + if len(c.cfg.Headers) > 0 { + for k, v := range c.cfg.Headers { + if req.Header.Get(k) == "" { + req.Header.Add(k, v) + } else { + level.Warn(c.logger).Log("msg", "custom header key already exists, skipping", "key", k) + } + } + } + + resp, err := c.client.Do(req) + if err != nil { + return -1, err + } + defer lokiutil.LogError("closing response body", resp.Body.Close) + + if resp.StatusCode/100 != 2 { + scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen)) + line := "" + if scanner.Scan() { + line = scanner.Text() + } + err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line) + } + return resp.StatusCode, err +} + +func (c *queueClient) getTenantID(labels model.LabelSet) string { + // Check if it has been overridden while processing the pipeline stages + if value, ok := labels[ReservedLabelTenantID]; ok { + return string(value) + } + + // Check if has been specified in the config + if c.cfg.TenantID != "" { + return c.cfg.TenantID + } + + // Defaults to an empty string, which means the X-Scope-OrgID header + // will not be sent + return "" +} + +// Stop the client, enqueueing pending batches and draining the send queue accordingly. Both closing operations are +// limited by a deadline, controlled by a configured drain timeout, which is global to the Stop call. +func (c *queueClient) Stop() { + // first close main queue routine + close(c.quit) + c.wg.Wait() + + // fire timeout timer + ctx, cancel := context.WithTimeout(context.Background(), c.drainTimeout) + defer cancel() + + // enqueue batches that might be pending in the batches map + c.enqueuePendingBatches(ctx) + + // drain sendQueue with timeout in context + c.sendQueue.closeAndDrain(ctx) + + // stop request after drain times out or exits + c.cancel() +} + +// StopNow stops the client without retries or draining the send queue +func (c *queueClient) StopNow() { + // cancel will stop retrying http requests. + c.cancel() + close(c.quit) + c.sendQueue.closeNow() + c.wg.Wait() +} + +func (c *queueClient) processLabels(lbs model.LabelSet) (model.LabelSet, string) { + if len(c.externalLabels) > 0 { + lbs = c.externalLabels.Merge(lbs) + } + tenantID := c.getTenantID(lbs) + return lbs, tenantID +} diff --git a/component/common/loki/client/queue_client_test.go b/component/common/loki/client/queue_client_test.go new file mode 100644 index 000000000000..e64f9afab45a --- /dev/null +++ b/component/common/loki/client/queue_client_test.go @@ -0,0 +1,390 @@ +package client + +import ( + "fmt" + "os" + "testing" + "time" + + "github.com/alecthomas/units" + "github.com/go-kit/log" + "github.com/grafana/agent/component/common/loki" + "github.com/grafana/agent/component/common/loki/utils" + "github.com/grafana/dskit/backoff" + "github.com/grafana/dskit/flagext" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/config" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/labels" + "github.com/prometheus/prometheus/tsdb/chunks" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/stretchr/testify/require" + "go.uber.org/atomic" + + "github.com/grafana/loki/pkg/ingester/wal" + "github.com/grafana/loki/pkg/logproto" + lokiflag "github.com/grafana/loki/pkg/util/flagext" +) + +type testCase struct { + // numLines is the total number of lines sent through the client in the benchmark. + numLines int + + // numSeries is the different number of series to use in entries. Series are dynamically generated for each entry, but + // would be numSeries in total, and evenly distributed. + numSeries int + + // configs + batchSize int + batchWait time.Duration + queueConfig QueueConfig + + // expects + expectedRWReqsCount int64 +} + +func TestQueueClient(t *testing.T) { + for name, tc := range map[string]testCase{ + "small test": { + numLines: 3, + numSeries: 1, + batchSize: 10, + batchWait: time.Millisecond * 50, + queueConfig: QueueConfig{ + Capacity: 100, + DrainTimeout: time.Second, + }, + }, + "many lines and series, immediate delivery": { + numLines: 1000, + numSeries: 10, + batchSize: 10, + batchWait: time.Millisecond * 50, + queueConfig: QueueConfig{ + Capacity: 100, + DrainTimeout: time.Second, + }, + }, + "many lines and series, delivery because of batch age": { + numLines: 100, + numSeries: 10, + batchSize: int(1 * units.MiB), // make batch size big enough so that all batches should be delivered because of batch age + batchWait: time.Millisecond * 50, + queueConfig: QueueConfig{ + Capacity: int(100 * units.MiB), // keep buffered channel size on 100 + DrainTimeout: 10 * time.Second, + }, + expectedRWReqsCount: 1, // expect all entries to be sent in a single batch (100 * < 10B per line) < 1MiB + }, + } { + t.Run(name, func(t *testing.T) { + reg := prometheus.NewRegistry() + + // Create a buffer channel where we do enqueue received requests + receivedReqsChan := make(chan utils.RemoteWriteRequest, 10) + // count the number for remote-write requests received (which should correlated with the number of sent batches), + // and the total number of entries. + var receivedRWsCount atomic.Int64 + var receivedEntriesCount atomic.Int64 + + receivedReqs := utils.NewSyncSlice[utils.RemoteWriteRequest]() + go func() { + for req := range receivedReqsChan { + receivedReqs.Append(req) + receivedRWsCount.Add(1) + for _, s := range req.Request.Streams { + receivedEntriesCount.Add(int64(len(s.Entries))) + } + } + }() + + // Start a local HTTP server + server := utils.NewRemoteWriteServer(receivedReqsChan, 200) + require.NotNil(t, server) + defer server.Close() + + // Get the URL at which the local test server is listening to + serverURL := flagext.URLValue{} + err := serverURL.Set(server.URL) + require.NoError(t, err) + + // Instance the client + cfg := Config{ + URL: serverURL, + BatchWait: tc.batchWait, + BatchSize: tc.batchSize, + Client: config.HTTPClientConfig{}, + BackoffConfig: backoff.Config{MinBackoff: 5 * time.Second, MaxBackoff: 10 * time.Second, MaxRetries: 1}, + ExternalLabels: lokiflag.LabelSet{}, + Timeout: 1 * time.Second, + TenantID: "", + Queue: tc.queueConfig, + } + + logger := log.NewLogfmtLogger(os.Stdout) + + m := NewMetrics(reg) + qc, err := NewQueue(m, cfg, 0, 0, false, logger) + require.NoError(t, err) + + //labels := model.LabelSet{"app": "test"} + lines := make([]string, 0, tc.numLines) + for i := 0; i < tc.numLines; i++ { + lines = append(lines, fmt.Sprintf("hola %d", i)) + } + + // Send all the input log entries + for i, l := range lines { + mod := i % tc.numSeries + qc.StoreSeries([]record.RefSeries{ + { + Labels: labels.Labels{{ + Name: "app", + Value: fmt.Sprintf("test-%d", mod), + }}, + Ref: chunks.HeadSeriesRef(mod), + }, + }, 0) + + _ = qc.AppendEntries(wal.RefEntries{ + Ref: chunks.HeadSeriesRef(mod), + Entries: []logproto.Entry{{ + Timestamp: time.Now(), + Line: l, + }}, + }, 0) + } + + require.Eventually(t, func() bool { + return receivedEntriesCount.Load() == int64(len(lines)) + }, time.Second*10, time.Second, "timed out waiting for entries to arrive") + + if tc.expectedRWReqsCount != 0 { + require.Equal(t, tc.expectedRWReqsCount, receivedRWsCount.Load(), "number for remote write request not expected") + } + + // Stop the client: it waits until the current batch is sent + qc.Stop() + close(receivedReqsChan) + }) + } +} + +func BenchmarkClientImplementations(b *testing.B) { + for name, bc := range map[string]testCase{ + "100 entries, single series, no batching": { + numLines: 100, + numSeries: 1, + batchSize: 10, + batchWait: time.Millisecond * 50, + queueConfig: QueueConfig{ + Capacity: 1000, // buffer size 100 + DrainTimeout: time.Second, + }, + }, + "100k entries, 100 series, default batching": { + numLines: 100_000, + numSeries: 100, + batchSize: int(1 * units.MiB), + batchWait: time.Second, + queueConfig: QueueConfig{ + Capacity: int(10 * units.MiB), // buffer size 100 + DrainTimeout: 5 * time.Second, + }, + }, + } { + b.Run(name, func(b *testing.B) { + b.Run("implementation=queue", func(b *testing.B) { + runQueueClientBenchCase(b, bc) + }) + + b.Run("implementation=regular", func(b *testing.B) { + runRegularClientBenchCase(b, bc) + }) + }) + } +} + +func runQueueClientBenchCase(b *testing.B, bc testCase) { + reg := prometheus.NewRegistry() + + // Create a buffer channel where we do enqueue received requests + receivedReqsChan := make(chan utils.RemoteWriteRequest, 10) + // count the number for remote-write requests received (which should correlated with the number of sent batches), + // and the total number of entries. + var receivedEntriesCount atomic.Int64 + reset := func() { + receivedEntriesCount.Store(0) + } + + go func() { + for req := range receivedReqsChan { + for _, s := range req.Request.Streams { + receivedEntriesCount.Add(int64(len(s.Entries))) + } + } + }() + + // Start a local HTTP server + server := utils.NewRemoteWriteServer(receivedReqsChan, 200) + require.NotNil(b, server) + defer server.Close() + + // Get the URL at which the local test server is listening to + serverURL := flagext.URLValue{} + err := serverURL.Set(server.URL) + require.NoError(b, err) + + // Instance the client + cfg := Config{ + URL: serverURL, + BatchWait: time.Millisecond * 50, + BatchSize: 10, + Client: config.HTTPClientConfig{}, + BackoffConfig: backoff.Config{MinBackoff: 5 * time.Second, MaxBackoff: 10 * time.Second, MaxRetries: 1}, + ExternalLabels: lokiflag.LabelSet{}, + Timeout: 1 * time.Second, + TenantID: "", + Queue: QueueConfig{ + Capacity: 1000, // queue size of 100 + DrainTimeout: time.Second * 10, + }, + } + + logger := log.NewLogfmtLogger(os.Stdout) + + m := NewMetrics(reg) + qc, err := NewQueue(m, cfg, 0, 0, false, logger) + require.NoError(b, err) + + //labels := model.LabelSet{"app": "test"} + var lines []string + for i := 0; i < bc.numLines; i++ { + lines = append(lines, fmt.Sprintf("hola %d", i)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // Send all the input log entries + for j, l := range lines { + seriesId := j % bc.numSeries + qc.StoreSeries([]record.RefSeries{ + { + Labels: labels.Labels{{ + Name: "app", + // take j module bc.numSeries to evenly distribute those numSeries across all sent entries + Value: fmt.Sprintf("series-%d", seriesId), + }}, + Ref: chunks.HeadSeriesRef(seriesId), + }, + }, 0) + + _ = qc.AppendEntries(wal.RefEntries{ + Ref: chunks.HeadSeriesRef(seriesId), + Entries: []logproto.Entry{{ + Timestamp: time.Now(), + Line: l, + }}, + }, 0) + } + + require.Eventually(b, func() bool { + return receivedEntriesCount.Load() == int64(len(lines)) + }, time.Second*10, time.Second, "timed out waiting for entries to arrive") + + // reset counters + reset() + } + + // Stop the client: it waits until the current batch is sent + qc.Stop() + close(receivedReqsChan) +} + +func runRegularClientBenchCase(b *testing.B, bc testCase) { + reg := prometheus.NewRegistry() + + // Create a buffer channel where we do enqueue received requests + receivedReqsChan := make(chan utils.RemoteWriteRequest, 10) + // count the number for remote-write requests received (which should correlated with the number of sent batches), + // and the total number of entries. + var receivedEntriesCount atomic.Int64 + reset := func() { + receivedEntriesCount.Store(0) + } + + go func() { + for req := range receivedReqsChan { + for _, s := range req.Request.Streams { + receivedEntriesCount.Add(int64(len(s.Entries))) + } + } + }() + + // Start a local HTTP server + server := utils.NewRemoteWriteServer(receivedReqsChan, 200) + require.NotNil(b, server) + defer server.Close() + + // Get the URL at which the local test server is listening to + serverURL := flagext.URLValue{} + err := serverURL.Set(server.URL) + require.NoError(b, err) + + // Instance the client + cfg := Config{ + URL: serverURL, + BatchWait: time.Millisecond * 50, + BatchSize: 10, + Client: config.HTTPClientConfig{}, + BackoffConfig: backoff.Config{MinBackoff: 5 * time.Second, MaxBackoff: 10 * time.Second, MaxRetries: 1}, + ExternalLabels: lokiflag.LabelSet{}, + Timeout: 1 * time.Second, + TenantID: "", + Queue: QueueConfig{ + Capacity: 1000, // queue size of 100 + DrainTimeout: time.Second * 10, + }, + } + + logger := log.NewLogfmtLogger(os.Stdout) + + m := NewMetrics(reg) + qc, err := New(m, cfg, 0, 0, false, logger) + require.NoError(b, err) + + //labels := model.LabelSet{"app": "test"} + var lines []string + for i := 0; i < bc.numLines; i++ { + lines = append(lines, fmt.Sprintf("hola %d", i)) + } + + b.ResetTimer() + for i := 0; i < b.N; i++ { + // Send all the input log entries + for j, l := range lines { + seriesId := j % bc.numSeries + qc.Chan() <- loki.Entry{ + Labels: model.LabelSet{ + // take j module bc.numSeries to evenly distribute those numSeries across all sent entries + "app": model.LabelValue(fmt.Sprintf("series-%d", seriesId)), + }, + Entry: logproto.Entry{ + Timestamp: time.Now(), + Line: l, + }, + } + } + + require.Eventually(b, func() bool { + return receivedEntriesCount.Load() == int64(len(lines)) + }, time.Second*10, time.Second, "timed out waiting for entries to arrive") + + // reset counters + reset() + } + + // Stop the client: it waits until the current batch is sent + qc.Stop() + close(receivedReqsChan) +} diff --git a/component/common/loki/utils/sync.go b/component/common/loki/utils/sync.go index 92da9af85663..ee24ad05cbd2 100644 --- a/component/common/loki/utils/sync.go +++ b/component/common/loki/utils/sync.go @@ -26,6 +26,14 @@ func (ss *SyncSlice[T]) Length() int { return len(ss.list) } +// Reset resets the slice to have zero elements. If used during benchmarks, this will probably +// make new appends more efficient since the underlying array has more room. +func (ss *SyncSlice[T]) Reset() { + ss.lock.Lock() + defer ss.lock.Unlock() + ss.list = ss.list[:0] +} + // StartIterate returns the internal slice, after read-locking the internal lock. Once the iteration is finished, // DoneIterate should be called to release the lock. func (ss *SyncSlice[T]) StartIterate() []T { diff --git a/component/common/loki/wal/watcher.go b/component/common/loki/wal/watcher.go index a39137671295..ac2416b4ec67 100644 --- a/component/common/loki/wal/watcher.go +++ b/component/common/loki/wal/watcher.go @@ -58,7 +58,7 @@ type WriteTo interface { // found in. StoreSeries(series []record.RefSeries, segmentNum int) - AppendEntries(entries wal.RefEntries) error + AppendEntries(entries wal.RefEntries, segmentNum int) error } type Watcher struct { @@ -264,7 +264,7 @@ func (w *Watcher) decodeAndDispatch(b []byte, segmentNum int) (bool, error) { readData = true for _, entries := range rec.RefEntries { - if err := w.actions.AppendEntries(entries); err != nil && firstErr == nil { + if err := w.actions.AppendEntries(entries, segmentNum); err != nil && firstErr == nil { firstErr = err } } diff --git a/component/common/loki/wal/watcher_test.go b/component/common/loki/wal/watcher_test.go index 1c008b19badd..1e59966a1375 100644 --- a/component/common/loki/wal/watcher_test.go +++ b/component/common/loki/wal/watcher_test.go @@ -38,7 +38,7 @@ func (t *testWriteTo) SeriesReset(segmentNum int) { t.ReceivedSeriesReset = append(t.ReceivedSeriesReset, segmentNum) } -func (t *testWriteTo) AppendEntries(entries wal.RefEntries) error { +func (t *testWriteTo) AppendEntries(entries wal.RefEntries, _ int) error { var entry loki.Entry if l, ok := t.series[uint64(entries.Ref)]; ok { entry.Labels = l diff --git a/component/loki/write/types.go b/component/loki/write/types.go index 15266de4d890..2959f1b681a1 100644 --- a/component/loki/write/types.go +++ b/component/loki/write/types.go @@ -29,6 +29,7 @@ type EndpointOptions struct { TenantID string `river:"tenant_id,attr,optional"` RetryOnHTTP429 bool `river:"retry_on_http_429,attr,optional"` HTTPClientConfig *types.HTTPClientConfig `river:",squash"` + QueueConfig QueueConfig `river:"queue_config,block,optional"` } // GetDefaultEndpointOptions defines the default settings for sending logs to a @@ -70,6 +71,21 @@ func (r *EndpointOptions) Validate() error { return nil } +// QueueConfig controls how the queue logs remote write client is configured. Note that this client is only used when the +// loki.write component has WAL support enabled. +type QueueConfig struct { + Capacity units.Base2Bytes `river:"capacity,attr,optional"` + DrainTimeout time.Duration `river:"drain_timeout,attr,optional"` +} + +// SetToDefault implements river.Defaulter. +func (q *QueueConfig) SetToDefault() { + *q = QueueConfig{ + Capacity: 10 * units.MiB, // considering the default BatchSize of 1MiB, this gives us a default buffered channel of size 10 + DrainTimeout: time.Minute, + } +} + func (args Arguments) convertClientConfigs() []client.Config { var res []client.Config for _, cfg := range args.Endpoints { @@ -90,6 +106,10 @@ func (args Arguments) convertClientConfigs() []client.Config { Timeout: cfg.RemoteTimeout, TenantID: cfg.TenantID, DropRateLimitedBatches: !cfg.RetryOnHTTP429, + Queue: client.QueueConfig{ + Capacity: int(cfg.QueueConfig.Capacity), + DrainTimeout: cfg.QueueConfig.DrainTimeout, + }, } res = append(res, cc) } diff --git a/docs/sources/flow/reference/components/loki.write.md b/docs/sources/flow/reference/components/loki.write.md index fbdfadff99e0..7246a66f6897 100644 --- a/docs/sources/flow/reference/components/loki.write.md +++ b/docs/sources/flow/reference/components/loki.write.md @@ -49,6 +49,7 @@ endpoint > authorization | [authorization][] | Configure generic authorization t endpoint > oauth2 | [oauth2][] | Configure OAuth2 for authenticating to the endpoint. | no endpoint > oauth2 > tls_config | [tls_config][] | Configure TLS settings for connecting to the endpoint. | no endpoint > tls_config | [tls_config][] | Configure TLS settings for connecting to the endpoint. | no +| endpoint > queue_config | [queue_config][] | When WAL is enabled, configures the queue client. | no | The `>` symbol indicates deeper levels of nesting. For example, `endpoint > basic_auth` refers to a `basic_auth` block defined inside an @@ -60,6 +61,7 @@ basic_auth` refers to a `basic_auth` block defined inside an [authorization]: #authorization-block [oauth2]: #oauth2-block [tls_config]: #tls_config-block +[queue_config]: #queue_config-block ### endpoint block @@ -128,6 +130,18 @@ enabled, the retry mechanism will be governed by the backoff configuration speci {{< docs/shared lookup="flow/reference/components/tls-config-block.md" source="agent" version="" >}} +### queue_config block (experimental) + +The optional `queue_config` block configures, when WAL is enabled (see [Write-Ahead block](#wal-block-experimental)), how the +underlying client queues batches of logs to be sent to Loki. + +The following arguments are supported: + +| Name | Type | Description | Default | Required | +| --------------- | ---------- | -------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ------- | -------- | +| `capacity` | `string` | Controls the size of the underlying send queue buffer. This setting should be considered a worst-case scenario of memory consumption, in which all enqueued batches are full. | `10MiB` | no | +| `drain_timeout` | `duration` | Configures the maximum time the client can take to drain the send queue upon shutdown. During that time, it will enqueue pending batches and drain the send queue sending each. | `"1m"` | no | + ### wal block (experimental) The optional `wal` block configures the Write-Ahead Log (WAL) used in the Loki remote-write client. To enable the WAL,