forked from grafana/agent
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Create shared server for
loki.source.+
network targets (grafana#3581)
(cherry picked from commit 4a3c4a7)
- Loading branch information
Showing
17 changed files
with
600 additions
and
217 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
// Package http contains a River serializable definition of the weaveworks weaveworks config in | ||
// https://github.com/weaveworks/common/blob/master/server/server.go#L62. | ||
package net | ||
|
||
import ( | ||
"flag" | ||
"time" | ||
|
||
weaveworks "github.com/weaveworks/common/server" | ||
) | ||
|
||
const ( | ||
DefaultHTTPPort = 8080 | ||
DefaultGRPCPort = 8081 | ||
) | ||
|
||
// ServerConfig is a River configuration that allows one to configure a weaveworks.Server. It | ||
// exposes a subset of the available configurations. | ||
type ServerConfig struct { | ||
// HTTP configures the HTTP weaveworks. Note that despite the block being present or not, | ||
// the weaveworks is always started. | ||
HTTP *HTTPConfig `river:"http,block,optional"` | ||
|
||
// GRPC configures the gRPC weaveworks. Note that despite the block being present or not, | ||
// the weaveworks is always started. | ||
GRPC *GRPCConfig `river:"grpc,block,optional"` | ||
|
||
// GracefulShutdownTimeout configures a timeout to gracefully shut down the server. | ||
GracefulShutdownTimeout time.Duration `river:"graceful_shutdown_timeout,attr,optional"` | ||
} | ||
|
||
// HTTPConfig configures the HTTP weaveworks started by weaveworks.Server. | ||
type HTTPConfig struct { | ||
ListenAddress string `river:"listen_address,attr,optional"` | ||
ListenPort int `river:"listen_port,attr,optional"` | ||
ConnLimit int `river:"conn_limit,attr,optional"` | ||
ServerReadTimeout time.Duration `river:"server_read_timeout,attr,optional"` | ||
ServerWriteTimeout time.Duration `river:"server_write_timeout,attr,optional"` | ||
ServerIdleTimeout time.Duration `river:"server_idle_timeout,attr,optional"` | ||
} | ||
|
||
// Into applies the configs from HTTPConfig into a weaveworks.Into. | ||
func (h *HTTPConfig) Into(c *weaveworks.Config) { | ||
c.HTTPListenAddress = h.ListenAddress | ||
c.HTTPListenPort = h.ListenPort | ||
c.HTTPConnLimit = h.ConnLimit | ||
c.HTTPServerReadTimeout = h.ServerReadTimeout | ||
c.HTTPServerWriteTimeout = h.ServerWriteTimeout | ||
c.HTTPServerIdleTimeout = h.ServerIdleTimeout | ||
} | ||
|
||
// GRPCConfig configures the gRPC weaveworks started by weaveworks.Server. | ||
type GRPCConfig struct { | ||
ListenAddress string `river:"listen_address,attr,optional"` | ||
ListenPort int `river:"listen_port,attr,optional"` | ||
ConnLimit int `river:"conn_limit,attr,optional"` | ||
MaxConnectionAge time.Duration `river:"max_connection_age,attr,optional"` | ||
MaxConnectionAgeGrace time.Duration `river:"max_connection_age_grace,attr,optional"` | ||
MaxConnectionIdle time.Duration `river:"max_connection_idle,attr,optional"` | ||
ServerMaxRecvMsg int `river:"server_max_recv_msg_size,attr,optional"` | ||
ServerMaxSendMsg int `river:"server_max_send_msg_size,attr,optional"` | ||
ServerMaxConcurrentStreams uint `river:"server_max_concurrent_streams,attr,optional"` | ||
} | ||
|
||
// Into applies the configs from GRPCConfig into a weaveworks.Into. | ||
func (g *GRPCConfig) Into(c *weaveworks.Config) { | ||
c.GRPCListenAddress = g.ListenAddress | ||
c.GRPCListenPort = g.ListenPort | ||
c.GRPCConnLimit = g.ConnLimit | ||
c.GRPCServerMaxConnectionAge = g.MaxConnectionAge | ||
c.GRPCServerMaxConnectionAgeGrace = g.MaxConnectionAgeGrace | ||
c.GRPCServerMaxConnectionIdle = g.MaxConnectionIdle | ||
c.GPRCServerMaxRecvMsgSize = g.ServerMaxRecvMsg | ||
c.GRPCServerMaxSendMsgSize = g.ServerMaxSendMsg | ||
c.GPRCServerMaxConcurrentStreams = g.ServerMaxConcurrentStreams | ||
} | ||
|
||
func (c *ServerConfig) UnmarshalRiver(f func(v interface{}) error) error { | ||
type config ServerConfig | ||
if err := f((*config)(c)); err != nil { | ||
return err | ||
} | ||
|
||
return nil | ||
} | ||
|
||
// Convert converts the River-based ServerConfig into a weaveworks.Config object. | ||
func (c *ServerConfig) Convert() weaveworks.Config { | ||
cfg := newDefaultConfig() | ||
if c.HTTP != nil { | ||
c.HTTP.Into(&cfg) | ||
} | ||
if c.GRPC != nil { | ||
c.GRPC.Into(&cfg) | ||
} | ||
// If set, override. Don't allow a zero-value since it configure a context.WithTimeout, so the user should at least | ||
// give a >0 value to it | ||
if c.GracefulShutdownTimeout != 0 { | ||
cfg.ServerGracefulShutdownTimeout = c.GracefulShutdownTimeout | ||
} | ||
return cfg | ||
} | ||
|
||
// newDefaultConfig creates a new weaveworks.Config object with some overridden defaults. | ||
func newDefaultConfig() weaveworks.Config { | ||
c := weaveworks.Config{} | ||
c.RegisterFlags(flag.NewFlagSet("empty", flag.ContinueOnError)) | ||
c.HTTPListenPort = DefaultHTTPPort | ||
c.GRPCListenPort = DefaultGRPCPort | ||
// By default, do not register instrumentation since every metric is later registered | ||
// inside a custom register | ||
c.RegisterInstrumentation = false | ||
return c | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,105 @@ | ||
package net | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
|
||
"github.com/grafana/agent/pkg/river" | ||
weaveworks "github.com/weaveworks/common/server" | ||
) | ||
|
||
func TestConfig(t *testing.T) { | ||
type testcase struct { | ||
raw string | ||
errExpected bool | ||
assert func(t *testing.T, config weaveworks.Config) | ||
} | ||
var cases = map[string]testcase{ | ||
"empty config applies defaults": { | ||
raw: ``, | ||
assert: func(t *testing.T, config weaveworks.Config) { | ||
// custom defaults | ||
require.Equal(t, DefaultHTTPPort, config.HTTPListenPort) | ||
require.Equal(t, DefaultGRPCPort, config.GRPCListenPort) | ||
// defaults inherited from weaveworks | ||
require.Equal(t, "", config.HTTPListenAddress) | ||
require.Equal(t, "", config.GRPCListenAddress) | ||
require.False(t, config.RegisterInstrumentation) | ||
require.Equal(t, time.Second*30, config.ServerGracefulShutdownTimeout) | ||
}, | ||
}, | ||
"overriding defaults": { | ||
raw: ` | ||
graceful_shutdown_timeout = "1m" | ||
http { | ||
listen_port = 8080 | ||
listen_address = "0.0.0.0" | ||
conn_limit = 10 | ||
server_write_timeout = "10s" | ||
}`, | ||
assert: func(t *testing.T, config weaveworks.Config) { | ||
require.Equal(t, 8080, config.HTTPListenPort) | ||
require.Equal(t, "0.0.0.0", config.HTTPListenAddress) | ||
require.Equal(t, 10, config.HTTPConnLimit) | ||
require.Equal(t, time.Second*10, config.HTTPServerWriteTimeout) | ||
|
||
require.Equal(t, time.Minute, config.ServerGracefulShutdownTimeout) | ||
}, | ||
}, | ||
"all params": { | ||
raw: ` | ||
graceful_shutdown_timeout = "1m" | ||
http { | ||
listen_address = "0.0.0.0" | ||
listen_port = 1 | ||
conn_limit = 2 | ||
server_read_timeout = "2m" | ||
server_write_timeout = "3m" | ||
server_idle_timeout = "4m" | ||
} | ||
grpc { | ||
listen_address = "0.0.0.1" | ||
listen_port = 3 | ||
conn_limit = 4 | ||
max_connection_age = "5m" | ||
max_connection_age_grace = "6m" | ||
max_connection_idle = "7m" | ||
server_max_recv_msg_size = 5 | ||
server_max_send_msg_size = 6 | ||
server_max_concurrent_streams = 7 | ||
}`, | ||
assert: func(t *testing.T, config weaveworks.Config) { | ||
// general | ||
require.Equal(t, time.Minute, config.ServerGracefulShutdownTimeout) | ||
// http | ||
require.Equal(t, "0.0.0.0", config.HTTPListenAddress) | ||
require.Equal(t, 1, config.HTTPListenPort) | ||
require.Equal(t, 2, config.HTTPConnLimit) | ||
require.Equal(t, time.Minute*2, config.HTTPServerReadTimeout) | ||
require.Equal(t, time.Minute*3, config.HTTPServerWriteTimeout) | ||
require.Equal(t, time.Minute*4, config.HTTPServerIdleTimeout) | ||
// grpc | ||
require.Equal(t, "0.0.0.1", config.GRPCListenAddress) | ||
require.Equal(t, 3, config.GRPCListenPort) | ||
require.Equal(t, 5*time.Minute, config.GRPCServerMaxConnectionAge) | ||
require.Equal(t, 6*time.Minute, config.GRPCServerMaxConnectionAgeGrace) | ||
require.Equal(t, 7*time.Minute, config.GRPCServerMaxConnectionIdle) | ||
require.Equal(t, 5, config.GPRCServerMaxRecvMsgSize) | ||
require.Equal(t, 6, config.GRPCServerMaxSendMsgSize) | ||
require.Equal(t, uint(7), config.GPRCServerMaxConcurrentStreams) | ||
}, | ||
}, | ||
} | ||
for name, tc := range cases { | ||
t.Run(name, func(t *testing.T) { | ||
cfg := ServerConfig{} | ||
err := river.Unmarshal([]byte(tc.raw), &cfg) | ||
require.Equal(t, tc.errExpected, err != nil) | ||
wConfig := cfg.Convert() | ||
tc.assert(t, wConfig) | ||
}) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,92 @@ | ||
package net | ||
|
||
import ( | ||
"fmt" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/go-kit/log/level" | ||
"github.com/gorilla/mux" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/common/model" | ||
"github.com/weaveworks/common/logging" | ||
weaveworks "github.com/weaveworks/common/server" | ||
) | ||
|
||
// TargetServer is wrapper around weaveworks.Server that handles some common configuration used in all flow components | ||
// that expose a network server. It just handles configuration and initialization, the handlers implementation are left | ||
// to the consumer. | ||
type TargetServer struct { | ||
logger log.Logger | ||
config *weaveworks.Config | ||
metricsNamespace string | ||
server *weaveworks.Server | ||
} | ||
|
||
// NewTargetServer creates a new TargetServer, applying some defaults to the server configuration. | ||
func NewTargetServer(logger log.Logger, metricsNamespace string, reg prometheus.Registerer, config *ServerConfig) (*TargetServer, error) { | ||
if !model.IsValidMetricName(model.LabelValue(metricsNamespace)) { | ||
return nil, fmt.Errorf("metrics namespace is not prometheus compatiible: %s", metricsNamespace) | ||
} | ||
|
||
ts := &TargetServer{ | ||
logger: logger, | ||
metricsNamespace: metricsNamespace, | ||
} | ||
|
||
// convert from River into the weaveworks config | ||
serverCfg := config.Convert() | ||
// Set the config to the new combined config. | ||
// Avoid logging entire received request on failures | ||
serverCfg.ExcludeRequestInLog = true | ||
// Configure dedicated metrics registerer | ||
serverCfg.Registerer = reg | ||
// Persist crafter config in server | ||
ts.config = &serverCfg | ||
// To prevent metric collisions because all metrics are going to be registered in the global Prometheus registry. | ||
ts.config.MetricsNamespace = ts.metricsNamespace | ||
// We don't want the /debug and /metrics endpoints running, since this is not the main Flow HTTP server. | ||
// We want this target to expose the least surface area possible, hence disabling WeaveWorks HTTP server metrics | ||
// and debugging functionality. | ||
ts.config.RegisterInstrumentation = false | ||
// Add logger to weaveworks | ||
ts.config.Log = logging.GoKit(ts.logger) | ||
|
||
return ts, nil | ||
} | ||
|
||
// MountAndRun mounts the handlers and starting the server. | ||
func (ts *TargetServer) MountAndRun(mountRoute func(router *mux.Router)) error { | ||
level.Info(ts.logger).Log("msg", "starting server") | ||
srv, err := weaveworks.New(*ts.config) | ||
if err != nil { | ||
return err | ||
} | ||
|
||
ts.server = srv | ||
mountRoute(ts.server.HTTP) | ||
|
||
go func() { | ||
err := srv.Run() | ||
if err != nil { | ||
level.Error(ts.logger).Log("msg", "server shutdown with error", "err", err) | ||
} | ||
}() | ||
|
||
return nil | ||
} | ||
|
||
// HTTPListenAddr returns the listen address of the HTTP server. | ||
func (ts *TargetServer) HTTPListenAddr() string { | ||
return ts.server.HTTPListenAddr().String() | ||
} | ||
|
||
// GRPCListenAddr returns the listen address of the gRPC server. | ||
func (ts *TargetServer) GRPCListenAddr() string { | ||
return ts.server.GRPCListenAddr().String() | ||
} | ||
|
||
// StopAndShutdown stops and shuts down the underlying server. | ||
func (ts *TargetServer) StopAndShutdown() { | ||
ts.server.Stop() | ||
ts.server.Shutdown() | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
package net | ||
|
||
import ( | ||
"fmt" | ||
"net/http" | ||
"os" | ||
"strings" | ||
"testing" | ||
|
||
"github.com/go-kit/log" | ||
"github.com/gorilla/mux" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/stretchr/testify/require" | ||
) | ||
|
||
func TestTargetServer(t *testing.T) { | ||
// dependencies | ||
w := log.NewSyncWriter(os.Stderr) | ||
logger := log.NewLogfmtLogger(w) | ||
reg := prometheus.NewRegistry() | ||
|
||
ts, err := NewTargetServer(logger, "test_namespace", reg, &ServerConfig{}) | ||
require.NoError(t, err) | ||
|
||
ts.MountAndRun(func(router *mux.Router) { | ||
router.Methods("GET").Path("/hello").Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { | ||
w.WriteHeader(http.StatusOK) | ||
})) | ||
}) | ||
defer ts.StopAndShutdown() | ||
|
||
// test mounted endpoint | ||
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/hello", ts.HTTPListenAddr()), nil) | ||
require.NoError(t, err) | ||
res, err := http.DefaultClient.Do(req) | ||
require.NoError(t, err) | ||
require.Equal(t, 200, res.StatusCode) | ||
|
||
// assert all metrics have the prefix applied | ||
metrics, err := reg.Gather() | ||
require.NoError(t, err) | ||
for _, m := range metrics { | ||
require.True(t, strings.HasPrefix(m.GetName(), "test_namespace")) | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.