Skip to content

Commit

Permalink
Create shared server for loki.source.+ network targets (#3581)
Browse files Browse the repository at this point in the history
  • Loading branch information
thepalbi authored and clayton-cornell committed Aug 14, 2023
1 parent f2d572b commit a51da80
Show file tree
Hide file tree
Showing 17 changed files with 600 additions and 217 deletions.
114 changes: 114 additions & 0 deletions component/common/net/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
// Package http contains a River serializable definition of the weaveworks weaveworks config in
// https://github.com/weaveworks/common/blob/master/server/server.go#L62.
package net

import (
"flag"
"time"

weaveworks "github.com/weaveworks/common/server"
)

const (
DefaultHTTPPort = 8080
DefaultGRPCPort = 8081
)

// ServerConfig is a River configuration that allows one to configure a weaveworks.Server. It
// exposes a subset of the available configurations.
type ServerConfig struct {
// HTTP configures the HTTP weaveworks. Note that despite the block being present or not,
// the weaveworks is always started.
HTTP *HTTPConfig `river:"http,block,optional"`

// GRPC configures the gRPC weaveworks. Note that despite the block being present or not,
// the weaveworks is always started.
GRPC *GRPCConfig `river:"grpc,block,optional"`

// GracefulShutdownTimeout configures a timeout to gracefully shut down the server.
GracefulShutdownTimeout time.Duration `river:"graceful_shutdown_timeout,attr,optional"`
}

// HTTPConfig configures the HTTP weaveworks started by weaveworks.Server.
type HTTPConfig struct {
ListenAddress string `river:"listen_address,attr,optional"`
ListenPort int `river:"listen_port,attr,optional"`
ConnLimit int `river:"conn_limit,attr,optional"`
ServerReadTimeout time.Duration `river:"server_read_timeout,attr,optional"`
ServerWriteTimeout time.Duration `river:"server_write_timeout,attr,optional"`
ServerIdleTimeout time.Duration `river:"server_idle_timeout,attr,optional"`
}

// Into applies the configs from HTTPConfig into a weaveworks.Into.
func (h *HTTPConfig) Into(c *weaveworks.Config) {
c.HTTPListenAddress = h.ListenAddress
c.HTTPListenPort = h.ListenPort
c.HTTPConnLimit = h.ConnLimit
c.HTTPServerReadTimeout = h.ServerReadTimeout
c.HTTPServerWriteTimeout = h.ServerWriteTimeout
c.HTTPServerIdleTimeout = h.ServerIdleTimeout
}

// GRPCConfig configures the gRPC weaveworks started by weaveworks.Server.
type GRPCConfig struct {
ListenAddress string `river:"listen_address,attr,optional"`
ListenPort int `river:"listen_port,attr,optional"`
ConnLimit int `river:"conn_limit,attr,optional"`
MaxConnectionAge time.Duration `river:"max_connection_age,attr,optional"`
MaxConnectionAgeGrace time.Duration `river:"max_connection_age_grace,attr,optional"`
MaxConnectionIdle time.Duration `river:"max_connection_idle,attr,optional"`
ServerMaxRecvMsg int `river:"server_max_recv_msg_size,attr,optional"`
ServerMaxSendMsg int `river:"server_max_send_msg_size,attr,optional"`
ServerMaxConcurrentStreams uint `river:"server_max_concurrent_streams,attr,optional"`
}

// Into applies the configs from GRPCConfig into a weaveworks.Into.
func (g *GRPCConfig) Into(c *weaveworks.Config) {
c.GRPCListenAddress = g.ListenAddress
c.GRPCListenPort = g.ListenPort
c.GRPCConnLimit = g.ConnLimit
c.GRPCServerMaxConnectionAge = g.MaxConnectionAge
c.GRPCServerMaxConnectionAgeGrace = g.MaxConnectionAgeGrace
c.GRPCServerMaxConnectionIdle = g.MaxConnectionIdle
c.GPRCServerMaxRecvMsgSize = g.ServerMaxRecvMsg
c.GRPCServerMaxSendMsgSize = g.ServerMaxSendMsg
c.GPRCServerMaxConcurrentStreams = g.ServerMaxConcurrentStreams
}

func (c *ServerConfig) UnmarshalRiver(f func(v interface{}) error) error {
type config ServerConfig
if err := f((*config)(c)); err != nil {
return err
}

return nil
}

// Convert converts the River-based ServerConfig into a weaveworks.Config object.
func (c *ServerConfig) Convert() weaveworks.Config {
cfg := newDefaultConfig()
if c.HTTP != nil {
c.HTTP.Into(&cfg)
}
if c.GRPC != nil {
c.GRPC.Into(&cfg)
}
// If set, override. Don't allow a zero-value since it configure a context.WithTimeout, so the user should at least
// give a >0 value to it
if c.GracefulShutdownTimeout != 0 {
cfg.ServerGracefulShutdownTimeout = c.GracefulShutdownTimeout
}
return cfg
}

// newDefaultConfig creates a new weaveworks.Config object with some overridden defaults.
func newDefaultConfig() weaveworks.Config {
c := weaveworks.Config{}
c.RegisterFlags(flag.NewFlagSet("empty", flag.ContinueOnError))
c.HTTPListenPort = DefaultHTTPPort
c.GRPCListenPort = DefaultGRPCPort
// By default, do not register instrumentation since every metric is later registered
// inside a custom register
c.RegisterInstrumentation = false
return c
}
105 changes: 105 additions & 0 deletions component/common/net/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package net

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/grafana/agent/pkg/river"
weaveworks "github.com/weaveworks/common/server"
)

