forked from kata-containers/kata-containers
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathshim_management.go
197 lines (164 loc) · 5.26 KB
/
shim_management.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
// Copyright (c) 2020 Ant Financial
//
// SPDX-License-Identifier: Apache-2.0
//
package containerdshim
import (
"context"
"expvar"
"fmt"
"io"
"net/http"
"net/http/pprof"
"path/filepath"
"strconv"
"strings"
"github.com/containerd/containerd/namespaces"
cdshim "github.com/containerd/containerd/runtime/v2/shim"
vc "github.com/kata-containers/kata-containers/src/runtime/virtcontainers"
vcAnnotations "github.com/kata-containers/kata-containers/src/runtime/virtcontainers/pkg/annotations"
"github.com/opencontainers/runtime-spec/specs-go"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
"google.golang.org/grpc/codes"
mutils "github.com/kata-containers/kata-containers/src/runtime/pkg/utils"
)
var (
ifSupportAgentMetricsAPI = true
shimMgtLog = shimLog.WithField("subsystem", "shim-management")
)
// agentURL returns URL for agent
func (s *service) agentURL(w http.ResponseWriter, r *http.Request) {
url, err := s.sandbox.GetAgentURL()
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
return
}
fmt.Fprint(w, url)
}
// serveMetrics handle /metrics requests
func (s *service) serveMetrics(w http.ResponseWriter, r *http.Request) {
// update metrics from sandbox
s.sandbox.UpdateRuntimeMetrics()
// update metrics for shim process
updateShimMetrics()
// metrics gathered by shim
mfs, err := prometheus.DefaultGatherer.Gather()
if err != nil {
return
}
// encode the metrics
encoder := expfmt.NewEncoder(w, expfmt.FmtText)
for _, mf := range mfs {
if err := encoder.Encode(mf); err != nil {
}
}
// if using an old agent, only collect shim/sandbox metrics.
if !ifSupportAgentMetricsAPI {
return
}
// get metrics from agent
agentMetrics, err := s.sandbox.GetAgentMetrics()
if err != nil {
shimMgtLog.WithError(err).Error("failed GetAgentMetrics")
if isGRPCErrorCode(codes.NotFound, err) {
shimMgtLog.Warn("metrics API not supportted by this agent.")
ifSupportAgentMetricsAPI = false
return
}
}
// decode and parse metrics from agent
list := decodeAgentMetrics(agentMetrics)
// encode the metrics to output
for _, mf := range list {
encoder.Encode(mf)
}
// collect pod overhead metrics need sleep to get the changes of cpu/memory resources usage
// so here only trigger the collect operation, and the data will be gathered
// next time collection request from Prometheus server
go s.setPodOverheadMetrics()
}
func decodeAgentMetrics(body string) []*dto.MetricFamily {
// decode agent metrics
reader := strings.NewReader(body)
decoder := expfmt.NewDecoder(reader, expfmt.FmtText)
list := make([]*dto.MetricFamily, 0)
for {
mf := &dto.MetricFamily{}
if err := decoder.Decode(mf); err != nil {
if err == io.EOF {
break
}
} else {
// metrics collected by prometheus(prefixed by go_ and process_ ) will to add a prefix to
// to avoid an naming conflicts
// this will only has effect for go version agent(Kata 1.x).
// And rust agent will create metrics for processes with the prefix "process_"
if mf.Name != nil && (strings.HasPrefix(*mf.Name, "go_") || strings.HasPrefix(*mf.Name, "process_")) {
mf.Name = mutils.String2Pointer("kata_agent_" + *mf.Name)
}
list = append(list, mf)
}
}
return list
}
func (s *service) startManagementServer(ctx context.Context, ociSpec *specs.Spec) {
// metrics socket will under sandbox's bundle path
metricsAddress, err := socketAddress(ctx, s.id)
if err != nil {
shimMgtLog.WithError(err).Error("failed to create socket address")
return
}
listener, err := cdshim.NewSocket(metricsAddress)
if err != nil {
shimMgtLog.WithError(err).Error("failed to create listener")
return
}
// write metrics address to filesystem
if err := cdshim.WriteAddress("monitor_address", metricsAddress); err != nil {
shimMgtLog.WithError(err).Errorf("failed to write metrics address")
return
}
shimMgtLog.Info("kata management inited")
// bind hanlder
m := http.NewServeMux()
m.Handle("/metrics", http.HandlerFunc(s.serveMetrics))
m.Handle("/agent-url", http.HandlerFunc(s.agentURL))
s.mountPprofHandle(m, ociSpec)
// register shim metrics
registerMetrics()
// register sandbox metrics
vc.RegisterMetrics()
// start serve
svr := &http.Server{Handler: m}
svr.Serve(listener)
}
// mountServeDebug provides a debug endpoint
func (s *service) mountPprofHandle(m *http.ServeMux, ociSpec *specs.Spec) {
// return if not enabled
if !s.config.EnablePprof {
value, ok := ociSpec.Annotations[vcAnnotations.EnablePprof]
if !ok {
return
}
enabled, err := strconv.ParseBool(value)
if err != nil || !enabled {
return
}
}
m.Handle("/debug/vars", expvar.Handler())
m.Handle("/debug/pprof/", http.HandlerFunc(pprof.Index))
m.Handle("/debug/pprof/cmdline", http.HandlerFunc(pprof.Cmdline))
m.Handle("/debug/pprof/profile", http.HandlerFunc(pprof.Profile))
m.Handle("/debug/pprof/symbol", http.HandlerFunc(pprof.Symbol))
m.Handle("/debug/pprof/trace", http.HandlerFunc(pprof.Trace))
}
func socketAddress(ctx context.Context, id string) (string, error) {
ns, err := namespaces.NamespaceRequired(ctx)
if err != nil {
return "", err
}
return filepath.Join(string(filepath.Separator), "containerd-shim", ns, id, "shim-monitor.sock"), nil
}