diff --git a/component/common/net/config.go b/component/common/net/config.go new file mode 100644 index 000000000000..404d1b14222e --- /dev/null +++ b/component/common/net/config.go @@ -0,0 +1,114 @@ +// Package http contains a River serializable definition of the weaveworks weaveworks config in +// https://github.com/weaveworks/common/blob/master/server/server.go#L62. +package net + +import ( + "flag" + "time" + + weaveworks "github.com/weaveworks/common/server" +) + +const ( + DefaultHTTPPort = 8080 + DefaultGRPCPort = 8081 +) + +// ServerConfig is a River configuration that allows one to configure a weaveworks.Server. It +// exposes a subset of the available configurations. +type ServerConfig struct { + // HTTP configures the HTTP weaveworks. Note that despite the block being present or not, + // the weaveworks is always started. + HTTP *HTTPConfig `river:"http,block,optional"` + + // GRPC configures the gRPC weaveworks. Note that despite the block being present or not, + // the weaveworks is always started. + GRPC *GRPCConfig `river:"grpc,block,optional"` + + // GracefulShutdownTimeout configures a timeout to gracefully shut down the server. + GracefulShutdownTimeout time.Duration `river:"graceful_shutdown_timeout,attr,optional"` +} + +// HTTPConfig configures the HTTP weaveworks started by weaveworks.Server. +type HTTPConfig struct { + ListenAddress string `river:"listen_address,attr,optional"` + ListenPort int `river:"listen_port,attr,optional"` + ConnLimit int `river:"conn_limit,attr,optional"` + ServerReadTimeout time.Duration `river:"server_read_timeout,attr,optional"` + ServerWriteTimeout time.Duration `river:"server_write_timeout,attr,optional"` + ServerIdleTimeout time.Duration `river:"server_idle_timeout,attr,optional"` +} + +// Into applies the configs from HTTPConfig into a weaveworks.Into. +func (h *HTTPConfig) Into(c *weaveworks.Config) { + c.HTTPListenAddress = h.ListenAddress + c.HTTPListenPort = h.ListenPort + c.HTTPConnLimit = h.ConnLimit + c.HTTPServerReadTimeout = h.ServerReadTimeout + c.HTTPServerWriteTimeout = h.ServerWriteTimeout + c.HTTPServerIdleTimeout = h.ServerIdleTimeout +} + +// GRPCConfig configures the gRPC weaveworks started by weaveworks.Server. +type GRPCConfig struct { + ListenAddress string `river:"listen_address,attr,optional"` + ListenPort int `river:"listen_port,attr,optional"` + ConnLimit int `river:"conn_limit,attr,optional"` + MaxConnectionAge time.Duration `river:"max_connection_age,attr,optional"` + MaxConnectionAgeGrace time.Duration `river:"max_connection_age_grace,attr,optional"` + MaxConnectionIdle time.Duration `river:"max_connection_idle,attr,optional"` + ServerMaxRecvMsg int `river:"server_max_recv_msg_size,attr,optional"` + ServerMaxSendMsg int `river:"server_max_send_msg_size,attr,optional"` + ServerMaxConcurrentStreams uint `river:"server_max_concurrent_streams,attr,optional"` +} + +// Into applies the configs from GRPCConfig into a weaveworks.Into. +func (g *GRPCConfig) Into(c *weaveworks.Config) { + c.GRPCListenAddress = g.ListenAddress + c.GRPCListenPort = g.ListenPort + c.GRPCConnLimit = g.ConnLimit + c.GRPCServerMaxConnectionAge = g.MaxConnectionAge + c.GRPCServerMaxConnectionAgeGrace = g.MaxConnectionAgeGrace + c.GRPCServerMaxConnectionIdle = g.MaxConnectionIdle + c.GPRCServerMaxRecvMsgSize = g.ServerMaxRecvMsg + c.GRPCServerMaxSendMsgSize = g.ServerMaxSendMsg + c.GPRCServerMaxConcurrentStreams = g.ServerMaxConcurrentStreams +} + +func (c *ServerConfig) UnmarshalRiver(f func(v interface{}) error) error { + type config ServerConfig + if err := f((*config)(c)); err != nil { + return err + } + + return nil +} + +// Convert converts the River-based ServerConfig into a weaveworks.Config object. +func (c *ServerConfig) Convert() weaveworks.Config { + cfg := newDefaultConfig() + if c.HTTP != nil { + c.HTTP.Into(&cfg) + } + if c.GRPC != nil { + c.GRPC.Into(&cfg) + } + // If set, override. Don't allow a zero-value since it configure a context.WithTimeout, so the user should at least + // give a >0 value to it + if c.GracefulShutdownTimeout != 0 { + cfg.ServerGracefulShutdownTimeout = c.GracefulShutdownTimeout + } + return cfg +} + +// newDefaultConfig creates a new weaveworks.Config object with some overridden defaults. +func newDefaultConfig() weaveworks.Config { + c := weaveworks.Config{} + c.RegisterFlags(flag.NewFlagSet("empty", flag.ContinueOnError)) + c.HTTPListenPort = DefaultHTTPPort + c.GRPCListenPort = DefaultGRPCPort + // By default, do not register instrumentation since every metric is later registered + // inside a custom register + c.RegisterInstrumentation = false + return c +} diff --git a/component/common/net/config_test.go b/component/common/net/config_test.go new file mode 100644 index 000000000000..9d137fcd8503 --- /dev/null +++ b/component/common/net/config_test.go @@ -0,0 +1,105 @@ +package net + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/grafana/agent/pkg/river" + weaveworks "github.com/weaveworks/common/server" +) + +func TestConfig(t *testing.T) { + type testcase struct { + raw string + errExpected bool + assert func(t *testing.T, config weaveworks.Config) + } + var cases = map[string]testcase{ + "empty config applies defaults": { + raw: ``, + assert: func(t *testing.T, config weaveworks.Config) { + // custom defaults + require.Equal(t, DefaultHTTPPort, config.HTTPListenPort) + require.Equal(t, DefaultGRPCPort, config.GRPCListenPort) + // defaults inherited from weaveworks + require.Equal(t, "", config.HTTPListenAddress) + require.Equal(t, "", config.GRPCListenAddress) + require.False(t, config.RegisterInstrumentation) + require.Equal(t, time.Second*30, config.ServerGracefulShutdownTimeout) + }, + }, + "overriding defaults": { + raw: ` + graceful_shutdown_timeout = "1m" + http { + listen_port = 8080 + listen_address = "0.0.0.0" + conn_limit = 10 + server_write_timeout = "10s" + }`, + assert: func(t *testing.T, config weaveworks.Config) { + require.Equal(t, 8080, config.HTTPListenPort) + require.Equal(t, "0.0.0.0", config.HTTPListenAddress) + require.Equal(t, 10, config.HTTPConnLimit) + require.Equal(t, time.Second*10, config.HTTPServerWriteTimeout) + + require.Equal(t, time.Minute, config.ServerGracefulShutdownTimeout) + }, + }, + "all params": { + raw: ` + graceful_shutdown_timeout = "1m" + http { + listen_address = "0.0.0.0" + listen_port = 1 + conn_limit = 2 + server_read_timeout = "2m" + server_write_timeout = "3m" + server_idle_timeout = "4m" + } + + grpc { + listen_address = "0.0.0.1" + listen_port = 3 + conn_limit = 4 + max_connection_age = "5m" + max_connection_age_grace = "6m" + max_connection_idle = "7m" + server_max_recv_msg_size = 5 + server_max_send_msg_size = 6 + server_max_concurrent_streams = 7 + }`, + assert: func(t *testing.T, config weaveworks.Config) { + // general + require.Equal(t, time.Minute, config.ServerGracefulShutdownTimeout) + // http + require.Equal(t, "0.0.0.0", config.HTTPListenAddress) + require.Equal(t, 1, config.HTTPListenPort) + require.Equal(t, 2, config.HTTPConnLimit) + require.Equal(t, time.Minute*2, config.HTTPServerReadTimeout) + require.Equal(t, time.Minute*3, config.HTTPServerWriteTimeout) + require.Equal(t, time.Minute*4, config.HTTPServerIdleTimeout) + // grpc + require.Equal(t, "0.0.0.1", config.GRPCListenAddress) + require.Equal(t, 3, config.GRPCListenPort) + require.Equal(t, 5*time.Minute, config.GRPCServerMaxConnectionAge) + require.Equal(t, 6*time.Minute, config.GRPCServerMaxConnectionAgeGrace) + require.Equal(t, 7*time.Minute, config.GRPCServerMaxConnectionIdle) + require.Equal(t, 5, config.GPRCServerMaxRecvMsgSize) + require.Equal(t, 6, config.GRPCServerMaxSendMsgSize) + require.Equal(t, uint(7), config.GPRCServerMaxConcurrentStreams) + }, + }, + } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + cfg := ServerConfig{} + err := river.Unmarshal([]byte(tc.raw), &cfg) + require.Equal(t, tc.errExpected, err != nil) + wConfig := cfg.Convert() + tc.assert(t, wConfig) + }) + } +} diff --git a/component/common/net/server.go b/component/common/net/server.go new file mode 100644 index 000000000000..3c861d2f0511 --- /dev/null +++ b/component/common/net/server.go @@ -0,0 +1,92 @@ +package net + +import ( + "fmt" + + "github.com/go-kit/log" + "github.com/go-kit/log/level" + "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/weaveworks/common/logging" + weaveworks "github.com/weaveworks/common/server" +) + +// TargetServer is wrapper around weaveworks.Server that handles some common configuration used in all flow components +// that expose a network server. It just handles configuration and initialization, the handlers implementation are left +// to the consumer. +type TargetServer struct { + logger log.Logger + config *weaveworks.Config + metricsNamespace string + server *weaveworks.Server +} + +// NewTargetServer creates a new TargetServer, applying some defaults to the server configuration. +func NewTargetServer(logger log.Logger, metricsNamespace string, reg prometheus.Registerer, config *ServerConfig) (*TargetServer, error) { + if !model.IsValidMetricName(model.LabelValue(metricsNamespace)) { + return nil, fmt.Errorf("metrics namespace is not prometheus compatiible: %s", metricsNamespace) + } + + ts := &TargetServer{ + logger: logger, + metricsNamespace: metricsNamespace, + } + + // convert from River into the weaveworks config + serverCfg := config.Convert() + // Set the config to the new combined config. + // Avoid logging entire received request on failures + serverCfg.ExcludeRequestInLog = true + // Configure dedicated metrics registerer + serverCfg.Registerer = reg + // Persist crafter config in server + ts.config = &serverCfg + // To prevent metric collisions because all metrics are going to be registered in the global Prometheus registry. + ts.config.MetricsNamespace = ts.metricsNamespace + // We don't want the /debug and /metrics endpoints running, since this is not the main Flow HTTP server. + // We want this target to expose the least surface area possible, hence disabling WeaveWorks HTTP server metrics + // and debugging functionality. + ts.config.RegisterInstrumentation = false + // Add logger to weaveworks + ts.config.Log = logging.GoKit(ts.logger) + + return ts, nil +} + +// MountAndRun mounts the handlers and starting the server. +func (ts *TargetServer) MountAndRun(mountRoute func(router *mux.Router)) error { + level.Info(ts.logger).Log("msg", "starting server") + srv, err := weaveworks.New(*ts.config) + if err != nil { + return err + } + + ts.server = srv + mountRoute(ts.server.HTTP) + + go func() { + err := srv.Run() + if err != nil { + level.Error(ts.logger).Log("msg", "server shutdown with error", "err", err) + } + }() + + return nil +} + +// HTTPListenAddr returns the listen address of the HTTP server. +func (ts *TargetServer) HTTPListenAddr() string { + return ts.server.HTTPListenAddr().String() +} + +// GRPCListenAddr returns the listen address of the gRPC server. +func (ts *TargetServer) GRPCListenAddr() string { + return ts.server.GRPCListenAddr().String() +} + +// StopAndShutdown stops and shuts down the underlying server. +func (ts *TargetServer) StopAndShutdown() { + ts.server.Stop() + ts.server.Shutdown() +} diff --git a/component/common/net/server_test.go b/component/common/net/server_test.go new file mode 100644 index 000000000000..0cd20b685b03 --- /dev/null +++ b/component/common/net/server_test.go @@ -0,0 +1,45 @@ +package net + +import ( + "fmt" + "net/http" + "os" + "strings" + "testing" + + "github.com/go-kit/log" + "github.com/gorilla/mux" + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/require" +) + +func TestTargetServer(t *testing.T) { + // dependencies + w := log.NewSyncWriter(os.Stderr) + logger := log.NewLogfmtLogger(w) + reg := prometheus.NewRegistry() + + ts, err := NewTargetServer(logger, "test_namespace", reg, &ServerConfig{}) + require.NoError(t, err) + + ts.MountAndRun(func(router *mux.Router) { + router.Methods("GET").Path("/hello").Handler(http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { + w.WriteHeader(http.StatusOK) + })) + }) + defer ts.StopAndShutdown() + + // test mounted endpoint + req, err := http.NewRequest("GET", fmt.Sprintf("http://%s/hello", ts.HTTPListenAddr()), nil) + require.NoError(t, err) + res, err := http.DefaultClient.Do(req) + require.NoError(t, err) + require.Equal(t, 200, res.StatusCode) + + // assert all metrics have the prefix applied + metrics, err := reg.Gather() + require.NoError(t, err) + for _, m := range metrics { + require.True(t, strings.HasPrefix(m.GetName(), "test_namespace")) + } +} diff --git a/component/loki/source/gcplog/gcplog.go b/component/loki/source/gcplog/gcplog.go index 3d6dc54ba724..ed6dcbd96089 100644 --- a/component/loki/source/gcplog/gcplog.go +++ b/component/loki/source/gcplog/gcplog.go @@ -7,13 +7,14 @@ import ( "sync" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/model/relabel" + "github.com/grafana/agent/component" "github.com/grafana/agent/component/common/loki" flow_relabel "github.com/grafana/agent/component/common/relabel" gt "github.com/grafana/agent/component/loki/source/gcplog/internal/gcplogtarget" "github.com/grafana/agent/pkg/util" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/prometheus/model/relabel" ) func init() { diff --git a/component/loki/source/gcplog/gcplog_test.go b/component/loki/source/gcplog/gcplog_test.go index ee0740244688..f3dee5c39ce6 100644 --- a/component/loki/source/gcplog/gcplog_test.go +++ b/component/loki/source/gcplog/gcplog_test.go @@ -10,8 +10,10 @@ import ( "github.com/grafana/agent/component" "github.com/grafana/agent/component/common/loki" + fnet "github.com/grafana/agent/component/common/net" flow_relabel "github.com/grafana/agent/component/common/relabel" gt "github.com/grafana/agent/component/loki/source/gcplog/internal/gcplogtarget" + "github.com/grafana/agent/pkg/util" "github.com/grafana/regexp" "github.com/phayes/freeport" @@ -37,8 +39,14 @@ func TestPush(t *testing.T) { port, err := freeport.GetFreePort() require.NoError(t, err) args.PushTarget = >.PushConfig{ - HTTPListenAddress: "localhost", - HTTPListenPort: port, + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: port, + }, + // assign random grpc port + GRPC: &fnet.GRPCConfig{ListenPort: 0}, + }, Labels: map[string]string{ "foo": "bar", }, diff --git a/component/loki/source/gcplog/internal/gcplogtarget/push_target.go b/component/loki/source/gcplog/internal/gcplogtarget/push_target.go index 2734f4c9796c..5fa5db37bb9e 100644 --- a/component/loki/source/gcplog/internal/gcplogtarget/push_target.go +++ b/component/loki/source/gcplog/internal/gcplogtarget/push_target.go @@ -14,13 +14,13 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/agent/component/common/loki" - "github.com/grafana/loki/clients/pkg/promtail/targets/serverutils" + "github.com/gorilla/mux" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/relabel" - "github.com/weaveworks/common/logging" - "github.com/weaveworks/common/server" + + "github.com/grafana/agent/component/common/loki" + fnet "github.com/grafana/agent/component/common/net" ) // PushTarget defines a server for receiving messages from a GCP PubSub push @@ -33,14 +33,19 @@ type PushTarget struct { entries chan<- loki.Entry handler loki.EntryHandler relabelConfigs []*relabel.Config - server *server.Server - serverConfig server.Config + server *fnet.TargetServer } // NewPushTarget constructs a PushTarget. func NewPushTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, jobName string, config *PushConfig, relabel []*relabel.Config, reg prometheus.Registerer) (*PushTarget, error) { + wrappedLogger := log.With(logger, "component", "gcp_push") + srv, err := fnet.NewTargetServer(wrappedLogger, jobName+"_push_target", reg, config.Server) + if err != nil { + return nil, fmt.Errorf("failed to create loki http server: %w", err) + } pt := &PushTarget{ - logger: logger, + server: srv, + logger: wrappedLogger, jobName: jobName, metrics: metrics, config: config, @@ -49,22 +54,9 @@ func NewPushTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandle relabelConfigs: relabel, } - srvCfg := server.Config{ - HTTPListenPort: config.HTTPListenPort, - HTTPListenAddress: config.HTTPListenAddress, - - // Avoid logging entire received request on failures - ExcludeRequestInLog: true, - } - mergedServerConfigs, err := serverutils.MergeWithDefaults(srvCfg) - if err != nil { - return nil, fmt.Errorf("failed to parse configs and override defaults when configuring gcp push target: %w", err) - } - - pt.serverConfig = mergedServerConfigs - pt.serverConfig.Registerer = reg - - err = pt.run() + err = pt.server.MountAndRun(func(router *mux.Router) { + router.Path("/gcp/api/v1/push").Methods("POST").Handler(http.HandlerFunc(pt.push)) + }) if err != nil { return nil, err } @@ -72,41 +64,6 @@ func NewPushTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandle return pt, nil } -func (p *PushTarget) run() error { - level.Info(p.logger).Log("msg", "starting gcp push target", "job", p.jobName) - - // To prevent metric collisions registering in the global Prometheus registry. - tentativeServerMetricNamespace := p.jobName + "_push_target" - if !model.IsValidMetricName(model.LabelValue(tentativeServerMetricNamespace)) { - return fmt.Errorf("invalid prometheus-compatible job name: %s", p.jobName) - } - p.serverConfig.MetricsNamespace = tentativeServerMetricNamespace - - // We don't want the /debug and /metrics endpoints running, since this is - // not the main Flow HTTP server. We want this target to expose the least - // surface area possible, hence disabling WeaveWorks HTTP server metrics - // and debugging functionality. - p.serverConfig.RegisterInstrumentation = false - - p.serverConfig.Log = logging.GoKit(p.logger) - srv, err := server.New(p.serverConfig) - if err != nil { - return err - } - p.server = srv - - p.server.HTTP.Path("/gcp/api/v1/push").Methods("POST").Handler(http.HandlerFunc(p.push)) - - go func() { - err := srv.Run() - if err != nil { - level.Error(p.logger).Log("msg", "loki.source.gcplog push target shutdown with error", "err", err) - } - }() - - return nil -} - func (p *PushTarget) push(w http.ResponseWriter, r *http.Request) { defer r.Body.Close() @@ -187,15 +144,14 @@ func (p *PushTarget) Details() map[string]string { return map[string]string{ "strategy": "push", "labels": p.Labels().String(), - "server_address": p.server.HTTPListenAddr().String(), + "server_address": p.server.HTTPListenAddr(), } } // Stop shuts down the push target. func (p *PushTarget) Stop() error { level.Info(p.logger).Log("msg", "stopping gcp push target", "job", p.jobName) - p.server.Stop() - p.server.Shutdown() + p.server.StopAndShutdown() p.handler.Stop() return nil } diff --git a/component/loki/source/gcplog/internal/gcplogtarget/push_target_test.go b/component/loki/source/gcplog/internal/gcplogtarget/push_target_test.go index 88da26140db9..1e47fbc29ebe 100644 --- a/component/loki/source/gcplog/internal/gcplogtarget/push_target_test.go +++ b/component/loki/source/gcplog/internal/gcplogtarget/push_target_test.go @@ -11,8 +11,10 @@ import ( "github.com/grafana/agent/component/loki/internal/fake" - "github.com/go-kit/log" "github.com/grafana/agent/component/common/loki" + fnet "github.com/grafana/agent/component/common/net" + + "github.com/go-kit/log" "github.com/phayes/freeport" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -165,8 +167,14 @@ func TestPushTarget(t *testing.T) { config := &PushConfig{ Labels: lbls, UseIncomingTimestamp: false, - HTTPListenAddress: "localhost", - HTTPListenPort: port, + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: port, + }, + // assign random grpc port + GRPC: &fnet.GRPCConfig{ListenPort: 0}, + }, } prometheus.DefaultRegisterer = prometheus.NewRegistry() @@ -228,8 +236,14 @@ func TestPushTarget_UseIncomingTimestamp(t *testing.T) { config := &PushConfig{ Labels: nil, UseIncomingTimestamp: true, - HTTPListenAddress: "localhost", - HTTPListenPort: port, + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: port, + }, + // assign random grpc port + GRPC: &fnet.GRPCConfig{ListenPort: 0}, + }, } prometheus.DefaultRegisterer = prometheus.NewRegistry() @@ -272,8 +286,14 @@ func TestPushTarget_UseTenantIDHeaderIfPresent(t *testing.T) { config := &PushConfig{ Labels: nil, UseIncomingTimestamp: true, - HTTPListenAddress: "localhost", - HTTPListenPort: port, + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: port, + }, + // assign random grpc port + GRPC: &fnet.GRPCConfig{ListenPort: 0}, + }, } prometheus.DefaultRegisterer = prometheus.NewRegistry() @@ -323,9 +343,15 @@ func TestPushTarget_ErroneousPayloadsAreRejected(t *testing.T) { port, err := freeport.GetFreePort() require.NoError(t, err) config := &PushConfig{ - Labels: nil, - HTTPListenAddress: "localhost", - HTTPListenPort: port, + Labels: nil, + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: port, + }, + // assign random grpc port + GRPC: &fnet.GRPCConfig{ListenPort: 0}, + }, } prometheus.DefaultRegisterer = prometheus.NewRegistry() @@ -406,8 +432,14 @@ func TestPushTarget_UsePushTimeout(t *testing.T) { Labels: nil, UseIncomingTimestamp: true, PushTimeout: time.Second, - HTTPListenAddress: "localhost", - HTTPListenPort: port, + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: "localhost", + ListenPort: port, + }, + // assign random grpc port + GRPC: &fnet.GRPCConfig{ListenPort: 0}, + }, } prometheus.DefaultRegisterer = prometheus.NewRegistry() diff --git a/component/loki/source/gcplog/internal/gcplogtarget/types.go b/component/loki/source/gcplog/internal/gcplogtarget/types.go index eda6884d83e5..df3f2e36be57 100644 --- a/component/loki/source/gcplog/internal/gcplogtarget/types.go +++ b/component/loki/source/gcplog/internal/gcplogtarget/types.go @@ -3,6 +3,8 @@ package gcplogtarget import ( "fmt" "time" + + fnet "github.com/grafana/agent/component/common/net" ) // Target is a common interface implemented by both GCPLog targets. @@ -21,22 +23,14 @@ type PullConfig struct { // PushConfig configures a GCPLog target with the 'push' strategy. type PushConfig struct { - HTTPListenAddress string `river:"http_listen_address,attr,optional"` - HTTPListenPort int `river:"http_listen_port,attr,optional"` - PushTimeout time.Duration `river:"push_timeout,attr,optional"` - Labels map[string]string `river:"labels,attr,optional"` - UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"` -} - -// DefaultPushConfig sets the default listen address and port. -var DefaultPushConfig = PushConfig{ - HTTPListenAddress: "0.0.0.0", - HTTPListenPort: 8080, + Server *fnet.ServerConfig `river:",squash"` + PushTimeout time.Duration `river:"push_timeout,attr,optional"` + Labels map[string]string `river:"labels,attr,optional"` + UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"` } // UnmarshalRiver implements the unmarshaller func (p *PushConfig) UnmarshalRiver(f func(v interface{}) error) error { - *p = DefaultPushConfig type pushCfg PushConfig err := f((*pushCfg)(p)) if err != nil { diff --git a/component/loki/source/heroku/heroku.go b/component/loki/source/heroku/heroku.go index 84a285264a0c..f980f990a4eb 100644 --- a/component/loki/source/heroku/heroku.go +++ b/component/loki/source/heroku/heroku.go @@ -2,20 +2,20 @@ package heroku import ( "context" - "fmt" "reflect" "sync" "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/model/relabel" + "github.com/grafana/agent/component" "github.com/grafana/agent/component/common/loki" + fnet "github.com/grafana/agent/component/common/net" flow_relabel "github.com/grafana/agent/component/common/relabel" ht "github.com/grafana/agent/component/loki/source/heroku/internal/herokutarget" "github.com/grafana/agent/pkg/util" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/relabel" - sv "github.com/weaveworks/common/server" ) func init() { @@ -32,38 +32,13 @@ func init() { // Arguments holds values which are used to configure the loki.source.heroku // component. type Arguments struct { - HerokuListener ListenerConfig `river:"listener,block"` + Server *fnet.ServerConfig `river:",squash"` Labels map[string]string `river:"labels,attr,optional"` UseIncomingTimestamp bool `river:"use_incoming_timestamp,attr,optional"` ForwardTo []loki.LogsReceiver `river:"forward_to,attr"` RelabelRules flow_relabel.Rules `river:"relabel_rules,attr,optional"` } -// ListenerConfig defines a heroku listener. -type ListenerConfig struct { - ListenAddress string `river:"address,attr,optional"` - ListenPort int `river:"port,attr"` - // TODO - add the rest of the server config from Promtail -} - -// DefaultListenerConfig provides the default arguments for a heroku listener. -var DefaultListenerConfig = ListenerConfig{ - ListenAddress: "0.0.0.0", -} - -// UnmarshalRiver implements river.Unmarshaler. -func (lc *ListenerConfig) UnmarshalRiver(f func(interface{}) error) error { - *lc = DefaultListenerConfig - - type herokucfg ListenerConfig - err := f((*herokucfg)(lc)) - if err != nil { - return err - } - - return nil -} - // Component implements the loki.source.heroku component. type Component struct { opts component.Options @@ -143,7 +118,7 @@ func (c *Component) Update(args component.Arguments) error { rcs = flow_relabel.ComponentToPromRelabelConfigs(newArgs.RelabelRules) } - if listenerChanged(c.args.HerokuListener, newArgs.HerokuListener) || relabelRulesChanged(c.args.RelabelRules, newArgs.RelabelRules) { + if listenerChanged(c.args.Server, newArgs.Server) || relabelRulesChanged(c.args.RelabelRules, newArgs.RelabelRules) { if c.target != nil { err := c.target.Stop() if err != nil { @@ -180,10 +155,7 @@ func (args *Arguments) Convert() *ht.HerokuDrainTargetConfig { } return &ht.HerokuDrainTargetConfig{ - Server: sv.Config{ - HTTPListenAddress: args.HerokuListener.ListenAddress, - HTTPListenPort: args.HerokuListener.ListenPort, - }, + Server: args.Server, Labels: lbls, UseIncomingTimestamp: args.UseIncomingTimestamp, } @@ -196,7 +168,7 @@ func (c *Component) DebugInfo() interface{} { var res readerDebugInfo = readerDebugInfo{ Ready: c.target.Ready(), - Address: fmt.Sprintf("%s:%d", c.target.ListenAddress(), c.target.ListenPort()), + Address: c.target.HTTPListenAddress(), } return res @@ -207,7 +179,7 @@ type readerDebugInfo struct { Address string `river:"address,attr"` } -func listenerChanged(prev, next ListenerConfig) bool { +func listenerChanged(prev, next *fnet.ServerConfig) bool { return !reflect.DeepEqual(prev, next) } func relabelRulesChanged(prev, next flow_relabel.Rules) bool { diff --git a/component/loki/source/heroku/heroku_test.go b/component/loki/source/heroku/heroku_test.go index 71d38ac2f6b0..402298aa51af 100644 --- a/component/loki/source/heroku/heroku_test.go +++ b/component/loki/source/heroku/heroku_test.go @@ -10,6 +10,7 @@ import ( "github.com/grafana/agent/component" "github.com/grafana/agent/component/common/loki" + fnet "github.com/grafana/agent/component/common/net" flow_relabel "github.com/grafana/agent/component/common/relabel" "github.com/grafana/agent/component/loki/source/heroku/internal/herokutarget" "github.com/grafana/agent/pkg/util" @@ -28,9 +29,13 @@ func TestPush(t *testing.T) { ch1, ch2 := make(chan loki.Entry), make(chan loki.Entry) args := Arguments{ - HerokuListener: ListenerConfig{ - ListenAddress: address, - ListenPort: port, + Server: &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: address, + ListenPort: port, + }, + // assign random grpc port + GRPC: &fnet.GRPCConfig{ListenPort: 0}, }, UseIncomingTimestamp: false, Labels: map[string]string{"foo": "bar"}, diff --git a/component/loki/source/heroku/internal/herokutarget/herokutarget.go b/component/loki/source/heroku/internal/herokutarget/herokutarget.go index 2d9903a5c632..cad24fdc18a4 100644 --- a/component/loki/source/heroku/internal/herokutarget/herokutarget.go +++ b/component/loki/source/heroku/internal/herokutarget/herokutarget.go @@ -12,26 +12,24 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" + "github.com/gorilla/mux" herokuEncoding "github.com/heroku/x/logplex/encoding" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/model/relabel" - "github.com/weaveworks/common/logging" - "github.com/weaveworks/common/server" "github.com/grafana/agent/component/common/loki" + fnet "github.com/grafana/agent/component/common/net" "github.com/grafana/loki/pkg/logproto" - util_log "github.com/grafana/loki/pkg/util/log" ) const ReservedLabelTenantID = "__tenant_id__" // HerokuDrainTargetConfig describes a scrape config to listen and consume heroku logs, in the HTTPS drain manner. type HerokuDrainTargetConfig struct { - // Server is the weaveworks server config for listening connections - Server server.Config + Server *fnet.ServerConfig // Labels optionally holds labels to associate with each record received on the push api. Labels model.LabelSet @@ -45,16 +43,22 @@ type HerokuTarget struct { logger log.Logger handler loki.EntryHandler config *HerokuDrainTargetConfig - server *server.Server metrics *Metrics relabelConfigs []*relabel.Config + server *fnet.TargetServer } // NewTarget creates a brand new Heroku Drain target, capable of receiving logs from a Heroku application through an HTTP drain. func NewHerokuTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHandler, relabel []*relabel.Config, config *HerokuDrainTargetConfig, reg prometheus.Registerer) (*HerokuTarget, error) { wrappedLogger := log.With(logger, "component", "heroku_drain") + srv, err := fnet.NewTargetServer(wrappedLogger, "loki_source_heroku_drain_target", reg, config.Server) + if err != nil { + return nil, fmt.Errorf("failed to create loki server: %w", err) + } + ht := &HerokuTarget{ + server: srv, metrics: metrics, logger: wrappedLogger, handler: handler, @@ -62,9 +66,10 @@ func NewHerokuTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHand relabelConfigs: relabel, } - config.Server.Registerer = reg - - err := ht.run() + err = ht.server.MountAndRun(func(router *mux.Router) { + router.Path(ht.DrainEndpoint()).Methods("POST").Handler(http.HandlerFunc(ht.drain)) + router.Path(ht.HealthyEndpoint()).Methods("GET").Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })) + }) if err != nil { return nil, err } @@ -72,38 +77,6 @@ func NewHerokuTarget(metrics *Metrics, logger log.Logger, handler loki.EntryHand return ht, nil } -func (h *HerokuTarget) run() error { - level.Info(h.logger).Log("msg", "starting heroku drain target") - - h.config.Server.MetricsNamespace = "loki_source_heroku_drain_target" - - // We don't want the /debug and /metrics endpoints running, since this is not the main promtail HTTP server. - // We want this target to expose the least surface area possible, hence disabling WeaveWorks HTTP server metrics - // and debugging functionality. - h.config.Server.RegisterInstrumentation = false - - // Wrapping util logger with component-specific key vals, and the expected GoKit logging interface - h.config.Server.Log = logging.GoKit(log.With(util_log.Logger, "component", "heroku_drain")) - - srv, err := server.New(h.config.Server) - if err != nil { - return err - } - - h.server = srv - h.server.HTTP.Path(h.DrainEndpoint()).Methods("POST").Handler(http.HandlerFunc(h.drain)) - h.server.HTTP.Path(h.HealthyEndpoint()).Methods("GET").Handler(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })) - - go func() { - err := srv.Run() - if err != nil { - level.Error(h.logger).Log("msg", "heroku drain target shutdown with error", "err", err) - } - }() - - return nil -} - func (h *HerokuTarget) drain(w http.ResponseWriter, r *http.Request) { entries := h.handler.Chan() defer r.Body.Close() @@ -172,12 +145,8 @@ func (h *HerokuTarget) Labels() model.LabelSet { return h.config.Labels } -func (h *HerokuTarget) ListenAddress() string { - return h.config.Server.HTTPListenAddress -} - -func (h *HerokuTarget) ListenPort() int { - return h.config.Server.HTTPListenPort +func (h *HerokuTarget) HTTPListenAddress() string { + return h.server.HTTPListenAddr() } func (h *HerokuTarget) DrainEndpoint() string { @@ -189,7 +158,7 @@ func (h *HerokuTarget) HealthyEndpoint() string { } func (h *HerokuTarget) Ready() bool { - req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s:%d%s", h.ListenAddress(), h.ListenPort(), h.HealthyEndpoint()), nil) + req, err := http.NewRequest(http.MethodGet, fmt.Sprintf("%s%s", h.HTTPListenAddress(), h.HealthyEndpoint()), nil) if err != nil { return false } @@ -208,7 +177,7 @@ func (h *HerokuTarget) Details() interface{} { func (h *HerokuTarget) Stop() error { level.Info(h.logger).Log("msg", "stopping heroku drain target") - h.server.Shutdown() + h.server.StopAndShutdown() h.handler.Stop() return nil } diff --git a/component/loki/source/heroku/internal/herokutarget/target_test.go b/component/loki/source/heroku/internal/herokutarget/target_test.go index 727d63fda7e1..7b47097f94fa 100644 --- a/component/loki/source/heroku/internal/herokutarget/target_test.go +++ b/component/loki/source/heroku/internal/herokutarget/target_test.go @@ -5,7 +5,6 @@ package herokutarget // to other loki components. import ( - "flag" "fmt" "net" "net/http" @@ -23,7 +22,8 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/relabel" "github.com/stretchr/testify/require" - "github.com/weaveworks/common/server" + + fnet "github.com/grafana/agent/component/common/net" ) const localhost = "127.0.0.1" @@ -429,7 +429,7 @@ func waitForMessages(eh *fake.Client) { } } -func getServerConfigWithAvailablePort() (cfg server.Config, port int, err error) { +func getServerConfigWithAvailablePort() (cfg *fnet.ServerConfig, port int, err error) { // Get a randomly available port by open and closing a TCP socket addr, err := net.ResolveTCPAddr("tcp", localhost+":0") if err != nil { @@ -445,12 +445,14 @@ func getServerConfigWithAvailablePort() (cfg server.Config, port int, err error) return } - // Adjust some of the defaults - cfg.RegisterFlags(flag.NewFlagSet("empty", flag.ContinueOnError)) - cfg.HTTPListenAddress = localhost - cfg.HTTPListenPort = port - cfg.GRPCListenAddress = localhost - cfg.GRPCListenPort = 0 // Not testing GRPC, a random port will be assigned + cfg = &fnet.ServerConfig{ + HTTP: &fnet.HTTPConfig{ + ListenAddress: localhost, + ListenPort: port, + }, + // assign random grpc port + GRPC: &fnet.GRPCConfig{ListenPort: 0}, + } return } diff --git a/docs/sources/flow/reference/components/loki.source.gcplog.md b/docs/sources/flow/reference/components/loki.source.gcplog.md index 20d6203c789c..62dd3acfebbe 100644 --- a/docs/sources/flow/reference/components/loki.source.gcplog.md +++ b/docs/sources/flow/reference/components/loki.source.gcplog.md @@ -41,16 +41,21 @@ Name | Type | Description | Default | Requir The following blocks are supported inside the definition of `loki.source.gcplog`: -Hierarchy | Name | Description | Required ---------- | ---- | ----------- | -------- -pull | [pull][] | Configures a target to pull logs from a GCP Pub/Sub subscription. | no -push | [push][] | Configures a server to receive logs as GCP Pub/Sub push requests. | no +Hierarchy | Name | Description | Required +--------- |----------|-------------------------------------------------------------------------------| -------- +pull | [pull][] | Configures a target to pull logs from a GCP Pub/Sub subscription. | no +push | [push][] | Configures a server to receive logs as GCP Pub/Sub push requests. | no +push > http | [http][] | Configures the HTTP server that receives requests when using the `push` mode. | no +push > grpc | [grpc][] | Configures the gRPC server that receives requests when using the `push` mode. | no The `pull` and `push` inner blocks are mutually exclusive; a component must -contain exactly one of the two in its definition. +contain exactly one of the two in its definition. The `http` and `grpc` block +are just used when the `push` block is configured. [pull]: #pull-block [push]: #push-block +[http]: #http +[grpc]: #grpc ### pull block @@ -80,19 +85,18 @@ a service account key. ### push block -The `push` block defines the configuration of the HTTP server that receives +The `push` block defines the configuration of the server that receives push requests from GCP's Pub/Sub servers. The following arguments can be used to configure the `push` block. Any omitted fields take their default values. -Name | Type | Description | Default | Required ------------------------- | ------------- | ----------- | ------- | -------- -`http_listen_address` | `string` | The address the server listens to. | `"0.0.0.0"` | no -`http_listen_port` | `int` | The port the server listens to. | `8080` | no +Name | Type | Description | Default | Required +------------------------ |---------------|-----------------------------------------------------------------| ------- | -------- +`graceful_shutdown_timeout` | `duration` | Timeout for servers graceful shutdown. If configured, should be greater than zero. | "30s" | no `push_timeout` | `duration` | Sets a maximum processing time for each incoming GCP log entry. | `"0s"` | no -`labels` | `map(string)` | Additional labels to associate with incoming entries. | `"{}"` | no -`use_incoming_timestamp` | `bool` | Whether to use the incoming entry timestamp. | `false` | no +`labels` | `map(string)` | Additional labels to associate with incoming entries. | `"{}"` | no +`use_incoming_timestamp` | `bool` | Whether to use the incoming entry timestamp. | `false` | no The server listens for POST requests from GCP's Push subscriptions on `HOST:PORT/gcp/api/v1/push`. @@ -103,6 +107,13 @@ true. The `labels` map is applied to every entry that passes through the component. +### http + +{{< docs/shared lookup="flow/reference/components/loki-server-http.md" source="agent" >}} + +### grpc + +{{< docs/shared lookup="flow/reference/components/loki-server-grpc.md" source="agent" >}} ## Exported fields @@ -153,3 +164,23 @@ loki.write "local" { } ``` +On the other hand, if we need the server to listen on `0.0.0.0:4040`, and forwards them +to a `loki.write` component. + +```river +loki.source.gcplog "local" { + push { + http { + listen_port = 4040 + } + } + + forward_to = [loki.write.local.receiver] +} + +loki.write "local" { + endpoint { + url = "loki:3100/api/v1/push" + } +} +``` diff --git a/docs/sources/flow/reference/components/loki.source.heroku.md b/docs/sources/flow/reference/components/loki.source.heroku.md index 17946b08056b..68996cca82e0 100644 --- a/docs/sources/flow/reference/components/loki.source.heroku.md +++ b/docs/sources/flow/reference/components/loki.source.heroku.md @@ -35,12 +35,13 @@ loki.source.heroku "LABEL" { `loki.source.heroku` supports the following arguments: -Name | Type | Description | Default | Required ------------------------- | ---------------------- | -------------------- | ------- | -------- -`use_incoming_timestamp` | `bool` | Whether or not to use the timestamp received from Heroku. | `false` | no -`labels` | `map(string)` | The labels to associate with each received Heroku record. | `{}` | no -`forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes -`relabel_rules` | `RelabelRules` | Relabeling rules to apply on log entries. | `{}` | no +Name | Type | Description | Default | Required +------------------------ | ---------------------- |------------------------------------------------------------------------------------| ------- | -------- +`use_incoming_timestamp` | `bool` | Whether or not to use the timestamp received from Heroku. | `false` | no +`labels` | `map(string)` | The labels to associate with each received Heroku record. | `{}` | no +`forward_to` | `list(LogsReceiver)` | List of receivers to send log entries to. | | yes +`relabel_rules` | `RelabelRules` | Relabeling rules to apply on log entries. | `{}` | no +`graceful_shutdown_timeout` | `duration` | Timeout for servers graceful shutdown. If configured, should be greater than zero. | "30s" | no The `relabel_rules` field can make use of the `rules` export value from a `loki.relabel` component to apply one or more relabeling rules to log entries @@ -52,19 +53,19 @@ The following blocks are supported inside the definition of `loki.source.heroku` Hierarchy | Name | Description | Required --------- | ---- | ----------- | -------- -listener | [listener] | Configures a listener for Heroku messages. | yes +`http` | [http][] | Configures the HTTP server that receives requests. | | no +`grpc` | [grpc][] | Configures the gRPC server that receives requests. | | no -[listener]: #listener-block +[http]: #http +[grpc]: #grpc -### listener block +### http -The `listener` block defines the listen address and port where the listener -expects Heroku messages to be sent to. +{{< docs/shared lookup="flow/reference/components/loki-server-http.md" source="agent" >}} -Name | Type | Description | Default | Required ------------------------- | ------------- | ----------- | ------- | -------- -`address` | `string` | The `` address to listen to for heroku messages. | `0.0.0.0` | no -`port` | `int` | The `` to listen to for heroku messages. | | yes +### grpc + +{{< docs/shared lookup="flow/reference/components/loki-server-grpc.md" source="agent" >}} ## Labels @@ -105,9 +106,9 @@ This example listens for Heroku messages over TCP in the specified port and forw ```river loki.source.heroku "local" { - listener { + http { address = "0.0.0.0" - port = 8080 + port = 4040 } use_incoming_timestamp = true labels = {component = "loki.source.heroku"} @@ -121,3 +122,18 @@ loki.write "local" { } ``` +When using the default `http` block settings, the server listen for new connection on port `8080`. + +```river +loki.source.heroku "local" { + use_incoming_timestamp = true + labels = {component = "loki.source.heroku"} + forward_to = [loki.write.local.receiver] +} + +loki.write "local" { + endpoint { + url = "loki:3100/api/v1/push" + } +} +``` diff --git a/docs/sources/shared/flow/reference/components/loki-server-grpc.md b/docs/sources/shared/flow/reference/components/loki-server-grpc.md new file mode 100644 index 000000000000..471fc4ff0e89 --- /dev/null +++ b/docs/sources/shared/flow/reference/components/loki-server-grpc.md @@ -0,0 +1,22 @@ +--- +aliases: +- /docs/agent/shared/flow/reference/components/loki-server-grpc/ +headless: true +--- + +The `grpc` block configures the gRPC server. + +The following arguments can be used to configure the `grpc` block. Any omitted +fields take their default values. + + Name | Type | Description | Default | Required +---------------------------------|------------|----------------------------------------------------------------------------------------------------------------------|--------------|---------- + `listen_address` | `string` | Network address on which the server will listen for new connections. Defaults to accepting all incoming connections. | `""` | no + `listen_port` | `int` | Port number on which the server will listen for new connections. | `8081` | no + `conn_limit` | `int` | Maximum number of simultaneous http connections. Defaults to no limit. | `0` | no + `max_connection_age` | `duration` | The duration for the maximum amount of time a connection may exist before it will be closed. | `"infinity"` | no + `max_connection_age_grace` | `duration` | An additive period after `max_connection_age` after which the connection will be forcibly closed. | `"infinity"` | no + `max_connection_idle` | `duration` | The duration after which an idle connection should be closed. | `"infinity"` | no + `server_max_recv_msg_size` | `int` | Limit on the size of a gRPC message this server can receive (bytes). | `4MB` | no + `server_max_send_msg_size` | `int` | Limit on the size of a gRPC message this server can send (bytes). | `4MB` | no + `server_max_concurrent_streams` | `int` | Limit on the number of concurrent streams for gRPC calls (0 = unlimited). | `100` | no diff --git a/docs/sources/shared/flow/reference/components/loki-server-http.md b/docs/sources/shared/flow/reference/components/loki-server-http.md new file mode 100644 index 000000000000..761887f9a4ea --- /dev/null +++ b/docs/sources/shared/flow/reference/components/loki-server-http.md @@ -0,0 +1,19 @@ +--- +aliases: +- /docs/agent/shared/flow/reference/components/loki-server-http/ +headless: true +--- + +The `http` block configures the HTTP server. + +The following arguments can be used to configure the `http` block. Any omitted +fields take their default values. + + Name | Type | Description | Default | Required +------------------------|------------|----------------------------------------------------------------------------------------------------------------------|----------|---------- + `listen_address` | `string` | Network address on which the server will listen for new connections. Defaults to accepting all incoming connections. | `""` | no + `listen_port` | `int` | Port number on which the server will listen for new connections. | `8080` | no + `conn_limit` | `int` | Maximum number of simultaneous http connections. Defaults to no limit. | `0` | no + `server_read_timeout` | `duration` | Read timeout for HTTP server. | `"30s"` | no + `server_write_timeout` | `duration` | Write timeout for HTTP server. | `"30s"` | no + `server_idle_timeout` | `duration` | Idle timeout for HTTP server. | `"120s"` | no