Skip to content

Commit

Permalink
Fixes #RHIROS-1401 - Dropping csv records with missing resource usage… (
Browse files Browse the repository at this point in the history
  • Loading branch information
patilsuraj767 authored Nov 15, 2023
1 parent 325da05 commit 85ac7b5
Show file tree
Hide file tree
Showing 5 changed files with 255 additions and 29 deletions.
3 changes: 2 additions & 1 deletion internal/logging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,12 +62,13 @@ func GetLogger() *logrus.Entry {
}

func Set_request_details(data types.KafkaMsg) *logrus.Entry {
return log.WithFields(logrus.Fields{
log = log.WithFields(logrus.Fields{
"request_id": data.Request_id,
"account": data.Metadata.Account,
"org_id": data.Metadata.Org_id,
"source_id": data.Metadata.Source_id,
"cluster_uuid": data.Metadata.Cluster_uuid,
"cluster_alias": data.Metadata.Cluster_alias,
})
return log
}
2 changes: 1 addition & 1 deletion internal/types/workload/workload.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (p WorkloadType) String() string {
case Replicationcontroller:
return "replicationcontroller"
case Statefulset:
return "statefulsets"
return "statefulset"
case Daemonset:
return "daemonset"
}
Expand Down
114 changes: 87 additions & 27 deletions internal/utils/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,40 +6,28 @@ import (
"github.com/go-gota/gota/dataframe"
"github.com/go-gota/gota/series"

"github.com/redhatinsights/ros-ocp-backend/internal/logging"
w "github.com/redhatinsights/ros-ocp-backend/internal/types/workload"
)

func Aggregate_data(df dataframe.DataFrame) dataframe.DataFrame {
df = df.FilterAggregation(
dataframe.And,
dataframe.F{Colname: "owner_kind", Comparator: series.Neq, Comparando: ""},
dataframe.F{Colname: "owner_name", Comparator: series.Neq, Comparando: ""},
dataframe.F{Colname: "workload", Comparator: series.Neq, Comparando: ""},
dataframe.F{Colname: "workload_type", Comparator: series.Neq, Comparando: ""},
)
log = logging.GetLogger()
df = determine_k8s_object_type(df)

columns := df.Names()
index_of_owner_name := findInStringSlice("owner_name", columns)
index_of_owner_kind := findInStringSlice("owner_kind", columns)
index_of_workload := findInStringSlice("workload", columns)
index_of_workload_type := findInStringSlice("workload_type", columns)
// filter out only valid workload type
df = filter_valid_k8s_object_types(df)

s := df.Rapply(func(s series.Series) series.Series {
owner_name := s.Elem(index_of_owner_name).String()
owner_kind := s.Elem(index_of_owner_kind).String()
workload := s.Elem(index_of_workload).String()
workload_type := s.Elem(index_of_workload_type).String()
if strings.ToLower(owner_kind) == string(w.Replicaset) && workload == "<none>" {
return series.Strings([]string{string(w.Replicaset), owner_name})
} else if strings.ToLower(owner_kind) == string(w.Replicationcontroller) && workload == "<none>" {
return series.Strings([]string{string(w.Replicationcontroller), owner_name})
} else {
return series.Strings([]string{workload_type, workload})
}
})
// Validation to check if metrics for cpuUsage, memoryUsage and memoryRSS are missing
df, no_of_dropped_records := filter_valid_csv_records(df)
if no_of_dropped_records != 0 {
invalidDataPoints.Add(float64(no_of_dropped_records))
log.Infof("Invalid records in CSV - %v", no_of_dropped_records)
}

if df.Nrow() == 0 {
return df
}

df = df.Mutate(s.Col("X0")).Rename("k8s_object_type", "X0")
df = df.Mutate(s.Col("X1")).Rename("k8s_object_name", "X1")
dfGroups := df.GroupBy(
"namespace",
"k8s_object_type",
Expand Down Expand Up @@ -87,3 +75,75 @@ func Aggregate_data(df dataframe.DataFrame) dataframe.DataFrame {
df = dfGroups.Aggregation(columnsAggregationType, columnsToAggregate)
return df
}

func filter_valid_csv_records(main_df dataframe.DataFrame) (dataframe.DataFrame, int) {
df := main_df.FilterAggregation(
dataframe.And,
dataframe.F{Colname: "memory_rss_usage_container_sum", Comparator: series.GreaterEq, Comparando: 0},
dataframe.F{Colname: "memory_rss_usage_container_max", Comparator: series.GreaterEq, Comparando: 0},
dataframe.F{Colname: "memory_rss_usage_container_min", Comparator: series.GreaterEq, Comparando: 0},
dataframe.F{Colname: "memory_rss_usage_container_avg", Comparator: series.GreaterEq, Comparando: 0},
dataframe.F{Colname: "memory_usage_container_sum", Comparator: series.GreaterEq, Comparando: 0},
dataframe.F{Colname: "memory_usage_container_max", Comparator: series.GreaterEq, Comparando: 0},
dataframe.F{Colname: "memory_usage_container_min", Comparator: series.GreaterEq, Comparando: 0},
dataframe.F{Colname: "memory_usage_container_avg", Comparator: series.GreaterEq, Comparando: 0},
dataframe.F{Colname: "cpu_usage_container_sum", Comparator: series.GreaterEq, Comparando: 0},
dataframe.F{Colname: "cpu_usage_container_max", Comparator: series.GreaterEq, Comparando: 0},
dataframe.F{Colname: "cpu_usage_container_min", Comparator: series.GreaterEq, Comparando: 0},
dataframe.F{Colname: "cpu_usage_container_avg", Comparator: series.GreaterEq, Comparando: 0},
)

no_of_dropped_records := main_df.Nrow() - df.Nrow()

return df, no_of_dropped_records
}

func filter_valid_k8s_object_types(df dataframe.DataFrame) dataframe.DataFrame {
return df.Filter(
dataframe.F{
Colname: "k8s_object_type",
Comparator: series.In,
Comparando: []string{
w.Daemonset.String(),
w.Deployment.String(),
w.Deploymentconfig.String(),
w.Replicaset.String(),
w.Replicationcontroller.String(),
w.Statefulset.String(),
}},
)
}

func determine_k8s_object_type(df dataframe.DataFrame) dataframe.DataFrame {
df = df.FilterAggregation(
dataframe.And,
dataframe.F{Colname: "owner_kind", Comparator: series.Neq, Comparando: ""},
dataframe.F{Colname: "owner_name", Comparator: series.Neq, Comparando: ""},
dataframe.F{Colname: "workload", Comparator: series.Neq, Comparando: ""},
dataframe.F{Colname: "workload_type", Comparator: series.Neq, Comparando: ""},
)

columns := df.Names()
index_of_owner_name := findInStringSlice("owner_name", columns)
index_of_owner_kind := findInStringSlice("owner_kind", columns)
index_of_workload := findInStringSlice("workload", columns)
index_of_workload_type := findInStringSlice("workload_type", columns)

s := df.Rapply(func(s series.Series) series.Series {
owner_name := s.Elem(index_of_owner_name).String()
owner_kind := s.Elem(index_of_owner_kind).String()
workload := s.Elem(index_of_workload).String()
workload_type := s.Elem(index_of_workload_type).String()
if strings.ToLower(owner_kind) == string(w.Replicaset) && workload == "<none>" {
return series.Strings([]string{string(w.Replicaset), owner_name})
} else if strings.ToLower(owner_kind) == string(w.Replicationcontroller) && workload == "<none>" {
return series.Strings([]string{string(w.Replicationcontroller), owner_name})
} else {
return series.Strings([]string{workload_type, workload})
}
})

df = df.Mutate(s.Col("X0")).Rename("k8s_object_type", "X0")
df = df.Mutate(s.Col("X1")).Rename("k8s_object_name", "X1")
return df
}
152 changes: 152 additions & 0 deletions internal/utils/aggregator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
package utils

import (
"fmt"
"testing"

"github.com/go-gota/gota/dataframe"
)

type UsageData struct {
Report_period_start string `dataframe:"report_period_start,string"`
Report_period_end string `dataframe:"report_period_end,string"`
Interval_start string `dataframe:"interval_start,string"`
Interval_end string `dataframe:"interval_end,string"`
Container_name string `dataframe:"container_name,string"`
Pod string `dataframe:"pod,string"`
Owner_name string `dataframe:"owner_name,string"`
Owner_kind string `dataframe:"owner_kind,string"`
Workload string `dataframe:"workload,string"`
Workload_type string `dataframe:"workload_type,string"`
Namespace string `dataframe:"namespace,string"`
Image_name string `dataframe:"image_name,string"`
Node string `dataframe:"node,string"`
Resource_id string `dataframe:"resource_id,string"`
Cpu_request_container_avg string `dataframe:"cpu_request_container_avg,float"`
Cpu_request_container_sum string `dataframe:"cpu_request_container_sum,float"`
Cpu_limit_container_avg string `dataframe:"cpu_limit_container_avg,float"`
Cpu_limit_container_sum string `dataframe:"cpu_limit_container_sum,float"`
Cpu_usage_container_avg string `dataframe:"cpu_usage_container_avg,float"`
Cpu_usage_container_min string `dataframe:"cpu_usage_container_min,float"`
Cpu_usage_container_max string `dataframe:"cpu_usage_container_max,float"`
Cpu_usage_container_sum string `dataframe:"cpu_usage_container_sum,float"`
Cpu_throttle_container_avg string `dataframe:"cpu_throttle_container_avg,float"`
Cpu_throttle_container_max string `dataframe:"cpu_throttle_container_max,float"`
Cpu_throttle_container_sum string `dataframe:"cpu_throttle_container_sum,float"`
Memory_request_container_avg string `dataframe:"memory_request_container_avg,float"`
Memory_request_container_sum string `dataframe:"memory_request_container_sum,float"`
Memory_limit_container_avg string `dataframe:"memory_limit_container_avg,float"`
Memory_limit_container_sum string `dataframe:"memory_limit_container_sum,float"`
Memory_usage_container_avg string `dataframe:"memory_usage_container_avg,float"`
Memory_usage_container_min string `dataframe:"memory_usage_container_min,float"`
Memory_usage_container_max string `dataframe:"memory_usage_container_max,float"`
Memory_usage_container_sum string `dataframe:"memory_usage_container_sum,float"`
Memory_rss_usage_container_avg string `dataframe:"memory_rss_usage_container_avg,float"`
Memory_rss_usage_container_min string `dataframe:"memory_rss_usage_container_min,float"`
Memory_rss_usage_container_max string `dataframe:"memory_rss_usage_container_max,float"`
Memory_rss_usage_container_sum string `dataframe:"memory_rss_usage_container_sum,float"`
}

func Test_filter_valid_k8s_object_types(t *testing.T) {
// Check valid k8s object type
usage_data := []UsageData{
// k8s object type DaemonSet
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "DaemonSet", "testdeploymentconfig", "daemonset", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1",
},
// k8s object type Replicaset
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "ReplicaSet", "<none>", "deployment", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1",
},
// k8s object type Deployment
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "ReplicaSet", "testdeployment", "deployment", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1",
},
// k8s object type ReplicationController
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "ReplicationController", "<none>", "deploymentconfig", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1",
},
// k8s object type Deploymentconfig
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "ReplicationController", "testdeploymentconfig", "deploymentconfig", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1",
},
// k8s object type StatefulSet
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "StatefulSet", "testdeploymentconfig", "statefulset", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1",
},
}
df := dataframe.LoadStructs(usage_data)
df = determine_k8s_object_type(df)
result := filter_valid_k8s_object_types(df)
fmt.Println(result.Nrow())
if result.Nrow() != 6 {
t.Error("Data not filtered properly. Some of the valid k8s object type got dropped")
}

