Skip to content

Commit

Permalink
refactor(graphdb): split microbatcher and retrier concerns.
Browse files Browse the repository at this point in the history
  • Loading branch information
Zenithar committed Nov 29, 2024
1 parent 99e875f commit ee79469
Show file tree
Hide file tree
Showing 5 changed files with 410 additions and 248 deletions.
179 changes: 66 additions & 113 deletions pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,23 @@ type JanusGraphEdgeWriter struct {
gremlin types.EdgeTraversal // Gremlin traversal generator function
drc *gremlingo.DriverRemoteConnection // Gremlin driver remote connection
traversalSource *gremlingo.GraphTraversalSource // Transacted graph traversal source
inserts []any // Object data to be inserted in the graph
mu sync.Mutex // Mutex protecting access to the inserts array
consumerChan chan batchItem // Channel consuming inserts for async writing
writingInFlight *sync.WaitGroup // Wait group tracking current unfinished writes
batchSize int // Batchsize of graph DB inserts
qcounter int32 // Track items queued
wcounter int32 // Track items writtn
tags []string // Telemetry tags
writerTimeout time.Duration // Timeout for the writer
maxRetry int // Maximum number of retries for failed writes
mb *microBatcher // Micro batcher to batch writes
}

// NewJanusGraphAsyncEdgeWriter creates a new bulk edge writer instance.
func NewJanusGraphAsyncEdgeWriter(ctx context.Context, drc *gremlingo.DriverRemoteConnection,
e edge.Builder, opts ...WriterOption,
) (*JanusGraphEdgeWriter, error) {
options := &writerOptions{
WriterTimeout: defaultWriterTimeout,
MaxRetry: defaultMaxRetry,
WriterTimeout: defaultWriterTimeout,
MaxRetry: defaultMaxRetry,
WriterWorkerCount: defaultWriterWorkerCount,
}
for _, opt := range opts {
opt(options)
Expand All @@ -59,101 +57,91 @@ func NewJanusGraphAsyncEdgeWriter(ctx context.Context, drc *gremlingo.DriverRemo
builder: builder,
gremlin: e.Traversal(),
drc: drc,
inserts: make([]any, 0, e.BatchSize()),
traversalSource: gremlingo.Traversal_().WithRemote(drc),
batchSize: e.BatchSize(),
writingInFlight: &sync.WaitGroup{},
consumerChan: make(chan batchItem, e.BatchSize()*channelSizeBatchFactor),
tags: append(options.Tags, tag.Label(e.Label()), tag.Builder(builder)),
writerTimeout: options.WriterTimeout,
maxRetry: options.MaxRetry,
}

jw.startBackgroundWriter(ctx)

return &jw, nil
}

// startBackgroundWriter starts a background go routine
func (jgv *JanusGraphEdgeWriter) startBackgroundWriter(ctx context.Context) {
go func() {
for {
select {
case batch, ok := <-jgv.consumerChan:
// If the channel is closed, return.
if !ok {
log.Trace(ctx).Info("Closed background janusgraph worker on channel close")

return
}

// If the batch is empty, return.
if len(batch.data) == 0 {
log.Trace(ctx).Warn("Empty edge batch received in background janusgraph worker, skipping")

return
}

_ = statsd.Count(ctx, metric.BackgroundWriterCall, 1, jgv.tags, 1)
err := jgv.batchWrite(ctx, batch.data)
if err != nil {
var e *batchWriterError
if errors.As(err, &e) {
// If the error is retryable, retry the write operation with a smaller batch.
if e.retryable && batch.retryCount < jgv.maxRetry {
jgv.retrySplitAndRequeue(ctx, &batch, e)

continue
}

log.Trace(ctx).Errorf("Retry limit exceeded for write operation: %v", err)
}

log.Trace(ctx).Errorf("write data in background batch writer: %v", err)
}

_ = statsd.Decr(ctx, metric.QueueSize, jgv.tags, 1)
case <-ctx.Done():
log.Trace(ctx).Info("Closed background janusgraph worker on context cancel")

return
// Create a new micro batcher to batch the inserts with split and retry logic.
jw.mb = newMicroBatcher(log.Trace(ctx), e.BatchSize(), options.WriterWorkerCount, func(ctx context.Context, a []any) error {
// Try to write the batch to the graph DB.
if err := jw.batchWrite(ctx, a); err != nil {
var bwe *batchWriterError
if errors.As(err, &bwe) && bwe.retryable {
// If the write operation failed and is retryable, split the batch and retry.
return jw.splitAndRetry(ctx, 0, a)
}

return err
}
}()

return nil
})
jw.mb.Start(ctx)

return &jw, nil
}

// retrySplitAndRequeue will split the batch into smaller chunks and requeue them for writing.
func (jgv *JanusGraphEdgeWriter) retrySplitAndRequeue(ctx context.Context, batch *batchItem, e *batchWriterError) {
func (jgv *JanusGraphEdgeWriter) splitAndRetry(ctx context.Context, retryCount int, payload []any) error {
_ = statsd.Count(ctx, metric.RetryWriterCall, 1, jgv.tags, 1)

// If we have reached the maximum number of retries, return an error.
if retryCount >= jgv.maxRetry {
return fmt.Errorf("max retry count reached: %d", retryCount)
}

// Compute the new batch size.
newBatchSize := len(batch.data) / 2
batch.retryCount++
newBatchSize := len(payload) / 2

log.Trace(ctx).Warnf("Retrying write operation with smaller edge batch (n:%d -> %d, r:%d)", len(payload), newBatchSize, retryCount)

log.Trace(ctx).Warnf("Retrying write operation with smaller edge batch (n:%d -> %d, r:%d): %v", len(batch.data), newBatchSize, batch.retryCount, e.Unwrap())
var leftErr, rightErr error

// Split the batch into smaller chunks and requeue them.
if len(batch.data[:newBatchSize]) > 0 {
jgv.consumerChan <- batchItem{
data: batch.data[:newBatchSize],
retryCount: batch.retryCount,
// Split the batch into smaller chunks and retry them.
if len(payload[:newBatchSize]) > 0 {
if leftErr = jgv.batchWrite(ctx, payload[:newBatchSize]); leftErr == nil {
var bwe *batchWriterError
if errors.As(leftErr, &bwe) && bwe.retryable {
return jgv.splitAndRetry(ctx, retryCount+1, payload[:newBatchSize])
}
}
}
if len(batch.data[newBatchSize:]) > 0 {
jgv.consumerChan <- batchItem{
data: batch.data[newBatchSize:],
retryCount: batch.retryCount,

// Process the right side of the batch.
if len(payload[newBatchSize:]) > 0 {
if rightErr = jgv.batchWrite(ctx, payload[newBatchSize:]); rightErr != nil {
var bwe *batchWriterError
if errors.As(rightErr, &bwe) && bwe.retryable {
return jgv.splitAndRetry(ctx, retryCount+1, payload[newBatchSize:])
}
}
}

// Return the first error encountered.
switch {
case leftErr != nil && rightErr != nil:
return fmt.Errorf("left: %w, right: %w", leftErr, rightErr)
case leftErr != nil:
return leftErr
case rightErr != nil:
return rightErr
}

return nil
}

// batchWrite will write a batch of entries into the graph DB and block until the write completes.
// Callers are responsible for doing an Add(1) to the writingInFlight wait group to ensure proper synchronization.
func (jgv *JanusGraphEdgeWriter) batchWrite(ctx context.Context, data []any) error {
span, ctx := span.SpanRunFromContext(ctx, span.JanusGraphBatchWrite)
span.SetTag(tag.LabelTag, jgv.builder)
var err error
defer func() { span.Finish(tracer.WithError(err)) }()

// Increment the writingInFlight wait group to track the number of writes in progress.
jgv.writingInFlight.Add(1)
defer jgv.writingInFlight.Done()

datalen := len(data)
Expand Down Expand Up @@ -185,8 +173,6 @@ func (jgv *JanusGraphEdgeWriter) batchWrite(ctx context.Context, data []any) err
}

func (jgv *JanusGraphEdgeWriter) Close(ctx context.Context) error {
close(jgv.consumerChan)

return nil
}

Expand All @@ -198,29 +184,17 @@ func (jgv *JanusGraphEdgeWriter) Flush(ctx context.Context) error {
var err error
defer func() { span.Finish(tracer.WithError(err)) }()

jgv.mu.Lock()
defer jgv.mu.Unlock()

if jgv.traversalSource == nil {
return errors.New("janusGraph traversalSource is not initialized")
}

if len(jgv.inserts) != 0 {
_ = statsd.Incr(ctx, metric.FlushWriterCall, jgv.tags, 1)

jgv.writingInFlight.Add(1)
err = jgv.batchWrite(ctx, jgv.inserts)
if err != nil {
log.Trace(ctx).Errorf("batch write %s: %+v", jgv.builder, err)
jgv.writingInFlight.Wait()

return err
}

log.Trace(ctx).Debugf("Done flushing %s writes. clearing the queue", jgv.builder)
jgv.inserts = nil
// Flush the micro batcher.
err = jgv.mb.Flush(ctx)
if err != nil {
return fmt.Errorf("micro batcher flush: %w", err)
}

// Wait for all writes to complete.
jgv.writingInFlight.Wait()

log.Trace(ctx).Debugf("Edge writer %d %s queued", jgv.qcounter, jgv.builder)
Expand All @@ -230,26 +204,5 @@ func (jgv *JanusGraphEdgeWriter) Flush(ctx context.Context) error {
}

func (jgv *JanusGraphEdgeWriter) Queue(ctx context.Context, v any) error {
jgv.mu.Lock()
defer jgv.mu.Unlock()

atomic.AddInt32(&jgv.qcounter, 1)
jgv.inserts = append(jgv.inserts, v)

if len(jgv.inserts) > jgv.batchSize {
copied := make([]any, len(jgv.inserts))
copy(copied, jgv.inserts)

jgv.writingInFlight.Add(1)
jgv.consumerChan <- batchItem{
data: copied,
retryCount: 0,
}
_ = statsd.Incr(ctx, metric.QueueSize, jgv.tags, 1)

// cleanup the ops array after we have copied it to the channel
jgv.inserts = nil
}

return nil
return jgv.mb.Enqueue(ctx, v)
}
Loading

0 comments on commit ee79469

Please sign in to comment.