Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

autodiscovery: add GPU provider #32915

Merged
merged 8 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitlab-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1171,6 +1171,8 @@ workflow:
- test/new-e2e/tests/gpu/**/*
- pkg/collector/corechecks/gpu/**/*
- comp/core/workloadmeta/collectors/internal/nvml/**/*
- comp/core/autodiscovery/providers/gpu.go
- pkg/config/autodiscovery/autodiscovery.go
compare_to: main # TODO: use a variable, when this is supported https://gitlab.com/gitlab-org/gitlab/-/issues/369916

# windows_docker_2022 configures the job to use the Windows Server 2022 runners.
Expand Down
4 changes: 4 additions & 0 deletions comp/core/autodiscovery/providers/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,7 @@ The `ZookeeperConfigProvider` reads the check configs from zookeeper.
### `RemoteConfigProvider`

The `RemoteConfigProvider` reads the check configs from remote-config.

### `GPUConfigProvider`

The `GPUConfigProvider` generates check configs from visible GPUs on the host.
142 changes: 142 additions & 0 deletions comp/core/autodiscovery/providers/gpu.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2016-present Datadog, Inc.

//go:build !serverless

package providers

import (
"context"
"sync"

"github.com/DataDog/datadog-agent/comp/core/autodiscovery/integration"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/providers/names"
"github.com/DataDog/datadog-agent/comp/core/autodiscovery/telemetry"
workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
pkgconfigsetup "github.com/DataDog/datadog-agent/pkg/config/setup"
"github.com/DataDog/datadog-agent/pkg/util/log"
)

// gpuCheckName is the name of the GPU check, to avoid importing the code from the GPU package
const gpuCheckName = "gpu"
val06 marked this conversation as resolved.
Show resolved Hide resolved

// GPUConfigProvider implements the ConfigProvider interface for GPUs. This provider listens
// in Workloadmeta for GPU events. If any GPU is detected, it will generate a config to
// schedule the GPU check. As the GPU check covers all GPUs automatically, further GPUs
// will not trigger new configs.
type GPUConfigProvider struct {
workloadmetaStore workloadmeta.Component

// scheduledConfig is the config that is scheduled for the GPU check. Stored here for
// unscheduling purposes.
scheduledConfig *integration.Config

// gpuDeviceCache is a cache of GPU devices that have been seen. If we stop seeing all GPU
// devices, we will unschedule the GPU check.
gpuDeviceCache map[string]struct{}
mu sync.RWMutex
}

var _ ConfigProvider = &GPUConfigProvider{}
var _ StreamingConfigProvider = &GPUConfigProvider{}

// NewGPUConfigProvider returns a new ConfigProvider subscribed to GPU events
func NewGPUConfigProvider(_ *pkgconfigsetup.ConfigurationProviders, wmeta workloadmeta.Component, _ *telemetry.Store) (ConfigProvider, error) {
return &GPUConfigProvider{
workloadmetaStore: wmeta,
gpuDeviceCache: make(map[string]struct{}),
}, nil
}

// String returns a string representation of the GPUConfigProvider
func (k *GPUConfigProvider) String() string {
return names.GPU
}

// Stream starts listening to workloadmeta to generate configs as they come
// instead of relying on a periodic call to Collect.
func (k *GPUConfigProvider) Stream(ctx context.Context) <-chan integration.ConfigChanges {
const name = "ad-gpuprovider"

// outCh must be unbuffered. processing of workloadmeta events must not
// proceed until the config is processed by autodiscovery, as configs
// need to be generated before any associated services.
outCh := make(chan integration.ConfigChanges)

filter := workloadmeta.NewFilterBuilder().
AddKind(workloadmeta.KindGPU).
Build()
inCh := k.workloadmetaStore.Subscribe(name, workloadmeta.ConfigProviderPriority, filter)

go func() {
for {
select {
case <-ctx.Done():
k.workloadmetaStore.Unsubscribe(inCh)

case evBundle, ok := <-inCh:
if !ok {
return
}

// send changes even when they're empty, as we
// need to signal that an event has been
// received, for flow control reasons
outCh <- k.processEvents(evBundle)
evBundle.Acknowledge()
}
}
}()

return outCh
}

