Skip to content

Commit

Permalink
add updown counters
Browse files Browse the repository at this point in the history
  • Loading branch information
moh-osman3 committed Feb 8, 2024
1 parent 39896a2 commit b54c851
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 9 deletions.
76 changes: 68 additions & 8 deletions collector/receiver/otelarrowreceiver/internal/arrow/arrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,15 @@ import (
"go.opentelemetry.io/collector/receiver"
"go.opentelemetry.io/collector/receiver/receiverhelper"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/propagation"
"go.opentelemetry.io/otel/trace"
)

const (
streamFormat = "arrow"
hpackMaxDynamicSize = 4096
scopeName = "github.com/open-telemetry/otel-arrow/collector/receiver/otelarrowreceiver"
)

var (
Expand Down Expand Up @@ -71,6 +73,8 @@ type Receiver struct {
authServer auth.Server
newConsumer func() arrowRecord.ConsumerAPI
netReporter netstats.Interface
recvInFlightBytes metric.Int64UpDownCounter
recvInFlightItems metric.Int64UpDownCounter
}

// New creates a new Receiver reference.
Expand All @@ -82,9 +86,10 @@ func New(
authServer auth.Server,
newConsumer func() arrowRecord.ConsumerAPI,
netReporter netstats.Interface,
) *Receiver {
) (*Receiver, error) {
tracer := set.TelemetrySettings.TracerProvider.Tracer("otel-arrow-receiver")
return &Receiver{
var errors, err error
recv := &Receiver{
Consumers: cs,
obsrecv: obsrecv,
telemetry: set.TelemetrySettings,
Expand All @@ -94,6 +99,26 @@ func New(
gsettings: gsettings,
netReporter: netReporter,
}

meter := recv.telemetry.MeterProvider.Meter(scopeName)
recv.recvInFlightBytes, err = meter.Int64UpDownCounter(
"otelarrow_receiver_in_flight_bytes",
metric.WithDescription("Number of bytes in flight"),
metric.WithUnit("By"),
)
errors = multierr.Append(errors, err)

recv.recvInFlightItems, err = meter.Int64UpDownCounter(
"otelarrow_receiver_in_flight_items",
metric.WithDescription("Number of items in flight"),
)
errors = multierr.Append(errors, err)

if errors != nil {
return nil, errors
}

return recv, nil
}

// headerReceiver contains the state necessary to decode per-request metadata
Expand Down Expand Up @@ -452,13 +477,25 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu
err = consumererror.NewPermanent(err)
} else {
for _, metrics := range data {
numPts += metrics.DataPointCount()
items := metrics.DataPointCount()
sz := int64(sizer.MetricsSize(metrics))

r.recvInFlightBytes.Add(ctx, sz)
r.recvInFlightItems.Add(ctx, int64(items))

numPts += items
if r.telemetry.MetricsLevel > configtelemetry.LevelNormal {
uncompSize += int64(sizer.MetricsSize(metrics))
uncompSize += sz
}
err = multierr.Append(err,
r.Metrics().ConsumeMetrics(ctx, metrics),
)

// Once ConsumeMetrics returns we can decrement
// the updown counters as this memory will no
// longer be held by the receiver.
r.recvInFlightBytes.Add(ctx, -sz)
r.recvInFlightItems.Add(ctx, -int64(items))
}
}
r.obsrecv.EndMetricsOp(ctx, streamFormat, numPts, err)
Expand All @@ -477,13 +514,24 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu
err = consumererror.NewPermanent(err)
} else {
for _, logs := range data {
numLogs += logs.LogRecordCount()
items := logs.LogRecordCount()
sz := int64(sizer.LogsSize(logs))

r.recvInFlightBytes.Add(ctx, sz)
r.recvInFlightItems.Add(ctx, int64(items))
numLogs += items
if r.telemetry.MetricsLevel > configtelemetry.LevelNormal {
uncompSize += int64(sizer.LogsSize(logs))
uncompSize += sz
}
err = multierr.Append(err,
r.Logs().ConsumeLogs(ctx, logs),
)

// Once ConsumeLogs returns we can decrement
// the updown counters as this memory will no
// longer be held by the receiver.
r.recvInFlightBytes.Add(ctx, -sz)
r.recvInFlightItems.Add(ctx, int64(-items))
}
}
r.obsrecv.EndLogsOp(ctx, streamFormat, numLogs, err)
Expand All @@ -502,13 +550,25 @@ func (r *Receiver) processRecords(ctx context.Context, method string, arrowConsu
err = consumererror.NewPermanent(err)
} else {
for _, traces := range data {
numSpans += traces.SpanCount()
items := traces.SpanCount()
sz := int64(sizer.TracesSize(traces))

r.recvInFlightBytes.Add(ctx, sz)
r.recvInFlightItems.Add(ctx, int64(items))

numSpans += items
if r.telemetry.MetricsLevel > configtelemetry.LevelNormal {
uncompSize += int64(sizer.TracesSize(traces))
uncompSize += sz
}
err = multierr.Append(err,
r.Traces().ConsumeTraces(ctx, traces),
)

// Once ConsumeTraces returns we can decrement
// the updown counters as this memory will no
// longer be held by the receiver.
r.recvInFlightBytes.Add(ctx, -sz)
r.recvInFlightItems.Add(ctx, int64(-items))
}
}
r.obsrecv.EndTracesOp(ctx, streamFormat, numSpans, err)
Expand Down
6 changes: 5 additions & 1 deletion collector/receiver/otelarrowreceiver/otelarrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func (r *otelArrowReceiver) startProtocolServers(host component.Host) error {
}
}

r.arrowReceiver = arrow.New(arrow.Consumers(r), r.settings, r.obsrepGRPC, r.cfg.GRPC, authServer, func() arrowRecord.ConsumerAPI {
r.arrowReceiver, err = arrow.New(arrow.Consumers(r), r.settings, r.obsrepGRPC, r.cfg.GRPC, authServer, func() arrowRecord.ConsumerAPI {
var opts []arrowRecord.Option
if r.cfg.Arrow.MemoryLimitMiB != 0 {
// in which case the default is selected in the arrowRecord package.
Expand All @@ -126,6 +126,10 @@ func (r *otelArrowReceiver) startProtocolServers(host component.Host) error {
return arrowRecord.NewConsumer(opts...)
}, r.netReporter)

if err != nil {
return err
}

if r.tracesReceiver != nil {
ptraceotlp.RegisterGRPCServer(r.serverGRPC, r.tracesReceiver)

Expand Down

0 comments on commit b54c851

Please sign in to comment.