Skip to content

Commit

Permalink
Merge branch '2434-iss-trace-stream-feed' into 'dev'
Browse files Browse the repository at this point in the history
feat: add trace-based point feed on ddtrace and otel

See merge request cloudcare-tools/datakit!3250
  • Loading branch information
谭彪 committed Nov 1, 2024
2 parents 895230a + 2d90611 commit 98e20e6
Show file tree
Hide file tree
Showing 25 changed files with 544 additions and 177 deletions.
46 changes: 32 additions & 14 deletions internal/io/dataway/body.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,25 @@ import (
"github.com/GuanceCloud/cliutils/point"
)

type walFrom int8
type (
walFrom int8
gzipFlag int8
bufOnwer int8
)

const (
walFromMem walFrom = iota
walFromDisk
walFromMem walFrom = 0
walFromDisk walFrom = 1
walFromNotSet walFrom = -1

gzipRaw gzipFlag = 0
gzipSet gzipFlag = 1
gzipNotSet gzipFlag = -1

bufOnwerOthers bufOnwer = 0
bufOnwerSelf bufOnwer = 1

encNotSet point.Encoding = -1
)

func (f walFrom) String() string {
Expand All @@ -43,22 +57,22 @@ type body struct {

chksum string

selfBuffer, // buffer that belongs to itself, and we should not drop it when putback
gzon int8
from walFrom
selfBuffer bufOnwer // buffer that belongs to itself, and we should not drop it when putback
gzon gzipFlag
from walFrom
}

func (b *body) reset() {
b.CacheData.Payload = nil
b.CacheData.PayloadType = int32(point.Protobuf)
b.CacheData.Category = int32(point.Protobuf)
b.CacheData.PayloadType = int32(encNotSet)
b.CacheData.Category = int32(point.UnknownCategory)

b.CacheData.Headers = b.CacheData.Headers[:0]
b.CacheData.DynURL = ""
b.CacheData.Pts = 0
b.CacheData.RawLen = 0

if b.selfBuffer != 1 { // buffer not managed by itself
if b.selfBuffer != bufOnwerSelf { // buffer not managed by itself
b.sendBuf = nil
b.marshalBuf = nil
}
Expand All @@ -67,8 +81,8 @@ func (b *body) reset() {
// and WAL protobuf marshal, their len(x) is always it's capacity. If len(x) changed,
// this will **panic** body encoding and protobuf marshal.

b.gzon = -1
b.from = walFromMem
b.gzon = gzipNotSet
b.from = walFromNotSet
}

func (b *body) buf() []byte {
Expand Down Expand Up @@ -104,6 +118,10 @@ func (b *body) loadCache(data []byte) error {
return fmt.Errorf("Unmarshal: %w", err)
}

if b.enc() == encNotSet || b.cat() == point.UnknownCategory {
l.Warnf("invalid body: %s", b.pretty())
}

return nil
}

Expand All @@ -129,8 +147,8 @@ func (b *body) String() string {
func (b *body) pretty() string {
var arr []string
arr = append(arr, fmt.Sprintf("\n%p from: %s", b, b.from))
arr = append(arr, fmt.Sprintf("enc: %s", b.enc()))
arr = append(arr, fmt.Sprintf("cat: %s", b.cat()))
arr = append(arr, fmt.Sprintf("enc: %d/%s", b.enc(), b.enc()))
arr = append(arr, fmt.Sprintf("cat: %d/%s", b.cat(), b.cat()))
arr = append(arr, fmt.Sprintf("gzon: %d", b.gzon))
arr = append(arr, fmt.Sprintf("#buf: %d", len(b.buf())))
arr = append(arr, fmt.Sprintf("#send-buf: %d", len(b.sendBuf)))
Expand Down Expand Up @@ -217,7 +235,7 @@ func (w *writer) buildPointsBody() error {

if err := enc.LastErr(); err != nil {
l.Errorf("encode: %s, cat: %s, total points: %d, current part: %d, body cap: %d",
err.Error(), b.cat().Alias(), len(w.points), parts, cap(b.sendBuf))
err.Error(), w.category.Alias(), len(w.points), parts, cap(b.sendBuf))
return err
}

Expand Down
2 changes: 1 addition & 1 deletion internal/io/dataway/body_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ func BenchmarkBuildBody(b *T.B) {
name string
pts []*point.Point
batch int
gz int
gz gzipFlag
enc point.Encoding
}{
{
Expand Down
1 change: 1 addition & 0 deletions internal/io/dataway/dialtesting.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func (d *DialtestingSender) WriteData(url string, pts []*point.Point) error {
w := getWriter(WithPoints(pts),
WithDynamicURL(url),
WithCategory(point.DynamicDWCategory),
WithHTTPEncoding(point.LineProtocol),
WithBodyCallback(func(w *writer, b *body) error {
err := d.ep.writePointData(w, b)
if err != nil {
Expand Down
15 changes: 11 additions & 4 deletions internal/io/dataway/endpoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func TestEndpointRetry(t *T.T) {

func TestEndpointMetrics(t *T.T) {
t.Run("5xx-request", func(t *T.T) {
metricsReset()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
Expand All @@ -55,7 +56,6 @@ func TestEndpointMetrics(t *T.T) {

t.Cleanup(func() {
ts.Close()
metricsReset()
})

api := "/some/path"
Expand Down Expand Up @@ -96,6 +96,7 @@ func TestEndpointMetrics(t *T.T) {
})

t.Run("write-points-4xx", func(t *T.T) {
metricsReset()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
assert.NoError(t, err)
Expand Down Expand Up @@ -133,6 +134,7 @@ test-2 f1=1i,f2=false 123
point.NewPointV2("test-1", point.NewKVs(map[string]any{"f1": 1, "f2": false}), point.WithTime(time.Unix(0, 123))),
point.NewPointV2("test-2", point.NewKVs(map[string]any{"f1": 1, "f2": false}), point.WithTime(time.Unix(0, 123))),
}),
WithHTTPEncoding(point.LineProtocol),
WithCategory(point.Metric))
defer putWriter(w)

Expand Down Expand Up @@ -164,11 +166,11 @@ test-2 f1=1i,f2=false 123

t.Cleanup(func() {
ts.Close()
metricsReset()
})
})

t.Run("write-n-points-ok", func(t *T.T) {
metricsReset()
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
defer r.Body.Close()
Expand All @@ -189,6 +191,8 @@ test-2 f1=1i,f2=false 123
w.WriteHeader(200)
}))

time.Sleep(time.Second)

urlstr := fmt.Sprintf("%s?token=abc", ts.URL)
ep, err := newEndpoint(urlstr, withAPIs([]string{datakit.Metric}))
assert.NoError(t, err)
Expand All @@ -201,6 +205,7 @@ test-2 f1=1i,f2=false 123
point.NewPointV2("test-1", point.NewKVs(map[string]any{"f1": 1, "f2": false}), point.WithTime(time.Unix(0, 123))),
point.NewPointV2("test-2", point.NewKVs(map[string]any{"f1": 1, "f2": false}), point.WithTime(time.Unix(0, 123))),
}),
WithHTTPEncoding(point.LineProtocol),
WithGzip(1),
)
defer putWriter(w)
Expand Down Expand Up @@ -234,11 +239,11 @@ test-2 f1=1i,f2=false 123

t.Cleanup(func() {
ts.Close()
metricsReset()
})
})

t.Run("with-proxy", func(t *T.T) {
metricsReset()
backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
body, err := io.ReadAll(r.Body)
assert.NoError(t, err)
Expand Down Expand Up @@ -274,7 +279,9 @@ test-2 f1=1i,f2=false 123
WithPoints([]*point.Point{
point.NewPointV2("test-1", point.NewKVs(map[string]any{"f1": 1, "f2": false}), point.WithTime(time.Unix(0, 123))),
point.NewPointV2("test-2", point.NewKVs(map[string]any{"f1": 1, "f2": false}), point.WithTime(time.Unix(0, 123))),
}), WithGzip(1))
}),
WithHTTPEncoding(point.LineProtocol),
WithGzip(1))
defer putWriter(w)

reg := prometheus.NewRegistry()
Expand Down
4 changes: 2 additions & 2 deletions internal/io/dataway/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,9 +130,9 @@ func (f *flusher) start() {
}

func (f *flusher) do(b *body, opts ...WriteOption) error {
gzOn := 0
gzOn := gzipNotSet
if f.dw.GZip {
gzOn = 1
gzOn = gzipSet
}

w := getWriter(
Expand Down
8 changes: 4 additions & 4 deletions internal/io/dataway/gz.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,15 @@ func (z *gzipWriter) zip(data []byte) ([]byte, error) {
return z.buf.Bytes(), nil
}

func isGzip(data []byte) int8 {
func isGzip(data []byte) gzipFlag {
if len(data) < 2 {
return -1
return gzipNotSet
}

// See: https://stackoverflow.com/a/6059342/342348
if data[0] == 0x1f && data[1] == 0x8b {
return 1
return gzipSet
} else {
return 0
return gzipRaw
}
}
4 changes: 2 additions & 2 deletions internal/io/dataway/gz_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestEqualGZip(t *T.T) {
pgzBytes := buf.Bytes()
t.Logf("raw data: %d bytes, pgzip: %d", len(arr[0]), len(pgzBytes))

assert.Equal(t, int8(1), isGzip(pgzBytes))
assert.Equal(t, gzipSet, isGzip(pgzBytes))

// unzip pgzBytes with go's gzip
gzr := bytes.NewBuffer(pgzBytes)
Expand All @@ -69,7 +69,7 @@ func TestEqualGZip(t *T.T) {
kgzBytes := buf.Bytes()
t.Logf("raw data: %d bytes, kgzip: %d", len(arr[0]), len(kgzBytes))

assert.Equal(t, int8(1), isGzip(kgzBytes))
assert.Equal(t, gzipSet, isGzip(kgzBytes))

// unzip pgzBytes with go's gzip
gzr = bytes.NewBuffer(kgzBytes)
Expand Down
31 changes: 8 additions & 23 deletions internal/io/dataway/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ package dataway

import (
sync "sync"

"github.com/GuanceCloud/cliutils/point"
)

var (
Expand All @@ -29,7 +27,7 @@ func withNewBuffer(n int) bodyOpt {
// +10% on marshal buffer: we need more bytes for meta-info about the body
extra := int(float64(n) * .1)
b.marshalBuf = make([]byte, n+extra)
b.selfBuffer = 1
b.selfBuffer = bufOnwerSelf
}
}
}
Expand All @@ -42,7 +40,7 @@ func withReusableBuffer(send, marshal []byte) bodyOpt {
if len(send) > 0 && len(marshal) > 0 { // sendBuf and marshalBuf should not nil
b.sendBuf = send
b.marshalBuf = marshal
b.selfBuffer = 0 // buffer not comes from new buffer
b.selfBuffer = bufOnwerOthers // buffer not comes from new buffer
}
}
}
Expand All @@ -51,7 +49,7 @@ func getNewBufferBody(opts ...bodyOpt) *body {
var b *body
if x := newBufferBodyPool.Get(); x == nil {
b = &body{
selfBuffer: 1,
selfBuffer: bufOnwerSelf,
}

bodyCounterVec.WithLabelValues("malloc", "get", "new").Inc()
Expand All @@ -75,7 +73,7 @@ func getReuseBufferBody(opts ...bodyOpt) *body {
var b *body
if x := reuseBufferBodyPool.Get(); x == nil {
b = &body{
selfBuffer: 0,
selfBuffer: bufOnwerOthers,
}

bodyCounterVec.WithLabelValues("malloc", "get", "reuse").Inc()
Expand All @@ -99,7 +97,7 @@ func putBody(b *body) {
if b != nil {
b.reset()

if b.selfBuffer == 1 {
if b.selfBuffer == bufOnwerSelf {
newBufferBodyPool.Put(b)
bodyCounterVec.WithLabelValues("pool", "put", "new").Inc()
} else {
Expand All @@ -114,9 +112,9 @@ func getWriter(opts ...WriteOption) *writer {

if x := wpool.Get(); x == nil {
w = &writer{
httpHeaders: map[string]string{},
batchBytesSize: defaultBatchSize,
httpHeaders: map[string]string{},
}
w.reset()
} else {
w = x.(*writer)
}
Expand All @@ -131,19 +129,6 @@ func getWriter(opts ...WriteOption) *writer {
}

func putWriter(w *writer) {
w.category = point.UnknownCategory
w.dynamicURL = ""
w.points = w.points[:0]
w.gzip = -1
w.cacheClean = false
w.cacheAll = false
w.batchBytesSize = 1 << 20
w.batchSize = 0
w.bcb = nil

for k := range w.httpHeaders {
delete(w.httpHeaders, k)
}
w.httpEncoding = point.LineProtocol
w.reset()
wpool.Put(w)
}
2 changes: 1 addition & 1 deletion internal/io/dataway/wal.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (q *WALQueue) Get(opts ...bodyOpt) (*body, error) {
return nil, nil
}

l.Debugf("from queue %d get bytes", len(raw))
l.Debugf("from queue get %d bytes", len(raw))

if err := b.loadCache(raw); err != nil {
return nil, err
Expand Down
Loading

0 comments on commit 98e20e6

Please sign in to comment.