func (k *GPUConfigProvider) processEvents(evBundle workloadmeta.EventBundle) integration.ConfigChanges {
k.mu.Lock()
defer k.mu.Unlock()

changes := integration.ConfigChanges{}

for _, event := range evBundle.Events {
gpuUUID := event.Entity.GetID().ID

switch event.Type {
case workloadmeta.EventTypeSet:
// Track seen GPU devices
k.gpuDeviceCache[gpuUUID] = struct{}{}

// We only need to schedule the check once
if k.scheduledConfig != nil {
continue
}

k.scheduledConfig = &integration.Config{
Name: gpuCheckName,
Instances: []integration.Data{[]byte{}},
InitConfig: []byte{},
Provider: names.GPU,
Source: names.GPU,
}

changes.ScheduleConfig(*k.scheduledConfig)
case workloadmeta.EventTypeUnset:
delete(k.gpuDeviceCache, gpuUUID)

// Unschedule the check if no more GPUs are detected
if len(k.gpuDeviceCache) == 0 && k.scheduledConfig != nil {
changes.UnscheduleConfig(*k.scheduledConfig)
}
default:
log.Errorf("cannot handle event of type %d", event.Type)
}
}

return changes
}

// GetConfigErrors returns a map of configuration errors, which is always empty for the GPUConfigProvider
func (k *GPUConfigProvider) GetConfigErrors() map[string]ErrorMsgSet {
return make(map[string]ErrorMsgSet)
}
74 changes: 74 additions & 0 deletions comp/core/autodiscovery/providers/gpu_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
// Unless explicitly stated otherwise all files in this repository are licensed
// under the Apache License Version 2.0.
// This product includes software developed at Datadog (https://www.datadoghq.com/).
// Copyright 2025-present Datadog, Inc.

package providers

import (
"testing"

"github.com/stretchr/testify/require"

workloadmeta "github.com/DataDog/datadog-agent/comp/core/workloadmeta/def"
"github.com/DataDog/datadog-agent/pkg/collector/corechecks/gpu"
)

func TestGPUProcessEvents(t *testing.T) {
// the processEvents function doesn't need any of the deps, so make them nill
provider, err := NewGPUConfigProvider(nil, nil, nil)
require.NoError(t, err)

// Cast from the generic factory method
gpuProvider, ok := provider.(*GPUConfigProvider)
require.True(t, ok)

gpuIDs := []string{"gpu-1234", "gpu-5678"}

var gpuCreateEvents []workloadmeta.Event
var gpuDestroyEvents []workloadmeta.Event
for _, gpuID := range gpuIDs {
entityID := workloadmeta.EntityID{
Kind: workloadmeta.KindGPU,
ID: gpuID,
}

entity := &workloadmeta.GPU{
EntityID: entityID,
EntityMeta: workloadmeta.EntityMeta{
Name: entityID.ID,
},
Vendor: "nvidia",
Device: "tesla-v100",
}

gpuCreateEvents = append(gpuCreateEvents, workloadmeta.Event{Type: workloadmeta.EventTypeSet, Entity: entity})
gpuDestroyEvents = append(gpuDestroyEvents, workloadmeta.Event{Type: workloadmeta.EventTypeUnset, Entity: entity})
}

createBundle := workloadmeta.EventBundle{Events: gpuCreateEvents}
destroyBundle1 := workloadmeta.EventBundle{Events: gpuDestroyEvents[0:1]}
destroyBundle2 := workloadmeta.EventBundle{Events: gpuDestroyEvents[1:2]}

// Multiple events should only create one config
changes := gpuProvider.processEvents(createBundle)
require.Len(t, changes.Schedule, 1)
require.Len(t, changes.Unschedule, 0)
require.Equal(t, changes.Schedule[0].Name, gpu.CheckName)

// More events should not create more configs
changes = gpuProvider.processEvents(createBundle)
require.Len(t, changes.Schedule, 0)
require.Len(t, changes.Unschedule, 0)

// Destroying one GPU should not unschedule the check
changes = gpuProvider.processEvents(destroyBundle1)
require.Len(t, changes.Schedule, 0)
require.Len(t, changes.Unschedule, 0)

// Destroying the last GPU should unschedule the check
changes = gpuProvider.processEvents(destroyBundle2)
require.Len(t, changes.Schedule, 0)
require.Len(t, changes.Unschedule, 1)
require.Equal(t, changes.Unschedule[0].Name, gpu.CheckName)
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ const (
RemoteConfig = "remote-config"
SNMP = "snmp"
Zookeeper = "zookeeper"
GPU = "gpu"
)

// Internal Autodiscovery names for the config providers
Expand Down
1 change: 1 addition & 0 deletions comp/core/autodiscovery/providers/providers.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ func RegisterProviders(providerCatalog map[string]ConfigProviderFactory) {
RegisterProvider(names.PrometheusPodsRegisterName, NewPrometheusPodsConfigProvider, providerCatalog)
RegisterProvider(names.PrometheusServicesRegisterName, NewPrometheusServicesConfigProvider, providerCatalog)
RegisterProvider(names.ZookeeperRegisterName, NewZookeeperConfigProvider, providerCatalog)
RegisterProviderWithComponents(names.GPU, NewGPUConfigProvider, providerCatalog)
}

// ConfigProviderFactory is any function capable to create a ConfigProvider instance
Expand Down
6 changes: 6 additions & 0 deletions pkg/config/autodiscovery/autodiscovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,5 +125,11 @@ func DiscoverComponentsFromEnv() ([]pkgconfigsetup.ConfigurationProviders, []pkg
log.Info("Adding Kubelet listener from environment")
}

isGPUEnv := env.IsFeaturePresent(env.NVML)
if isGPUEnv {
detectedProviders = append(detectedProviders, pkgconfigsetup.ConfigurationProviders{Name: names.GPU})
log.Info("Adding GPU provider from environment")
}

return detectedProviders, detectedListeners
}
18 changes: 11 additions & 7 deletions test/new-e2e/tests/gpu/gpu_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,15 +102,19 @@ func (v *gpuSuite) runCudaDockerWorkload() string {
}

