Skip to content

Commit

Permalink
allow Pid to be wrapped for mocks
Browse files Browse the repository at this point in the history
  • Loading branch information
atoulme committed Jan 23, 2024
1 parent df42199 commit 58e8ffe
Show file tree
Hide file tree
Showing 3 changed files with 67 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,14 @@ package processscraper // import "github.com/open-telemetry/opentelemetry-collec

import (
"context"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"time"

"github.com/shirou/gopsutil/v3/common"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/process"
"go.opentelemetry.io/collector/pdata/pcommon"
Expand Down Expand Up @@ -93,14 +97,15 @@ type processHandle interface {
NumFDsWithContext(context.Context) (int32, error)
// If gatherUsed is true, the currently used value will be gathered and added to the resulting RlimitStat.
RlimitUsageWithContext(ctx context.Context, gatherUsed bool) ([]process.RlimitStat, error)
CgroupWithContext(ctx context.Context) (string, error)
}

type gopsProcessHandles struct {
handles []*process.Process
handles []wrappedProcessHandle
}

func (p *gopsProcessHandles) Pid(index int) int32 {
return p.handles[index].Pid
return p.handles[index].Process.Pid
}

func (p *gopsProcessHandles) At(index int) processHandle {
Expand All @@ -111,13 +116,51 @@ func (p *gopsProcessHandles) Len() int {
return len(p.handles)
}

type wrappedProcessHandle struct {
*process.Process
}

func (p wrappedProcessHandle) CgroupWithContext(ctx context.Context) (string, error) {
pid := p.Process.Pid
statPath := getEnvWithContext(ctx, string(common.HostProcEnvKey), "/proc", strconv.Itoa(int(pid)), "cgroup")
contents, err := os.ReadFile(statPath)
if err != nil {
return "", err
}

return strings.TrimSuffix(string(contents), "\n"), nil
}

// copied from gopsutil:
// GetEnvWithContext retrieves the environment variable key. If it does not exist it returns the default.
// The context may optionally contain a map superseding os.EnvKey.
func getEnvWithContext(ctx context.Context, key string, dfault string, combineWith ...string) string {
var value string
if env, ok := ctx.Value(common.EnvKey).(common.EnvMap); ok {
value = env[common.EnvKeyType(key)]
}
if value == "" {
value = os.Getenv(key)
}
if value == "" {
value = dfault
}
segments := append([]string{value}, combineWith...)

return filepath.Join(segments...)
}

func getProcessHandlesInternal(ctx context.Context) (processHandles, error) {
processes, err := process.ProcessesWithContext(ctx)
if err != nil {
return nil, err
}
wrapped := make([]wrappedProcessHandle, len(processes))
for i, p := range processes {
wrapped[i] = wrappedProcessHandle{Process: p}
}

return &gopsProcessHandles{handles: processes}, nil
return &gopsProcessHandles{handles: wrapped}, nil
}

func parentPid(ctx context.Context, handle processHandle, pid int32) (int32, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,8 @@ package processscraper // import "github.com/open-telemetry/opentelemetry-collec

import (
"context"
"os"
"path/filepath"
"strconv"
"strings"

"github.com/shirou/gopsutil/v3/common"
"github.com/shirou/gopsutil/v3/cpu"
"github.com/shirou/gopsutil/v3/process"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/receiver/hostmetricsreceiver/internal/scraper/processscraper/internal/metadata"
Expand Down Expand Up @@ -53,33 +47,12 @@ func getProcessExecutable(ctx context.Context, proc processHandle) (string, erro
}

func getProcessCgroup(ctx context.Context, proc processHandle) (string, error) {
pid := proc.(*process.Process).Pid
statPath := getEnvWithContext(ctx, string(common.HostProcEnvKey), "/proc", strconv.Itoa(int(pid)), "cgroup")
contents, err := os.ReadFile(statPath)
cgroup, err := proc.CgroupWithContext(ctx)
if err != nil {
return "", err
}

return strings.TrimSuffix(string(contents), "\n"), nil
}

// copied from gopsutil:
// GetEnvWithContext retrieves the environment variable key. If it does not exist it returns the default.
// The context may optionally contain a map superseding os.EnvKey.
func getEnvWithContext(ctx context.Context, key string, dfault string, combineWith ...string) string {
var value string
if env, ok := ctx.Value(common.EnvKey).(common.EnvMap); ok {
value = env[common.EnvKeyType(key)]
}
if value == "" {
value = os.Getenv(key)
}
if value == "" {
value = dfault
}
segments := append([]string{value}, combineWith...)

return filepath.Join(segments...)
return cgroup, nil
}

func getProcessCommand(ctx context.Context, proc processHandle) (*commandMetadata, error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,11 @@ type processHandleMock struct {
mock.Mock
}

func (p *processHandleMock) CgroupWithContext(ctx context.Context) (string, error) {
args := p.MethodCalled("CgroupWithContext", ctx)
return args.String(0), args.Error(1)
}

func (p *processHandleMock) NameWithContext(ctx context.Context) (ret string, err error) {
args := p.MethodCalled("NameWithContext", ctx)
return args.String(0), args.Error(1)
Expand Down Expand Up @@ -527,6 +532,9 @@ func initDefaultsHandleMock(t mock.TestingT, handleMock *processHandleMock) {
if !handleMock.IsMethodCallable(t, "ExeWithContext", mock.Anything) {
handleMock.On("ExeWithContext", mock.Anything).Return("processname", nil)
}
if !handleMock.IsMethodCallable(t, "CgroupWithContext", mock.Anything) {
handleMock.On("CgroupWithContext", mock.Anything).Return("cgroup", nil)
}
}

func TestScrapeMetrics_Filtered(t *testing.T) {
Expand Down Expand Up @@ -676,6 +684,7 @@ func TestScrapeMetrics_ProcessErrors(t *testing.T) {
osFilter string
nameError error
exeError error
cgroupError error
usernameError error
cmdlineError error
timesError error
Expand Down Expand Up @@ -711,6 +720,14 @@ func TestScrapeMetrics_ProcessErrors(t *testing.T) {
return `error reading process executable for pid 1: err1`
}(),
},
{
name: "Cgroup Error",
osFilter: "linux",
cgroupError: errors.New("err1"),
expectedError: func() string {
return `error reading cgroup for pid 1: err1`
}(),
},
{
name: "Cmdline Error",
osFilter: "darwin",
Expand Down Expand Up @@ -851,6 +868,7 @@ func TestScrapeMetrics_ProcessErrors(t *testing.T) {
handleMock := &processHandleMock{}
handleMock.On("NameWithContext", mock.Anything).Return("test", test.nameError)
handleMock.On("ExeWithContext", mock.Anything).Return("test", test.exeError)
handleMock.On("CgroupWithContext", mock.Anything).Return("test", test.cgroupError)
handleMock.On("UsernameWithContext", mock.Anything).Return(username, test.usernameError)
handleMock.On("CmdlineWithContext", mock.Anything).Return("cmdline", test.cmdlineError)
handleMock.On("CmdlineSliceWithContext", mock.Anything).Return([]string{"cmdline"}, test.cmdlineError)
Expand Down Expand Up @@ -1159,6 +1177,7 @@ func TestScrapeMetrics_DontCheckDisabledMetrics(t *testing.T) {
require.NoError(t, err, "Failed to initialize process scraper: %v", err)

handleMock := newErroringHandleMock()
handleMock.On("CgroupWithContext", mock.Anything).Return("test", nil)
handleMock.On("NameWithContext", mock.Anything).Return("test", nil)
handleMock.On("ExeWithContext", mock.Anything).Return("test", nil)
handleMock.On("CreateTimeWithContext", mock.Anything).Return(time.Now().UnixMilli(), nil)
Expand Down

0 comments on commit 58e8ffe

Please sign in to comment.