Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance: dump pprof info if component stop progress timeout #39726

Merged
merged 2 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 131 additions & 2 deletions cmd/components/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,20 +2,27 @@ package components

import (
"context"
"fmt"
"os"
"path/filepath"
"runtime/pprof"
"time"

"github.com/cockroachdb/errors"
"go.uber.org/zap"

"github.com/milvus-io/milvus/pkg/log"
"github.com/milvus-io/milvus/pkg/util/conc"
"github.com/milvus-io/milvus/pkg/util/paramtable"
)

var errStopTimeout = errors.New("stop timeout")

// exitWhenStopTimeout stops a component with timeout and exit progress when timeout.
func exitWhenStopTimeout(stop func() error, timeout time.Duration) error {
err := stopWithTimeout(stop, timeout)
err := dumpPprof(func() error { return stopWithTimeout(stop, timeout) })
if errors.Is(err, errStopTimeout) {
log.Info("stop progress timeout, force exit")
os.Exit(1)
}
return err
Expand All @@ -27,7 +34,7 @@ func stopWithTimeout(stop func() error, timeout time.Duration) error {
defer cancel()

future := conc.Go(func() (struct{}, error) {
return struct{}{}, stop()
return struct{}{}, dumpPprof(stop)
})
select {
case <-future.Inner():
Expand All @@ -36,3 +43,125 @@ func stopWithTimeout(stop func() error, timeout time.Duration) error {
return errStopTimeout
}
}

// profileType defines the structure for each type of profile to be collected
type profileType struct {
name string // Name of the profile type
filename string // File path for the profile
dump func(*os.File) error // Function to dump the profile data
}

// dumpPprof wraps the execution of a function with pprof profiling
// It collects various performance profiles only if the execution fails
func dumpPprof(exec func() error) error {
// Get pprof directory from configuration
pprofDir := paramtable.Get().ServiceParam.ProfileCfg.PprofPath.GetValue()
if err := os.MkdirAll(pprofDir, 0o755); err != nil {
log.Error("failed to create pprof directory", zap.Error(err))
return exec()
}

// Generate base file path with timestamp
baseFilePath := filepath.Join(
pprofDir,
fmt.Sprintf("%s_pprof_%s",
paramtable.GetRole(),
time.Now().Format("20060102_150405"),
),
)

// Define all profile types to be collected
profiles := []profileType{
{
name: "cpu",
filename: baseFilePath + "_cpu.prof",
dump: func(f *os.File) error {
// Ensure no other CPU profiling is active before starting a new one.
// This prevents the "cpu profiling already in use" error.
pprof.StopCPUProfile()
return pprof.StartCPUProfile(f)
},
},
{
name: "goroutine",
filename: baseFilePath + "_goroutine.prof",
dump: func(f *os.File) error {
return pprof.Lookup("goroutine").WriteTo(f, 0)
},
},
{
name: "heap",
filename: baseFilePath + "_heap.prof",
dump: func(f *os.File) error {
return pprof.WriteHeapProfile(f)
},
},
{
name: "block",
filename: baseFilePath + "_block.prof",
dump: func(f *os.File) error {
return pprof.Lookup("block").WriteTo(f, 0)
},
},
{
name: "mutex",
filename: baseFilePath + "_mutex.prof",
dump: func(f *os.File) error {
return pprof.Lookup("mutex").WriteTo(f, 0)
},
},
}

// Create all profile files and store file handles
files := make(map[string]*os.File)
for _, p := range profiles {
f, err := os.Create(p.filename)
if err != nil {
log.Error("could not create profile file",
zap.String("profile", p.name),
zap.Error(err))
for filename, f := range files {
f.Close()
os.Remove(filename)
}
return exec()
}
files[p.filename] = f
}
// Ensure all files are closed when function returns
defer func() {
for _, f := range files {
f.Close()
}
}()

// Start CPU profiling
cpuProfile := profiles[0]
if err := cpuProfile.dump(files[cpuProfile.filename]); err != nil {
log.Error("could not start CPU profiling", zap.Error(err))
return exec()
}
defer pprof.StopCPUProfile()

// Execute the target function
execErr := exec()

// Only save profiles and collect additional data if execution fails
if execErr != nil {
// Start from index 1 to skip CPU profile (already running)
for _, p := range profiles[1:] {
if err := p.dump(files[p.filename]); err != nil {
log.Error("could not write profile",
zap.String("profile", p.name),
zap.Error(err))
}
}
} else {
// Remove all files if execution succeeds
for _, p := range profiles {
os.Remove(p.filename)
}
}

return execErr
}
2 changes: 1 addition & 1 deletion configs/milvus.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -773,7 +773,7 @@ log:
grpc:
log:
level: WARNING
gracefulStopTimeout: 10 # second, time to wait graceful stop finish
gracefulStopTimeout: 3 # second, time to wait graceful stop finish
client:
compressionEnabled: false
dialTimeout: 200
Expand Down
2 changes: 1 addition & 1 deletion internal/distributed/proxy/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1191,7 +1191,7 @@ func Test_Service_GracefulStop(t *testing.T) {
mockProxy.ExpectedCalls = nil
mockProxy.EXPECT().GetComponentStates(mock.Anything, mock.Anything).Run(func(_a0 context.Context, _a1 *milvuspb.GetComponentStatesRequest) {
fmt.Println("rpc start")
time.Sleep(10 * time.Second)
time.Sleep(3 * time.Second)
atomic.AddInt32(&count, 1)
fmt.Println("rpc done")
}).Return(&milvuspb.ComponentStates{Status: &commonpb.Status{ErrorCode: commonpb.ErrorCode_Success}}, nil)
Expand Down
4 changes: 2 additions & 2 deletions internal/distributed/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,15 @@ func GracefulStopGRPCServer(s *grpc.Server) {
ch := make(chan struct{})
go func() {
defer close(ch)
log.Debug("try to graceful stop grpc server...")
log.Info("try to graceful stop grpc server...")
// will block until all rpc finished.
s.GracefulStop()
}()
select {
case <-ch:
case <-time.After(paramtable.Get().ProxyGrpcServerCfg.GracefulStopTimeout.GetAsDuration(time.Second)):
// took too long, manually close grpc server
log.Debug("stop grpc server...")
log.Info("force to stop grpc server...")
s.Stop()
// concurrent GracefulStop should be interrupted
<-ch
Expand Down
2 changes: 1 addition & 1 deletion pkg/util/paramtable/grpc_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ func (p *GrpcServerConfig) Init(domain string, base *BaseTable) {
p.GracefulStopTimeout = ParamItem{
Key: "grpc.gracefulStopTimeout",
Version: "2.3.1",
DefaultValue: "10",
DefaultValue: "3",
Doc: "second, time to wait graceful stop finish",
Export: true,
}
Expand Down
3 changes: 1 addition & 2 deletions pkg/util/paramtable/grpc_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,7 @@ func TestGrpcServerParams(t *testing.T) {
base.Save("grpc.serverMaxSendSize", "a")
assert.Equal(t, serverConfig.ServerMaxSendSize.GetAsInt(), DefaultServerMaxSendSize)

base.Save(serverConfig.GracefulStopTimeout.Key, "1")
assert.Equal(t, serverConfig.GracefulStopTimeout.GetAsInt(), 1)
assert.Equal(t, serverConfig.GracefulStopTimeout.GetAsInt(), 3)
}

func TestGrpcClientParams(t *testing.T) {
Expand Down
24 changes: 24 additions & 0 deletions pkg/util/paramtable/service_param.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ type ServiceParam struct {
RocksmqCfg RocksmqConfig
NatsmqCfg NatsmqConfig
MinioCfg MinioConfig
ProfileCfg ProfileConfig
}

func (p *ServiceParam) init(bt *BaseTable) {
Expand All @@ -64,6 +65,7 @@ func (p *ServiceParam) init(bt *BaseTable) {
p.RocksmqCfg.Init(bt)
p.NatsmqCfg.Init(bt)
p.MinioCfg.Init(bt)
p.ProfileCfg.Init(bt)
}

func (p *ServiceParam) RocksmqEnable() bool {
Expand Down Expand Up @@ -1402,3 +1404,25 @@ Leave it empty if you want to use AWS default endpoint`,
}
p.ListObjectsMaxKeys.Init(base.mgr)
}

// profile config
type ProfileConfig struct {
PprofPath ParamItem `refreshable:"false"`
}

func (p *ProfileConfig) Init(base *BaseTable) {
p.PprofPath = ParamItem{
Key: "profile.pprof.path",
Version: "2.5.5",
DefaultValue: "",
Doc: "The folder that storing pprof files, by default will use localStoragePath/pprof",
Formatter: func(v string) string {
if len(v) == 0 {
return path.Join(base.Get("localStorage.path"), "pprof")
}
return v
},
Export: true,
}
p.PprofPath.Init(base.mgr)
}
7 changes: 7 additions & 0 deletions pkg/util/paramtable/service_param_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,13 @@ func TestServiceParam(t *testing.T) {
assert.Equal(t, 100000, Params.PaginationSize.GetAsInt())
assert.Equal(t, 32, Params.ReadConcurrency.GetAsInt())
})

t.Run("test profile config", func(t *testing.T) {
params := &SParams.ProfileCfg
assert.Equal(t, "/var/lib/milvus/data/pprof", params.PprofPath.GetValue())
bt.Save(params.PprofPath.Key, "/tmp/pprof")
assert.Equal(t, "/tmp/pprof", params.PprofPath.GetValue())
})
}

func TestRuntimConfig(t *testing.T) {
Expand Down
Loading