Skip to content

Commit

Permalink
Merge pull request #137 from MichaHoffmann/mhoffmann/timing-reader-ig…
Browse files Browse the repository at this point in the history
…nores-initial-get-range-call

fix: timing reader ignores initial get_range
  • Loading branch information
yeya24 authored Sep 13, 2024
2 parents 7adf08b + 1a62b9e commit d29b6ed
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 7 deletions.
15 changes: 13 additions & 2 deletions objstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -566,14 +566,18 @@ func (b *metricBucket) Get(ctx context.Context, name string) (io.ReadCloser, err
const op = OpGet
b.metrics.ops.WithLabelValues(op).Inc()

start := time.Now()

rc, err := b.bkt.Get(ctx, name)
if err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
}
b.metrics.opsDuration.WithLabelValues(op).Observe(float64(time.Since(start)))
return nil, err
}
return newTimingReader(
start,
rc,
true,
op,
Expand All @@ -589,14 +593,18 @@ func (b *metricBucket) GetRange(ctx context.Context, name string, off, length in
const op = OpGetRange
b.metrics.ops.WithLabelValues(op).Inc()

start := time.Now()

rc, err := b.bkt.GetRange(ctx, name, off, length)
if err != nil {
if !b.metrics.isOpFailureExpected(err) && ctx.Err() != context.Canceled {
b.metrics.opsFailures.WithLabelValues(op).Inc()
}
b.metrics.opsDuration.WithLabelValues(op).Observe(float64(time.Since(start)))
return nil, err
}
return newTimingReader(
start,
rc,
true,
op,
Expand Down Expand Up @@ -628,7 +636,10 @@ func (b *metricBucket) Upload(ctx context.Context, name string, r io.Reader) err
const op = OpUpload
b.metrics.ops.WithLabelValues(op).Inc()

start := time.Now()

trc := newTimingReader(
start,
r,
false,
op,
Expand Down Expand Up @@ -705,7 +716,7 @@ type timingReader struct {
transferredBytes *prometheus.HistogramVec
}

func newTimingReader(r io.Reader, closeReader bool, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, transferredBytes *prometheus.HistogramVec) io.ReadCloser {
func newTimingReader(start time.Time, r io.Reader, closeReader bool, op string, dur *prometheus.HistogramVec, failed *prometheus.CounterVec, isFailureExpected IsOpFailureExpectedFunc, fetchedBytes *prometheus.CounterVec, transferredBytes *prometheus.HistogramVec) io.ReadCloser {
// Initialize the metrics with 0.
dur.WithLabelValues(op)
failed.WithLabelValues(op)
Expand All @@ -716,7 +727,7 @@ func newTimingReader(r io.Reader, closeReader bool, op string, dur *prometheus.H
closeReader: closeReader,
objSize: objSize,
objSizeErr: objSizeErr,
start: time.Now(),
start: start,
op: op,
duration: dur,
failed: failed,
Expand Down
11 changes: 6 additions & 5 deletions objstore_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path/filepath"
"strings"
"testing"
"time"

"github.com/efficientgo/core/testutil"
"github.com/go-kit/log"
Expand Down Expand Up @@ -412,7 +413,7 @@ func TestDownloadUploadDirConcurrency(t *testing.T) {
func TestTimingReader(t *testing.T) {
m := WrapWithMetrics(NewInMemBucket(), nil, "")
r := bytes.NewReader([]byte("hello world"))
tr := newTimingReader(r, true, OpGet, m.metrics.opsDuration, m.metrics.opsFailures, func(err error) bool {
tr := newTimingReader(time.Now(), r, true, OpGet, m.metrics.opsDuration, m.metrics.opsFailures, func(err error) bool {
return false
}, m.metrics.opsFetchedBytes, m.metrics.opsTransferredBytes)

Expand Down Expand Up @@ -447,7 +448,7 @@ func TestTimingReader_ExpectedError(t *testing.T) {

m := WrapWithMetrics(NewInMemBucket(), nil, "")
r := dummyReader{readerErr}
tr := newTimingReader(r, true, OpGet, m.metrics.opsDuration, m.metrics.opsFailures, func(err error) bool { return errors.Is(err, readerErr) }, m.metrics.opsFetchedBytes, m.metrics.opsTransferredBytes)
tr := newTimingReader(time.Now(), r, true, OpGet, m.metrics.opsDuration, m.metrics.opsFailures, func(err error) bool { return errors.Is(err, readerErr) }, m.metrics.opsFetchedBytes, m.metrics.opsTransferredBytes)

buf := make([]byte, 1)
_, err := io.ReadFull(tr, buf)
Expand All @@ -461,7 +462,7 @@ func TestTimingReader_UnexpectedError(t *testing.T) {

m := WrapWithMetrics(NewInMemBucket(), nil, "")
r := dummyReader{readerErr}
tr := newTimingReader(r, true, OpGet, m.metrics.opsDuration, m.metrics.opsFailures, func(err error) bool { return false }, m.metrics.opsFetchedBytes, m.metrics.opsTransferredBytes)
tr := newTimingReader(time.Now(), r, true, OpGet, m.metrics.opsDuration, m.metrics.opsFailures, func(err error) bool { return false }, m.metrics.opsFetchedBytes, m.metrics.opsTransferredBytes)

buf := make([]byte, 1)
_, err := io.ReadFull(tr, buf)
Expand All @@ -476,7 +477,7 @@ func TestTimingReader_ContextCancellation(t *testing.T) {

m := WrapWithMetrics(NewInMemBucket(), nil, "")
r := dummyReader{ctx.Err()}
tr := newTimingReader(r, true, OpGet, m.metrics.opsDuration, m.metrics.opsFailures, func(err error) bool { return false }, m.metrics.opsFetchedBytes, m.metrics.opsTransferredBytes)
tr := newTimingReader(time.Now(), r, true, OpGet, m.metrics.opsDuration, m.metrics.opsFailures, func(err error) bool { return false }, m.metrics.opsFetchedBytes, m.metrics.opsTransferredBytes)

buf := make([]byte, 1)
_, err := io.ReadFull(tr, buf)
Expand Down Expand Up @@ -506,7 +507,7 @@ func TestTimingReader_ShouldCorrectlyWrapFile(t *testing.T) {
})

m := WrapWithMetrics(NewInMemBucket(), nil, "")
r := newTimingReader(file, true, "", m.metrics.opsDuration, m.metrics.opsFailures, func(err error) bool {
r := newTimingReader(time.Now(), file, true, "", m.metrics.opsDuration, m.metrics.opsFailures, func(err error) bool {
return false
}, m.metrics.opsFetchedBytes, m.metrics.opsTransferredBytes)

Expand Down

0 comments on commit d29b6ed

Please sign in to comment.