Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Create shared server for loki.source.+ network targets #3581

Merged
merged 29 commits into from
May 3, 2023
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
15a87fd
added commons server
thepalbi Apr 20, 2023
b45f5b2
ported loki.source.gcp
thepalbi Apr 20, 2023
f82292d
migrated heroku and gcp
thepalbi Apr 20, 2023
dc71f63
added server test
thepalbi Apr 20, 2023
6577c4f
simplfyin params
thepalbi Apr 20, 2023
a9cb051
new river config
thepalbi Apr 21, 2023
2536ee8
adapted both heroku and gcp
thepalbi Apr 21, 2023
5000f0f
config tests
thepalbi Apr 24, 2023
5bcefbb
fix imports
thepalbi Apr 24, 2023
ffd59fb
address pr comments
thepalbi Apr 24, 2023
ad11085
fix tests
thepalbi Apr 24, 2023
c1b90d8
fix linter
thepalbi Apr 24, 2023
c44001a
some PR fixes
thepalbi Apr 27, 2023
cb49d8f
renaming loki/http to loki/net
thepalbi Apr 27, 2023
172a8ea
add tests with river raw config
thepalbi Apr 27, 2023
4cab3ad
discussed model implemented
thepalbi Apr 27, 2023
7ece9b4
added test covering all setings
thepalbi Apr 27, 2023
6656625
added docs
thepalbi Apr 28, 2023
1981d4f
pr comments and moved loki-server docs
thepalbi May 2, 2023
0646d90
new embedded docs model + loki.source.gcplog
thepalbi May 2, 2023
83db4f5
adjusting snippets
thepalbi May 2, 2023
deb1773
comments and docs fixes
thepalbi May 2, 2023
b29f96e
assign random grpc ports when not used
thepalbi May 2, 2023
9cf4269
fix imports
thepalbi May 2, 2023
3d1aad3
Update docs/sources/flow/reference/components/loki.source.gcplog.md
thepalbi May 3, 2023
6036b39
Update docs/sources/flow/reference/components/loki.source.heroku.md
thepalbi May 3, 2023
3aaa8b3
Update component/common/loki/net/server.go
thepalbi May 3, 2023
ed62713
docs fixes and other pr stuff
thepalbi May 3, 2023
ef385a2
move net package to component/common
thepalbi May 3, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
115 changes: 115 additions & 0 deletions component/common/loki/net/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Package http contains a River serializable definition of the weaveworks weaveworks config in
// https://github.com/weaveworks/common/blob/master/server/server.go#L62.
package net

import (
"flag"
"time"

weaveworks "github.com/weaveworks/common/server"
)

const (
DefaultHTTPPort = 8080
DefaultGRPCPort = 8081
)

// ServerConfig is a River configuration that allows one to configure a weaveworks.Server. It
// exposes a subset of the available configurations.
type ServerConfig struct {
// HTTP configures the HTTP weaveworks. Note that despite the block being present or not,
// the weaveworks is always started.
HTTP *HTTPConfig `river:"http,block,optional"`

// GRPC configures the gRPC weaveworks. Note that despite the block being present or not,
// the weaveworks is always started.
GRPC *GRPCConfig `river:"grpc,block,optional"`

// GracefulShutdownTimeout configures a timeout to gracefully shut down the server.
GracefulShutdownTimeout time.Duration `river:"graceful_shutdown_timeout,attr,optional"`
}

// HTTPConfig configures the HTTP weaveworks started by weaveworks.Server.
type HTTPConfig struct {
ListenAddress string `river:"listen_address,attr,optional"`
ListenPort int `river:"listen_port,attr,optional"`
ConnLimit int `river:"conn_limit,attr,optional"`
ServerReadTimeout time.Duration `river:"server_read_timeout,attr,optional"`
ServerWriteTimeout time.Duration `river:"server_write_timeout,attr,optional"`
ServerIdleTimeout time.Duration `river:"server_idle_timeout,attr,optional"`
}