func TestConfig(t *testing.T) {
type testcase struct {
raw string
errExpected bool
assert func(t *testing.T, config weaveworks.Config)
}
var cases = map[string]testcase{
"empty config applies defaults": {
raw: ``,
assert: func(t *testing.T, config weaveworks.Config) {
// custom defaults
require.Equal(t, DefaultHTTPPort, config.HTTPListenPort)
require.Equal(t, DefaultGRPCPort, config.GRPCListenPort)
// defaults inherited from weaveworks
require.Equal(t, "", config.HTTPListenAddress)
require.Equal(t, "", config.GRPCListenAddress)
require.False(t, config.RegisterInstrumentation)
require.Equal(t, time.Second*30, config.ServerGracefulShutdownTimeout)
},
},
"overriding defaults": {
raw: `
graceful_shutdown_timeout = "1m"
http {
listen_port = 8080
listen_address = "0.0.0.0"
conn_limit = 10
server_write_timeout = "10s"
}`,
assert: func(t *testing.T, config weaveworks.Config) {
require.Equal(t, 8080, config.HTTPListenPort)
require.Equal(t, "0.0.0.0", config.HTTPListenAddress)
require.Equal(t, 10, config.HTTPConnLimit)
require.Equal(t, time.Second*10, config.HTTPServerWriteTimeout)

require.Equal(t, time.Minute, config.ServerGracefulShutdownTimeout)
},
},
"all params": {
raw: `
graceful_shutdown_timeout = "1m"
http {
listen_address = "0.0.0.0"
listen_port = 1
conn_limit = 2
server_read_timeout = "2m"
server_write_timeout = "3m"
server_idle_timeout = "4m"
}
grpc {
listen_address = "0.0.0.1"
listen_port = 3
conn_limit = 4
max_connection_age = "5m"
max_connection_age_grace = "6m"
max_connection_idle = "7m"
server_max_recv_msg_size = 5
server_max_send_msg_size = 6
server_max_concurrent_streams = 7
}`,
assert: func(t *testing.T, config weaveworks.Config) {
// general
require.Equal(t, time.Minute, config.ServerGracefulShutdownTimeout)
// http
require.Equal(t, "0.0.0.0", config.HTTPListenAddress)
require.Equal(t, 1, config.HTTPListenPort)
require.Equal(t, 2, config.HTTPConnLimit)
require.Equal(t, time.Minute*2, config.HTTPServerReadTimeout)
require.Equal(t, time.Minute*3, config.HTTPServerWriteTimeout)
require.Equal(t, time.Minute*4, config.HTTPServerIdleTimeout)
// grpc
require.Equal(t, "0.0.0.1", config.GRPCListenAddress)
require.Equal(t, 3, config.GRPCListenPort)
require.Equal(t, 5*time.Minute, config.GRPCServerMaxConnectionAge)
require.Equal(t, 6*time.Minute, config.GRPCServerMaxConnectionAgeGrace)
require.Equal(t, 7*time.Minute, config.GRPCServerMaxConnectionIdle)
require.Equal(t, 5, config.GPRCServerMaxRecvMsgSize)
require.Equal(t, 6, config.GRPCServerMaxSendMsgSize)
require.Equal(t, uint(7), config.GPRCServerMaxConcurrentStreams)
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
cfg := ServerConfig{}
err := river.Unmarshal([]byte(tc.raw), &cfg)
require.Equal(t, tc.errExpected, err != nil)
wConfig := cfg.Convert()
tc.assert(t, wConfig)
})
}
}
92 changes: 92 additions & 0 deletions component/common/net/server.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package net

