Skip to content

Commit

Permalink
add lock
Browse files Browse the repository at this point in the history
  • Loading branch information
宋龙奇 authored and 谭彪 committed Sep 4, 2024
1 parent e4b5e9f commit 8c2431a
Showing 1 changed file with 12 additions and 3 deletions.
15 changes: 12 additions & 3 deletions internal/plugins/inputs/ddtrace/api_telemetry.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
)

type Telemetry struct {
lock sync.RWMutex
host Host
application Application
traceTime time.Time
Expand All @@ -32,12 +33,20 @@ type Telemetry struct {
}

func (ob *Telemetry) toPoint() *point.Point {
ob.lock.RLock()
defer ob.lock.RUnlock()
opts := point.DefaultObjectOptions()
opts = append(opts, point.WithTime(ob.traceTime))
kvs := append(point.NewTags(ob.tags), point.NewKVs(ob.fields)...)
return point.NewPointV2("tracing_service", kvs, opts...)
}

func (ob *Telemetry) setField(key string, val interface{}) {
ob.lock.Lock()
defer ob.lock.Unlock()
ob.fields[key] = val
}

func (ob *Telemetry) Info() *inputs.MeasurementInfo {
return &inputs.MeasurementInfo{
Name: "DdTrace APM Telemetry",
Expand Down Expand Up @@ -130,7 +139,7 @@ func (ob *Telemetry) parseEvent(requestType RequestType, payload interface{}) {
log.Errorf("err=%v", err)
return
}
ob.fields[string(requestType)] = string(bts)
ob.setField(string(requestType), string(bts))
ob.change = true
case RequestTypeAppHeartbeat,
RequestTypeDistributions:
Expand Down Expand Up @@ -158,12 +167,12 @@ func (ob *Telemetry) parseEvent(requestType RequestType, payload interface{}) {
for _, points := range series.Points {
m += points[1]
}
ob.fields[series.Metric+seriesTag] = m
ob.setField(series.Metric+seriesTag, m)
}
}
ob.change = true
case RequestTypeAppClosing:
ob.fields[string(requestType)] = "service closing"
ob.setField(string(requestType), "service closing")
ob.change = true
case RequestTypeMessageBatch:
bts, err := json.Marshal(payload)
Expand Down

0 comments on commit 8c2431a

Please sign in to comment.