Skip to content

Commit

Permalink
service/cluster: migrate pkg/cluster to a Flow-native service (grafan…
Browse files Browse the repository at this point in the history
…a#4870)

* service/http: allow dependant services to register custom handlers

This updates the HTTP service to allow downstream services to register
custom handlers.

This will replace ths custom logic for wiring methods for the HTTP
service, which currently requires the clustering service to be
explicitly passed to the HTTP service.

* service/http: move UI-specific routes to a dedicated UI service.

The UI service depends on both the HTTP and clustering services to power
the API, which means it should be brought out to a dedicated package to
avoid a cyclic dependency.

* service/cluster: introduce cluster service

The cluster service is a replacement for pkg/cluster that implements the
github.com/grafana/agent/service.Service API.

It has the following changes compared to the previous implementation:

* service/cluster always starts a ckit.Node for consistency around the
  API. When clustering is disabled, service/cluster never joins any
  peers, and never exposes its cluster HTTP handlers to peers in the
  cluster.

* Much of the behavior previously provided by pkg/cluster is now
  deferred to the caller:

  * The caller is expected to determine their own advertise address,
    including defaulting to a specific interface.

  * The caller is expected to provide their own function to discover
    peers, even if that function returns a static list.

* The new implementation, as a Flow service, directly informs components
  about changes to the cluster state instead of having the Flow
  controller inform components.

While the new implementation puts more work on the caller, its API is
simplified. Mutually exclusive mechanisms of the previous API (such as
configuring JoinPeers and DiscoverPeers) have been simplified into a
single, unified mechanism.

While this commit introduces the new service, existing code is not
updated the use the new service yet.

* flowmode: add utilities to build a cluster.Service implementation

The implementation of service/cluster shifts more responsibility to the
caller than was previously required with pkg/cluster. This commit adds
helper utilities to implement that responsibility to retain the same
behavior with the new service.

* all: replace usages of pkg/cluster.Clusterer with pkg/cluster.Node

This changes all usages of pkg/cluster.Clusterer to use an interface
instead. Usage of the interface will permit changing out the
implementation, particularly with something from service/cluster.

This is a temporary change for the purposes of atomic commits. As such,
the Clusterer field is not renamed to the more appropriate name of
"Node."

* service/cluster: temporarily match pkg/cluster.Node interface

This is a temporary commit for service/cluster.Cluster to match the
pkg/cluster.Node interface.

The purpose of this commit is to atomically swap out the implementation
of pkg/cluster with service/cluster without needing to remove the
hard-coded usage of the interface yet.

* flowmode: Replace usage of pkg/cluster with service/cluster.

* flow: use GetServiceData API and remove hard-wired cluster usage

* flow: remove remaining references to pkg/cluster

* service/cluster: remove unused methods

Remove methods which were temporarily added to support an atomic commits
of switching pkg/cluster usage with service/cluster.

* misc: remove pkg/cluster

pkg/cluster is now considered dead code in favor of the flow-specific
service/cluster API.

* component: remove reference to old ClusteredComponent interface

The ClusteredComponent interface has been replaced with the interface
definition in cluster.Component.
  • Loading branch information
rfratto authored Aug 24, 2023
1 parent 7357b37 commit a3544fe
Show file tree
Hide file tree
Showing 37 changed files with 931 additions and 1,109 deletions.
184 changes: 184 additions & 0 deletions cmd/internal/flowmode/cluster_builder.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package flowmode

import (
"fmt"
stdlog "log"
"net"
"os"
"strconv"
"time"

"github.com/go-kit/log"
"github.com/grafana/agent/service/cluster"
"github.com/grafana/ckit/advertise"
"github.com/hashicorp/go-discover"
"github.com/hashicorp/go-discover/provider/k8s"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
)

type clusterOptions struct {
Log log.Logger
Metrics prometheus.Registerer
Tracer trace.TracerProvider

EnableClustering bool
NodeName string
AdvertiseAddress string
ListenAddress string
JoinPeers []string
DiscoverPeers string
RejoinInterval time.Duration
}

func buildClusterService(opts clusterOptions) (*cluster.Service, error) {
listenPort := findPort(opts.ListenAddress, 80)

config := cluster.Options{
Log: opts.Log,
Metrics: opts.Metrics,
Tracer: opts.Tracer,

EnableClustering: opts.EnableClustering,
NodeName: opts.NodeName,
AdvertiseAddress: opts.AdvertiseAddress,
RejoinInterval: opts.RejoinInterval,
}

if config.NodeName == "" {
hostname, err := os.Hostname()
if err != nil {
return nil, fmt.Errorf("generating node name: %w", err)
}
config.NodeName = hostname
}

if config.AdvertiseAddress == "" {
// TODO(rfratto): allow advertise interfaces to be configurable.
addr, err := advertise.FirstAddress(advertise.DefaultInterfaces)
if err != nil {
return nil, fmt.Errorf("determining advertise address: %w", err)
}
config.AdvertiseAddress = fmt.Sprintf("%s:%d", addr.String(), listenPort)
} else {
config.AdvertiseAddress = appendDefaultPort(config.AdvertiseAddress, listenPort)
}

switch {
case len(opts.JoinPeers) > 0 && opts.DiscoverPeers != "":
return nil, fmt.Errorf("at most one of join peers and discover peers may be set")

case len(opts.JoinPeers) > 0:
config.DiscoverPeers = newStaticDiscovery(opts.JoinPeers, listenPort)

case opts.DiscoverPeers != "":
discoverFunc, err := newDynamicDiscovery(config.Log, opts.DiscoverPeers, listenPort)
if err != nil {
return nil, err
}
config.DiscoverPeers = discoverFunc

default:
// Here, both JoinPeers and DiscoverPeers are empty. This is desirable when
// starting a seed node that other nodes connect to, so we don't require
// one of the fields to be set.
}

return cluster.New(config)
}

func findPort(addr string, defaultPort int) int {
_, portStr, err := net.SplitHostPort(addr)
if err != nil {
return defaultPort
}
port, err := strconv.Atoi(portStr)
if err != nil {
return defaultPort
}
return port
}

func appendDefaultPort(addr string, port int) string {
_, _, err := net.SplitHostPort(addr)
if err == nil {
// No error means there was a port in the string
return addr
}
return fmt.Sprintf("%s:%d", addr, port)
}

type discoverFunc func() ([]string, error)

func newStaticDiscovery(peers []string, defaultPort int) discoverFunc {
return func() ([]string, error) {
var addrs []string

for _, addr := range peers {
addrs = appendJoinAddr(addrs, addr)
}

for i := range addrs {
// Default to using the same advertise port as the local node. This may
// break in some cases, so the user should make sure the port numbers
// align on as many nodes as possible.
addrs[i] = appendDefaultPort(addrs[i], defaultPort)
}

return addrs, nil
}
}

func appendJoinAddr(addrs []string, in string) []string {
_, _, err := net.SplitHostPort(in)
if err == nil {
addrs = append(addrs, in)
return addrs
}

ip := net.ParseIP(in)
if ip != nil {
addrs = append(addrs, ip.String())
return addrs
}

_, srvs, err := net.LookupSRV("", "", in)
if err == nil {
for _, srv := range srvs {
addrs = append(addrs, srv.Target)
}
}

return addrs
}

func newDynamicDiscovery(l log.Logger, config string, defaultPort int) (discoverFunc, error) {
providers := make(map[string]discover.Provider, len(discover.Providers)+1)
for k, v := range discover.Providers {
providers[k] = v
}

// Custom providers that aren't enabled by default
providers["k8s"] = &k8s.Provider{}

discoverer, err := discover.New(discover.WithProviders(providers))
if err != nil {
return nil, fmt.Errorf("bootstrapping peer discovery: %w", err)
}

return func() ([]string, error) {
addrs, err := discoverer.Addrs(config, stdlog.New(log.NewStdlibAdapter(l), "", 0))
if err != nil {
return nil, fmt.Errorf("discovering peers: %w", err)
}

for i := range addrs {
// Default to using the same advertise port as the local node. This may
// break in some cases, so the user should make sure the port numbers
// align on as many nodes as possible.
addrs[i] = appendDefaultPort(addrs[i], defaultPort)
}

return addrs, nil
}, nil
}
62 changes: 35 additions & 27 deletions cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
Expand All @@ -17,15 +18,16 @@ import (
"github.com/grafana/agent/converter"
convert_diag "github.com/grafana/agent/converter/diag"
"github.com/grafana/agent/pkg/boringcrypto"
"github.com/grafana/agent/pkg/cluster"
"github.com/grafana/agent/pkg/config/instrumentation"
"github.com/grafana/agent/pkg/flow"
"github.com/grafana/agent/pkg/flow/logging"
"github.com/grafana/agent/pkg/flow/tracing"
"github.com/grafana/agent/pkg/river/diag"
"github.com/grafana/agent/pkg/usagestats"
"github.com/grafana/agent/service"
"github.com/grafana/agent/service/cluster"
httpservice "github.com/grafana/agent/service/http"
uiservice "github.com/grafana/agent/service/ui"
"github.com/grafana/ckit/peer"
"github.com/prometheus/client_golang/prometheus"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -172,17 +174,6 @@ func (fr *flowRun) Run(configFile string) error {
reg := prometheus.DefaultRegisterer
reg.MustRegister(newResourcesCollector(l))

clusterer, err := cluster.New(l, reg, fr.clusterEnabled, fr.clusterNodeName, fr.httpListenAddr, fr.clusterAdvAddr, fr.clusterJoinAddr, fr.clusterDiscoverPeers, fr.clusterRejoinInterval)
if err != nil {
return fmt.Errorf("building clusterer: %w", err)
}
defer func() {
err := clusterer.Stop()
if err != nil {
level.Error(l).Log("msg", "failed to terminate clusterer", "err", err)
}
}()

// There's a cyclic dependency between the definition of the Flow controller,
// the reload/ready functions, and the HTTP service.
//
Expand All @@ -193,28 +184,51 @@ func (fr *flowRun) Run(configFile string) error {
ready func() bool
)

clusterService, err := buildClusterService(clusterOptions{
Log: l,
Tracer: t,
Metrics: reg,

EnableClustering: fr.clusterEnabled,
NodeName: fr.clusterNodeName,
AdvertiseAddress: fr.clusterAdvAddr,
ListenAddress: fr.httpListenAddr,
JoinPeers: strings.Split(fr.clusterJoinAddr, ","),
DiscoverPeers: fr.clusterDiscoverPeers,
RejoinInterval: fr.clusterRejoinInterval,
})
if err != nil {
return err
}

httpService := httpservice.New(httpservice.Options{
Logger: log.With(l, "service", "http"),
Tracer: t,
Gatherer: prometheus.DefaultGatherer,

Clusterer: clusterer,
ReadyFunc: func() bool { return ready() },
ReloadFunc: func() error { return reload() },

HTTPListenAddr: fr.httpListenAddr,
MemoryListenAddr: fr.inMemoryAddr,
UIPrefix: fr.uiPrefix,
EnablePProf: fr.enablePprof,
})

uiService := uiservice.New(uiservice.Options{
UIPrefix: fr.uiPrefix,
Cluster: clusterService.Data().(cluster.Cluster),
})

f := flow.New(flow.Options{
Logger: l,
Tracer: t,
Clusterer: clusterer,
DataPath: fr.storagePath,
Reg: reg,
Services: []service.Service{httpService},
Logger: l,
Tracer: t,
DataPath: fr.storagePath,
Reg: reg,
Services: []service.Service{
httpService,
uiService,
clusterService,
},
})

ready = f.Ready
Expand Down Expand Up @@ -255,12 +269,6 @@ func (fr *flowRun) Run(configFile string) error {
}()
}

// Start the Clusterer's Node implementation.
err = clusterer.Start(ctx)
if err != nil {
return fmt.Errorf("failed to start the clusterer: %w", err)
}

// Perform the initial reload. This is done after starting the HTTP server so
// that /metric and pprof endpoints are available while the Flow controller
// is loading.
Expand Down Expand Up @@ -290,7 +298,7 @@ func (fr *flowRun) Run(configFile string) error {
// Nodes initially join in the Viewer state. After the graph has been
// loaded successfully, we can move to the Participant state to signal that
// we wish to participate in reading or writing data.
err = clusterer.ChangeState(peer.StateParticipant)
err = clusterService.ChangeState(ctx, peer.StateParticipant)
if err != nil {
return fmt.Errorf("failed to set clusterer state to Participant after initial load")
}
Expand Down
13 changes: 0 additions & 13 deletions component/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,3 @@ type DebugComponent interface {
// DebugInfo must be safe for calling concurrently.
DebugInfo() interface{}
}

// ClusteredComponent is an extension interface for components which implement
// clustering-specific behavior.
type ClusteredComponent interface {
Component

// NotifyClusterChange notifies the component that the state of the cluster
// has changed.
//
// Implementations of ClusteredComponent should ignore calls to this method
// if they are configured to not utilize clustering.
NotifyClusterChange()
}
12 changes: 6 additions & 6 deletions component/discovery/discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import (
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/pkg/cluster"
"github.com/grafana/agent/service/cluster"
"github.com/grafana/ckit/shard"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/discovery"
Expand All @@ -24,13 +24,13 @@ type Target map[string]string
// targets when a Flow component runs in a cluster.
type DistributedTargets struct {
useClustering bool
node cluster.Node
cluster cluster.Cluster
targets []Target
}

// NewDistributedTargets creates the abstraction that allows components to
// dynamically shard targets between components.
func NewDistributedTargets(e bool, n cluster.Node, t []Target) DistributedTargets {
func NewDistributedTargets(e bool, n cluster.Cluster, t []Target) DistributedTargets {
return DistributedTargets{e, n, t}
}

Expand All @@ -39,16 +39,16 @@ func NewDistributedTargets(e bool, n cluster.Node, t []Target) DistributedTarget
// If a cluster size is 1, then all targets will be returned.
func (t *DistributedTargets) Get() []Target {
// TODO(@tpaschalis): Make this into a single code-path to simplify logic.
if !t.useClustering || t.node == nil {
if !t.useClustering || t.cluster == nil {
return t.targets
}

res := make([]Target, 0, (len(t.targets)+1)/len(t.node.Peers()))
res := make([]Target, 0, (len(t.targets)+1)/len(t.cluster.Peers()))

// TODO(@tpaschalis): Make sure OpReadWrite is the correct operation;
// eg. this determines how clustering behaves when nodes are shutting down.
for _, tgt := range t.targets {
peers, err := t.node.Lookup(shard.StringKey(tgt.NonMetaLabels().String()), 1, shard.OpReadWrite)
peers, err := t.cluster.Lookup(shard.StringKey(tgt.NonMetaLabels().String()), 1, shard.OpReadWrite)
if err != nil {
// This can only fail in case we ask for more owners than the
// available peers. This will never happen, but in any case we fall
Expand Down
3 changes: 2 additions & 1 deletion component/module/file/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/grafana/agent/component/local/file"
"github.com/grafana/agent/component/module"
"github.com/grafana/agent/pkg/river/rivertypes"
"github.com/grafana/agent/service/cluster"
"github.com/grafana/agent/service/http"
)

Expand All @@ -18,7 +19,7 @@ func init() {
Name: "module.file",
Args: Arguments{},
Exports: module.Exports{},
NeedsServices: []string{http.ServiceName},
NeedsServices: []string{http.ServiceName, cluster.ServiceName},

Build: func(opts component.Options, args component.Arguments) (component.Component, error) {
return New(opts, args.(Arguments))
Expand Down
Loading

0 comments on commit a3544fe

Please sign in to comment.