import (
"fmt"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/logging"
weaveworks "github.com/weaveworks/common/server"
)

// TargetServer is wrapper around weaveworks.Server that handles some common configuration used in all flow components
// that expose a network server. It just handles configuration and initialization, the handlers implementation are left
// to the consumer.
type TargetServer struct {
logger log.Logger
config *weaveworks.Config
metricsNamespace string
server *weaveworks.Server
}

// NewTargetServer creates a new TargetServer, applying some defaults to the server configuration.
func NewTargetServer(logger log.Logger, metricsNamespace string, reg prometheus.Registerer, config *ServerConfig) (*TargetServer, error) {
if !model.IsValidMetricName(model.LabelValue(metricsNamespace)) {
return nil, fmt.Errorf("metrics namespace is not prometheus compatiible: %s", metricsNamespace)
}

ts := &TargetServer{
logger: logger,
metricsNamespace: metricsNamespace,
}

// convert from River into the weaveworks config
serverCfg := config.Convert()
// Set the config to the new combined config.
// Avoid logging entire received request on failures
serverCfg.ExcludeRequestInLog = true
// Configure dedicated metrics registerer
serverCfg.Registerer = reg
// Persist crafter config in server
ts.config = &serverCfg
// To prevent metric collisions because all metrics are going to be registered in the global Prometheus registry.
ts.config.MetricsNamespace = ts.metricsNamespace
// We don't want the /debug and /metrics endpoints running, since this is not the main Flow HTTP server.
// We want this target to expose the least surface area possible, hence disabling WeaveWorks HTTP server metrics
// and debugging functionality.
ts.config.RegisterInstrumentation = false
// Add logger to weaveworks
ts.config.Log = logging.GoKit(ts.logger)

return ts, nil
}

// MountAndRun mounts the handlers and starting the server.
func (ts *TargetServer) MountAndRun(mountRoute func(router *mux.Router)) error {
level.Info(ts.logger).Log("msg", "starting server")
srv, err := weaveworks.New(*ts.config)
if err != nil {
return err
}

ts.server = srv
mountRoute(ts.server.HTTP)

go func() {
err := srv.Run()
if err != nil {
level.Error(ts.logger).Log("msg", "server shutdown with error", "err", err)
}
}()

return nil
}

// HTTPListenAddr returns the listen address of the HTTP server.
func (ts *TargetServer) HTTPListenAddr() string {
return ts.server.HTTPListenAddr().String()
}

// GRPCListenAddr returns the listen address of the gRPC server.
func (ts *TargetServer) GRPCListenAddr() string {
return ts.server.GRPCListenAddr().String()
}

// StopAndShutdown stops and shuts down the underlying server.
func (ts *TargetServer) StopAndShutdown() {
ts.server.Stop()
ts.server.Shutdown()
}
45 changes: 45 additions & 0 deletions component/common/net/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package net

import (
"fmt"
"net/http"
"os"
"strings"
"testing"

"github.com/go-kit/log"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func TestTargetServer(t *testing.T) {
// dependencies
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
reg := prometheus.NewRegistry()

ts, err := NewTargetServer(logger, "test_namespace", reg, &ServerConfig{})
require.NoError(t, err)

ts.MountAndRun(func(router *mux.Router) {
router.Methods("GET").Path("/hello").Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
}))
})
defer ts.StopAndShutdown()

// test mounted endpoint
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/hello", ts.HTTPListenAddr()), nil)
require.NoError(t, err)
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// assert all metrics have the prefix applied
metrics, err := reg.Gather()
require.NoError(t, err)
for _, m := range metrics {
require.True(t, strings.HasPrefix(m.GetName(), "test_namespace"))
}
}
5 changes: 3 additions & 2 deletions component/loki/source/gcplog/gcplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
"sync"

"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/relabel"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
flow_relabel "github.com/grafana/agent/component/common/relabel"
gt "github.com/grafana/agent/component/loki/source/gcplog/internal/gcplogtarget"
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/relabel"
)

func init() {
Expand Down
Loading

0 comments on commit a51da80

Please sign in to comment.