Skip to content

Commit

Permalink
feat(graphdb): replicate resiliency patterns on edge writer.
Browse files Browse the repository at this point in the history
  • Loading branch information
Zenithar committed Nov 28, 2024
1 parent 95753d3 commit 3cfe99b
Show file tree
Hide file tree
Showing 4 changed files with 106 additions and 38 deletions.
22 changes: 22 additions & 0 deletions pkg/kubehound/storage/graphdb/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package graphdb

import "fmt"

// errBatchWriter is an error type that wraps an error and indicates whether the
// error is retryable.
type errBatchWriter struct {

Check failure on line 7 in pkg/kubehound/storage/graphdb/errors.go

View workflow job for this annotation

GitHub Actions / linter

the error type name `errBatchWriter` should conform to the `xxxError` format (errname)
err error
retryable bool
}

func (e errBatchWriter) Error() string {
if e.err == nil {
return fmt.Sprintf("batch writer error (retriable:%v)", e.retryable)
}

return fmt.Sprintf("batch writer error (retriable:%v): %v", e.retryable, e.err.Error())
}

func (e errBatchWriter) Unwrap() error {
return e.err
}
86 changes: 73 additions & 13 deletions pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/DataDog/KubeHound/pkg/kubehound/graph/edge"
"github.com/DataDog/KubeHound/pkg/kubehound/graph/types"
Expand All @@ -31,19 +32,24 @@ type JanusGraphEdgeWriter struct {
traversalSource *gremlingo.GraphTraversalSource // Transacted graph traversal source
inserts []any // Object data to be inserted in the graph
mu sync.Mutex // Mutex protecting access to the inserts array
consumerChan chan []any // Channel consuming inserts for async writing
consumerChan chan batchItem // Channel consuming inserts for async writing
writingInFlight *sync.WaitGroup // Wait group tracking current unfinished writes
batchSize int // Batchsize of graph DB inserts
qcounter int32 // Track items queued
wcounter int32 // Track items writtn
tags []string // Telemetry tags
writerTimeout time.Duration // Timeout for the writer
maxRetry int // Maximum number of retries for failed writes
}

