Skip to content

Commit

Permalink
add in config auto-reload to flow
Browse files Browse the repository at this point in the history
document the changes and add to CL

run lint

better wording in docs for disabling auto reload

Co-authored-by: Clayton Cornell <[email protected]>
handle inline instead of throwing around channels

gofmt-ed

prrevent main proc from finishing before watcher is closed

Co-authored-by: Robert Fratto <[email protected]>
allow polling with override and rework to a separate utility for clean


adding in force polling option is too granular


rely on the context for sig events
  • Loading branch information
joshuapare committed Jul 2, 2023
1 parent a2226c1 commit 2138d10
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 8 deletions.
35 changes: 28 additions & 7 deletions cmd/internal/flowmode/cmd_run.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"path"
"sync"
"syscall"
"time"

"github.com/grafana/agent/component"
"github.com/grafana/agent/converter"
Expand All @@ -33,6 +34,7 @@ import (
"github.com/grafana/agent/pkg/flow/tracing"
"github.com/grafana/agent/pkg/river/diag"
"github.com/grafana/agent/pkg/usagestats"
"github.com/grafana/agent/pkg/util"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
Expand All @@ -44,13 +46,15 @@ import (

func runCommand() *cobra.Command {
r := &flowRun{
inMemoryAddr: "agent.internal:12345",
httpListenAddr: "127.0.0.1:12345",
storagePath: "data-agent/",
uiPrefix: "/",
disableReporting: false,
enablePprof: true,
configFormat: "flow",
inMemoryAddr: "agent.internal:12345",
httpListenAddr: "127.0.0.1:12345",
storagePath: "data-agent/",
uiPrefix: "/",
disableReporting: false,
enablePprof: true,
configFormat: "flow",
configAutoReload: true,
configPollInterval: 3 * time.Second,
}

cmd := &cobra.Command{
Expand Down Expand Up @@ -87,6 +91,10 @@ depending on the nature of the reload error.
},
}

cmd.Flags().
BoolVar(&r.configAutoReload, "config.auto-reload", r.configAutoReload, "Automatically reload the config file when it changes on disk")
cmd.Flags().
DurationVar(&r.configPollInterval, "config.auto-reload.poll-interval", r.configPollInterval, "Interval at which to poll the config file for changes when polling is used")
cmd.Flags().
StringVar(&r.httpListenAddr, "server.http.listen-addr", r.httpListenAddr, "Address to listen for HTTP traffic on")
cmd.Flags().StringVar(&r.inMemoryAddr, "server.http.memory-addr", r.inMemoryAddr, "Address to listen for in-memory HTTP traffic on. Change if it collides with a real address")
Expand Down Expand Up @@ -122,6 +130,8 @@ type flowRun struct {
clusterJoinAddr string
configFormat string
configBypassConversionWarnings bool
configAutoReload bool
configPollInterval time.Duration
}

func (fr *flowRun) Run(configFile string) error {
Expand Down Expand Up @@ -344,6 +354,17 @@ func (fr *flowRun) Run(configFile string) error {
return err
}

// Setup config file watching
if fr.configAutoReload {
fw := util.NewFileWatcher(configFile, reload, fr.configPollInterval)
wg.Add(1)
go func() {
defer wg.Done()
fw.Watch(l, ctx)
}()
}

// Reload on SIGHUP
reloadSignal := make(chan os.Signal, 1)
signal.Notify(reloadSignal, syscall.SIGHUP)
defer signal.Stop(reloadSignal)
Expand Down
7 changes: 6 additions & 1 deletion docs/sources/flow/reference/cli/run.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ The following flags are supported:
* `--server.http.ui-path-prefix`: Base path where the UI will be exposed (default `/`).
* `--storage.path`: Base directory where components can store data (default `data-agent/`).
* `--disable-reporting`: Disable [usage reporting][] of enabled [components][] to Grafana (default `false`).
* `--config.auto-reload.poll-interval`: Interval at which to poll the config file for changes when polling is used (default `5s`).
* `--config.auto-reload`: Automatically reload the config file when it changes on disk (default `true`).
* `--cluster.enabled`: Start the Agent in clustered mode (default `false`).
* `--cluster.node-name`: The name to use for this node (defaults to the environment's hostname).
* `--cluster.join-addresses`: Comma-separated list of addresses to join the cluster at (default `""`).
Expand All @@ -47,7 +49,10 @@ The following flags are supported:

## Updating the config file

The config file can be reloaded from disk by either:
By default, Grafana Agent Flow will automatically reload the config file when it
changes on disk. To disable this feature, use the `--config.auto-reload=false` flag.

In addition, the config file can be reloaded from disk by either:

* Sending an HTTP POST request to the `/-/reload` endpoint.
* Sending a `SIGHUP` signal to the Grafana Agent process.
Expand Down
77 changes: 77 additions & 0 deletions pkg/util/filewatch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package util

import (
"context"
"os"
"time"

"github.com/fsnotify/fsnotify"
"github.com/go-kit/log/level"
"github.com/grafana/agent/pkg/flow/logging"
)

type ReloadFunc func() error

type FileWatcher struct {
filePath string
reloadFunc ReloadFunc
pollInterval time.Duration
}

// NewFileWatcher creates a new file watcher that will call reloadFunc when the file at filePath is modified.
func NewFileWatcher(filePath string, reloadFunc ReloadFunc, pollInterval time.Duration) *FileWatcher {
return &FileWatcher{
filePath: filePath,
reloadFunc: reloadFunc,
pollInterval: pollInterval,
}
}

// Watch starts watching the file for changes and calls reloadFunc when the file is modified.
func (fw *FileWatcher) Watch(l *logging.Logger, ctx context.Context) {
watcher, err := fsnotify.NewWatcher()
if err != nil {
level.Error(l).Log("msg", "failed to create watcher", "err", err)
return
}
defer watcher.Close()

err = watcher.Add(fw.filePath)
if err != nil {
level.Error(l).Log("msg", "failed to add file to watcher", "err", err)
return
}

ticker := time.NewTicker(fw.pollInterval)
defer ticker.Stop()

var lastModTime time.Time
if fileInfo, err := os.Stat(fw.filePath); err == nil {
lastModTime = fileInfo.ModTime()
}

for {
select {
case <-ctx.Done():
return
case event := <-watcher.Events:
if event.Op&fsnotify.Write == fsnotify.Write {
if err := fw.reloadFunc(); err != nil {
level.Error(l).Log("msg", "failed to reload", "err", err)
}
}
case err := <-watcher.Errors:
level.Error(l).Log("msg", "watcher error", "err", err)
case <-ticker.C:
if fileInfo, err := os.Stat(fw.filePath); err == nil {
modTime := fileInfo.ModTime()
if modTime.After(lastModTime) {
lastModTime = modTime
if err := fw.reloadFunc(); err != nil {
level.Error(l).Log("msg", "failed to reload", "err", err)
}
}
}
}
}
}

0 comments on commit 2138d10

Please sign in to comment.