Skip to content

Commit

Permalink
Prometheus [Input] plugin - Optimizing for bigger kubernetes clusters…
Browse files Browse the repository at this point in the history
… (500+ pods) when scraping thru 'monitor_kubernetes_pods' (influxdata#8762)
  • Loading branch information
gracewehner authored Mar 8, 2021
1 parent aabec05 commit d7df2c5
Show file tree
Hide file tree
Showing 6 changed files with 396 additions and 11 deletions.
2 changes: 2 additions & 0 deletions docs/LICENSE_OF_DEPENDENCIES.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ following works:
- gopkg.in/tomb.v2 [BSD 3-Clause Clear License](https://github.com/go-tomb/tomb/blob/v2/LICENSE)
- gopkg.in/yaml.v2 [Apache License 2.0](https://github.com/go-yaml/yaml/blob/v2.2.2/LICENSE)
- gopkg.in/yaml.v3 [Apache License 2.0](https://github.com/go-yaml/yaml/blob/v3/LICENSE)
- k8s.io/apimachinery [Apache License 2.0](https://github.com/kubernetes/apimachinery/blob/master/LICENSE)
- k8s.io/klog [Apache License 2.0](https://github.com/kubernetes/klog/blob/master/LICENSE)
- modernc.org/libc [BSD 3-Clause "New" or "Revised" License](https://gitlab.com/cznic/libc/-/blob/master/LICENSE)
- modernc.org/memory [BSD 3-Clause "New" or "Revised" License](https://gitlab.com/cznic/memory/-/blob/master/LICENSE)
- modernc.org/sqlite [BSD 3-Clause "New" or "Revised" License](https://gitlab.com/cznic/sqlite/-/blob/master/LICENSE)
Expand Down
21 changes: 21 additions & 0 deletions plugins/inputs/prometheus/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,16 @@ in Prometheus format.
## - prometheus.io/path: If the metrics path is not /metrics, define it with this annotation.
## - prometheus.io/port: If port is not 9102 use this annotation
# monitor_kubernetes_pods = true
## Get the list of pods to scrape with either the scope of
## - cluster: the kubernetes watch api (default), no need to specify
## - node: the local cadvisor api; for scalability. Note that the config node_ip or the environment variable NODE_IP must be set to the host IP.
# pod_scrape_scope = "cluster"
## Only for node scrape scope: node IP of the node that telegraf is running on.
## Either this config or the environment variable NODE_IP must be set.
# node_ip = "10.180.1.1"
## Only for node scrape scope: interval in seconds for how often to get updated pod list for scraping
## Default is 60 seconds.
# pod_scrape_interval = 60
## Restricts Kubernetes monitoring to a single namespace
## ex: monitor_kubernetes_pods_namespace = "default"
# monitor_kubernetes_pods_namespace = ""
Expand Down Expand Up @@ -88,6 +98,17 @@ Currently the following annotation are supported:

Using the `monitor_kubernetes_pods_namespace` option allows you to limit which pods you are scraping.

Using `pod_scrape_scope = "node"` allows more scalable scraping for pods which will scrape pods only in the node that telegraf is running. It will fetch the pod list locally from the node's kubelet. This will require running Telegraf in every node of the cluster. Note that either `node_ip` must be specified in the config or the environment variable `NODE_IP` must be set to the host IP. ThisThe latter can be done in the yaml of the pod running telegraf:
```
env:
- name: NODE_IP
valueFrom:
fieldRef:
fieldPath: status.hostIP
```

If using node level scrape scope, `pod_scrape_interval` specifies how often (in seconds) the pod list for scraping should updated. If not specified, the default is 60 seconds.

#### Bearer Token

If set, the file specified by the `bearer_token` parameter will be read on
Expand Down
181 changes: 176 additions & 5 deletions plugins/inputs/prometheus/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package prometheus

import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"log"
"net"
"net/http"
"net/url"
"os/user"
"path/filepath"
Expand All @@ -15,13 +17,29 @@ import (
"github.com/ericchiang/k8s"
corev1 "github.com/ericchiang/k8s/apis/core/v1"
"github.com/ghodss/yaml"
"github.com/kubernetes/apimachinery/pkg/fields"
"github.com/kubernetes/apimachinery/pkg/labels"
)

type payload struct {
eventype string
pod *corev1.Pod
}

type podMetadata struct {
ResourceVersion string `json:"resourceVersion"`
SelfLink string `json:"selfLink"`
}

type podResponse struct {
Kind string `json:"kind"`
ApiVersion string `json:"apiVersion"`
Metadata podMetadata `json:"metadata"`
Items []*corev1.Pod `json:"items,string,omitempty"`
}

const cAdvisorPodListDefaultInterval = 60

// loadClient parses a kubeconfig from a file and returns a Kubernetes
// client. It does not support extensions or client auth providers.
func loadClient(kubeconfigPath string) (*k8s.Client, error) {
Expand Down Expand Up @@ -66,9 +84,16 @@ func (p *Prometheus) start(ctx context.Context) error {
case <-ctx.Done():
return
case <-time.After(time.Second):
err := p.watch(ctx, client)
if err != nil {
p.Log.Errorf("Unable to watch resources: %s", err.Error())
if p.isNodeScrapeScope {
err = p.cAdvisor(ctx, client)
if err != nil {
p.Log.Errorf("Unable to monitor pods with node scrape scope: %s", err.Error())
}
} else {
err = p.watch(ctx, client)
if err != nil {
p.Log.Errorf("Unable to watch resources: %s", err.Error())
}
}
}
}
Expand Down Expand Up @@ -126,6 +151,147 @@ func (p *Prometheus) watch(ctx context.Context, client *k8s.Client) error {
}
}

func (p *Prometheus) cAdvisor(ctx context.Context, client *k8s.Client) error {
// Set InsecureSkipVerify for cAdvisor client since Node IP will not be a SAN for the CA cert
tlsConfig := client.Client.Transport.(*http.Transport).TLSClientConfig
tlsConfig.InsecureSkipVerify = true

// The request will be the same each time
podsUrl := fmt.Sprintf("https://%s:10250/pods", p.NodeIP)
req, err := http.NewRequest("GET", podsUrl, nil)
if err != nil {
return fmt.Errorf("Error when creating request to %s to get pod list: %w", podsUrl, err)
}
client.SetHeaders(req.Header)

// Update right away so code is not waiting the length of the specified scrape interval initially
err = updateCadvisorPodList(ctx, p, client, req)
if err != nil {
return fmt.Errorf("Error initially updating pod list: %w", err)
}

scrapeInterval := cAdvisorPodListDefaultInterval
if p.PodScrapeInterval != 0 {
scrapeInterval = p.PodScrapeInterval
}

for {
select {
case <-ctx.Done():
return nil
case <-time.After(time.Duration(scrapeInterval) * time.Second):
err := updateCadvisorPodList(ctx, p, client, req)
if err != nil {
return fmt.Errorf("Error updating pod list: %w", err)
}
}
}
}

func updateCadvisorPodList(ctx context.Context, p *Prometheus, client *k8s.Client, req *http.Request) error {

resp, err := client.Client.Do(req)
if err != nil {
return fmt.Errorf("Error when making request for pod list: %w", err)
}

// If err is nil, still check response code
if resp.StatusCode != 200 {
return fmt.Errorf("Error when making request for pod list with status %s", resp.Status)
}

defer resp.Body.Close()

cadvisorPodsResponse := podResponse{}

// Will have expected type errors for some parts of corev1.Pod struct for some unused fields
// Instead have nil checks for every used field in case of incorrect decoding
json.NewDecoder(resp.Body).Decode(&cadvisorPodsResponse)
pods := cadvisorPodsResponse.Items

// Updating pod list to be latest cadvisor response
p.lock.Lock()
p.kubernetesPods = make(map[string]URLAndAddress)

// Register pod only if it has an annotation to scrape, if it is ready,
// and if namespace and selectors are specified and match
for _, pod := range pods {
if necessaryPodFieldsArePresent(pod) &&
pod.GetMetadata().GetAnnotations()["prometheus.io/scrape"] == "true" &&
podReady(pod.GetStatus().GetContainerStatuses()) &&
podHasMatchingNamespace(pod, p) &&
podHasMatchingLabelSelector(pod, p.podLabelSelector) &&
podHasMatchingFieldSelector(pod, p.podFieldSelector) {
registerPod(pod, p)
}

}
p.lock.Unlock()

// No errors
return nil
}

func necessaryPodFieldsArePresent(pod *corev1.Pod) bool {
return pod.GetMetadata() != nil &&
pod.GetMetadata().GetAnnotations() != nil &&
pod.GetMetadata().GetLabels() != nil &&
pod.GetSpec() != nil &&
pod.GetStatus() != nil &&
pod.GetStatus().GetContainerStatuses() != nil
}

/* See the docs on kubernetes label selectors:
* https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors
*/
func podHasMatchingLabelSelector(pod *corev1.Pod, labelSelector labels.Selector) bool {
if labelSelector == nil {
return true
}

var labelsSet labels.Set = pod.GetMetadata().GetLabels()
return labelSelector.Matches(labelsSet)
}

/* See ToSelectableFields() for list of fields that are selectable:
* https://github.com/kubernetes/kubernetes/release-1.20/pkg/registry/core/pod/strategy.go
* See docs on kubernetes field selectors:
* https://kubernetes.io/docs/concepts/overview/working-with-objects/field-selectors/
*/
func podHasMatchingFieldSelector(pod *corev1.Pod, fieldSelector fields.Selector) bool {
if fieldSelector == nil {
return true
}

podSpec := pod.GetSpec()
podStatus := pod.GetStatus()

// Spec and Status shouldn't be nil.
// Error handling just in case something goes wrong but won't crash telegraf
if podSpec == nil || podStatus == nil {
return false
}

fieldsSet := make(fields.Set)
fieldsSet["spec.nodeName"] = podSpec.GetNodeName()
fieldsSet["spec.restartPolicy"] = podSpec.GetRestartPolicy()
fieldsSet["spec.schedulerName"] = podSpec.GetSchedulerName()
fieldsSet["spec.serviceAccountName"] = podSpec.GetServiceAccountName()
fieldsSet["status.phase"] = podStatus.GetPhase()
fieldsSet["status.podIP"] = podStatus.GetPodIP()
fieldsSet["status.nominatedNodeName"] = podStatus.GetNominatedNodeName()

return fieldSelector.Matches(fieldsSet)
}

/*
* If a namespace is specified and the pod doesn't have that namespace, return false
* Else return true
*/
func podHasMatchingNamespace(pod *corev1.Pod, p *Prometheus) bool {
return !(p.PodNamespace != "" && pod.GetMetadata().GetNamespace() != p.PodNamespace)
}

func podReady(statuss []*corev1.ContainerStatus) bool {
if len(statuss) == 0 {
return false
Expand Down Expand Up @@ -180,14 +346,19 @@ func registerPod(pod *corev1.Pod, p *Prometheus) {
return
}
podURL := p.AddressToURL(URL, URL.Hostname())
p.lock.Lock()

// Locks earlier if using cAdvisor calls - makes a new list each time
// rather than updating and removing from the same list
if !p.isNodeScrapeScope {
p.lock.Lock()
defer p.lock.Unlock()
}
p.kubernetesPods[podURL.String()] = URLAndAddress{
URL: podURL,
Address: URL.Hostname(),
OriginalURL: URL,
Tags: tags,
}
p.lock.Unlock()
}

func getScrapeURL(pod *corev1.Pod) *string {
Expand Down
59 changes: 59 additions & 0 deletions plugins/inputs/prometheus/kubernetes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ import (

v1 "github.com/ericchiang/k8s/apis/core/v1"
metav1 "github.com/ericchiang/k8s/apis/meta/v1"

"github.com/kubernetes/apimachinery/pkg/fields"
"github.com/kubernetes/apimachinery/pkg/labels"
)

func TestScrapeURLNoAnnotations(t *testing.T) {
Expand Down Expand Up @@ -142,6 +145,62 @@ func TestPodSelector(t *testing.T) {
}
}

func TestPodHasMatchingNamespace(t *testing.T) {
prom := &Prometheus{Log: testutil.Logger{}, PodNamespace: "default"}

pod := pod()
pod.Metadata.Name = str("Pod1")
pod.Metadata.Namespace = str("default")
shouldMatch := podHasMatchingNamespace(pod, prom)
assert.Equal(t, true, shouldMatch)

pod.Metadata.Name = str("Pod2")
pod.Metadata.Namespace = str("namespace")
shouldNotMatch := podHasMatchingNamespace(pod, prom)
assert.Equal(t, false, shouldNotMatch)
}

func TestPodHasMatchingLabelSelector(t *testing.T) {
labelSelectorString := "label0==label0,label1=label1,label2!=label,label3 in (label1,label2, label3),label4 notin (label1, label2,label3),label5,!label6"
prom := &Prometheus{Log: testutil.Logger{}, KubernetesLabelSelector: labelSelectorString}

pod := pod()
pod.Metadata.Labels = make(map[string]string)
pod.Metadata.Labels["label0"] = "label0"
pod.Metadata.Labels["label1"] = "label1"
pod.Metadata.Labels["label2"] = "label2"
pod.Metadata.Labels["label3"] = "label3"
pod.Metadata.Labels["label4"] = "label4"
pod.Metadata.Labels["label5"] = "label5"

labelSelector, err := labels.Parse(prom.KubernetesLabelSelector)
assert.Equal(t, err, nil)
assert.Equal(t, true, podHasMatchingLabelSelector(pod, labelSelector))
}

func TestPodHasMatchingFieldSelector(t *testing.T) {
fieldSelectorString := "status.podIP=127.0.0.1,spec.restartPolicy=Always,spec.NodeName!=nodeName"
prom := &Prometheus{Log: testutil.Logger{}, KubernetesFieldSelector: fieldSelectorString}
pod := pod()
pod.Spec.RestartPolicy = str("Always")
pod.Spec.NodeName = str("node1000")

fieldSelector, err := fields.ParseSelector(prom.KubernetesFieldSelector)
assert.Equal(t, err, nil)
assert.Equal(t, true, podHasMatchingFieldSelector(pod, fieldSelector))
}

func TestInvalidFieldSelector(t *testing.T) {
fieldSelectorString := "status.podIP=127.0.0.1,spec.restartPolicy=Always,spec.NodeName!=nodeName,spec.nodeName"
prom := &Prometheus{Log: testutil.Logger{}, KubernetesFieldSelector: fieldSelectorString}
pod := pod()
pod.Spec.RestartPolicy = str("Always")
pod.Spec.NodeName = str("node1000")

_, err := fields.ParseSelector(prom.KubernetesFieldSelector)
assert.NotEqual(t, err, nil)
}

func pod() *v1.Pod {
p := &v1.Pod{Metadata: &metav1.ObjectMeta{}, Status: &v1.PodStatus{}, Spec: &v1.PodSpec{}}
p.Status.PodIP = str("127.0.0.1")
Expand Down
Loading

0 comments on commit d7df2c5

Please sign in to comment.