From 9b2f532cd62f4752b89782073f2f3466d691cc95 Mon Sep 17 00:00:00 2001 From: Tolya Korniltsev Date: Fri, 15 Dec 2023 17:51:18 +0700 Subject: [PATCH] fix --- component/pyroscope/java/java.go | 41 ++- component/pyroscope/java/jfr/jfr.go | 263 ++++++++++++++++++ .../pyroscope/java/jfr/profile_builder.go | 247 ++++++++++++++++ component/pyroscope/java/jfr/symbols.go | 41 +++ component/pyroscope/java/process.go | 90 ++++-- 5 files changed, 661 insertions(+), 21 deletions(-) create mode 100644 component/pyroscope/java/jfr/jfr.go create mode 100644 component/pyroscope/java/jfr/profile_builder.go create mode 100644 component/pyroscope/java/jfr/symbols.go diff --git a/component/pyroscope/java/java.go b/component/pyroscope/java/java.go index 66608f0223a5..6fb0ae03740a 100644 --- a/component/pyroscope/java/java.go +++ b/component/pyroscope/java/java.go @@ -28,10 +28,11 @@ func init() { return nil, fmt.Errorf("extract async profiler: %w", err) } a := args.(Arguments) - flowAppendable := pyroscope.NewFanout(a.ForwardTo, opts.ID, opts.Registerer) + forwardTo := pyroscope.NewFanout(a.ForwardTo, opts.ID, opts.Registerer) c := &javaComponent{ opts: opts, - forwardTo: flowAppendable, + args: a, + forwardTo: forwardTo, } c.updateTargets(a.Targets) return c, nil @@ -43,11 +44,38 @@ type Arguments struct { Targets []discovery.Target `river:"targets,attr"` ForwardTo []pyroscope.Appendable `river:"forward_to,attr"` - Interval time.Duration `river:"interval,attr,optional"` + ProfilingConfig ProfilingConfig `river:"profiling_config,block,optional"` +} + +type ProfilingConfig struct { + Interval time.Duration `river:"interval,attr,optional"` + SampleRate int `river:"sample_rate,attr,optional"` + Alloc string `river:"alloc,attr,optional"` + Lock string `river:"lock,attr,optional"` + CPU bool `river:"cpu,attr,optional"` +} + +func (rc *Arguments) UnmarshalRiver(f func(interface{}) error) error { + *rc = defaultArguments() + type config Arguments + return f((*config)(rc)) +} + +func defaultArguments() Arguments { + return Arguments{ + ProfilingConfig: ProfilingConfig{ + Interval: 15 * time.Second, + SampleRate: 100, + Alloc: "", + Lock: "", + CPU: true, + }, + } } type javaComponent struct { opts component.Options + args Arguments forwardTo *pyroscope.Fanout mutex sync.Mutex @@ -82,15 +110,17 @@ func (j *javaComponent) updateTargets(targets []discovery.Target) { active := make(map[int]struct{}) for _, target := range targets { - fmt.Printf("target: %v\n", target) pid, err := strconv.Atoi(target[labelProcessID]) + _ = level.Debug(j.opts.Logger).Log("msg", "active target", + "target", fmt.Sprintf("%+v", target), + "pid", pid) if err != nil { _ = level.Error(j.opts.Logger).Log("msg", "invalid target", "target", fmt.Sprintf("%v", target), "err", err) continue } proc := j.pid2process[pid] if proc == nil { - proc = newProcess(pid, target, j.opts.Logger, j.forwardTo) + proc = newProcess(pid, target, j.opts.Logger, j.forwardTo, j.args.ProfilingConfig) } else { proc.update(target) } @@ -100,6 +130,7 @@ func (j *javaComponent) updateTargets(targets []discovery.Target) { if _, ok := active[pid]; ok { continue } + _ = level.Debug(j.opts.Logger).Log("msg", "inactive target", "pid", pid) j.pid2process[pid].Close() delete(j.pid2process, pid) } diff --git a/component/pyroscope/java/jfr/jfr.go b/component/pyroscope/java/jfr/jfr.go new file mode 100644 index 000000000000..b8f751319a9c --- /dev/null +++ b/component/pyroscope/java/jfr/jfr.go @@ -0,0 +1,263 @@ +package jfr + +import ( + //"github.com/grafana/pyroscope/pkg/distributor/model" + + "fmt" + "io" + "time" + + "github.com/grafana/agent/component/discovery" + "github.com/grafana/agent/component/pyroscope" + jfrparser "github.com/grafana/jfr-parser/parser" + "github.com/grafana/jfr-parser/parser/types" + "github.com/prometheus/prometheus/model/labels" +) + +const ( + sampleTypeCPU = 0 + sampleTypeWall = 1 + + sampleTypeInTLAB = 2 + + sampleTypeOutTLAB = 3 + + sampleTypeLock = 4 + + sampleTypeThreadPark = 5 + + sampleTypeLiveObject = 6 +) + +// labels labels.Labels, samples []*RawSample +type PushRequest struct { + Labels labels.Labels + Samples []*pyroscope.RawSample +} + +func ParseJFR(body []byte, metadata Metadata) (requests []PushRequest, err error) { + defer func() { + if r := recover(); r != nil { + err = fmt.Errorf("jfr parser panic: %v", r) + } + }() + parser := jfrparser.NewParser(body, jfrparser.Options{ + SymbolProcessor: processSymbols, + }) + + var event string + + builders := newJfrPprofBuilders(parser, metadata) + + var values = [2]int64{1, 0} + + for { + typ, err := parser.ParseEvent() + if err != nil { + if err == io.EOF { + break + } + return nil, fmt.Errorf("jfr parser ParseEvent error: %w", err) + } + + switch typ { + case parser.TypeMap.T_EXECUTION_SAMPLE: + ts := parser.GetThreadState(parser.ExecutionSample.State) + if ts != nil && ts.Name != "STATE_SLEEPING" { + builders.addStacktrace(sampleTypeCPU, parser.ExecutionSample.StackTrace, values[:1]) + } + if event == "wall" { + builders.addStacktrace(sampleTypeWall, parser.ExecutionSample.StackTrace, values[:1]) + } + case parser.TypeMap.T_ALLOC_IN_NEW_TLAB: + values[1] = int64(parser.ObjectAllocationInNewTLAB.TlabSize) + builders.addStacktrace(sampleTypeInTLAB, parser.ObjectAllocationInNewTLAB.StackTrace, values[:2]) + case parser.TypeMap.T_ALLOC_OUTSIDE_TLAB: + values[1] = int64(parser.ObjectAllocationOutsideTLAB.AllocationSize) + builders.addStacktrace(sampleTypeOutTLAB, parser.ObjectAllocationOutsideTLAB.StackTrace, values[:2]) + case parser.TypeMap.T_MONITOR_ENTER: + values[1] = int64(parser.JavaMonitorEnter.Duration) + builders.addStacktrace(sampleTypeLock, parser.JavaMonitorEnter.StackTrace, values[:2]) + case parser.TypeMap.T_THREAD_PARK: + values[1] = int64(parser.ThreadPark.Duration) + builders.addStacktrace(sampleTypeThreadPark, parser.ThreadPark.StackTrace, values[:2]) + case parser.TypeMap.T_LIVE_OBJECT: + builders.addStacktrace(sampleTypeLiveObject, parser.LiveObject.StackTrace, values[:1]) + case parser.TypeMap.T_ACTIVE_SETTING: + if parser.ActiveSetting.Name == "event" { + event = parser.ActiveSetting.Value + } + + } + } + + requests, err = builders.build(event) + + return requests, err +} + +type Metadata struct { + StartTime time.Time + EndTime time.Time + SampleRate int + Target discovery.Target +} + +func newJfrPprofBuilders(p *jfrparser.Parser, metadata Metadata) *jfrPprofBuilders { + st := metadata.StartTime.UnixNano() + et := metadata.EndTime.UnixNano() + var period int64 + if metadata.SampleRate == 0 { + period = 0 + } else { + period = 1e9 / int64(metadata.SampleRate) + } + res := &jfrPprofBuilders{ + timeNanos: st, + durationNanos: et - st, + period: period, + parser: p, + sampleType2Builder: make(map[int64]*ProfileBuilder, 6), + metadata: metadata, + } + + return res +} + +type jfrPprofBuilders struct { + timeNanos int64 + durationNanos int64 + + parser *jfrparser.Parser + sampleType2Builder map[int64]*ProfileBuilder + + period int64 + metadata Metadata +} + +func (b *jfrPprofBuilders) addStacktrace(sampleType int64, ref types.StackTraceRef, values []int64) { + e := b.sampleType2Builder[sampleType] + if e == nil { + e = NewProfileBuilder(b.timeNanos) + b.sampleType2Builder[sampleType] = e + } + st := b.parser.GetStacktrace(ref) + if st == nil { + return + } + + addValues := func(dst []int64) { + mul := 1 + if sampleType == sampleTypeCPU || sampleType == sampleTypeWall { + mul = int(b.period) + } + for i, value := range values { + dst[i] += value * int64(mul) + } + } + + sample := e.FindExternalSample(uint32(ref)) + if sample != nil { + addValues(sample.Value) + return + } + + locations := make([]uint64, 0, len(st.Frames)) + for i := 0; i < len(st.Frames); i++ { + f := st.Frames[i] + loc, found := e.FindLocationByExternalID(uint32(f.Method)) + if found { + locations = append(locations, loc) + continue + } + m := b.parser.GetMethod(f.Method) + if m != nil { + + cls := b.parser.GetClass(m.Type) + if cls != nil { + clsName := b.parser.GetSymbolString(cls.Name) + methodName := b.parser.GetSymbolString(m.Name) + frame := clsName + "." + methodName + loc = e.AddExternalFunction(frame, uint32(f.Method)) + locations = append(locations, loc) + } + //todo remove Scratch field from the Method + } + } + vs := make([]int64, len(values)) + addValues(vs) + e.AddExternalSample(locations, vs, uint32(ref)) +} + +func (b *jfrPprofBuilders) build(event string) ([]PushRequest, error) { + defer func() { + for _, builder := range b.sampleType2Builder { + builder.Profile.ReturnToVTPool() + } + b.sampleType2Builder = nil + }() + profiles := make([]PushRequest, 0, len(b.sampleType2Builder)) + + for sampleType, e := range b.sampleType2Builder { + //for _, e := range entries { + e.TimeNanos = b.timeNanos + e.DurationNanos = b.durationNanos + metric := "" + switch sampleType { + case sampleTypeCPU: + e.AddSampleType("cpu", "nanoseconds") + e.PeriodType("cpu", "nanoseconds") + metric = "process_cpu" + case sampleTypeWall: + e.AddSampleType("wall", "nanoseconds") + e.PeriodType("wall", "nanoseconds") + metric = "wall" + case sampleTypeInTLAB: + e.AddSampleType("alloc_in_new_tlab_objects", "count") + e.AddSampleType("alloc_in_new_tlab_bytes", "bytes") + e.PeriodType("space", "bytes") + metric = "memory" + case sampleTypeOutTLAB: + e.AddSampleType("alloc_outside_tlab_objects", "count") + e.AddSampleType("alloc_outside_tlab_bytes", "bytes") + e.PeriodType("space", "bytes") + metric = "memory" + case sampleTypeLock: + e.AddSampleType("contentions", "count") + e.AddSampleType("delay", "nanoseconds") + e.PeriodType("mutex", "count") + metric = "mutex" + case sampleTypeThreadPark: + e.AddSampleType("contentions", "count") + e.AddSampleType("delay", "nanoseconds") + e.PeriodType("block", "count") + metric = "block" + case sampleTypeLiveObject: + e.AddSampleType("live", "count") + e.PeriodType("objects", "count") + metric = "memory" + } + ls := labels.NewBuilder(make(labels.Labels, 0, len(b.metadata.Target)+5)) + ls.Set(labels.MetricName, metric) + ls.Set("__delta__", "false") + ls.Set("jfr_event", event) + ls.Set("pyroscope_spy", "grafana-agent.java") + for k, v := range b.metadata.Target { + ls.Set(k, v) + } + prof, err := e.Profile.MarshalVT() + if err != nil { + return nil, fmt.Errorf("marshal profile error: %w", err) + } + profiles = append(profiles, PushRequest{ + Labels: ls.Labels(), + Samples: []*pyroscope.RawSample{ + { + RawProfile: prof, + }, + }, + }) + //} + } + return profiles, nil +} diff --git a/component/pyroscope/java/jfr/profile_builder.go b/component/pyroscope/java/jfr/profile_builder.go new file mode 100644 index 000000000000..150a9613c408 --- /dev/null +++ b/component/pyroscope/java/jfr/profile_builder.go @@ -0,0 +1,247 @@ +package jfr + +import ( + //"fmt" + //"sort" + + "github.com/google/uuid" + profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" + //phlaremodel "github.com/grafana/pyroscope/pkg/model" +) + +type ProfileBuilder struct { + *profilev1.Profile + strings map[string]int + + uuid.UUID + //Labels []*typesv1.LabelPair + + externalFunctionID2LocationId map[uint32]uint64 + externalSampleID2SampleIndex map[uint32]uint32 +} + +func NewProfileBuilder(ts int64) *ProfileBuilder { + profile := profilev1.ProfileFromVTPool() + profile.TimeNanos = ts + profile.Mapping = append(profile.Mapping, &profilev1.Mapping{ + Id: 1, HasFunctions: true, + }) + p := &ProfileBuilder{ + Profile: profile, + UUID: uuid.New(), + //Labels: labels, + strings: map[string]int{}, + + externalFunctionID2LocationId: map[uint32]uint64{}, + } + p.addString("") + return p +} + +//func (m *ProfileBuilder) MemoryProfile() *ProfileBuilder { +// m.Profile.PeriodType = &profilev1.ValueType{ +// Unit: m.addString("bytes"), +// Type: m.addString("space"), +// } +// m.Profile.SampleType = []*profilev1.ValueType{ +// { +// Unit: m.addString("count"), +// Type: m.addString("alloc_objects"), +// }, +// { +// Unit: m.addString("bytes"), +// Type: m.addString("alloc_space"), +// }, +// { +// Unit: m.addString("count"), +// Type: m.addString("inuse_objects"), +// }, +// { +// Unit: m.addString("bytes"), +// Type: m.addString("inuse_space"), +// }, +// } +// m.Profile.DefaultSampleType = m.addString("alloc_space") +// +// m.Labels = append(m.Labels, &typesv1.LabelPair{ +// Name: model.MetricNameLabel, +// Value: "memory", +// }) +// +// return m +//} + +//func (m *ProfileBuilder) WithLabels(lv ...string) *ProfileBuilder { +//Outer: +// for i := 0; i < len(lv); i += 2 { +// for _, lbl := range m.Labels { +// if lbl.Name == lv[i] { +// lbl.Value = lv[i+1] +// continue Outer +// } +// } +// m.Labels = append(m.Labels, &typesv1.LabelPair{ +// Name: lv[i], +// Value: lv[i+1], +// }) +// } +// sort.Sort(phlaremodel.Labels(m.Labels)) +// return m +//} + +//func (m *ProfileBuilder) Name() string { +// for _, lbl := range m.Labels { +// if lbl.Name == model.MetricNameLabel { +// return lbl.Value +// } +// } +// return "" +//} + +func (m *ProfileBuilder) AddSampleType(typ, unit string) { + m.Profile.SampleType = append(m.Profile.SampleType, &profilev1.ValueType{ + Type: m.addString(typ), + Unit: m.addString(unit), + }) +} + +//func (m *ProfileBuilder) MetricName(name string) { +// m.Labels = append(m.Labels, &typesv1.LabelPair{ +// Name: model.MetricNameLabel, +// Value: name, +// }) +//} + +func (m *ProfileBuilder) PeriodType(periodType string, periodUnit string) { + m.Profile.PeriodType = &profilev1.ValueType{ + Type: m.addString(periodType), + Unit: m.addString(periodUnit), + } +} + +//func (m *ProfileBuilder) CustomProfile(name, typ, unit, periodType, periodUnit string) { +// m.AddSampleType(typ, unit) +// m.Profile.DefaultSampleType = m.addString(typ) +// +// m.PeriodType(periodType, periodUnit) +// +// m.MetricName(name) +//} +// +//func (m *ProfileBuilder) CPUProfile() *ProfileBuilder { +// m.CustomProfile("process_cpu", "cpu", "nanoseconds", "cpu", "nanoseconds") +// return m +//} + +//func (m *ProfileBuilder) ForStacktraceString(stacktraces ...string) *StacktraceBuilder { +// namePositions := lo.Map(stacktraces, func(stacktrace string, i int) int64 { +// return m.addString(stacktrace) +// }) +// +// // search functions +// functionIds := lo.Map(namePositions, func(namePos int64, i int) uint64 { +// for _, f := range m.Function { +// if f.Name == namePos { +// return f.Id +// } +// } +// f := &profilev1.Function{ +// Name: namePos, +// Id: uint64(len(m.Function)) + 1, +// } +// m.Function = append(m.Function, f) +// return f.Id +// }) +// // search locations +// locationIDs := lo.Map(functionIds, func(functionId uint64, i int) uint64 { +// for _, l := range m.Location { +// if l.Line[0].FunctionId == functionId { +// return l.Id +// } +// } +// l := &profilev1.Location{ +// MappingId: uint64(1), +// Line: []*profilev1.Line{ +// { +// FunctionId: functionId, +// }, +// }, +// Id: uint64(len(m.Location)) + 1, +// } +// m.Location = append(m.Location, l) +// return l.Id +// }) +// return &StacktraceBuilder{ +// locationID: locationIDs, +// ProfileBuilder: m, +// } +//} + +func (m *ProfileBuilder) addString(s string) int64 { + i, ok := m.strings[s] + if !ok { + i = len(m.strings) + m.strings[s] = i + m.StringTable = append(m.StringTable, s) + } + return int64(i) +} + +func (m *ProfileBuilder) FindLocationByExternalID(externalID uint32) (uint64, bool) { + loc, ok := m.externalFunctionID2LocationId[externalID] + return loc, ok +} + +func (m *ProfileBuilder) AddExternalFunction(frame string, externalFunctionID uint32) uint64 { + fname := m.addString(frame) + funcID := uint64(len(m.Function)) + 1 + m.Function = append(m.Function, &profilev1.Function{ + Id: funcID, + Name: fname, + }) + locID := uint64(len(m.Location)) + 1 + m.Location = append(m.Location, &profilev1.Location{ + Id: locID, + MappingId: uint64(1), + Line: []*profilev1.Line{{FunctionId: funcID}}, + }) + m.externalFunctionID2LocationId[externalFunctionID] = locID + return locID +} + +func (m *ProfileBuilder) AddExternalSample(locs []uint64, values []int64, externalSampleID uint32) { + sample := &profilev1.Sample{ + LocationId: locs, + Value: values, + } + if m.externalSampleID2SampleIndex == nil { + m.externalSampleID2SampleIndex = map[uint32]uint32{} + } + m.externalSampleID2SampleIndex[externalSampleID] = uint32(len(m.Profile.Sample)) + m.Profile.Sample = append(m.Profile.Sample, sample) +} + +func (m *ProfileBuilder) FindExternalSample(externalSampleID uint32) *profilev1.Sample { + sampleIndex, ok := m.externalSampleID2SampleIndex[externalSampleID] + if !ok { + return nil + } + sample := m.Profile.Sample[sampleIndex] + return sample +} + +//type StacktraceBuilder struct { +// locationID []uint64 +// *ProfileBuilder +//} +// +//func (s *StacktraceBuilder) AddSamples(samples ...int64) *ProfileBuilder { +// if exp, act := len(s.Profile.SampleType), len(samples); exp != act { +// panic(fmt.Sprintf("profile expects %d sample(s), there was actually %d sample(s) given.", exp, act)) +// } +// s.Profile.Sample = append(s.Profile.Sample, &profilev1.Sample{ +// LocationId: s.locationID, +// Value: samples, +// }) +// return s.ProfileBuilder +//} diff --git a/component/pyroscope/java/jfr/symbols.go b/component/pyroscope/java/jfr/symbols.go new file mode 100644 index 000000000000..b23ed3cbf9a4 --- /dev/null +++ b/component/pyroscope/java/jfr/symbols.go @@ -0,0 +1,41 @@ +package jfr + +import ( + "regexp" + + "github.com/grafana/jfr-parser/parser/types" +) + +// jdk/internal/reflect/GeneratedMethodAccessor31 +var generatedMethodAccessor = regexp.MustCompile("^(jdk/internal/reflect/GeneratedMethodAccessor)(\\d+)$") + +// org/example/rideshare/OrderService$$Lambda$669.0x0000000800fd7318.run +var lambdaGeneratedEnclosingClass = regexp.MustCompile("^(.+\\$\\$Lambda\\$)\\d+[./](0x[\\da-f]+|\\d+)$") + +// libzstd-jni-1.5.1-16931311898282279136.so.Java_com_github_luben_zstd_ZstdInputStreamNoFinalizer_decompressStream +var zstdJniSoLibName = regexp.MustCompile("^(\\.?/tmp/)?(libzstd-jni-\\d+\\.\\d+\\.\\d+-)(\\d+)(\\.so)( \\(deleted\\))?$") + +// ./tmp/libamazonCorrettoCryptoProvider109b39cf33c563eb.so +// ./tmp/amazonCorrettoCryptoProviderNativeLibraries.7382c2f79097f415/libcrypto.so (deleted) +var amazonCorrettoCryptoProvider = regexp.MustCompile("^(\\.?/tmp/)?(lib)?(amazonCorrettoCryptoProvider)(NativeLibraries\\.)?([0-9a-f]{16})" + + "(/libcrypto|/libamazonCorrettoCryptoProvider)?(\\.so)( \\(deleted\\))?$") + +// libasyncProfiler-linux-arm64-17b9a1d8156277a98ccc871afa9a8f69215f92.so +var pyroscopeAsyncProfiler = regexp.MustCompile( + "^(\\.?/tmp/)?(libasyncProfiler)-(linux-arm64|linux-musl-x64|linux-x64|macos)-(17b9a1d8156277a98ccc871afa9a8f69215f92)(\\.so)( \\(deleted\\))?$") + +// ./tmp/snappy-1.1.8-6fb9393a-3093-4706-a7e4-837efe01d078-libsnappyjava.so +func mergeJVMGeneratedClasses(frame string) string { + frame = generatedMethodAccessor.ReplaceAllString(frame, "${1}_") + frame = lambdaGeneratedEnclosingClass.ReplaceAllString(frame, "${1}_") + frame = zstdJniSoLibName.ReplaceAllString(frame, "libzstd-jni-_.so") + frame = amazonCorrettoCryptoProvider.ReplaceAllString(frame, "libamazonCorrettoCryptoProvider_.so") + frame = pyroscopeAsyncProfiler.ReplaceAllString(frame, "libasyncProfiler-_.so") + return frame +} + +func processSymbols(ref *types.SymbolList) { + for i := range ref.Symbol { //todo regex replace inplace + ref.Symbol[i].String = mergeJVMGeneratedClasses(ref.Symbol[i].String) + } +} diff --git a/component/pyroscope/java/process.go b/component/pyroscope/java/process.go index 62f134bc5934..e89f39259469 100644 --- a/component/pyroscope/java/process.go +++ b/component/pyroscope/java/process.go @@ -13,6 +13,7 @@ import ( "github.com/grafana/agent/component/discovery" "github.com/grafana/agent/component/pyroscope" "github.com/grafana/agent/component/pyroscope/java/asprof" + "github.com/grafana/agent/component/pyroscope/java/jfr" "github.com/grafana/agent/pkg/flow/logging/level" ) @@ -22,17 +23,19 @@ type profilingLoop struct { logger log.Logger output *pyroscope.Fanout - wg sync.WaitGroup - mutex sync.Mutex - pid int - target discovery.Target - cancel context.CancelFunc - error error - dist asprof.Distribution - jfrFile asprof.File + cfg ProfilingConfig + wg sync.WaitGroup + mutex sync.Mutex + pid int + target discovery.Target + cancel context.CancelFunc + error error + dist asprof.Distribution + jfrFile asprof.File + startTime time.Time } -func newProcess(pid int, target discovery.Target, logger log.Logger, output *pyroscope.Fanout) *profilingLoop { +func newProcess(pid int, target discovery.Target, logger log.Logger, output *pyroscope.Fanout, cfg ProfilingConfig) *profilingLoop { ctx, cancel := context.WithCancel(context.Background()) p := &profilingLoop{ logger: log.With(logger, "pid", pid), @@ -45,7 +48,9 @@ func newProcess(pid int, target discovery.Target, logger log.Logger, output *pyr Path: fmt.Sprintf("/tmp/asprof-%d-%d.jfr", os.Getpid(), pid), PID: pid, }, + cfg: cfg, } + _ = level.Debug(p.logger).Log("msg", "new process", "target", fmt.Sprintf("%+v", target)) p.wg.Add(1) go func() { @@ -82,13 +87,18 @@ func (p *profilingLoop) loop(ctx context.Context) { } } +//var counter atomic.Int32 + func (p *profilingLoop) reset() error { _ = level.Debug(p.logger).Log("msg", "timer tick") + startTime := p.startTime + endTime := time.Now() + p.startTime = endTime err := p.stop() if err != nil { return fmt.Errorf("failed to stop asprof: %w", err) } - jfr, err := p.jfrFile.Read() + jfrBytes, err := p.jfrFile.Read() if err != nil { return fmt.Errorf("failed to read jfr file: %w", err) } @@ -100,19 +110,59 @@ func (p *profilingLoop) reset() error { if err != nil { return fmt.Errorf("failed to start asprof: %w", err) } - _ = level.Debug(p.logger).Log("msg", "jfr file read", "len", len(jfr)) + _ = level.Debug(p.logger).Log("msg", "jfr file read", "len", len(jfrBytes)) + + //no := counter.Inc() + //fname := fmt.Sprintf("jfr-%d.jfr", no) + //os.WriteFile(fname, jfrBytes, 0644) + + reqs, err := jfr.ParseJFR(jfrBytes, jfr.Metadata{ + StartTime: startTime, + EndTime: endTime, + SampleRate: p.cfg.SampleRate, + Target: p.getTarget(), + }) + if err != nil { + return fmt.Errorf("failed to parse jfr: %w", err) + } + for _, req := range reqs { + go func(req jfr.PushRequest) { + appender := p.output.Appender() + err := appender.Append(context.Background(), req.Labels, req.Samples) + if err != nil { + _ = level.Error(p.logger).Log("msg", "failed to push jfr", "err", err) + return + } + _ = level.Debug(p.logger).Log("msg", "pushed jfr") + }(req) + } return nil } func (p *profilingLoop) start() error { - stdout, stderr, err := profiler.Execute(p.dist, []string{ + p.startTime = time.Now() + argv := make([]string, 0, 14) + argv = append(argv, "-f", p.jfrFile.Path, "-o", "jfr", - "-e", "itimer", + ) + if p.cfg.CPU { + argv = append(argv, "-e", "itimer") + } + if p.cfg.Alloc != "" { + argv = append(argv, "--alloc", p.cfg.Alloc) + } + if p.cfg.Lock != "" { + argv = append(argv, "--lock", p.cfg.Lock) + } + argv = append(argv, "start", "--timeout", strconv.Itoa(int(p.interval().Seconds())), strconv.Itoa(p.pid), - }) + ) + + _ = level.Debug(p.logger).Log("msg", "asprof", "argv", fmt.Sprintf("%+v", argv)) + stdout, stderr, err := profiler.Execute(p.dist, argv) if err != nil { _ = level.Error(p.logger).Log("msg", "asprof failed to run", "err", err, "stdout", stdout, "stderr", stderr) return fmt.Errorf("asprof failed to run: %w", err) @@ -121,10 +171,12 @@ func (p *profilingLoop) start() error { } func (p *profilingLoop) stop() error { - stdout, stderr, err := profiler.Execute(p.dist, []string{ + argv := []string{ "stop", strconv.Itoa(p.pid), - }) + } + _ = level.Debug(p.logger).Log("msg", "asprof", "argv", fmt.Sprintf("%+v", argv)) + stdout, stderr, err := profiler.Execute(p.dist, argv) if err != nil { _ = level.Error(p.logger).Log("msg", "asprof failed to run", "err", err, "stdout", stdout, "stderr", stderr) return fmt.Errorf("asprof failed to run: %w", err) @@ -155,3 +207,9 @@ func (p *profilingLoop) onError(err error) { func (p *profilingLoop) interval() time.Duration { return time.Second * 15 // todo } + +func (p *profilingLoop) getTarget() discovery.Target { + p.mutex.Lock() + defer p.mutex.Unlock() + return p.target +}