Skip to content
This repository has been archived by the owner on Apr 20, 2023. It is now read-only.

Commit

Permalink
send metadata in headers (#52)
Browse files Browse the repository at this point in the history
* metadata headers

* push changes

* metadata

* running with 0.0.2
  • Loading branch information
AdheipSingh authored Sep 12, 2022
1 parent aa9f4db commit 48c2815
Show file tree
Hide file tree
Showing 6 changed files with 76 additions and 58 deletions.
9 changes: 6 additions & 3 deletions cmd/kubecollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func kubeCollector(url, user, pwd string, stream *LogStream) {
}
for _, po := range podsList {
for _, p := range po.Items {
logs, err := collector.GetPodLogs(p, url, user, pwd, stream.Name)
logs, meta, err := collector.GetPodLogs(p, url, user, pwd, stream.Name)
if err != nil {
log.Error(err)
return
Expand All @@ -71,14 +71,17 @@ func kubeCollector(url, user, pwd string, stream *LogStream) {
} else {
log.Infof("Successfully collected log from [%s] in [%s] namespace", p.GetName(), p.Namespace)
}

jLogs, err := json.Marshal(logs)
if err != nil {
return
}
if err := parseable.PostLogs(url, user, pwd, stream.Name, jLogs, stream.Labels); err != nil {
if err := parseable.PostLogs(url, user, pwd, stream.Name, jLogs, stream.Labels, meta); err != nil {
log.Error(err)
} else {
log.Infof("Successfully sent log from [%s] in [%s] namespace to server [%s]", p.GetName(), p.GetNamespace(), url)
}
log.Infof("Successfully sent log from [%s] in [%s] namespace to server [%s]", p.GetName(), p.GetNamespace(), url)

}
}
}
Expand Down
10 changes: 10 additions & 0 deletions example.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
logStreams:
- name: a11
collectInterval: 10s
collectFrom:
namespace: go-apps
podSelector:
app: go-app
labels:
language: golang
app: ingress
14 changes: 7 additions & 7 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,17 +46,17 @@ func main() {
var wg sync.WaitGroup

for _, stream := range config.LogStreams {
wg.Add(2)
wg.Add(1)
go func(stream cmd.LogStream) {
defer wg.Done()
cmd.RunKubeCollector(config.Server, config.Username, config.Password, &stream)
}(stream)
for k, v := range stream.CollectFrom.PodSelector {
go func(namespace, selector string) {
defer wg.Done()
cmd.ExecCleanStore(namespace, selector)
}(stream.CollectFrom.Namespace, k+"="+v)
}
// for k, v := range stream.CollectFrom.PodSelector {
// go func(namespace, selector string) {
// defer wg.Done()
// cmd.ExecCleanStore(namespace, selector)
// }(stream.CollectFrom.Namespace, k+"="+v)
// }

}
wg.Wait()
Expand Down
57 changes: 24 additions & 33 deletions pkg/collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,11 @@ import (

// logMessage represents a single log message entry
type logMessage struct {
Timestamp string `json:"time"`
Log string `json:"log"`
LogMeta logMetadata `json:"meta"`
Timestamp string `json:"time"`
Log string `json:"log"`
}

type logMetadata struct {
Host string `json:"host"`
Source string `json:"source"`
ContainerName string `json:"containername"`
ContainerImage string `json:"containerimage"`
PodName string `json:"podname"`
Namespace string `json:"namespace"`
PodLabels string `json:"podlabels"`
}

func GetPodLogs(pod corev1.Pod, url, user, pwd, streamName string) ([]logMessage, error) {
func GetPodLogs(pod corev1.Pod, url, user, pwd, streamName string) ([]logMessage, map[string]string, error) {

for _, container := range pod.Spec.Containers {
podContainerName := pod.GetName() + "/" + container.Name
Expand All @@ -55,14 +44,15 @@ func GetPodLogs(pod corev1.Pod, url, user, pwd, streamName string) ([]logMessage
// time stamp. This ensure we can uniquely fetch a container's log
lastLogTime, ok := store.LastTimestamp(podContainerName)
if lastLogTime == (time.Time{}) || !ok {
mtq, err := parseable.LastLogTime(url, user, pwd, streamName, pod.Name, container.Name)
if err != nil {
return nil, err
}
if len(mtq) > 1 {
mtq, _ := parseable.LastLogTime(url, user, pwd, streamName, pod.Name, container.Name)
// if err != nil {
// //return nil, nil, err
// }
if len(mtq) > 3 {
time, err := time.Parse(time.RFC3339, mtq[0].MAXSystemsTime)
if err != nil {
return nil, err

return nil, nil, err
}
store.SetLastTimestamp(podContainerName, time)
lastLogTime = time
Expand All @@ -76,37 +66,38 @@ func GetPodLogs(pod corev1.Pod, url, user, pwd, streamName string) ([]logMessage

podLogs, err := client.KubeClient.GetPodLogs(pod, podLogOpts)
if err != nil {
return nil, err
return nil, nil, err
}
if len(podLogs) > 1 {
// last line of the log
if err := putTimeStamp(podContainerName, podLogs); err != nil {
return nil, err
return nil, nil, err
}
var logMessages []logMessage
var LogMeta map[string]string
for _, lm := range podLogs {
newLogMessage := strings.Fields(lm)
if len(newLogMessage) > 1 {
log := logMessage{
Timestamp: newLogMessage[0],
Log: strings.Join(newLogMessage[1:], " "),
LogMeta: logMetadata{
Namespace: pod.GetNamespace(),
Host: pod.Status.HostIP,
Source: pod.Status.PodIP,
ContainerName: container.Name,
ContainerImage: container.Image,
PodName: pod.GetName(),
PodLabels: map2string(pod.GetLabels()),
},
}
logMessages = append(logMessages, log)
LogMeta = map[string]string{
"Namespace": pod.GetNamespace(),
"Host": pod.Status.HostIP,
"Source": pod.Status.PodIP,
"ContainerName": container.Name,
"ContainerImage": container.Image,
"PodName": pod.GetName(),
"PodLabels": map2string(pod.GetLabels()),
}
}
}
return logMessages, nil
return logMessages, LogMeta, nil
}
}
return []logMessage{}, nil
return []logMessage{}, nil, nil
}

func putTimeStamp(podName string, podLogs []string) error {
Expand Down
33 changes: 23 additions & 10 deletions pkg/parseable/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,17 @@ package parseable

import (
"bytes"
"fmt"
"net/http"
"net/http/httputil"
"runtime"
"strings"
)

const METADATA_LABEL = "X-P-META-"
const (
METADATA_LABEL = "X-P-META-"
TAG_LABEL = "X-P-TAGS-"
)

type HttpParseable interface {
Do() (*http.Response, error)
Expand All @@ -31,14 +36,15 @@ type HttpParseable interface {
// httpRequest holds all the fields needed for a HTTP request
// to parseable server.
type httpRequest struct {
method string
url string
labels map[string]string
body []byte
method string
url string
tags map[string]string
metaLabels map[string]string
body []byte
}

func newRequest(method, url string, labels map[string]string, body []byte) *httpRequest {
return &httpRequest{method: method, url: url, labels: labels, body: body}
func newRequest(method, url string, tags, metaLabels map[string]string, body []byte) *httpRequest {
return &httpRequest{method: method, url: url, tags: tags, metaLabels: metaLabels, body: body}
}

func (h *httpRequest) Do(user, pwd string) (*http.Response, error) {
Expand All @@ -52,12 +58,19 @@ func (h *httpRequest) Do(user, pwd string) (*http.Response, error) {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("User-Agent", getUserAgent())

if h.labels != nil {
for key, value := range h.labels {
req.Header.Add(METADATA_LABEL+key, value)
if h.tags != nil {
for key, value := range h.tags {
req.Header.Add(TAG_LABEL+key, value)
}
}

if h.metaLabels != nil {
for key, value := range h.metaLabels {
req.Header.Add(METADATA_LABEL+key, value)
}
}
r, _ := httputil.DumpRequest(req, true)
fmt.Printf("%s", string(r))
resp, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
Expand Down
11 changes: 6 additions & 5 deletions pkg/parseable/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ func queryURL(url string) string {
}

func CreateStream(url, user, pwd, streamName string) error {
req := newRequest("PUT", streamURL(url, streamName), nil, nil)
req := newRequest("PUT", streamURL(url, streamName), nil, nil, nil)
if resp, err := req.Do(user, pwd); err != nil {
return err
} else if resp.StatusCode == 400 {
Expand All @@ -48,8 +48,8 @@ func CreateStream(url, user, pwd, streamName string) error {
return nil
}

func PostLogs(url, user, pwd, streamName string, logs []byte, labels map[string]string) error {
req := newRequest("POST", streamURL(url, streamName), labels, logs)
func PostLogs(url, user, pwd, streamName string, logs []byte, tags, metaLabels map[string]string) error {
req := newRequest("POST", streamURL(url, streamName), tags, metaLabels, logs)
if resp, err := req.Do(user, pwd); err != nil {
return err
} else if resp.StatusCode != 200 {
Expand All @@ -61,7 +61,7 @@ func PostLogs(url, user, pwd, streamName string, logs []byte, labels map[string]
func LastLogTime(url, user, pwd, streamName, podName, containerName string) (MaxTimeQuery, error) {

query := map[string]string{
"query": fmt.Sprintf("select max(time) from %s where meta_podname = '%s' and meta_containername = '%s'", streamName, podName, containerName),
"query": fmt.Sprintf("select max(time) from %s where p_metadata like '%s=%s' and p_metadata like '%s=%s'", streamName, "%podname", podName+"%", "%containerimage", containerName+"%"),
"startTime": time.Now().UTC().Add(time.Duration(-10) * time.Minute).Format(time.RFC3339),
"endTime": time.Now().UTC().Format(time.RFC3339),
}
Expand All @@ -71,7 +71,7 @@ func LastLogTime(url, user, pwd, streamName, podName, containerName string) (Max
return nil, err
}

req := newRequest("POST", queryURL(url), nil, queryJson)
req := newRequest("POST", queryURL(url), nil, nil, queryJson)
resp, err := req.Do(user, pwd)
if err != nil {
return nil, err
Expand All @@ -96,6 +96,7 @@ func LastLogTime(url, user, pwd, streamName, podName, containerName string) (Max

var mtq MaxTimeQuery

fmt.Println(string(respData))
if len(respData) > 0 {
err = json.Unmarshal(respData, &mtq)
if err != nil {
Expand Down

0 comments on commit 48c2815

Please sign in to comment.