diff --git a/internal/io/dataway/body.go b/internal/io/dataway/body.go index 04dcdb0b80..880a8bd9ba 100644 --- a/internal/io/dataway/body.go +++ b/internal/io/dataway/body.go @@ -14,11 +14,25 @@ import ( "github.com/GuanceCloud/cliutils/point" ) -type walFrom int8 +type ( + walFrom int8 + gzipFlag int8 + bufOnwer int8 +) const ( - walFromMem walFrom = iota - walFromDisk + walFromMem walFrom = 0 + walFromDisk walFrom = 1 + walFromNotSet walFrom = -1 + + gzipRaw gzipFlag = 0 + gzipSet gzipFlag = 1 + gzipNotSet gzipFlag = -1 + + bufOnwerOthers bufOnwer = 0 + bufOnwerSelf bufOnwer = 1 + + encNotSet point.Encoding = -1 ) func (f walFrom) String() string { @@ -43,22 +57,22 @@ type body struct { chksum string - selfBuffer, // buffer that belongs to itself, and we should not drop it when putback - gzon int8 - from walFrom + selfBuffer bufOnwer // buffer that belongs to itself, and we should not drop it when putback + gzon gzipFlag + from walFrom } func (b *body) reset() { b.CacheData.Payload = nil - b.CacheData.PayloadType = int32(point.Protobuf) - b.CacheData.Category = int32(point.Protobuf) + b.CacheData.PayloadType = int32(encNotSet) + b.CacheData.Category = int32(point.UnknownCategory) b.CacheData.Headers = b.CacheData.Headers[:0] b.CacheData.DynURL = "" b.CacheData.Pts = 0 b.CacheData.RawLen = 0 - if b.selfBuffer != 1 { // buffer not managed by itself + if b.selfBuffer != bufOnwerSelf { // buffer not managed by itself b.sendBuf = nil b.marshalBuf = nil } @@ -67,8 +81,8 @@ func (b *body) reset() { // and WAL protobuf marshal, their len(x) is always it's capacity. If len(x) changed, // this will **panic** body encoding and protobuf marshal. - b.gzon = -1 - b.from = walFromMem + b.gzon = gzipNotSet + b.from = walFromNotSet } func (b *body) buf() []byte { @@ -104,6 +118,10 @@ func (b *body) loadCache(data []byte) error { return fmt.Errorf("Unmarshal: %w", err) } + if b.enc() == encNotSet || b.cat() == point.UnknownCategory { + l.Warnf("invalid body: %s", b.pretty()) + } + return nil } @@ -129,8 +147,8 @@ func (b *body) String() string { func (b *body) pretty() string { var arr []string arr = append(arr, fmt.Sprintf("\n%p from: %s", b, b.from)) - arr = append(arr, fmt.Sprintf("enc: %s", b.enc())) - arr = append(arr, fmt.Sprintf("cat: %s", b.cat())) + arr = append(arr, fmt.Sprintf("enc: %d/%s", b.enc(), b.enc())) + arr = append(arr, fmt.Sprintf("cat: %d/%s", b.cat(), b.cat())) arr = append(arr, fmt.Sprintf("gzon: %d", b.gzon)) arr = append(arr, fmt.Sprintf("#buf: %d", len(b.buf()))) arr = append(arr, fmt.Sprintf("#send-buf: %d", len(b.sendBuf))) @@ -217,7 +235,7 @@ func (w *writer) buildPointsBody() error { if err := enc.LastErr(); err != nil { l.Errorf("encode: %s, cat: %s, total points: %d, current part: %d, body cap: %d", - err.Error(), b.cat().Alias(), len(w.points), parts, cap(b.sendBuf)) + err.Error(), w.category.Alias(), len(w.points), parts, cap(b.sendBuf)) return err } diff --git a/internal/io/dataway/body_test.go b/internal/io/dataway/body_test.go index 95f8214aef..898b5aca0e 100644 --- a/internal/io/dataway/body_test.go +++ b/internal/io/dataway/body_test.go @@ -304,7 +304,7 @@ func BenchmarkBuildBody(b *T.B) { name string pts []*point.Point batch int - gz int + gz gzipFlag enc point.Encoding }{ { diff --git a/internal/io/dataway/dialtesting.go b/internal/io/dataway/dialtesting.go index bef8289063..a7b67bb644 100644 --- a/internal/io/dataway/dialtesting.go +++ b/internal/io/dataway/dialtesting.go @@ -42,6 +42,7 @@ func (d *DialtestingSender) WriteData(url string, pts []*point.Point) error { w := getWriter(WithPoints(pts), WithDynamicURL(url), WithCategory(point.DynamicDWCategory), + WithHTTPEncoding(point.LineProtocol), WithBodyCallback(func(w *writer, b *body) error { err := d.ep.writePointData(w, b) if err != nil { diff --git a/internal/io/dataway/endpoint_test.go b/internal/io/dataway/endpoint_test.go index 818f420844..fc1759472c 100644 --- a/internal/io/dataway/endpoint_test.go +++ b/internal/io/dataway/endpoint_test.go @@ -46,6 +46,7 @@ func TestEndpointRetry(t *T.T) { func TestEndpointMetrics(t *T.T) { t.Run("5xx-request", func(t *T.T) { + metricsReset() ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusInternalServerError) })) @@ -55,7 +56,6 @@ func TestEndpointMetrics(t *T.T) { t.Cleanup(func() { ts.Close() - metricsReset() }) api := "/some/path" @@ -96,6 +96,7 @@ func TestEndpointMetrics(t *T.T) { }) t.Run("write-points-4xx", func(t *T.T) { + metricsReset() ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) assert.NoError(t, err) @@ -133,6 +134,7 @@ test-2 f1=1i,f2=false 123 point.NewPointV2("test-1", point.NewKVs(map[string]any{"f1": 1, "f2": false}), point.WithTime(time.Unix(0, 123))), point.NewPointV2("test-2", point.NewKVs(map[string]any{"f1": 1, "f2": false}), point.WithTime(time.Unix(0, 123))), }), + WithHTTPEncoding(point.LineProtocol), WithCategory(point.Metric)) defer putWriter(w) @@ -164,11 +166,11 @@ test-2 f1=1i,f2=false 123 t.Cleanup(func() { ts.Close() - metricsReset() }) }) t.Run("write-n-points-ok", func(t *T.T) { + metricsReset() ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) defer r.Body.Close() @@ -189,6 +191,8 @@ test-2 f1=1i,f2=false 123 w.WriteHeader(200) })) + time.Sleep(time.Second) + urlstr := fmt.Sprintf("%s?token=abc", ts.URL) ep, err := newEndpoint(urlstr, withAPIs([]string{datakit.Metric})) assert.NoError(t, err) @@ -201,6 +205,7 @@ test-2 f1=1i,f2=false 123 point.NewPointV2("test-1", point.NewKVs(map[string]any{"f1": 1, "f2": false}), point.WithTime(time.Unix(0, 123))), point.NewPointV2("test-2", point.NewKVs(map[string]any{"f1": 1, "f2": false}), point.WithTime(time.Unix(0, 123))), }), + WithHTTPEncoding(point.LineProtocol), WithGzip(1), ) defer putWriter(w) @@ -234,11 +239,11 @@ test-2 f1=1i,f2=false 123 t.Cleanup(func() { ts.Close() - metricsReset() }) }) t.Run("with-proxy", func(t *T.T) { + metricsReset() backend := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { body, err := io.ReadAll(r.Body) assert.NoError(t, err) @@ -274,7 +279,9 @@ test-2 f1=1i,f2=false 123 WithPoints([]*point.Point{ point.NewPointV2("test-1", point.NewKVs(map[string]any{"f1": 1, "f2": false}), point.WithTime(time.Unix(0, 123))), point.NewPointV2("test-2", point.NewKVs(map[string]any{"f1": 1, "f2": false}), point.WithTime(time.Unix(0, 123))), - }), WithGzip(1)) + }), + WithHTTPEncoding(point.LineProtocol), + WithGzip(1)) defer putWriter(w) reg := prometheus.NewRegistry() diff --git a/internal/io/dataway/flush.go b/internal/io/dataway/flush.go index b04d52c7dc..507ce255c6 100644 --- a/internal/io/dataway/flush.go +++ b/internal/io/dataway/flush.go @@ -130,9 +130,9 @@ func (f *flusher) start() { } func (f *flusher) do(b *body, opts ...WriteOption) error { - gzOn := 0 + gzOn := gzipNotSet if f.dw.GZip { - gzOn = 1 + gzOn = gzipSet } w := getWriter( diff --git a/internal/io/dataway/gz.go b/internal/io/dataway/gz.go index bc8af6b135..a30aad9f17 100644 --- a/internal/io/dataway/gz.go +++ b/internal/io/dataway/gz.go @@ -54,15 +54,15 @@ func (z *gzipWriter) zip(data []byte) ([]byte, error) { return z.buf.Bytes(), nil } -func isGzip(data []byte) int8 { +func isGzip(data []byte) gzipFlag { if len(data) < 2 { - return -1 + return gzipNotSet } // See: https://stackoverflow.com/a/6059342/342348 if data[0] == 0x1f && data[1] == 0x8b { - return 1 + return gzipSet } else { - return 0 + return gzipRaw } } diff --git a/internal/io/dataway/gz_test.go b/internal/io/dataway/gz_test.go index d9c6f68cb9..1da902050f 100644 --- a/internal/io/dataway/gz_test.go +++ b/internal/io/dataway/gz_test.go @@ -45,7 +45,7 @@ func TestEqualGZip(t *T.T) { pgzBytes := buf.Bytes() t.Logf("raw data: %d bytes, pgzip: %d", len(arr[0]), len(pgzBytes)) - assert.Equal(t, int8(1), isGzip(pgzBytes)) + assert.Equal(t, gzipSet, isGzip(pgzBytes)) // unzip pgzBytes with go's gzip gzr := bytes.NewBuffer(pgzBytes) @@ -69,7 +69,7 @@ func TestEqualGZip(t *T.T) { kgzBytes := buf.Bytes() t.Logf("raw data: %d bytes, kgzip: %d", len(arr[0]), len(kgzBytes)) - assert.Equal(t, int8(1), isGzip(kgzBytes)) + assert.Equal(t, gzipSet, isGzip(kgzBytes)) // unzip pgzBytes with go's gzip gzr = bytes.NewBuffer(kgzBytes) diff --git a/internal/io/dataway/pool.go b/internal/io/dataway/pool.go index 90a75c8ce7..b0ac9b887b 100644 --- a/internal/io/dataway/pool.go +++ b/internal/io/dataway/pool.go @@ -7,8 +7,6 @@ package dataway import ( sync "sync" - - "github.com/GuanceCloud/cliutils/point" ) var ( @@ -29,7 +27,7 @@ func withNewBuffer(n int) bodyOpt { // +10% on marshal buffer: we need more bytes for meta-info about the body extra := int(float64(n) * .1) b.marshalBuf = make([]byte, n+extra) - b.selfBuffer = 1 + b.selfBuffer = bufOnwerSelf } } } @@ -42,7 +40,7 @@ func withReusableBuffer(send, marshal []byte) bodyOpt { if len(send) > 0 && len(marshal) > 0 { // sendBuf and marshalBuf should not nil b.sendBuf = send b.marshalBuf = marshal - b.selfBuffer = 0 // buffer not comes from new buffer + b.selfBuffer = bufOnwerOthers // buffer not comes from new buffer } } } @@ -51,7 +49,7 @@ func getNewBufferBody(opts ...bodyOpt) *body { var b *body if x := newBufferBodyPool.Get(); x == nil { b = &body{ - selfBuffer: 1, + selfBuffer: bufOnwerSelf, } bodyCounterVec.WithLabelValues("malloc", "get", "new").Inc() @@ -75,7 +73,7 @@ func getReuseBufferBody(opts ...bodyOpt) *body { var b *body if x := reuseBufferBodyPool.Get(); x == nil { b = &body{ - selfBuffer: 0, + selfBuffer: bufOnwerOthers, } bodyCounterVec.WithLabelValues("malloc", "get", "reuse").Inc() @@ -99,7 +97,7 @@ func putBody(b *body) { if b != nil { b.reset() - if b.selfBuffer == 1 { + if b.selfBuffer == bufOnwerSelf { newBufferBodyPool.Put(b) bodyCounterVec.WithLabelValues("pool", "put", "new").Inc() } else { @@ -114,9 +112,9 @@ func getWriter(opts ...WriteOption) *writer { if x := wpool.Get(); x == nil { w = &writer{ - httpHeaders: map[string]string{}, - batchBytesSize: defaultBatchSize, + httpHeaders: map[string]string{}, } + w.reset() } else { w = x.(*writer) } @@ -131,19 +129,6 @@ func getWriter(opts ...WriteOption) *writer { } func putWriter(w *writer) { - w.category = point.UnknownCategory - w.dynamicURL = "" - w.points = w.points[:0] - w.gzip = -1 - w.cacheClean = false - w.cacheAll = false - w.batchBytesSize = 1 << 20 - w.batchSize = 0 - w.bcb = nil - - for k := range w.httpHeaders { - delete(w.httpHeaders, k) - } - w.httpEncoding = point.LineProtocol + w.reset() wpool.Put(w) } diff --git a/internal/io/dataway/wal.go b/internal/io/dataway/wal.go index 19388f4772..e4cbd9a0d2 100644 --- a/internal/io/dataway/wal.go +++ b/internal/io/dataway/wal.go @@ -139,7 +139,7 @@ func (q *WALQueue) Get(opts ...bodyOpt) (*body, error) { return nil, nil } - l.Debugf("from queue %d get bytes", len(raw)) + l.Debugf("from queue get %d bytes", len(raw)) if err := b.loadCache(raw); err != nil { return nil, err diff --git a/internal/io/dataway/write.go b/internal/io/dataway/write.go index 1201dd4e71..6450cbefac 100644 --- a/internal/io/dataway/write.go +++ b/internal/io/dataway/write.go @@ -9,8 +9,6 @@ import ( "github.com/GuanceCloud/cliutils/point" ) -var MaxKodoBody = 10 * 1000 * 1000 - type WriteOption func(w *writer) func WithCategory(cat point.Category) WriteOption { @@ -49,7 +47,7 @@ func WithCacheClean(on bool) WriteOption { } } -func WithGzip(on int) WriteOption { +func WithGzip(on gzipFlag) WriteOption { return func(w *writer) { w.gzip = on } @@ -93,7 +91,7 @@ type writer struct { httpEncoding point.Encoding - gzip int + gzip gzipFlag cacheClean, cacheAll bool httpHeaders map[string]string @@ -101,6 +99,23 @@ type writer struct { bcb bodyCallback } +func (w *writer) reset() { + w.category = point.UnknownCategory + w.dynamicURL = "" + w.points = w.points[:0] + w.gzip = gzipNotSet + w.cacheClean = false + w.cacheAll = false + w.batchBytesSize = defaultBatchSize + w.batchSize = 0 + w.bcb = nil + + for k := range w.httpHeaders { + delete(w.httpHeaders, k) + } + w.httpEncoding = encNotSet +} + func (dw *Dataway) doGroupPoints(ptg *ptGrouper, cat point.Category, points []*point.Point) { for _, pt := range points { // clear kvs for current pt @@ -127,9 +142,9 @@ func (dw *Dataway) groupPoints(ptg *ptGrouper, } func (dw *Dataway) Write(opts ...WriteOption) error { - gzOn := 0 + gzOn := gzipNotSet if dw.GZip { - gzOn = 1 + gzOn = gzipSet } w := getWriter( diff --git a/internal/io/dataway/write_test.go b/internal/io/dataway/write_test.go index 3eb75f1f11..5377ff29bd 100644 --- a/internal/io/dataway/write_test.go +++ b/internal/io/dataway/write_test.go @@ -30,7 +30,7 @@ func TestIsGZip(t *T.T) { gz, err := datakit.GZip(data) assert.NoError(t, err) - assert.Equal(t, int8(1), isGzip(gz)) + assert.Equal(t, gzipFlag(1), isGzip(gz)) }) } @@ -112,7 +112,7 @@ func TestFailCache(t *T.T) { } // check cached data - assert.Equal(t, int8(1), isGzip(b.buf())) + assert.Equal(t, gzipFlag(1), isGzip(b.buf())) assert.Equal(t, cat, b.cat()) assert.Equal(t, point.Protobuf, b.enc()) diff --git a/internal/monitor/app.go b/internal/monitor/app.go index d842b5871e..7d0f6d774d 100644 --- a/internal/monitor/app.go +++ b/internal/monitor/app.go @@ -28,7 +28,7 @@ var ( walStatsCols = strings.Split("Cat|Points(mem/disk/drop/total)", "|") enabledInputCols = strings.Split(`Input|Count|Crashed`, "|") goroutineCols = strings.Split(`Name|Running|Done|TotalCost`, "|") - httpAPIStatCols = strings.Split(`API|Status|Total|Latency|BodySize`, "|") + httpAPIStatCols = strings.Split(`API|Status|Total|Latency|BodySize(P90/Total)`, "|") filterRuleCols = strings.Split("Cat|Total|Filtered(%)|Cost", "|") dwptsStatCols = strings.Split(`Cat|Points(ok/total)|Bytes(ok/total/gz)`, "|") dwCols = strings.Split(`API|Status|Count|Latency|Retry`, "|") @@ -209,6 +209,10 @@ func number(i interface{}) string { } func metricWithLabel(mf *dto.MetricFamily, vals ...string) *dto.Metric { + if mf == nil { + return nil + } + labelMatch := func(lps []*dto.LabelPair) bool { if len(lps) < len(vals) { return false diff --git a/internal/monitor/view_feed.go b/internal/monitor/view_feed.go index 25c943a8c7..ab442e00cd 100644 --- a/internal/monitor/view_feed.go +++ b/internal/monitor/view_feed.go @@ -114,17 +114,19 @@ func (app *monitorAPP) renderInputsFeedTable(mfs map[string]*dto.MetricFamily, c col++ // P90Lat - feedSum := metricWithLabel(feedCost, cat, inputName) - feedLat := "-" - if feedSum != nil { - q := feedSum.GetSummary().GetQuantile()[1] // p90 - if v := q.GetValue(); math.IsNaN(v) { - feedLat = "NaN" - } else { - feedLat = time.Duration(v * float64(time.Second)).String() + if feedCost != nil { + x := metricWithLabel(feedCost, cat, inputName) + feedLat := "-" + if x != nil { + q := x.GetSummary().GetQuantile()[1] // p90 + if v := q.GetValue(); math.IsNaN(v) { + feedLat = "NaN" + } else { + feedLat = time.Duration(v * float64(time.Second)).String() + } } + table.SetCell(row, col, tview.NewTableCell(feedLat).SetMaxWidth(app.maxTableWidth).SetAlign(tview.AlignRight)) } - table.SetCell(row, col, tview.NewTableCell(feedLat).SetMaxWidth(app.maxTableWidth).SetAlign(tview.AlignRight)) col++ // P90Pts diff --git a/internal/monitor/view_http.go b/internal/monitor/view_http.go index 9d8c7cb3c2..13b32572f8 100644 --- a/internal/monitor/view_http.go +++ b/internal/monitor/view_http.go @@ -93,7 +93,13 @@ func (app *monitorAPP) renderHTTPStatTable(mfs map[string]*dto.MetricFamily, col table.SetCell(row, col, tview.NewTableCell("-"). SetMaxWidth(app.maxTableWidth).SetAlign(tview.AlignRight)) } else { - table.SetCell(row, col, tview.NewTableCell(number(x.GetSummary().GetSampleSum())). + p90 := x.GetSummary().GetQuantile()[1] + cellVal := fmt.Sprintf("%s/%s", + number(p90.GetValue()), + number(x.GetSummary().GetSampleSum()), + ) + + table.SetCell(row, col, tview.NewTableCell(cellVal). SetMaxWidth(app.maxTableWidth).SetAlign(tview.AlignRight)) } } diff --git a/internal/plugins/inputs/ddtrace/ddtrace_http.go b/internal/plugins/inputs/ddtrace/ddtrace_http.go index 79471c88ae..5e988651e4 100644 --- a/internal/plugins/inputs/ddtrace/ddtrace_http.go +++ b/internal/plugins/inputs/ddtrace/ddtrace_http.go @@ -55,17 +55,36 @@ func httpStatusRespFunc(resp http.ResponseWriter, req *http.Request, err error) } func handleDDTraces(resp http.ResponseWriter, req *http.Request) { - if req.Header.Get("Content-Length") == "0" || req.Header.Get("X-Datadog-Trace-Count") == "0" { + clStr := req.Header.Get("Content-Length") + ntraceStr := req.Header.Get("X-Datadog-Trace-Count") + if clStr == "0" || ntraceStr == "0" { log.Debug("empty request body") httpStatusRespFunc(resp, req, nil) return } + ntrace, err := strconv.ParseInt(ntraceStr, 10, 64) + if err != nil { + log.Warnf("invalid X-Datadog-Trace-Count: %q, ignored", ntraceStr) + } + + cl, err := strconv.ParseInt(clStr, 10, 64) + if err != nil { + log.Warnf("invalid Content-Length: %q, ignored", clStr) + } else if maxTraceBody > 0 && cl > maxTraceBody { + if ntrace > 0 { + droppedTraces.WithLabelValues(req.URL.Path).Add(float64(ntrace)) + } + + log.Warnf("dropped %d trace: too large request body(%q bytes > %d bytes)", ntrace, clStr, maxTraceBody) + return + } + pbuf := bufpool.GetBuffer() defer bufpool.PutBuffer(pbuf) - _, err := io.Copy(pbuf, req.Body) + _, err = io.Copy(pbuf, req.Body) if err != nil { log.Error(err.Error()) resp.WriteHeader(http.StatusBadRequest) @@ -78,7 +97,9 @@ func handleDDTraces(resp http.ResponseWriter, req *http.Request) { Media: req.Header.Get("Content-Type"), Body: pbuf, } + log.Debugf("param body len=%d", param.Body.Len()) + if err = parseDDTraces(param); err != nil { if errors.Is(err, msgp.ErrShortBytes) { log.Warn(err.Error()) @@ -122,6 +143,7 @@ func parseDDTraces(param *itrace.TraceParameters) error { } if len(dktraces) != 0 && afterGatherRun != nil { + log.Debugf("feed %d traces", len(dktraces)) afterGatherRun.Run(inputName, dktraces) } @@ -157,17 +179,42 @@ func decodeDDTraces(param *itrace.TraceParameters) (itrace.DatakitTraces, error) } } + curSpans := 0 + maxBatch := 100 + + log.Debugf("transform ddtrace to dkspan, noStreaming=%v", noStreaming) + if len(traces) != 0 { for _, trace := range traces { if len(trace) == 0 { log.Debug("### empty trace in traces") continue } - if dktrace := ddtraceToDkTrace(trace); len(dktrace) != 0 { - dktraces = append(dktraces, dktrace) + + // decode single ddtrace into dktrace + dktrace := ddtraceToDkTrace(trace) + if nspan := len(dktrace); nspan > 0 { + if nspan > maxBatch && !noStreaming { // flush large trace ASAP. + log.Debugf("streaming feed %d spans", nspan) + afterGatherRun.Run(inputName, itrace.DatakitTraces{dktrace}) + } else { + dktraces = append(dktraces, dktrace) + curSpans += nspan + + if curSpans > maxBatch && !noStreaming { // multiple traces got too many spans, flush ASAP. + log.Debugf("streaming feed %d spans within %d traces", curSpans, len(dktraces)) + afterGatherRun.Run(inputName, dktraces) + + // clear and reset + dktraces = dktraces[:0] + curSpans = 0 + } + } } } } + + log.Debugf("curSpans: %d", curSpans) return dktraces, err } @@ -232,15 +279,38 @@ var ( func ddtraceToDkTrace(trace DDTrace) itrace.DatakitTrace { var ( - dktrace itrace.DatakitTrace - parentIDs, spanIDs = gatherSpansInfo(trace) + parentIDs, spanIDs = gatherSpansInfo(trace) // NOTE: we should gather before truncate + dktrace = make(itrace.DatakitTrace, 0, len(trace)) + strTraceID = "" ) + traceSpans.WithLabelValues(inputName).Observe(float64(len(trace))) + + // truncate too large spans + if traceMaxSpans > 0 && len(trace) > traceMaxSpans { + // append info to last span's meta + lastSpan := trace[traceMaxSpans-1] + if lastSpan.Meta == nil { + lastSpan.Meta = map[string]string{} + } + + lastSpan.Meta["__datakit_span_truncated"] = fmt.Sprintf("large trace that got spans %d, max span limit is %d(%d spans truncated)", + len(trace), traceMaxSpans, len(trace)-traceMaxSpans) + + log.Warnf("truncate %d spans from service %q", len(trace)-traceMaxSpans, trace[0].Service) + truncatedTraceSpans.WithLabelValues(inputName).Add(float64(len(trace) - traceMaxSpans)) + trace = trace[:traceMaxSpans] // truncated too large spans + } + for _, span := range trace { if span == nil { continue } + if strTraceID == "" { + strTraceID = strconv.FormatUint(span.TraceID, traceBase) + } + var spanKV point.KVs priority, ok := span.Metrics[keyPriority] if ok { @@ -255,15 +325,24 @@ func ddtraceToDkTrace(trace DDTrace) itrace.DatakitTrace { } } - spanKV = spanKV.Add(itrace.FieldTraceID, strconv.FormatUint(span.TraceID, traceBase), false, false). + resource := span.Resource + if strings.Contains(span.Resource, "\n") { + resource = strings.ReplaceAll(span.Resource, "\n", " ") + } + + spanKV = spanKV.Add(itrace.FieldTraceID, strTraceID, false, false). Add(itrace.FieldParentID, strconv.FormatUint(span.ParentID, spanBase), false, false). Add(itrace.FieldSpanid, strconv.FormatUint(span.SpanID, spanBase), false, false). AddTag(itrace.TagService, span.Service). - Add(itrace.FieldResource, strings.ReplaceAll(span.Resource, "\n", " "), false, false). + Add(itrace.FieldResource, resource, false, false). AddTag(itrace.TagOperation, span.Name). AddTag(itrace.TagSource, inputName). Add(itrace.TagSpanType, - itrace.FindSpanTypeInMultiServersIntSpanID(span.SpanID, span.ParentID, span.Service, spanIDs, parentIDs), true, false). + itrace.FindSpanTypeInMultiServersIntSpanID(span.SpanID, + span.ParentID, + span.Service, + spanIDs, + parentIDs), true, false). AddTag(itrace.TagSourceType, itrace.GetSpanSourceType(span.Type)). Add(itrace.FieldStart, span.Start/int64(time.Microsecond), false, false). Add(itrace.FieldDuration, span.Duration/int64(time.Microsecond), false, false) @@ -274,11 +353,12 @@ func ddtraceToDkTrace(trace DDTrace) itrace.DatakitTrace { spanKV = spanKV.AddTag("runtime_id", v).AddTag(runTimeIDKey, v) delete(span.Meta, runTimeIDKey) } + if v, ok := span.Meta["trace_128_bit_id"]; !ignoreTraceIDFromTag && ok { spanKV = spanKV.Add(itrace.FieldTraceID, v, false, true) } - for k, v := range tags { + for k, v := range inputTags { spanKV = spanKV.AddTag(k, v) } @@ -310,11 +390,14 @@ func ddtraceToDkTrace(trace DDTrace) itrace.DatakitTrace { } } - t := time.Unix(span.Start/1e9, span.Start%1e9) + t := time.Unix(0, span.Start) pt := point.NewPointV2(inputName, spanKV, append(traceOpts, point.WithTime(t))...) dktrace = append(dktrace, &itrace.DkSpan{Point: pt}) } + log.Debugf("build %d trace point from %d trace spans", + len(dktrace), cap(dktrace)) // cap(dktrace) is the origin trace span count + return dktrace } diff --git a/internal/plugins/inputs/ddtrace/env.go b/internal/plugins/inputs/ddtrace/env.go index 7955898801..86f9807b73 100644 --- a/internal/plugins/inputs/ddtrace/env.go +++ b/internal/plugins/inputs/ddtrace/env.go @@ -35,6 +35,26 @@ func (ipt *Input) GetENVDoc() []*inputs.ENVInfo { {FieldName: "WPConfig", ENVName: "THREADS", Type: doc.JSON, Example: `{"buffer":1000, "threads":100}`, Desc: "Total number of threads and buffer", DescZh: "线程和缓存的数量"}, {FieldName: "LocalCacheConfig", ENVName: "STORAGE", Type: doc.JSON, Example: `{"storage":"./ddtrace_storage", "capacity": 5120}`, Desc: "Local cache file path and size (MB) ", DescZh: "本地缓存路径和大小(MB)"}, {FieldName: "Tags", Type: doc.JSON, Example: `{"k1":"v1", "k2":"v2", "k3":"v3"}`}, + + { + FieldName: "TraceMaxSpans", + ENVName: "ENV_INPUT_DDTRACE_MAX_SPANS", + Type: doc.Int, + Example: `1000`, + Default: "100000", + Desc: "Max spans of single trace. Set to -1 to remove this limit", + DescZh: "单个 trace 最大 span 个数,如果超过该限制,多余的 span 将截断,置为 -1 可关闭该限制", + }, + + { + FieldName: "MaxTraceBodyMB", + ENVName: "ENV_INPUT_DDTRACE_MAX_BODY_MB", + Type: doc.JSON, + Example: `32`, + Default: "10", + Desc: "Max body(in MiB) of single trace POST. Set to -1 to remove this limit", + DescZh: "单个 trace API 请求最大 body 字节数(单位 MiB),置为 -1 可关闭该限制", + }, } return doc.SetENVDoc("ENV_INPUT_DDTRACE_", infos) @@ -52,6 +72,7 @@ func (ipt *Input) GetENVDoc() []*inputs.ENVInfo { // ENV_INPUT_DDTRACE_TAGS : JSON string // ENV_INPUT_DDTRACE_THREADS : JSON string // ENV_INPUT_DDTRACE_STORAGE : JSON string +// ENV_INPUT_DDTRACE_MAX_SPANS: int // below is a complete example for env in shell // export ENV_INPUT_DDTRACE_ENDPOINTS=`["/v0.3/traces", "/v0.4/traces", "/v0.5/traces"]` // export ENV_INPUT_DDTRACE_IGNORE_TAGS=`["block1", "block2"]` @@ -66,16 +87,40 @@ func (ipt *Input) ReadEnv(envs map[string]string) { log = logger.SLogger(inputName) for _, key := range []string{ - "ENV_INPUT_DDTRACE_ENDPOINTS", "ENV_INPUT_DDTRACE_COMPATIBLE_OTEL", "ENV_INPUT_DDTRACE_CUSTOMER_TAGS", "ENV_INPUT_DDTRACE_KEEP_RARE_RESOURCE", - "ENV_INPUT_DDTRACE_OMIT_ERR_STATUS", "ENV_INPUT_DDTRACE_CLOSE_RESOURCE", "ENV_INPUT_DDTRACE_SAMPLER", - "ENV_INPUT_DDTRACE_TAGS", "ENV_INPUT_DDTRACE_THREADS", "ENV_INPUT_DDTRACE_STORAGE", "ENV_INPUT_DDTRACE_DEL_MESSAGE", + "ENV_INPUT_DDTRACE_ENDPOINTS", + "ENV_INPUT_DDTRACE_COMPATIBLE_OTEL", + "ENV_INPUT_DDTRACE_CUSTOMER_TAGS", + "ENV_INPUT_DDTRACE_KEEP_RARE_RESOURCE", + "ENV_INPUT_DDTRACE_OMIT_ERR_STATUS", + "ENV_INPUT_DDTRACE_CLOSE_RESOURCE", + "ENV_INPUT_DDTRACE_SAMPLER", + "ENV_INPUT_DDTRACE_TAGS", + "ENV_INPUT_DDTRACE_THREADS", + "ENV_INPUT_DDTRACE_STORAGE", + "ENV_INPUT_DDTRACE_DEL_MESSAGE", "ENV_INPUT_DDTRACE_TRACE_ID_64_BIT_HEX", + "ENV_INPUT_DDTRACE_MAX_SPANS", + "ENV_INPUT_DDTRACE_MAX_BODY_MB", } { value, ok := envs[key] if !ok { continue } switch key { + case "ENV_INPUT_DDTRACE_MAX_SPANS": + if n, err := strconv.ParseInt(value, 10, 64); err != nil { + log.Warnf("parse %s=%s failed: %s", key, value, err.Error()) + } else { + ipt.TraceMaxSpans = int(n) + } + + case "ENV_INPUT_DDTRACE_MAX_BODY_MB": + if n, err := strconv.ParseInt(value, 10, 64); err != nil { + log.Warnf("parse %s=%s failed: %s", key, value, err.Error()) + } else { + ipt.MaxTraceBodyMB = n + } + case "ENV_INPUT_DDTRACE_ENDPOINTS": var list []string if err := json.Unmarshal([]byte(value), &list); err != nil { diff --git a/internal/plugins/inputs/ddtrace/env_test.go b/internal/plugins/inputs/ddtrace/env_test.go index a939a1de15..7457813ef5 100644 --- a/internal/plugins/inputs/ddtrace/env_test.go +++ b/internal/plugins/inputs/ddtrace/env_test.go @@ -48,6 +48,8 @@ func TestReadEnv(t *testing.T) { "ENV_INPUT_DDTRACE_TAGS": `{"k1":"v1", "k2":"v2", "k3":"v3"}`, "ENV_INPUT_DDTRACE_THREADS": `{"buffer":1000, "threads":100}`, "ENV_INPUT_DDTRACE_STORAGE": `{"storage":"./ddtrace_storage", "capacity": 5120}`, + "ENV_INPUT_DDTRACE_MAX_SPANS": `10`, + "ENV_INPUT_DDTRACE_MAX_BODY_MB": `11`, }, expected: &Input{ Endpoints: []string{"/v0.3/traces", "/v0.4/traces", "/v0.5/traces"}, @@ -60,6 +62,8 @@ func TestReadEnv(t *testing.T) { Tags: map[string]string{"k1": "v1", "k2": "v2", "k3": "v3"}, WPConfig: &workerpool.WorkerPoolConfig{Buffer: 1000, Threads: 100}, LocalCacheConfig: &storage.StorageConfig{Path: "./ddtrace_storage", Capacity: 5120}, + TraceMaxSpans: 10, + MaxTraceBodyMB: 11, }, }, } diff --git a/internal/plugins/inputs/ddtrace/input.go b/internal/plugins/inputs/ddtrace/input.go index e6b9675ef6..54c9ac20aa 100644 --- a/internal/plugins/inputs/ddtrace/input.go +++ b/internal/plugins/inputs/ddtrace/input.go @@ -66,40 +66,46 @@ const ( ## delete trace message # del_message = true + ## max spans limit on each trace. default 100000 or set to -1 to remove this limit. + # trace_max_spans = 100000 + + ## max trace body(Content-Length) limit. default 32MiB or set to -1 to remove this limit. + # max_trace_body_mb = 32 + ## Ignore tracing resources map like service:[resources...]. ## The service name is the full service name in current application. ## The resource list is regular expressions uses to block resource names. ## If you want to block some resources universally under all services, you can set the ## service name as "*". Note: double quotes "" cannot be omitted. # [inputs.ddtrace.close_resource] - # service1 = ["resource1", "resource2", ...] - # service2 = ["resource1", "resource2", ...] - # "*" = ["close_resource_under_all_services"] - # ... + # service1 = ["resource1", "resource2", ...] + # service2 = ["resource1", "resource2", ...] + # "*" = ["close_resource_under_all_services"] + # ... ## Sampler config uses to set global sampling strategy. ## sampling_rate used to set global sampling rate. # [inputs.ddtrace.sampler] - # sampling_rate = 1.0 + # sampling_rate = 1.0 # [inputs.ddtrace.tags] - # key1 = "value1" - # key2 = "value2" - # ... + # key1 = "value1" + # key2 = "value2" + # ... ## Threads config controls how many goroutines an agent cloud start to handle HTTP request. ## buffer is the size of jobs' buffering of worker channel. ## threads is the total number fo goroutines at running time. # [inputs.ddtrace.threads] - # buffer = 100 - # threads = 8 + # buffer = 100 + # threads = 8 ## Storage config a local storage space in hard dirver to cache trace data. ## path is the local file path used to cache data. ## capacity is total space size(MB) used to store data. # [inputs.ddtrace.storage] - # path = "./ddtrace_storage" - # capacity = 5120 + # path = "./ddtrace_storage" + # capacity = 5120 ` ) @@ -108,12 +114,15 @@ var ( v1, v2, v3, v4, v5 = "/v0.1/spans", "/v0.2/traces", "/v0.3/traces", "/v0.4/traces", "/v0.5/traces" stats, apmTelemetry = "/v0.6/stats", "/telemetry/proxy/api/v2/apmtelemetry" afterGatherRun itrace.AfterGatherHandler - tags map[string]string + inputTags map[string]string wkpool *workerpool.WorkerPool localCache *storage.Storage traceBase = 10 spanBase = 10 delMessage bool + traceMaxSpans = 100000 + maxTraceBody = int64(32 * (1 << 20)) + noStreaming = false ) type Input struct { @@ -135,6 +144,11 @@ type Input struct { WPConfig *workerpool.WorkerPoolConfig `toml:"threads"` LocalCacheConfig *storage.StorageConfig `toml:"storage"` + TraceMaxSpans int `toml:"trace_max_spans"` + MaxTraceBodyMB int64 `toml:"max_trace_body_mb"` + + NoStreaming bool `toml:"no_streaming,omitempty"` + feeder dkio.Feeder semStop *cliutils.Sem // start stop signal Tagger datakit.GlobalTagger @@ -154,7 +168,7 @@ func (*Input) SampleMeasurement() []inputs.Measurement { func (ipt *Input) RegHTTPHandler() { log = logger.SLogger(inputName) log.Infof("DdTrace start init and register HTTP. Input=%s", ipt.string()) - tags = ipt.Tags + inputTags = ipt.Tags if ipt.CompatibleOTEL { spanBase = 16 } @@ -166,6 +180,16 @@ func (ipt *Input) RegHTTPHandler() { if len(ipt.CustomerTags) != 0 { setCustomTags(ipt.CustomerTags) } + + if ipt.TraceMaxSpans != 0 { + traceMaxSpans = ipt.TraceMaxSpans + } + + if ipt.MaxTraceBodyMB != 0 { + maxTraceBody = ipt.MaxTraceBodyMB * (1 << 20) + } + + noStreaming = ipt.NoStreaming delMessage = ipt.DelMessage traceOpts = append(point.CommonLoggingOptions(), point.WithExtraTags(ipt.Tagger.HostTags())) @@ -344,9 +368,10 @@ func (ipt *Input) string() string { func defaultInput() *Input { return &Input{ - feeder: dkio.DefaultFeeder(), - semStop: cliutils.NewSem(), - Tagger: datakit.DefaultGlobalTagger(), + feeder: dkio.DefaultFeeder(), + semStop: cliutils.NewSem(), + Tagger: datakit.DefaultGlobalTagger(), + TraceMaxSpans: traceMaxSpans, } } diff --git a/internal/plugins/inputs/ddtrace/metrics.go b/internal/plugins/inputs/ddtrace/metrics.go new file mode 100644 index 0000000000..18c1c779cf --- /dev/null +++ b/internal/plugins/inputs/ddtrace/metrics.go @@ -0,0 +1,65 @@ +// Unless explicitly stated otherwise all files in this repository are licensed +// under the MIT License. +// This product includes software developed at Guance Cloud (https://www.guance.com/). +// Copyright 2021-present Guance, Inc. + +package ddtrace + +import ( + "github.com/GuanceCloud/cliutils/metrics" + p8s "github.com/prometheus/client_golang/prometheus" +) + +var ( + droppedTraces, + truncatedTraceSpans *p8s.CounterVec + traceSpans *p8s.SummaryVec +) + +func metricsSetup() { + truncatedTraceSpans = p8s.NewCounterVec( + p8s.CounterOpts{ + Namespace: "datakit", + Subsystem: "input_ddtrace", + Name: "truncated_spans_total", + Help: "Truncated trace spans", + }, + []string{"input"}, + ) + + droppedTraces = p8s.NewCounterVec( + p8s.CounterOpts{ + Namespace: "datakit", + Subsystem: "input_ddtrace", + Name: "dropped_trace_total", + Help: "Dropped illegal traces", + }, + []string{"url"}, + ) + + traceSpans = p8s.NewSummaryVec( + p8s.SummaryOpts{ + Namespace: "datakit", + Subsystem: "input_ddtrace", + Name: "trace_spans", + Help: "Trace spans(include truncated spans)", + Objectives: map[float64]float64{ + 0.5: 0.05, + 0.9: 0.01, + 0.99: 0.001, + }, + }, + []string{"input"}, + ) +} + +// nolint: gochecknoinits +func init() { + metricsSetup() + + metrics.MustRegister( + truncatedTraceSpans, + droppedTraces, + traceSpans, + ) +} diff --git a/internal/plugins/inputs/sqlserver/input.go b/internal/plugins/inputs/sqlserver/input.go index f139df7023..4a8a051d1b 100644 --- a/internal/plugins/inputs/sqlserver/input.go +++ b/internal/plugins/inputs/sqlserver/input.go @@ -213,8 +213,8 @@ func (ipt *Input) getPerformanceCounters() { if key == "cntr_value" { // the raw value is a number and the key is cntr_value, store in fields if v, err := strconv.ParseFloat(raw, 64); err == nil { - if v > math.MaxInt64 { - l.Warnf("%s exceed maxint64: %d > %d, ignored", key, v, math.MaxInt64) + if v > float64(math.MaxInt64) { + l.Warnf("%s exceed maxint64: %d > %d, ignored", key, v, int64(math.MaxInt64)) continue } // store the counter_name and cntr_value as fields diff --git a/internal/trace/aftergather.go b/internal/trace/aftergather.go index 07e1b84ab9..06ffa07928 100644 --- a/internal/trace/aftergather.go +++ b/internal/trace/aftergather.go @@ -7,7 +7,6 @@ package trace import ( - "errors" "strings" "sync" "time" @@ -74,6 +73,18 @@ func (aga *AfterGather) AppendFilter(filter ...FilterFunc) { aga.filters = append(aga.filters, filter...) } +func (aga *AfterGather) doFeed(iname string, dktrace DatakitTrace) { + var pts []*point.Point + for _, span := range dktrace { + span.Point.AddTag(TagDKFingerprintKey, datakit.DatakitHostName) + pts = append(pts, span.Point) + } + + if err := aga.feeder.FeedV2(point.Tracing, pts, dkio.WithInputName(iname)); err != nil { + aga.log.Warnf("feed %d points failed: %s, ignored", len(pts), err.Error()) + } +} + func (aga *AfterGather) Run(inputName string, dktraces DatakitTraces) { if len(dktraces) == 0 { aga.log.Debug("empty dktraces") @@ -86,51 +97,31 @@ func (aga *AfterGather) Run(inputName string, dktraces DatakitTraces) { afterFilters = dktraces } else { afterFilters = make(DatakitTraces, 0, len(dktraces)) + for k := range dktraces { aga.log.Debugf("len = %d spans", len(dktraces[k])) serviceName := dktraces[k][0].GetTag(TagService) + TracingProcessCount.WithLabelValues(inputName, serviceName).Add(1) - var temp DatakitTrace + + var singleTrace DatakitTrace for i := range aga.filters { var skip bool - if temp, skip = aga.filters[i](aga.log, dktraces[k]); skip { + if singleTrace, skip = aga.filters[i](aga.log, dktraces[k]); skip { tracingSamplerCount.WithLabelValues(inputName, serviceName).Add(1) - break + + break // skip current trace } } - if temp != nil { - afterFilters = append(afterFilters, temp) + + if singleTrace != nil { + afterFilters = append(afterFilters, singleTrace) } } } - if len(afterFilters) == 0 { - return - } - pts := make([]*point.Point, 0) - for _, filter := range afterFilters { - for _, span := range filter { - span.Point.AddTag(TagDKFingerprintKey, datakit.DatakitHostName) - pts = append(pts, span.Point) - } - } - if len(pts) != 0 { - var ( - start = time.Now() - err error - ) - IO_FEED_RETRY: - if err = aga.feeder.FeedV2(point.Tracing, pts, - dkio.WithCollectCost(time.Since(start)), - dkio.WithInputName(inputName)); err != nil { - if aga.retry > 0 && errors.Is(err, dkio.ErrIOBusy) { - time.Sleep(aga.retry) - goto IO_FEED_RETRY - } - } else { - aga.log.Debugf("### send %d points cost %dms", len(pts), time.Since(start)/time.Millisecond) - } - } else { - aga.log.Debug("BuildPointsBatch return empty points array") + + for _, trace := range afterFilters { + aga.doFeed(inputName, trace) } } diff --git a/internal/trace/trace.go b/internal/trace/trace.go index d84acd5a52..e64122cf7b 100644 --- a/internal/trace/trace.go +++ b/internal/trace/trace.go @@ -191,7 +191,13 @@ func GetSpanSourceType(app string) string { } } -func FindSpanTypeInMultiServersIntSpanID(spanID, parentID uint64, service string, spanIDs map[uint64]string, parentIDs map[uint64]bool) string { +func FindSpanTypeInMultiServersIntSpanID( + spanID, + parentID uint64, + service string, + spanIDs map[uint64]string, + parentIDs map[uint64]bool, +) string { if parentID != 0 { if ss, ok := spanIDs[parentID]; ok { if service != ss { diff --git a/scripts/Makefile b/scripts/Makefile index f01c707439..5d42ccc35d 100644 --- a/scripts/Makefile +++ b/scripts/Makefile @@ -2,7 +2,11 @@ all: GOOS=linux GOARCH=amd64 go build -o v1-write-linux-amd64.out v1-write.go + GOOS=darwin GOARCH=arm64 go build -o v1-write-darwin-arm64.out v1-write.go + GOOS=windows GOARCH=amd64 go build -o v1-write-windows-amd64.ext v1-write.go GOOS=linux GOARCH=amd64 go build -o v1-data-linux-amd64.out v1-data.go + GOOS=darwin GOARCH=arm64 go build -o v1-data-darwin-arm64.out v1-data.go + GOOS=windows GOARCH=amd64 go build -o v1-data-windows-amd64.ext v1-data.go #sudo docker buildx build --platform linux/arm64,linux/amd64 -t registry.jiagouyun.com/datakit-tools/v1-write:v0.01 -f v1-write.dockerfile . #sudo docker build --platform linux/arm64 -t registry.jiagouyun.com/datakit-tools/v1-write:v0.0.1 -f v1-write.dockerfile . diff --git a/scripts/v1-data.go b/scripts/v1-data.go index 3c02c36254..17c7af760f 100644 --- a/scripts/v1-data.go +++ b/scripts/v1-data.go @@ -8,9 +8,12 @@ package main import ( "flag" "fmt" + "math/rand" "os" + "time" "github.com/GuanceCloud/cliutils" + "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/plugins/inputs/ddtrace" ) var ( @@ -18,6 +21,12 @@ var ( flagCount = flag.Int("count", 3, "generated data count") flagP8s = flag.Bool("p8s", false, "generate promethues metric text") flagFile = flag.String("output", "v1.data", "data output to file") + + flagDDTrace = flag.Bool("ddtrace", false, "generate ddtrace msgpack payload file") + flagDDTraceTrace = flag.Uint64("ddtrace-trace", 100, "generate N ddtrace trace") + flagDDTraceSpan = flag.Uint64("ddtrace-span", 100, "generate N ddtrace dd-span within each trace") + flagDDTraceRandSpan = flag.Uint64("ddtrace-rand-span", 100, "generate 0~N dd-spans within each trace ") + flagDDTraceFile = flag.String("ddtrace-msg-file", "ddtrace.msgp", "output file of DDTrace msgpack raw data") ) func genLargeLog() { @@ -120,6 +129,67 @@ go_gc_heap_allocs_by_size_bytes_count{tag1="%05d",tag2="%05d"} 4.35160484e+08 } } +var ( + meta = map[string]string{ + "_dd.p.dm": "-1", + "_dd.p.tid": "671f024100000000", + "_dd.tracer_hostname": "kodo-servicemap-2.kodo-servicemap.forethought-kodo.svc.cluster.local", + "language": "go", + } + metrics = map[string]float64{ + "_dd.profiling.enabled": 0, + "_dd.trace_span_attribute_schema": 0, + "_dd.top_level": 1, + "_sampling_priority_v1": 1, + "_dd.agent_psr": 1, + "process_id": 1, + } +) + +func getTrace(nspan uint64) (trace ddtrace.DDTrace) { + tid := rand.Uint64() + pid := rand.Uint64() + start := time.Now().UnixMicro() + for i := uint64(0); i < nspan; i++ { + trace = append(trace, &ddtrace.DDSpan{ + Service: "v1-data", + Name: fmt.Sprintf("span-%d", i), + Resource: "v1-data-resource", + TraceID: tid, + SpanID: rand.Uint64(), + ParentID: pid, + Start: start + 1, + Duration: 1, + Meta: meta, + Metrics: metrics, + Type: "not-set", + }) + } + return +} + +func genLargeDDTrace() { + ntrace, nspan, randSpan := *flagDDTraceTrace, *flagDDTraceSpan, *flagDDTraceRandSpan + rand.Seed(time.Now().UnixNano()) + var traces ddtrace.DDTraces + for i := uint64(0); i < ntrace; i++ { + if randSpan > 0 { + traces = append(traces, getTrace(rand.Uint64()%randSpan)) + } else { + traces = append(traces, getTrace(uint64(nspan))) + } + } + + buf, err := traces.MarshalMsg(nil) + if err != nil { + panic(fmt.Sprintf("error: %s", err.Error())) + } + + if err := os.WriteFile(*flagDDTraceFile, buf, 0600); err != nil { + panic(fmt.Sprintf("error: %s", err.Error())) + } +} + // nolint: typecheck func main() { flag.Parse() @@ -127,5 +197,11 @@ func main() { genLargeP8sMetric() return } + + if *flagDDTrace { + genLargeDDTrace() + return + } + genLargeLog() } diff --git a/scripts/v1-write.go b/scripts/v1-write.go index c9c6a8b8d1..b3e720a9ec 100644 --- a/scripts/v1-write.go +++ b/scripts/v1-write.go @@ -12,9 +12,9 @@ import ( "log" "net/http" "os" + "strconv" "strings" "sync/atomic" - "syscall" "time" uhttp "github.com/GuanceCloud/cliutils/network/http" @@ -28,10 +28,11 @@ var ( flagGinLog = flag.Bool("gin-log", false, "enable or disable gin log") flagMaxBody = flag.Int("max-body", 0, "set max body size(kb)") flagDecode = flag.Bool("decode", false, "try decode request") + flagHeader = flag.Bool("header", false, "show HTTP request headers") flag5XXRatio = flag.Int("5xx-ratio", 0, "fail request ratio(minimal is 1/1000)") flagLatency = flag.Duration("latency", time.Millisecond*10, "latency used on API cost") - MPts, LPts, TPts, totalReq, req5xx atomic.Int64 + MPts, LPts, TPts, totalReq, req5xx, decErr, decErrPts atomic.Int64 ) func benchHTTPServer() { @@ -53,10 +54,19 @@ func benchHTTPServer() { c.Data(http.StatusOK, "application/json", []byte(`{}`)) }) + go func() { + showTicker := time.NewTicker(time.Second * 3) + defer showTicker.Stop() + for { + select { + case <-showTicker.C: + showInfo() + } + } + }() + router.POST("/v1/write/:category", func(c *gin.Context) { - log.Printf("************************************************") - if *flagLatency > 0 { time.Sleep(*flagLatency) } @@ -79,32 +89,35 @@ func benchHTTPServer() { } var ( - start = time.Now() - encoding point.Encoding - dec *point.Decoder - headerArr []string + //start = time.Now() + encoding point.Encoding + dec *point.Decoder ) if body, err := io.ReadAll(c.Request.Body); err != nil { c.Status(http.StatusInternalServerError) return } else { - elapsed := time.Since(start) - if len(body) > 0 { - log.Printf("copy elapsed %s, bandwidth %fKB/S", elapsed, float64(len(body))/(float64(elapsed)/float64(time.Second))/1024.0) + //elapsed := time.Since(start) + //if len(body) > 0 { + // log.Printf("copy elapsed %s, bandwidth %fKB/S", elapsed, float64(len(body))/(float64(elapsed)/float64(time.Second))/1024.0) + //} + + var headerPts int64 + if x := c.Request.Header.Get("X-Points"); x != "" { + if n, err := strconv.ParseInt(x, 10, 64); err == nil { + headerPts = n + } } if !*flagDecode { goto end } - for k, _ := range c.Request.Header { - headerArr = append(headerArr, fmt.Sprintf("%s: %s", k, c.Request.Header.Get(k))) + if *flagHeader { + showHeaders(c) } - log.Printf("URL: %s", c.Request.URL) - log.Printf("headers:\n%s", strings.Join(headerArr, "\n")) - if c.Request.Header.Get("Content-Encoding") == "gzip" { unzipbody, err := uhttp.Unzip(body) if err != nil { @@ -114,7 +127,7 @@ func benchHTTPServer() { return } - log.Printf("[INFO] unzip body: %d => %d(%.4f)", len(body), len(unzipbody), float64(len(body))/float64(len(unzipbody))) + //log.Printf("[INFO] unzip body: %d => %d(%.4f)", len(body), len(unzipbody), float64(len(body))/float64(len(unzipbody))) body = unzipbody } @@ -137,6 +150,10 @@ func benchHTTPServer() { if dec != nil { if pts, err := dec.Decode(body); err != nil { log.Printf("[ERROR] decode on %s error: %s", encoding, err) + decErr.Add(1) + decErrPts.Add(headerPts) + showHeaders(c) + log.Printf("body: %s", string(body[32])) } else { nwarns := 0 for _, pt := range pts { @@ -144,7 +161,7 @@ func benchHTTPServer() { nwarns++ } - log.Println(pt.LineProto()) + //log.Println(pt.LineProto()) } cat := point.CatURL(c.Request.URL.Path) @@ -158,10 +175,10 @@ func benchHTTPServer() { TPts.Add(int64(len(pts))) } - log.Printf("[INFO] decode %d points, %d with warnnings", len(pts), nwarns) + if nwarns > 0 { + log.Printf("[WARN] decode %d points, %d with warnnings", len(pts), nwarns) + } } - - showInfo() } end: @@ -183,13 +200,15 @@ func showInfo() { //log.Printf("total M/%s, L/%s, req/%d, 5xx/%d", //humanize.SI(float64(MPts.Load()), ""), //humanize.SI(float64(LPts.Load()), ""), - log.Printf("total M/%d, L/%d, T/%d req/%d, 5xx/%d, 5xx ratio: %d/1000", + log.Printf("total M/%d, L/%d, T/%d req/%d, 5xx/%d, 5xx ratio: %d/1000, decErr: %d, decErrPts: %d", MPts.Load(), LPts.Load(), TPts.Load(), totalReq.Load(), req5xx.Load(), *flag5XXRatio, + decErr.Load(), + decErrPts.Load(), ) } @@ -199,28 +218,39 @@ func showENVs() { } } +func showHeaders(c *gin.Context) { + var headerArr []string + for k, _ := range c.Request.Header { + headerArr = append(headerArr, fmt.Sprintf("%s: %s", k, c.Request.Header.Get(k))) + } + + log.Println("-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=") + log.Printf("URL: %s", c.Request.URL) + log.Printf("headers:\n%s", strings.Join(headerArr, "\n")) +} + // nolint: typecheck func main() { log.SetFlags(log.LstdFlags | log.Lshortfile) showENVs() - var rLimit syscall.Rlimit - err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit) - if err != nil { - panic(fmt.Sprintf("Error Getting Rlimit: %s", err)) - } - - fmt.Println(rLimit) - rLimit.Max = 10240 - rLimit.Cur = 10240 - err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit) - if err != nil { - panic(fmt.Sprintf("Error Setting Rlimit: %s", err)) - } - err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit) - if err != nil { - panic(fmt.Sprintf("Error Getting Rlimit: %s", err)) - } + //var rLimit syscall.Rlimit + //err := syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit) + //if err != nil { + // panic(fmt.Sprintf("Error Getting Rlimit: %s", err)) + //} + + //fmt.Println(rLimit) + //rLimit.Max = 10240 + //rLimit.Cur = 10240 + //err = syscall.Setrlimit(syscall.RLIMIT_NOFILE, &rLimit) + //if err != nil { + // panic(fmt.Sprintf("Error Setting Rlimit: %s", err)) + //} + //err = syscall.Getrlimit(syscall.RLIMIT_NOFILE, &rLimit) + //if err != nil { + // panic(fmt.Sprintf("Error Getting Rlimit: %s", err)) + //} flag.Parse() benchHTTPServer()