// NewJanusGraphAsyncEdgeWriter creates a new bulk edge writer instance.
func NewJanusGraphAsyncEdgeWriter(ctx context.Context, drc *gremlingo.DriverRemoteConnection,
e edge.Builder, opts ...WriterOption) (*JanusGraphEdgeWriter, error) {

options := &writerOptions{}
e edge.Builder, opts ...WriterOption,
) (*JanusGraphEdgeWriter, error) {
options := &writerOptions{
WriterTimeout: defaultWriterTimeout,
MaxRetry: defaultMaxRetry,
}
for _, opt := range opts {
opt(options)
}
Expand All @@ -57,8 +63,10 @@ func NewJanusGraphAsyncEdgeWriter(ctx context.Context, drc *gremlingo.DriverRemo
traversalSource: gremlingo.Traversal_().WithRemote(drc),
batchSize: e.BatchSize(),
writingInFlight: &sync.WaitGroup{},
consumerChan: make(chan []any, e.BatchSize()*channelSizeBatchFactor),
consumerChan: make(chan batchItem, e.BatchSize()*channelSizeBatchFactor),
tags: append(options.Tags, tag.Label(e.Label()), tag.Builder(builder)),
writerTimeout: options.WriterTimeout,
maxRetry: options.MaxRetry,
}

jw.startBackgroundWriter(ctx)
Expand All @@ -71,15 +79,51 @@ func (jgv *JanusGraphEdgeWriter) startBackgroundWriter(ctx context.Context) {
go func() {
for {
select {
case data := <-jgv.consumerChan:
// closing the channel shoud stop the go routine
if data == nil {
case batch, ok := <-jgv.consumerChan:
// If the channel is closed, return.
if !ok {
log.Trace(ctx).Info("Closed background janusgraph worker on channel close")
return

Check failure on line 86 in pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go

View workflow job for this annotation

GitHub Actions / linter

return with no blank line before (nlreturn)
}

// If the batch is empty, return.
if len(batch.data) == 0 {
log.Trace(ctx).Warn("Empty edge batch received in background janusgraph worker, skipping")
return

Check failure on line 92 in pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go

View workflow job for this annotation

GitHub Actions / linter

return with no blank line before (nlreturn)
}

_ = statsd.Count(ctx, metric.BackgroundWriterCall, 1, jgv.tags, 1)
err := jgv.batchWrite(ctx, data)
err := jgv.batchWrite(ctx, batch.data)
if err != nil {

Check failure on line 97 in pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go

View workflow job for this annotation

GitHub Actions / linter

`if err != nil` has complex nested blocks (complexity: 9) (nestif)
var e *errBatchWriter
if errors.As(err, &e) && e.retryable {
// If the error is retryable, retry the write operation with a smaller batch.
if batch.retryCount < jgv.maxRetry {
// Compute the new batch size.
newBatchSize := len(batch.data) / 2
batch.retryCount++

log.Trace(ctx).Warnf("Retrying write operation with smaller edge batch (n:%d -> %d, r:%d): %v", len(batch.data), newBatchSize, batch.retryCount, e.Unwrap())

// Split the batch into smaller chunks and requeue them.
if len(batch.data[:newBatchSize]) > 0 {
jgv.consumerChan <- batchItem{
data: batch.data[:newBatchSize],
retryCount: batch.retryCount,
}
}
if len(batch.data[newBatchSize:]) > 0 {
jgv.consumerChan <- batchItem{
data: batch.data[newBatchSize:],
retryCount: batch.retryCount,
}
}
continue

Check failure on line 121 in pkg/kubehound/storage/graphdb/janusgraph_edge_writer.go

View workflow job for this annotation

GitHub Actions / linter

continue with no blank line before (nlreturn)
}

log.Trace(ctx).Errorf("Retry limit exceeded for write operation: %v", err)
}

log.Trace(ctx).Errorf("write data in background batch writer: %v", err)
}

Expand Down Expand Up @@ -109,9 +153,22 @@ func (jgv *JanusGraphEdgeWriter) batchWrite(ctx context.Context, data []any) err

op := jgv.gremlin(jgv.traversalSource, data)
promise := op.Iterate()
err = <-promise
if err != nil {
return fmt.Errorf("%s edge insert: %w", jgv.builder, err)

// Wait for the write operation to complete or timeout.
select {
case <-ctx.Done():
// If the context is cancelled, return the error.
return ctx.Err()
case <-time.After(jgv.writerTimeout):
// If the write operation takes too long, return an error.
return &errBatchWriter{
err: errors.New("edge write operation timed out"),
retryable: true,
}
case err := <-promise:
if err != nil {
return fmt.Errorf("%s edge insert: %w", jgv.builder, err)
}
}

return nil
Expand Down Expand Up @@ -174,7 +231,10 @@ func (jgv *JanusGraphEdgeWriter) Queue(ctx context.Context, v any) error {
copy(copied, jgv.inserts)

jgv.writingInFlight.Add(1)
jgv.consumerChan <- copied
jgv.consumerChan <- batchItem{
data: copied,
retryCount: 0,
}
_ = statsd.Incr(ctx, metric.QueueSize, jgv.tags, 1)

// cleanup the ops array after we have copied it to the channel
Expand Down
31 changes: 6 additions & 25 deletions pkg/kubehound/storage/graphdb/janusgraph_vertex_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,32 +48,13 @@ type batchItem struct {
retryCount int
}

// errBatchWriter is an error type that wraps an error and indicates whether the
// error is retryable.
type errBatchWriter struct {
err error
retryable bool
}

func (e errBatchWriter) Error() string {
if e.err == nil {
return fmt.Sprintf("batch writer error (retriable:%v)", e.retryable)
}

return fmt.Sprintf("batch writer error (retriable:%v): %v", e.retryable, e.err.Error())
}

func (e errBatchWriter) Unwrap() error {
return e.err
}

// NewJanusGraphAsyncVertexWriter creates a new bulk vertex writer instance.
func NewJanusGraphAsyncVertexWriter(ctx context.Context, drc *gremlin.DriverRemoteConnection,
v vertex.Builder, c cache.CacheProvider, opts ...WriterOption,
) (*JanusGraphVertexWriter, error) {
options := &writerOptions{
WriterTimeout: 60 * time.Second,
MaxRetry: 3,
WriterTimeout: defaultWriterTimeout,
MaxRetry: defaultMaxRetry,
}
for _, opt := range opts {
opt(options)
Expand Down Expand Up @@ -118,7 +99,7 @@ func (jgv *JanusGraphVertexWriter) startBackgroundWriter(ctx context.Context) {

// If the batch is empty, return.
if len(batch.data) == 0 {
log.Trace(ctx).Warn("Empty batch received in background janusgraph worker, skipping")
log.Trace(ctx).Warn("Empty vertex batch received in background janusgraph worker, skipping")
return
}

Expand All @@ -127,13 +108,13 @@ func (jgv *JanusGraphVertexWriter) startBackgroundWriter(ctx context.Context) {
if err != nil {

Check failure on line 108 in pkg/kubehound/storage/graphdb/janusgraph_vertex_writer.go

View workflow job for this annotation

GitHub Actions / linter

`if err != nil` has complex nested blocks (complexity: 9) (nestif)
var e *errBatchWriter
if errors.As(err, &e) && e.retryable {
// If the context deadline is exceeded, retry the write operation with a smaller batch.
// If the error is retryable, retry the write operation with a smaller batch.
if batch.retryCount < jgv.maxRetry {
// Compute the new batch size.
newBatchSize := len(batch.data) / 2
batch.retryCount++

log.Trace(ctx).Warnf("Retrying write operation with smaller batch (n:%d -> %d, r:%d): %v", len(batch.data), newBatchSize, batch.retryCount, e.Unwrap())
log.Trace(ctx).Warnf("Retrying write operation with vertex smaller batch (n:%d -> %d, r:%d): %v", len(batch.data), newBatchSize, batch.retryCount, e.Unwrap())

// Split the batch into smaller chunks and requeue them.
if len(batch.data[:newBatchSize]) > 0 {
Expand Down Expand Up @@ -241,7 +222,7 @@ func (jgv *JanusGraphVertexWriter) batchWrite(ctx context.Context, data []any) e
case <-time.After(jgv.writerTimeout):
// If the write operation takes too long, return an error.
return &errBatchWriter{
err: errors.New("write operation timed out"),
err: errors.New("vertex write operation timed out"),
retryable: true,
}
case err = <-errChan:
Expand Down
5 changes: 5 additions & 0 deletions pkg/kubehound/storage/graphdb/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,11 @@ import (
"github.com/DataDog/KubeHound/pkg/kubehound/storage/cache"
)

const (
defaultWriterTimeout = 60 * time.Second
defaultMaxRetry = 3
)

type writerOptions struct {
Tags []string
WriterTimeout time.Duration
Expand Down

0 comments on commit 3cfe99b

Please sign in to comment.