Skip to content

Commit

Permalink
Skeleton
Browse files Browse the repository at this point in the history
  • Loading branch information
gshaibi committed Nov 28, 2024
1 parent eddc6cf commit b014b75
Show file tree
Hide file tree
Showing 4 changed files with 222 additions and 81 deletions.
89 changes: 8 additions & 81 deletions internal/status-exporter/export/fs/exporter.go
Original file line number Diff line number Diff line change
@@ -1,96 +1,23 @@
package fs

import (
"fmt"
"log"
"os"
"path/filepath"
"strconv"

"github.com/run-ai/fake-gpu-operator/internal/common/constants"
"github.com/run-ai/fake-gpu-operator/internal/common/topology"
"github.com/run-ai/fake-gpu-operator/internal/status-exporter/export"
"github.com/run-ai/fake-gpu-operator/internal/status-exporter/export/fs/fake"
"github.com/run-ai/fake-gpu-operator/internal/status-exporter/export/fs/real"
"github.com/run-ai/fake-gpu-operator/internal/status-exporter/watch"
)

type FsExporter struct {
topologyChan <-chan *topology.NodeTopology
}

var _ export.Interface = &FsExporter{}
"github.com/spf13/viper"
)

func NewFsExporter(watcher watch.Interface) *FsExporter {
func NewFsExporter(watcher watch.Interface) export.Interface {
topologyChan := make(chan *topology.NodeTopology)
watcher.Subscribe(topologyChan)

return &FsExporter{
topologyChan: topologyChan,
}
}

func (e *FsExporter) Run(stopCh <-chan struct{}) {
for {
select {
case nodeTopology := <-e.topologyChan:
e.export(nodeTopology)
case <-stopCh:
return
}
}
}

func (e *FsExporter) export(nodeTopology *topology.NodeTopology) {
exportPods(nodeTopology)
exportEvents()
}

func exportPods(nodeTopology *topology.NodeTopology) {
podProcDir := "/runai/proc/pod"
if err := os.RemoveAll(podProcDir); err != nil {
log.Printf("Failed deleting %s directory: %s", podProcDir, err.Error())
if viper.GetBool(constants.EnvFakeNode) {
return fake.NewFakeFsExporter(topologyChan)
}

for gpuIdx, gpu := range nodeTopology.Gpus {
// Ignoring pods that are not supposed to be seen by runai-container-toolkit
if gpu.Status.AllocatedBy.Namespace != constants.ReservationNs {
continue
}

for podUuid, gpuUsageStatus := range gpu.Status.PodGpuUsageStatus {
log.Printf("Exporting pod %s gpu stats to filesystem", podUuid)

path := fmt.Sprintf("%s/%s/metrics/gpu/%d", podProcDir, podUuid, gpuIdx)
if err := os.MkdirAll(path, 0755); err != nil {
log.Printf("Failed creating directory for pod %s: %s", podUuid, err.Error())
}

if err := writeFile(filepath.Join(path, "utilization.sm"), []byte(strconv.Itoa(gpuUsageStatus.Utilization.Random()))); err != nil {
log.Printf("Failed exporting utilization for pod %s: %s", podUuid, err.Error())
}

if err := writeFile(filepath.Join(path, "memory.allocated"), []byte(strconv.Itoa(mbToBytes(gpuUsageStatus.FbUsed)))); err != nil {
log.Printf("Failed exporting memory for pod %s: %s", podUuid, err.Error())
}
}
}
}

func exportEvents() {
// For now, only creating the directory without exporting any events.
// In the future, we might want to export events to the filesystem as well.
eventsDir := "/runai/proc/events"
if err := os.MkdirAll(eventsDir, 0755); err != nil {
log.Printf("Failed creating directory for events: %s", err.Error())
}
}

func writeFile(path string, content []byte) error {
if err := os.WriteFile(path, content, 0644); err != nil {
return fmt.Errorf("failed writing file %s: %w", path, err)
}
return nil
}

func mbToBytes(mb int) int {
return mb * (1000 * 1000)
return real.NewRealFsExporter(topologyChan)
}
71 changes: 71 additions & 0 deletions internal/status-exporter/export/fs/fake/fake_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package fake

import (
"github.com/run-ai/fake-gpu-operator/internal/common/topology"
)

// FakeFsExporter exports fake filesystem based prometheus metrics.
type FakeFsExporter struct {
topologyChan <-chan *topology.NodeTopology
}

func NewFakeFsExporter(topologyChan <-chan *topology.NodeTopology) *FakeFsExporter {
return &FakeFsExporter{
topologyChan: topologyChan,
}
}

func (e *FakeFsExporter) Run(stopCh <-chan struct{}) {
for {
select {
case nodeTopology := <-e.topologyChan:
e.export(nodeTopology)
case <-stopCh:
return
}
}
}

func (e *FakeFsExporter) export(nodeTopology *topology.NodeTopology) {
exportFsBasedMetrics(nodeTopology)
}

func exportFsBasedMetrics(nodeTopology *topology.NodeTopology) {
// Export the following:
// core_team_metric.NewCoreTeamMetric(
// "runai_pod_gpu_utilization",
// "GPU Utilization of Pod",
// coreTeamMetricsDir,
// "pod/{pod_uuid}/metrics/gpu/{gpu}/utilization.sm"),
// core_team_metric.NewCoreTeamMetric(
// "runai_pod_gpu_memory_used_bytes",
// "GPU Memory Usage of Pod in Bytes",
// coreTeamMetricsDir,
// "pod/{pod_uuid}/metrics/gpu/{gpu}/memory.allocated"),
// core_team_metric.NewCoreTeamMetric(
// "runai_pod_gpu_swap_ram_used_bytes",
// "GPU Swap Ram Memory Usage of Pod in Bytes",
// coreTeamMetricsDir,
// "pod/{pod_uuid}/metrics/gpu/{gpu}/memory.swap_ram_used"),
// core_team_metric.NewCoreTeamMetric(
// "runai_gpu_oomkill_burst_count",
// "GPU Burst OOMKill count",
// coreTeamMetricsDir,
// "metrics/gpu/{gpu}/oom.burst"),
// core_team_metric.NewCoreTeamMetric(
// "runai_gpu_oomkill_idle_count",
// "GPU Idle OOMKill count",
// coreTeamMetricsDir,
// "metrics/gpu/{gpu}/oom.idle"),
// core_team_metric.NewCoreTeamMetric(
// "runai_gpu_oomkill_priority_count",
// "GPU Priority OOMKill count",
// coreTeamMetricsDir,
// "metrics/gpu/{gpu}/oom.priority"),
// core_team_metric.NewCoreTeamMetric(
// "runai_gpu_oomkill_swap_out_of_ram_count",
// "GPU swap out of RAM OOMKill count",
// coreTeamMetricsDir,
// "metrics/gpu/{gpu}/oom.swap_out_of_ram"),

}
54 changes: 54 additions & 0 deletions internal/status-exporter/export/fs/fake/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
package fake

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
// core_team_metric.NewCoreTeamMetric(
// "runai_pod_gpu_utilization",
// "GPU Utilization of Pod",
// coreTeamMetricsDir,
// "pod/{pod_uuid}/metrics/gpu/{gpu}/utilization.sm"),
// core_team_metric.NewCoreTeamMetric(
// "runai_pod_gpu_memory_used_bytes",
// "GPU Memory Usage of Pod in Bytes",
// coreTeamMetricsDir,
// "pod/{pod_uuid}/metrics/gpu/{gpu}/memory.allocated"),
// core_team_metric.NewCoreTeamMetric(
// "runai_pod_gpu_swap_ram_used_bytes",
// "GPU Swap Ram Memory Usage of Pod in Bytes",
// coreTeamMetricsDir,
// "pod/{pod_uuid}/metrics/gpu/{gpu}/memory.swap_ram_used"),
// core_team_metric.NewCoreTeamMetric(
// "runai_gpu_oomkill_burst_count",
// "GPU Burst OOMKill count",
// coreTeamMetricsDir,
// "metrics/gpu/{gpu}/oom.burst"),
// core_team_metric.NewCoreTeamMetric(
// "runai_gpu_oomkill_idle_count",
// "GPU Idle OOMKill count",
// coreTeamMetricsDir,
// "metrics/gpu/{gpu}/oom.idle"),
// core_team_metric.NewCoreTeamMetric(
// "runai_gpu_oomkill_priority_count",
// "GPU Priority OOMKill count",
// coreTeamMetricsDir,
// "metrics/gpu/{gpu}/oom.priority"),
// core_team_metric.NewCoreTeamMetric(
// "runai_gpu_oomkill_swap_out_of_ram_count",
// "GPU swap out of RAM OOMKill count",
// coreTeamMetricsDir,
// "metrics/gpu/{gpu}/oom.swap_out_of_ram"),

runaiPodGpuUtil = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "runai_pod_gpu_utilization",
Help: "GPU Utilization of Pod",
}, []string{"pod_uuid", "gpu"})