// Into applies the configs from HTTPConfig into a weaveworks.Into.
func (h *HTTPConfig) Into(c *weaveworks.Config) {
c.HTTPListenAddress = h.ListenAddress
c.HTTPListenPort = h.ListenPort
c.HTTPConnLimit = h.ConnLimit
c.HTTPServerReadTimeout = h.ServerReadTimeout
c.HTTPServerWriteTimeout = h.ServerWriteTimeout
c.HTTPServerIdleTimeout = h.ServerIdleTimeout
}

// GRPCConfig configures the gRPC weaveworks started by weaveworks.Server.
type GRPCConfig struct {
ListenAddress string `river:"listen_address,attr,optional"`
ListenPort int `river:"listen_port,attr,optional"`
ConnLimit int `river:"conn_limit,attr,optional"`
MaxConnectionAge time.Duration `river:"max_connection_age,attr,optional"`
MaxConnectionAgeGrace time.Duration `river:"max_connection_age_grace,attr,optional"`
MaxConnectionIdle time.Duration `river:"max_connection_idle,attr,optional"`
ServerMaxRecvMsg int `river:"server_max_recv_msg_size,attr,optional"`
ServerMaxSendMsg int `river:"server_max_send_msg_size,attr,optional"`
ServerMaxConcurrentStreams uint `river:"server_max_concurrent_streams,attr,optional"`
}

// Into applies the configs from GRPCConfig into a weaveworks.Into.
func (g *GRPCConfig) Into(c *weaveworks.Config) {
c.GRPCListenAddress = g.ListenAddress
c.GRPCListenPort = g.ListenPort
c.GRPCConnLimit = g.ConnLimit
c.GRPCServerMaxConnectionAge = g.MaxConnectionAge
c.GRPCServerMaxConnectionAgeGrace = g.MaxConnectionAgeGrace
c.GRPCServerMaxConnectionIdle = g.MaxConnectionIdle
c.GPRCServerMaxRecvMsgSize = g.ServerMaxRecvMsg
c.GRPCServerMaxSendMsgSize = g.ServerMaxSendMsg
c.GPRCServerMaxConcurrentStreams = g.ServerMaxConcurrentStreams
}

func (c *ServerConfig) UnmarshalRiver(f func(v interface{}) error) error {
type config ServerConfig
if err := f((*config)(c)); err != nil {
return err
}

return nil
}

