Skip to content

Commit

Permalink
Merge branch 'main' into prom/ui_sd_AlertManager
Browse files Browse the repository at this point in the history
  • Loading branch information
liguozhong committed Oct 10, 2023
2 parents 4b204e3 + 4b9c19f commit d3d5af2
Show file tree
Hide file tree
Showing 7 changed files with 366 additions and 145 deletions.
3 changes: 2 additions & 1 deletion RELEASE.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,8 @@ Release cadence of first pre-releases being cut is 6 weeks.
| v2.45 LTS | 2023-05-31 | Jesus Vazquez (Github: @jesusvazquez) |
| v2.46 | 2023-07-12 | Julien Pivotto (GitHub: @roidelapluie) |
| v2.47 | 2023-08-23 | Bryan Boreham (GitHub: @bboreham) |
| v2.48 | 2023-10-04 | **searching for volunteer** |
| v2.48 | 2023-10-04 | Levi Harrison (GitHub: @LeviHarrison) |
| v2.49 | 2023-11-15 | **searching for volunteer** |

If you are interested in volunteering please create a pull request against the [prometheus/prometheus](https://github.com/prometheus/prometheus) repository and propose yourself for the release series of your choice.

Expand Down
389 changes: 286 additions & 103 deletions prompb/io/prometheus/client/metrics.pb.go

Large diffs are not rendered by default.

8 changes: 7 additions & 1 deletion prompb/io/prometheus/client/metrics.proto
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ message Gauge {
message Counter {
double value = 1;
Exemplar exemplar = 2;

google.protobuf.Timestamp created_timestamp = 3 [(gogoproto.nullable) = true];
}

message Quantile {
Expand All @@ -62,6 +64,8 @@ message Summary {
uint64 sample_count = 1;
double sample_sum = 2;
repeated Quantile quantile = 3 [(gogoproto.nullable) = false];

google.protobuf.Timestamp created_timestamp = 4 [(gogoproto.nullable) = true];
}

message Untyped {
Expand All @@ -75,6 +79,8 @@ message Histogram {
// Buckets for the conventional histogram.
repeated Bucket bucket = 3 [(gogoproto.nullable) = false]; // Ordered in increasing order of upper_bound, +Inf bucket is optional.

google.protobuf.Timestamp created_timestamp = 15 [(gogoproto.nullable) = true];

// Everything below here is for native histograms (also known as sparse histograms).
// Native histograms are an experimental feature without stability guarantees.

Expand Down Expand Up @@ -147,4 +153,4 @@ message MetricFamily {
string help = 2;
MetricType type = 3;
repeated Metric metric = 4 [(gogoproto.nullable) = false];
}
}
41 changes: 24 additions & 17 deletions scrape/scrape.go
Original file line number Diff line number Diff line change
Expand Up @@ -785,7 +785,8 @@ func appender(app storage.Appender, sampleLimit, bucketLimit int) storage.Append

// A scraper retrieves samples and accepts a status report at the end.
type scraper interface {
scrape(ctx context.Context, w io.Writer) (string, error)
scrape(ctx context.Context) (*http.Response, error)
readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error)
Report(start time.Time, dur time.Duration, err error)
offset(interval time.Duration, offsetSeed uint64) time.Duration
}
Expand Down Expand Up @@ -814,11 +815,11 @@ const (

var UserAgent = fmt.Sprintf("Prometheus/%s", version.Version)

func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
func (s *targetScraper) scrape(ctx context.Context) (*http.Response, error) {
if s.req == nil {
req, err := http.NewRequest("GET", s.URL().String(), nil)
if err != nil {
return "", err
return nil, err
}
req.Header.Add("Accept", s.acceptHeader)
req.Header.Add("Accept-Encoding", "gzip")
Expand All @@ -828,10 +829,10 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error)
s.req = req
}

resp, err := s.client.Do(s.req.WithContext(ctx))
if err != nil {
return "", err
}
return s.client.Do(s.req.WithContext(ctx))
}

func (s *targetScraper) readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error) {
defer func() {
io.Copy(io.Discard, resp.Body)
resp.Body.Close()
Expand All @@ -858,13 +859,14 @@ func (s *targetScraper) scrape(ctx context.Context, w io.Writer) (string, error)

if s.gzipr == nil {
s.buf = bufio.NewReader(resp.Body)
var err error
s.gzipr, err = gzip.NewReader(s.buf)
if err != nil {
return "", err
}
} else {
s.buf.Reset(resp.Body)
if err = s.gzipr.Reset(s.buf); err != nil {
if err := s.gzipr.Reset(s.buf); err != nil {
return "", err
}
}
Expand Down Expand Up @@ -1326,11 +1328,7 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
)
}

b := sl.buffers.Get(sl.lastScrapeSize).([]byte)
defer sl.buffers.Put(b)
buf := bytes.NewBuffer(b)

var total, added, seriesAdded, bytes int
var total, added, seriesAdded, bytesRead int
var err, appErr, scrapeErr error

app := sl.appender(sl.appenderCtx)
Expand All @@ -1346,7 +1344,7 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
}()

defer func() {
if err = sl.report(app, appendTime, time.Since(start), total, added, seriesAdded, bytes, scrapeErr); err != nil {
if err = sl.report(app, appendTime, time.Since(start), total, added, seriesAdded, bytesRead, scrapeErr); err != nil {
level.Warn(sl.l).Log("msg", "Appending scrape report failed", "err", err)
}
}()
Expand All @@ -1367,8 +1365,17 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
}

var contentType string
var resp *http.Response
var b []byte
var buf *bytes.Buffer
scrapeCtx, cancel := context.WithTimeout(sl.parentCtx, sl.timeout)
contentType, scrapeErr = sl.scraper.scrape(scrapeCtx, buf)
resp, scrapeErr = sl.scraper.scrape(scrapeCtx)
if scrapeErr == nil {
b = sl.buffers.Get(sl.lastScrapeSize).([]byte)
defer sl.buffers.Put(b)
buf = bytes.NewBuffer(b)
contentType, scrapeErr = sl.scraper.readResponse(scrapeCtx, resp, buf)
}
cancel()

if scrapeErr == nil {
Expand All @@ -1379,14 +1386,14 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
if len(b) > 0 {
sl.lastScrapeSize = len(b)
}
bytes = len(b)
bytesRead = len(b)
} else {
level.Debug(sl.l).Log("msg", "Scrape failed", "err", scrapeErr)
if errc != nil {
errc <- scrapeErr
}
if errors.Is(scrapeErr, errBodySizeLimit) {
bytes = -1
bytesRead = -1
}
}

Expand Down
32 changes: 24 additions & 8 deletions scrape/scrape_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2619,7 +2619,9 @@ func TestTargetScraperScrapeOK(t *testing.T) {
}
var buf bytes.Buffer

contentType, err := ts.scrape(context.Background(), &buf)
resp, err := ts.scrape(context.Background())
require.NoError(t, err)
contentType, err := ts.readResponse(context.Background(), resp, &buf)
require.NoError(t, err)
require.Equal(t, "text/plain; version=0.0.4", contentType)
require.Equal(t, "metric_a 1\nmetric_b 2\n", buf.String())
Expand Down Expand Up @@ -2665,7 +2667,7 @@ func TestTargetScrapeScrapeCancel(t *testing.T) {
}()

go func() {
_, err := ts.scrape(ctx, io.Discard)
_, err := ts.scrape(ctx)
switch {
case err == nil:
errc <- errors.New("Expected error but got nil")
Expand Down Expand Up @@ -2711,7 +2713,9 @@ func TestTargetScrapeScrapeNotFound(t *testing.T) {
acceptHeader: scrapeAcceptHeader,
}

_, err = ts.scrape(context.Background(), io.Discard)
resp, err := ts.scrape(context.Background())
require.NoError(t, err)
_, err = ts.readResponse(context.Background(), resp, io.Discard)
require.Contains(t, err.Error(), "404", "Expected \"404 NotFound\" error but got: %s", err)
}

Expand Down Expand Up @@ -2755,26 +2759,34 @@ func TestTargetScraperBodySizeLimit(t *testing.T) {
var buf bytes.Buffer

// Target response uncompressed body, scrape with body size limit.
_, err = ts.scrape(context.Background(), &buf)
resp, err := ts.scrape(context.Background())
require.NoError(t, err)
_, err = ts.readResponse(context.Background(), resp, &buf)
require.ErrorIs(t, err, errBodySizeLimit)
require.Equal(t, bodySizeLimit, buf.Len())
// Target response gzip compressed body, scrape with body size limit.
gzipResponse = true
buf.Reset()
_, err = ts.scrape(context.Background(), &buf)
resp, err = ts.scrape(context.Background())
require.NoError(t, err)
_, err = ts.readResponse(context.Background(), resp, &buf)
require.ErrorIs(t, err, errBodySizeLimit)
require.Equal(t, bodySizeLimit, buf.Len())
// Target response uncompressed body, scrape without body size limit.
gzipResponse = false
buf.Reset()
ts.bodySizeLimit = 0
_, err = ts.scrape(context.Background(), &buf)
resp, err = ts.scrape(context.Background())
require.NoError(t, err)
_, err = ts.readResponse(context.Background(), resp, &buf)
require.NoError(t, err)
require.Equal(t, len(responseBody), buf.Len())
// Target response gzip compressed body, scrape without body size limit.
gzipResponse = true
buf.Reset()
_, err = ts.scrape(context.Background(), &buf)
resp, err = ts.scrape(context.Background())
require.NoError(t, err)
_, err = ts.readResponse(context.Background(), resp, &buf)
require.NoError(t, err)
require.Equal(t, len(responseBody), buf.Len())
}
Expand Down Expand Up @@ -2802,7 +2814,11 @@ func (ts *testScraper) Report(start time.Time, duration time.Duration, err error
ts.lastError = err
}

func (ts *testScraper) scrape(ctx context.Context, w io.Writer) (string, error) {
func (ts *testScraper) scrape(ctx context.Context) (*http.Response, error) {
return nil, ts.scrapeErr
}

func (ts *testScraper) readResponse(ctx context.Context, resp *http.Response, w io.Writer) (string, error) {
if ts.scrapeFunc != nil {
return "", ts.scrapeFunc(ctx, w)
}
Expand Down
36 changes: 21 additions & 15 deletions tsdb/index/postings.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,7 @@ type Postings interface {
Seek(v storage.SeriesRef) bool

// At returns the value at the current iterator position.
// At should only be called after a successful call to Next or Seek.
At() storage.SeriesRef

// Err returns the last error of the iterator.
Expand Down Expand Up @@ -747,7 +748,7 @@ func (rp *removedPostings) Err() error {
// ListPostings implements the Postings interface over a plain list.
type ListPostings struct {
list []storage.SeriesRef
pos int
cur storage.SeriesRef
}

func NewListPostings(list []storage.SeriesRef) Postings {
Expand All @@ -759,34 +760,39 @@ func newListPostings(list ...storage.SeriesRef) *ListPostings {
}

func (it *ListPostings) At() storage.SeriesRef {
return it.list[it.pos-1]
return it.cur
}

func (it *ListPostings) Next() bool {
if it.pos < len(it.list) {
it.pos++
if len(it.list) > 0 {
it.cur = it.list[0]
it.list = it.list[1:]
return true
}
it.cur = 0
return false
}

func (it *ListPostings) Seek(x storage.SeriesRef) bool {
if it.pos == 0 {
it.pos++
}
if it.pos > len(it.list) {
return false
}
// If the current value satisfies, then return.
if it.list[it.pos-1] >= x {
if it.cur >= x {
return true
}
if len(it.list) == 0 {
return false
}

// Do binary search between current position and end.
it.pos = sort.Search(len(it.list[it.pos-1:]), func(i int) bool {
return it.list[i+it.pos-1] >= x
}) + it.pos
return it.pos-1 < len(it.list)
i := sort.Search(len(it.list), func(i int) bool {
return it.list[i] >= x
})
if i < len(it.list) {
it.cur = it.list[i]
it.list = it.list[i+1:]
return true
}
it.list = nil
return false
}

func (it *ListPostings) Err() error {
Expand Down
2 changes: 2 additions & 0 deletions tsdb/index/postings_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1242,6 +1242,8 @@ func TestListPostings(t *testing.T) {
})
}

// BenchmarkListPostings benchmarks ListPostings by iterating Next/At sequentially.
// See also BenchmarkIntersect as it performs more `At` calls than `Next` calls when intersecting.
func BenchmarkListPostings(b *testing.B) {
const maxCount = 1e6
input := make([]storage.SeriesRef, maxCount)
Expand Down

0 comments on commit d3d5af2

Please sign in to comment.