Skip to content

Commit

Permalink
TBS: refactor to encapsulate badger DB (#15112)
Browse files Browse the repository at this point in the history
Introduce StorageManager to encapsulate badger DB access.

There is a minor difference in how gc mutex and subscriber position file mutex are no longer global variables, but they are per StorageManager, but there should be no apm-server behavior change, only possibly subtle difference in concurrent testing where there are 2 storage managers. apm-server has only 1 global storage manager created by x-pack main.
  • Loading branch information
carsonip authored Jan 3, 2025
1 parent e6370b3 commit 5a3dd8d
Show file tree
Hide file tree
Showing 6 changed files with 234 additions and 135 deletions.
29 changes: 9 additions & 20 deletions x-pack/apm-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"os"
"sync"

"github.com/dgraph-io/badger/v2"
"github.com/gofrs/uuid/v5"
"golang.org/x/sync/errgroup"

Expand Down Expand Up @@ -43,10 +42,10 @@ var (

// badgerDB holds the badger database to use when tail-based sampling is configured.
badgerMu sync.Mutex
badgerDB *badger.DB
badgerDB *eventstorage.StorageManager

storageMu sync.Mutex
storage *eventstorage.ShardedReadWriter
storage *eventstorage.ManagedReadWriter

// samplerUUID is a UUID used to identify sampled trace ID documents
// published by this process.
Expand Down Expand Up @@ -122,7 +121,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
if err != nil {
return nil, fmt.Errorf("failed to get Badger database: %w", err)
}
readWriters := getStorage(badgerDB)
readWriter := getStorage(badgerDB)

policies := make([]sampling.Policy, len(tailSamplingConfig.Policies))
for i, in := range tailSamplingConfig.Policies {
Expand Down Expand Up @@ -157,7 +156,7 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
},
StorageConfig: sampling.StorageConfig{
DB: badgerDB,
Storage: readWriters,
Storage: readWriter,
StorageDir: storageDir,
StorageGCInterval: tailSamplingConfig.StorageGCInterval,
StorageLimit: tailSamplingConfig.StorageLimitParsed,
Expand All @@ -166,25 +165,24 @@ func newTailSamplingProcessor(args beater.ServerParams) (*sampling.Processor, er
})
}

func getBadgerDB(storageDir string) (*badger.DB, error) {
func getBadgerDB(storageDir string) (*eventstorage.StorageManager, error) {
badgerMu.Lock()
defer badgerMu.Unlock()
if badgerDB == nil {
db, err := eventstorage.OpenBadger(storageDir, -1)
sm, err := eventstorage.NewStorageManager(storageDir)
if err != nil {
return nil, err
}
badgerDB = db
badgerDB = sm
}
return badgerDB, nil
}

func getStorage(db *badger.DB) *eventstorage.ShardedReadWriter {
func getStorage(sm *eventstorage.StorageManager) *eventstorage.ManagedReadWriter {
storageMu.Lock()
defer storageMu.Unlock()
if storage == nil {
eventCodec := eventstorage.ProtobufCodec{}
storage = eventstorage.New(db, eventCodec).NewShardedReadWriter()
storage = sm.NewReadWriter()
}
return storage
}
Expand Down Expand Up @@ -261,16 +259,7 @@ func closeBadger() error {
return nil
}

func closeStorage() {
if storage != nil {
storage.Close()
}
}

func cleanup() error {
// Close the underlying storage, the storage will be flushed on processor stop.
closeStorage()

return closeBadger()
}

Expand Down
5 changes: 2 additions & 3 deletions x-pack/apm-server/sampling/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package sampling
import (
"time"

"github.com/dgraph-io/badger/v2"
"github.com/pkg/errors"

"github.com/elastic/apm-data/model/modelpb"
Expand Down Expand Up @@ -99,13 +98,13 @@ type StorageConfig struct {
// DB holds the badger database in which event storage will be maintained.
//
// DB will not be closed when the processor is closed.
DB *badger.DB
DB *eventstorage.StorageManager

// Storage holds the read writers which provide sharded, locked access to storage.
//
// Storage lives outside processor lifecycle and will not be closed when processor
// is closed
Storage *eventstorage.ShardedReadWriter
Storage *eventstorage.ManagedReadWriter

// StorageDir holds the directory in which event storage will be maintained.
StorageDir string
Expand Down
5 changes: 2 additions & 3 deletions x-pack/apm-server/sampling/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ package sampling_test
import (
"testing"

"github.com/dgraph-io/badger/v2"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand Down Expand Up @@ -72,10 +71,10 @@ func TestNewProcessorConfigInvalid(t *testing.T) {
config.UUID = "server"

assertInvalidConfigError("invalid storage config: DB unspecified")
config.DB = &badger.DB{}
config.DB = &eventstorage.StorageManager{}

assertInvalidConfigError("invalid storage config: Storage unspecified")
config.Storage = &eventstorage.ShardedReadWriter{}
config.Storage = &eventstorage.ManagedReadWriter{}

assertInvalidConfigError("invalid storage config: StorageDir unspecified")
config.StorageDir = "tbs"
Expand Down
173 changes: 173 additions & 0 deletions x-pack/apm-server/sampling/eventstorage/storage_manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License 2.0;
// you may not use this file except in compliance with the Elastic License 2.0.

package eventstorage

import (
"os"
"path/filepath"
"sync"
"time"

"github.com/dgraph-io/badger/v2"

"github.com/elastic/apm-data/model/modelpb"
"github.com/elastic/apm-server/internal/logs"
"github.com/elastic/elastic-agent-libs/logp"
)

const (
// subscriberPositionFile holds the file name used for persisting
// the subscriber position across server restarts.
subscriberPositionFile = "subscriber_position.json"
)

// StorageManager encapsulates badger.DB.
// It is to provide file system access, simplify synchronization and enable underlying db swaps.
// It assumes exclusive access to badger DB at storageDir.
type StorageManager struct {
storageDir string
logger *logp.Logger

db *badger.DB
storage *Storage
rw *ShardedReadWriter

// subscriberPosMu protects the subscriber file from concurrent RW.
subscriberPosMu sync.Mutex

// gcLoopCh acts as a mutex to ensure only 1 gc loop is running per StorageManager.
// as it is possible that 2 separate RunGCLoop are created by 2 TBS processors during a hot reload.
gcLoopCh chan struct{}
}

// NewStorageManager returns a new StorageManager with badger DB at storageDir.
func NewStorageManager(storageDir string) (*StorageManager, error) {
sm := &StorageManager{
storageDir: storageDir,
gcLoopCh: make(chan struct{}, 1),
logger: logp.NewLogger(logs.Sampling),
}
err := sm.reset()
if err != nil {
return nil, err
}
return sm, nil
}

// reset initializes db, storage, and rw.
func (s *StorageManager) reset() error {
db, err := OpenBadger(s.storageDir, -1)
if err != nil {
return err
}
s.db = db
s.storage = New(db, ProtobufCodec{})
s.rw = s.storage.NewShardedReadWriter()
return nil
}

// RunGCLoop runs a loop that calls badger DB RunValueLogGC every gcInterval.
// The loop stops when it receives from stopping.
func (s *StorageManager) RunGCLoop(stopping <-chan struct{}, gcInterval time.Duration) error {
select {
case <-stopping:
return nil
case s.gcLoopCh <- struct{}{}:
}
defer func() {
<-s.gcLoopCh
}()
// This goroutine is responsible for periodically garbage
// collecting the Badger value log, using the recommended
// discard ratio of 0.5.
ticker := time.NewTicker(gcInterval)
defer ticker.Stop()
for {
select {
case <-stopping:
return nil
case <-ticker.C:
const discardRatio = 0.5
var err error
for err == nil {
// Keep garbage collecting until there are no more rewrites,
// or garbage collection fails.
err = s.runValueLogGC(discardRatio)
}
if err != nil && err != badger.ErrNoRewrite {
return err
}
}
}
}

func (s *StorageManager) runValueLogGC(discardRatio float64) error {
return s.db.RunValueLogGC(discardRatio)
}

func (s *StorageManager) Close() error {
s.rw.Close()
return s.db.Close()
}

// Size returns the db size
func (s *StorageManager) Size() (lsm, vlog int64) {
return s.db.Size()
}

func (s *StorageManager) ReadSubscriberPosition() ([]byte, error) {
s.subscriberPosMu.Lock()
defer s.subscriberPosMu.Unlock()
return os.ReadFile(filepath.Join(s.storageDir, subscriberPositionFile))
}

func (s *StorageManager) WriteSubscriberPosition(data []byte) error {
s.subscriberPosMu.Lock()
defer s.subscriberPosMu.Unlock()
return os.WriteFile(filepath.Join(s.storageDir, subscriberPositionFile), data, 0644)
}

func (s *StorageManager) NewReadWriter() *ManagedReadWriter {
return &ManagedReadWriter{
sm: s,
}
}

// ManagedReadWriter is a read writer that is transparent to badger DB changes done by StorageManager.
// It is a wrapper of the ShardedReadWriter under StorageManager.
type ManagedReadWriter struct {
sm *StorageManager
}

func (s *ManagedReadWriter) ReadTraceEvents(traceID string, out *modelpb.Batch) error {
return s.sm.rw.ReadTraceEvents(traceID, out)
}

func (s *ManagedReadWriter) WriteTraceEvent(traceID, id string, event *modelpb.APMEvent, opts WriterOpts) error {
return s.sm.rw.WriteTraceEvent(traceID, id, event, opts)
}

func (s *ManagedReadWriter) WriteTraceSampled(traceID string, sampled bool, opts WriterOpts) error {
return s.sm.rw.WriteTraceSampled(traceID, sampled, opts)
}

func (s *ManagedReadWriter) IsTraceSampled(traceID string) (bool, error) {
return s.sm.rw.IsTraceSampled(traceID)
}

func (s *ManagedReadWriter) DeleteTraceEvent(traceID, id string) error {
return s.sm.rw.DeleteTraceEvent(traceID, id)
}

func (s *ManagedReadWriter) Flush() error {
return s.sm.rw.Flush()
}

// NewBypassReadWriter returns a ReadWriter directly reading and writing to the database,
// bypassing any wrapper e.g. ShardedReadWriter.
// This should be used for testing only, useful to check if data is actually persisted to the DB.
func (s *StorageManager) NewBypassReadWriter() *ReadWriter {
return s.storage.NewReadWriter()
}
Loading

0 comments on commit 5a3dd8d

Please sign in to comment.