Skip to content

Commit

Permalink
Iss 2375 jaeger binary
Browse files Browse the repository at this point in the history
  • Loading branch information
宋龙奇 authored and 谭彪 committed Sep 4, 2024
1 parent e4b5e9f commit 7ec9480
Show file tree
Hide file tree
Showing 6 changed files with 310 additions and 34 deletions.
3 changes: 3 additions & 0 deletions internal/export/doc/en/inputs/jaeger.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ The Jaeger Agent embedded in Datakit is used to receive, calculate and analyze J

<!-- markdownlint-enable -->

When using UDP protocol, pay attention to the data format in the protocol. By default, the protocol used for port 6831 is `Thrift CompactProtocol` format, while the protocol used for port 6832 is `Thrift Binary Protocol`.
Jaeger uses the protocol from port 6831 by default.

### Configure Jaeger HTTP Agent {#config-http-agent}

endpoint represents Jaeger HTTP Agent routing
Expand Down
3 changes: 3 additions & 0 deletions internal/export/doc/zh/inputs/jaeger.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ Datakit 内嵌的 Jaeger Agent 用于接收,运算,分析 Jaeger Tracing 协

<!-- markdownlint-enable -->

在使用 UDP 协议的时候,注意协议中的数据格式,默认情况下使用 6831 端口使用的是 `thrift CompactProtocol` 格式,使用 6832 端口时的协议为 `thrift BinaryProtocol`
Jaeger 默认情况下使用的是 6831 端口中的协议,所以 当您不使用 6832 端口时,请不要打开注释。

### 配置 Jaeger HTTP Agent {#config-http-agent}

endpoint 代表 Jaeger HTTP Agent 路由
Expand Down
40 changes: 24 additions & 16 deletions internal/plugins/inputs/jaeger/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ package jaeger