// check if Invalid k8s object type is dropped
usage_data = []UsageData{
// k8s object type Job
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "Job", "testdeploymentconfig", "job", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1",
},
}
df = dataframe.LoadStructs(usage_data)
df = determine_k8s_object_type(df)
result = filter_valid_k8s_object_types(df)
if result.Nrow() != 0 {
t.Error("Invalid k8s object type did not get dropped")
}
}

func Test_filter_valid_csv_records(t *testing.T) {
usage_data := []UsageData{
// k8s object with missing data
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "ReplicaSet", "testdeployment", "deployment", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "1", "",
},
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "ReplicaSet", "testdeployment", "deployment", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "", "", "", "", "1", "1", "1", "1", "1", "1", "1", "", "", "", "", "", "", "", "",
},
// k8s object with 0 CPU, Memory and RSS usage
{
"2023-02-01 00:00:00 +0000 UTC", "2023-03-01 00:00:00 +0000 UTC", "2023-06-02 00:00:01 +0000 UTC", "2023-06-02 00:15:00 +0000 UTC",
"Yuptoo-service", "Yuptoo-app-standalone-1", "Yuptoo-app", "ReplicaSet", "testdeployment", "deployment", "Yuptoo-prod",
"quay.io/cloudservices/yuptoo", "ip-10-0-176-227.us-east-2.compute.internal", "i-0dfbb3fa4d0e8fc94",
"1", "1", "1", "1", "0", "0", "0", "0", "1", "1", "1", "1", "1", "1", "1", "0", "0", "0", "0", "0", "0", "0", "0",
},
}
df := dataframe.LoadStructs(usage_data)
df = determine_k8s_object_type(df)
result, no_of_dropped_records := filter_valid_csv_records(df)
if result.Nrow() != 1 || no_of_dropped_records != 2 {
t.Error("Invalid k8s object type did not get dropped")
}

}
13 changes: 13 additions & 0 deletions internal/utils/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package utils

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

var (
invalidDataPoints = promauto.NewCounter(prometheus.CounterOpts{
Name: "rosocp_invalid_datapoints_total",
Help: "The total number of invalid datapoints(rows) found in received CSVs",
})
)

0 comments on commit 85ac7b5

Please sign in to comment.