// Convert converts the River-based ServerConfig into a weaveworks.Config object.
func (c *ServerConfig) Convert() weaveworks.Config {
cfg := newDefaultConfig()
if c.HTTP != nil {
c.HTTP.Into(&cfg)
}
if c.GRPC != nil {
c.GRPC.Into(&cfg)
}
// If set, override. Don't allow a zero-value since it configure a context.WithTimeout, so the user should at least
// give a >0 value to it
if c.GracefulShutdownTimeout != 0 {
cfg.ServerGracefulShutdownTimeout = c.GracefulShutdownTimeout
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to make sure, what happens when the user does not set this? Is the "30s" default applied? I'm not sure where exactly this happened.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, covered by this test and applied in here that applies the defaults from the server cli flags

return cfg
}

// newDefaultConfig creates a new weaveworks.Config object with some overridden defaults.
func newDefaultConfig() weaveworks.Config {
c := weaveworks.Config{}
c.RegisterFlags(flag.NewFlagSet("empty", flag.ContinueOnError))
// Opting by default 0, which used in net.Listen assigns a random port
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
c.HTTPListenPort = DefaultHTTPPort
c.GRPCListenPort = DefaultGRPCPort
// By default, do not register instrumentation since every metric is later registered
// inside a custom register
c.RegisterInstrumentation = false
return c
}
105 changes: 105 additions & 0 deletions component/common/loki/net/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
package net

import (
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/grafana/agent/pkg/river"
weaveworks "github.com/weaveworks/common/server"
)

func TestConfig(t *testing.T) {
type testcase struct {
raw string
errExpected bool
assert func(t *testing.T, config weaveworks.Config)
}
var cases = map[string]testcase{
"empty config applies defaults": {
raw: ``,
assert: func(t *testing.T, config weaveworks.Config) {
// custom defaults
require.Equal(t, DefaultHTTPPort, config.HTTPListenPort)
require.Equal(t, DefaultGRPCPort, config.GRPCListenPort)
// defaults inherited from weaveworks
require.Equal(t, "", config.HTTPListenAddress)
require.Equal(t, "", config.GRPCListenAddress)
require.False(t, config.RegisterInstrumentation)
require.Equal(t, time.Second*30, config.ServerGracefulShutdownTimeout)
},
},
"overriding defaults": {
raw: `
graceful_shutdown_timeout = "1m"
http {
listen_port = 8080
listen_address = "0.0.0.0"
conn_limit = 10
server_write_timeout = "10s"
}`,
assert: func(t *testing.T, config weaveworks.Config) {
require.Equal(t, 8080, config.HTTPListenPort)
require.Equal(t, "0.0.0.0", config.HTTPListenAddress)
require.Equal(t, 10, config.HTTPConnLimit)
require.Equal(t, time.Second*10, config.HTTPServerWriteTimeout)

require.Equal(t, time.Minute, config.ServerGracefulShutdownTimeout)
},
},
"all params": {
raw: `
graceful_shutdown_timeout = "1m"
http {
listen_address = "0.0.0.0"
listen_port = 1
conn_limit = 2
server_read_timeout = "2m"
server_write_timeout = "3m"
server_idle_timeout = "4m"
}
grpc {
listen_address = "0.0.0.1"
listen_port = 3
conn_limit = 4
max_connection_age = "5m"
max_connection_age_grace = "6m"
max_connection_idle = "7m"
server_max_recv_msg_size = 5
server_max_send_msg_size = 6
server_max_concurrent_streams = 7
}`,
assert: func(t *testing.T, config weaveworks.Config) {
// general
require.Equal(t, time.Minute, config.ServerGracefulShutdownTimeout)
// http
require.Equal(t, "0.0.0.0", config.HTTPListenAddress)
require.Equal(t, 1, config.HTTPListenPort)
require.Equal(t, 2, config.HTTPConnLimit)
require.Equal(t, time.Minute*2, config.HTTPServerReadTimeout)
require.Equal(t, time.Minute*3, config.HTTPServerWriteTimeout)
require.Equal(t, time.Minute*4, config.HTTPServerIdleTimeout)
// grpc
require.Equal(t, "0.0.0.1", config.GRPCListenAddress)
require.Equal(t, 3, config.GRPCListenPort)
require.Equal(t, 5*time.Minute, config.GRPCServerMaxConnectionAge)
require.Equal(t, 6*time.Minute, config.GRPCServerMaxConnectionAgeGrace)
require.Equal(t, 7*time.Minute, config.GRPCServerMaxConnectionIdle)
require.Equal(t, 5, config.GPRCServerMaxRecvMsgSize)
require.Equal(t, 6, config.GRPCServerMaxSendMsgSize)
require.Equal(t, uint(7), config.GPRCServerMaxConcurrentStreams)
},
},
}
for name, tc := range cases {
t.Run(name, func(t *testing.T) {
cfg := ServerConfig{}
err := river.Unmarshal([]byte(tc.raw), &cfg)
require.Equal(t, tc.errExpected, err != nil)
wConfig := cfg.Convert()
tc.assert(t, wConfig)
})
}
}
92 changes: 92 additions & 0 deletions component/common/loki/net/server.go
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package net

import (
"fmt"

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/logging"
weaveworks "github.com/weaveworks/common/server"
)

// TargetServer is wrapper around weaveworks.Server that handles some common configuration used in all flow loki source
// components that expose a network server. It just handles configuration and initialization, the handlers implementation
// are left to the consumer.
type TargetServer struct {
logger log.Logger
config *weaveworks.Config
metricsNamespace string
server *weaveworks.Server
}

// NewTargetServer creates a new TargetServer, applying some defaults to the server configuration.
func NewTargetServer(logger log.Logger, metricsNamespace string, reg prometheus.Registerer, config *ServerConfig) (*TargetServer, error) {
if !model.IsValidMetricName(model.LabelValue(metricsNamespace)) {
return nil, fmt.Errorf("metrics namespace is not prometheus compatiible: %s", metricsNamespace)
}

ts := &TargetServer{
logger: logger,
metricsNamespace: metricsNamespace,
}

// convert from River into the weaveworks config
serverCfg := config.Convert()
// Set the config to the new combined config.
// Avoid logging entire received request on failures
serverCfg.ExcludeRequestInLog = true
// Configure dedicated metrics registerer
serverCfg.Registerer = reg
// Persist crafter config in server
ts.config = &serverCfg
// To prevent metric collisions because all metrics are going to be registered in the global Prometheus registry.
ts.config.MetricsNamespace = ts.metricsNamespace
// We don't want the /debug and /metrics endpoints running, since this is not the main promtail HTTP server.
thepalbi marked this conversation as resolved.
Show resolved Hide resolved
// We want this target to expose the least surface area possible, hence disabling WeaveWorks HTTP server metrics
// and debugging functionality.
ts.config.RegisterInstrumentation = false
// Add logger to weaveworks
ts.config.Log = logging.GoKit(ts.logger)

return ts, nil
}

// MountAndRun mounts the handlers and starting the server.
func (ts *TargetServer) MountAndRun(mountRoute func(router *mux.Router)) error {
level.Info(ts.logger).Log("msg", "starting server")
srv, err := weaveworks.New(*ts.config)
if err != nil {
return err
}

ts.server = srv
mountRoute(ts.server.HTTP)

go func() {
err := srv.Run()
if err != nil {
level.Error(ts.logger).Log("msg", "server shutdown with error", "err", err)
}
}()

return nil
}

// HTTPListenAddr returns the listen address of the HTTP server.
func (ts *TargetServer) HTTPListenAddr() string {
return ts.server.HTTPListenAddr().String()
}

// GRPCListenAddr returns the listen address of the gRPC server.
func (ts *TargetServer) GRPCListenAddr() string {
return ts.server.GRPCListenAddr().String()
}

// StopAndShutdown stops and shuts down the underlying server.
func (ts *TargetServer) StopAndShutdown() {
ts.server.Stop()
ts.server.Shutdown()
}
45 changes: 45 additions & 0 deletions component/common/loki/net/server_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
package net

import (
"fmt"
"net/http"
"os"
"strings"
"testing"

"github.com/go-kit/log"
"github.com/gorilla/mux"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
)

func TestTargetServer(t *testing.T) {
// dependencies
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
reg := prometheus.NewRegistry()

ts, err := NewTargetServer(logger, "test_namespace", reg, &ServerConfig{})
require.NoError(t, err)

ts.MountAndRun(func(router *mux.Router) {
router.Methods("GET").Path("/hello").Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) {
w.WriteHeader(http.StatusOK)
}))
})
defer ts.StopAndShutdown()

// test mounted endpoint
req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/hello", ts.HTTPListenAddr()), nil)
require.NoError(t, err)
res, err := http.DefaultClient.Do(req)
require.NoError(t, err)
require.Equal(t, 200, res.StatusCode)

// assert all metrics have the prefix applied
metrics, err := reg.Gather()
require.NoError(t, err)
for _, m := range metrics {
require.True(t, strings.HasPrefix(m.GetName(), "test_namespace"))
}
}
5 changes: 3 additions & 2 deletions component/loki/source/gcplog/gcplog.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,14 @@ import (
"sync"

"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/relabel"

"github.com/grafana/agent/component"
"github.com/grafana/agent/component/common/loki"
flow_relabel "github.com/grafana/agent/component/common/relabel"
gt "github.com/grafana/agent/component/loki/source/gcplog/internal/gcplogtarget"
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/model/relabel"
)

func init() {
Expand Down
Loading