import (
"bytes"
"context"
"io"
"net"
"net/http"
"net/url"
"regexp"
"time"

"gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/goroutine"

"github.com/GuanceCloud/cliutils"
"github.com/GuanceCloud/cliutils/logger"
"github.com/GuanceCloud/cliutils/point"
Expand Down Expand Up @@ -43,6 +45,7 @@ const (
# Jaeger agent host:port address for UDP transport.
# address = "127.0.0.1:6831"
# binary_address = "127.0.0.1:6832"
## ignore_tags will work as a blacklist to prevent tags send to data center.
## Every value in this list is a valid string of regular expression.
Expand Down Expand Up @@ -111,6 +114,7 @@ type Input struct {
CustomerTags []string `toml:"customer_tags"` // deprecated
Endpoint string `toml:"endpoint"`
Address string `toml:"address"`
BinaryAddress string `toml:"binary_address"`
IgnoreTags []string `toml:"ignore_tags"`
DelMessage bool `toml:"del_message"`
KeepRareResource bool `toml:"keep_rare_resource"`
Expand All @@ -120,10 +124,9 @@ type Input struct {
WPConfig *workerpool.WorkerPoolConfig `toml:"threads"`
LocalCacheConfig *storage.StorageConfig `toml:"storage"`

feeder dkio.Feeder
semStop *cliutils.Sem // start stop signal
Tagger datakit.GlobalTagger
udpListener *net.UDPConn
feeder dkio.Feeder
semStop *cliutils.Sem // start stop signal
Tagger datakit.GlobalTagger
}

func (*Input) Catalog() string { return inputName }
Expand Down Expand Up @@ -246,10 +249,22 @@ func (ipt *Input) Run() {
delMessage = ipt.DelMessage
if ipt.Address != "" {
log.Debugf("### %s UDP agent is starting...", inputName)
// itrace.StartTracingStatistic()
if err := StartUDPAgent(ipt.udpListener, ipt.Address, ipt.semStop); err != nil {
log.Errorf("### start %s UDP agent failed: %s", inputName, err.Error())
}
g := goroutine.NewGroup(goroutine.Option{Name: inputName})
g.Go(func(ctx context.Context) error {
if err := StartUDPAgent(CompactProtocol, ipt.Address, ipt.semStop); err != nil {
log.Errorf("### start %s UDP agent failed: %s", inputName, err.Error())
}
return nil
})
}
if ipt.BinaryAddress != "" {
g := goroutine.NewGroup(goroutine.Option{Name: inputName})
g.Go(func(ctx context.Context) error {
if err := StartUDPAgent(BinaryProtocol, ipt.BinaryAddress, ipt.semStop); err != nil {
log.Errorf("### start %s UDP agent failed: %s", inputName, err.Error())
}
return nil
})
}

log.Debugf("### %s agent is running...", inputName)
Expand Down Expand Up @@ -277,13 +292,6 @@ func (ipt *Input) exit() {
}
log.Debug("### storage closed")
}

if ipt.udpListener != nil {
if err := ipt.udpListener.Close(); err != nil {
log.Errorf("UDP close error: %v", err)
}
ipt.udpListener = nil
}
}

func (ipt *Input) Terminate() {
Expand Down
3 changes: 1 addition & 2 deletions internal/plugins/inputs/jaeger/integrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -338,11 +338,10 @@ func (cs *caseSpec) run() error {
}()

case AGENT_UDP:
conn, randPort, err := testutils.RandPortUDP()
_, randPort, err := testutils.RandPortUDP()
require.NoError(cs.t, err)
randPortStr = fmt.Sprintf("%d", randPort)
cs.ipt.Address = extIP + ":" + randPortStr
cs.ipt.udpListener = conn
}

shutdownFunc := func() {
Expand Down
37 changes: 21 additions & 16 deletions internal/plugins/inputs/jaeger/jaeger_udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@ import (
itrace "gitlab.jiagouyun.com/cloudcare-tools/datakit/internal/trace"
)

func StartUDPAgent(udpConn *net.UDPConn, addr string, semStop *cliutils.Sem) error {
if udpConn == nil {
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return err
}
const (
BinaryProtocol = "binary"
CompactProtocol = "compact"
)

udpConn, err = net.ListenUDP("udp", udpAddr)
if err != nil {
return err
}
func StartUDPAgent(protocol string, addr string, semStop *cliutils.Sem) error {
udpAddr, err := net.ResolveUDPAddr("udp", addr)
if err != nil {
return err
}

udpConn, err := net.ListenUDP("udp", udpAddr)
if err != nil {
return err
}

log.Debugf("%s(UDP): listen on path: %s", inputName, addr)
Expand Down Expand Up @@ -68,7 +71,7 @@ func StartUDPAgent(udpConn *net.UDPConn, addr string, semStop *cliutils.Sem) err
log.Debugf("### read from udp server:%s %d bytes", addr, n)

param := &itrace.TraceParameters{Body: bytes.NewBuffer(buf[:n])}
if err = parseJaegerTraceUDP(param); err != nil {
if err = parseJaegerTraceUDP(protocol, param); err != nil {
log.Errorf("### parse jaeger trace from UDP failed: %s", err.Error())
}
}
Expand All @@ -80,17 +83,19 @@ func udpExit(udpConn *net.UDPConn) {
}
}

func parseJaegerTraceUDP(param *itrace.TraceParameters) error {
func parseJaegerTraceUDP(protocol string, param *itrace.TraceParameters) error {
tmbuf := thrift.NewTMemoryBufferLen(param.Body.Len())
_, err := tmbuf.Write(param.Body.Bytes())
if err != nil {
return err
}

var (
tprot = thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{}).GetProtocol(tmbuf)
ctx = context.Background()
)
tprot := thrift.NewTCompactProtocolFactoryConf(&thrift.TConfiguration{}).GetProtocol(tmbuf)
if protocol == "binary" {
tprot = thrift.NewTBinaryProtocolFactoryConf(&thrift.TConfiguration{}).GetProtocol(tmbuf)
}
ctx := context.Background()

if _, _, _, err = tprot.ReadMessageBegin(ctx); err != nil { //nolint:dogsled
return err
}
Expand Down
Loading

0 comments on commit 7ec9480

Please sign in to comment.