func (v *gpuSuite) TestGPUCheckIsEnabled() {
statusOutput := v.Env().Agent.Client.Status(agentclient.WithArgs([]string{"collector", "--json"}))
// Note that the GPU check should be enabled by autodiscovery, so it can take some time to be enabled
v.EventuallyWithT(func(c *assert.CollectT) {
statusOutput := v.Env().Agent.Client.Status(agentclient.WithArgs([]string{"collector", "--json"}))

var status collectorStatus
err := json.Unmarshal([]byte(statusOutput.Content), &status)

var status collectorStatus
err := json.Unmarshal([]byte(statusOutput.Content), &status)
v.Require().NoError(err, "failed to unmarshal agent status")
v.Require().Contains(status.RunnerStats.Checks, "gpu")
assert.NoError(c, err, "failed to unmarshal agent status")
assert.Contains(c, status.RunnerStats.Checks, "gpu")

gpuCheckStatus := status.RunnerStats.Checks["gpu"]
v.Require().Equal(gpuCheckStatus.LastError, "")
gpuCheckStatus := status.RunnerStats.Checks["gpu"]
assert.Equal(c, gpuCheckStatus.LastError, "")
}, 2*time.Minute, 10*time.Second)
}

func (v *gpuSuite) TestGPUSysprobeEndpointIsResponding() {
Expand Down
9 changes: 0 additions & 9 deletions test/new-e2e/tests/gpu/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,6 @@ const nvidiaSMIValidationCmd = "nvidia-smi -L | grep GPU"
// and can be used to identify the validation commands.
const validationCommandMarker = "echo 'gpu-validation-command'"

const defaultGpuCheckConfig = `
init_config:
min_collection_interval: 5

instances:
- {}
`

const defaultSysprobeConfig = `
gpu_monitoring:
enabled: true
Expand All @@ -79,7 +71,6 @@ type provisionerParams struct {
func getDefaultProvisionerParams() *provisionerParams {
return &provisionerParams{
agentOptions: []agentparams.Option{
agentparams.WithIntegration("gpu.d", defaultGpuCheckConfig),
agentparams.WithSystemProbeConfig(defaultSysprobeConfig),
},
ami: gpuEnabledAMI,
Expand Down
Loading