runaiPodGpuMemoryUsedBytes = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "runai_pod_gpu_memory_used_bytes",
Help: "GPU Memory Usage of Pod in Bytes",

)
89 changes: 89 additions & 0 deletions internal/status-exporter/export/fs/real/real_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package real

import (
"fmt"
"log"
"os"
"path/filepath"
"strconv"

"github.com/run-ai/fake-gpu-operator/internal/common/constants"
"github.com/run-ai/fake-gpu-operator/internal/common/topology"
)

type RealFsExporter struct {
topologyChan <-chan *topology.NodeTopology
}

func NewRealFsExporter(topologyChan <-chan *topology.NodeTopology) *RealFsExporter {
return &RealFsExporter{
topologyChan: topologyChan,
}
}

func (e *RealFsExporter) Run(stopCh <-chan struct{}) {
for {
select {
case nodeTopology := <-e.topologyChan:
e.export(nodeTopology)
case <-stopCh:
return
}
}
}

func (e *RealFsExporter) export(nodeTopology *topology.NodeTopology) {
exportPods(nodeTopology)
exportEvents()
}

func exportPods(nodeTopology *topology.NodeTopology) {
podProcDir := "/runai/proc/pod"
if err := os.RemoveAll(podProcDir); err != nil {
log.Printf("Failed deleting %s directory: %s", podProcDir, err.Error())
}

for gpuIdx, gpu := range nodeTopology.Gpus {
// Ignoring pods that are not supposed to be seen by runai-container-toolkit
if gpu.Status.AllocatedBy.Namespace != constants.ReservationNs {
continue
}

for podUuid, gpuUsageStatus := range gpu.Status.PodGpuUsageStatus {
log.Printf("Exporting pod %s gpu stats to filesystem", podUuid)

path := fmt.Sprintf("%s/%s/metrics/gpu/%d", podProcDir, podUuid, gpuIdx)
if err := os.MkdirAll(path, 0755); err != nil {
log.Printf("Failed creating directory for pod %s: %s", podUuid, err.Error())
}

if err := writeFile(filepath.Join(path, "utilization.sm"), []byte(strconv.Itoa(gpuUsageStatus.Utilization.Random()))); err != nil {
log.Printf("Failed exporting utilization for pod %s: %s", podUuid, err.Error())
}

if err := writeFile(filepath.Join(path, "memory.allocated"), []byte(strconv.Itoa(mbToBytes(gpuUsageStatus.FbUsed)))); err != nil {
log.Printf("Failed exporting memory for pod %s: %s", podUuid, err.Error())
}
}
}
}

func exportEvents() {
// For now, only creating the directory without exporting any events.
// In the future, we might want to export events to the filesystem as well.
eventsDir := "/runai/proc/events"
if err := os.MkdirAll(eventsDir, 0755); err != nil {
log.Printf("Failed creating directory for events: %s", err.Error())
}
}

func writeFile(path string, content []byte) error {
if err := os.WriteFile(path, content, 0644); err != nil {
return fmt.Errorf("failed writing file %s: %w", path, err)
}
return nil
}

func mbToBytes(mb int) int {
return mb * (1000 * 1000)
}

0 comments on commit b014b75

Please sign in to comment.