Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] fixing and refactoring monitoring #649

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 7 additions & 3 deletions common/ecsmetrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"time"

"github.com/AliceO2Group/Control/common/logger"
"github.com/AliceO2Group/Control/common/logger/infologger"
"github.com/AliceO2Group/Control/common/monitoring"
"github.com/sirupsen/logrus"
)
Expand All @@ -30,7 +31,6 @@ func gather() monitoring.Metric {
{Name: "/memory/classes/heap/unused:bytes"},
}

// Collect metrics data
internalmetrics.Read(samples)

metric := NewMetric("golangruntimemetrics")
Expand All @@ -42,24 +42,28 @@ func gather() monitoring.Metric {
case internalmetrics.KindFloat64:
metric.AddValue(sample.Name, sample.Value.Float64())
case internalmetrics.KindFloat64Histogram:
log.Warning("Error: Histogram is not supported yet for metric [%s]", sample.Name)
log.WithField("level", infologger.IL_Devel).Warningf("Error: Histogram is not supported yet for metric [%s]", sample.Name)
continue
default:
log.Warning("Unsupported kind %v for metric %s\n", sample.Value.Kind(), sample.Name)
log.WithField("level", infologger.IL_Devel).Warningf("Unsupported kind %v for metric %s\n", sample.Value.Kind(), sample.Name)
continue
}
}
return metric
}

func StartGolangMetrics(period time.Duration) {
log.WithField("level", infologger.IL_Devel).Info("Starting golang metrics reporting")
go func() {
log.Debug("Starting golang metrics goroutine")
for {
select {
case <-endRequestChannel:
log.Debug("ending golang metrics")
endRequestChannel <- struct{}{}
return
default:
log.Debug("sending golang metrics")
monitoring.Send(gather())
time.Sleep(period)
}
Expand Down
56 changes: 37 additions & 19 deletions common/monitoring/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,64 +22,77 @@ var (
// channel used to send metrics into the event loop
metricsChannel chan Metric

// channel for sending notifications to event loop that new http Request to report metrics arrived
metricsRequestChannel chan struct{}
// channel for sending requests to reset actual metrics slice and send it back to caller via metricsExportedToRequest
metricsRequestedChannel chan struct{}

// channel used to send metrics to be reported by http request from event loop
metricsToRequest chan []Metric
metricsExportedToRequest chan []Metric

Log = logger.New(logrus.StandardLogger(), "metrics")
log = logger.New(logrus.StandardLogger(), "metrics")
)

func initChannels(messageBufferSize int) {
endChannel = make(chan struct{})
metricsRequestChannel = make(chan struct{})
metricsRequestedChannel = make(chan struct{})
// 100 was chosen arbitrarily as a number that seemed sensible to be high enough to provide nice buffer if
// multiple goroutines want to send metrics without blocking each other
metricsChannel = make(chan Metric, 100)
metricsToRequest = make(chan []Metric)
metricsExportedToRequest = make(chan []Metric)
metricsLimit = messageBufferSize
}

func closeChannels() {
close(endChannel)
close(metricsRequestChannel)
close(metricsRequestedChannel)
close(metricsChannel)
close(metricsToRequest)
close(metricsExportedToRequest)
}

// this eventLoop is the main part that processes all metrics send to the package
// 3 events can happen:
// 1. metricsChannel receives message from Send() method. We just add the new metric to metrics slice
// 2. metricsRequestChannel receives request to dump and request existing metrics. We send shallow copy of existing
// metrics to requestor (via metricsExportedToRequest channel) while resetting current metrics slice
// 3. receive request to stop monitoring via endChannel. We send confirmation through endChannel to notify caller
// that eventLoop stopped
func eventLoop() {
for {
select {
case <-metricsRequestChannel:
case <-metricsRequestedChannel:
shallowCopyMetrics := metrics
metrics = make([]Metric, 0)
metricsToRequest <- shallowCopyMetrics
metricsExportedToRequest <- shallowCopyMetrics

case metric := <-metricsChannel:
if len(metrics) < metricsLimit {
metrics = append(metrics, metric)
} else {
Log.Warn("too many metrics waiting to be scraped. Are you sure that metrics scraping is running?")
log.Warn("too many metrics waiting to be scraped. Are you sure that metrics scraping is running?")
}

case <-endChannel:
endChannel <- struct{}{}
defer func() {
endChannel <- struct{}{}
}()
return
}
}
}

func exportMetricsAndReset(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
metricsRequestChannel <- struct{}{}
metricsToConvert := <-metricsToRequest
metricsRequestedChannel <- struct{}{}
metricsToConvert := <-metricsExportedToRequest
if metricsToConvert == nil {
metricsToConvert = make([]Metric, 0)
}
json.NewEncoder(w).Encode(metricsToConvert)
}

func Send(metric Metric) {
metricsChannel <- metric
if IsRunning() {
metricsChannel <- metric
}
}

func handleFunc(endpointName string) {
Expand All @@ -96,22 +109,22 @@ func handleFunc(endpointName string) {
// \param messageBufferSize size of buffer for messages where messages are kept between scraping request.
//
// If we attempt send more messages than the size of the buffer, these overflowing messages will be ignored and warning will be logged.
func Start(port uint16, endpointName string, messageBufferSize int) error {
if server != nil {
func Run(port uint16, endpointName string, messageBufferSize int) error {
if IsRunning() {
return nil
}

initChannels(messageBufferSize)

go eventLoop()

server := &http.Server{Addr: fmt.Sprintf(":%d", port)}
server = &http.Server{Addr: fmt.Sprintf(":%d", port)}
handleFunc(endpointName)
return server.ListenAndServe()
}

func Stop() {
if server == nil {
if !IsRunning() {
return
}

Expand All @@ -122,4 +135,9 @@ func Stop() {
endChannel <- struct{}{}
<-endChannel
server = nil
metrics = nil
}

func IsRunning() bool {
return server != nil
}
86 changes: 54 additions & 32 deletions common/monitoring/monitoring_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,36 +8,61 @@ import (
"time"
)

// blocks until either IsRunning() returns true or timeout is triggered
func isRunningWithTimeout(t *testing.T, timeout time.Duration) {
timeoutChan := time.After(timeout)
for !IsRunning() {
select {
case <-timeoutChan:
t.Errorf("Monitoring is not running even after %v", timeout)
return

default:
time.Sleep(10 * time.Millisecond)
}
}
}

// block until either length of metrics is the same as \requiredMessages or timeout is triggered
func hasNumberOfMetrics(t *testing.T, timeout time.Duration, requiredMessages int) {
timeoutChan := time.After(timeout)
for len(metrics) != requiredMessages {
select {
case <-timeoutChan:
t.Errorf("Timeout %v triggered when waiting for %v messages, got %v", timeout, requiredMessages, len(metrics))
return

default:
time.Sleep(10 * time.Millisecond)
}
}
}

func TestSimpleStartStop(t *testing.T) {
go Start(1234, "/random", 100)
time.Sleep(time.Millisecond * 100)
go Run(1234, "/random", 100)
isRunningWithTimeout(t, time.Second)
Stop()
}

func TestStartMultipleStop(t *testing.T) {
go Start(1234, "/random", 100)
time.Sleep(time.Millisecond * 100)
go Run(1234, "/random", 100)
isRunningWithTimeout(t, time.Second)
Stop()
Stop()
}

func cleaningUpAfterTest() {
endChannel <- struct{}{}
<-endChannel
closeChannels()
metrics = make([]Metric, 0)
Stop()
}

func initTest() {
initChannels(100)
// we need metrics channel to block so we don't end to quickly
metricsChannel = make(chan Metric, 0)
go eventLoop()
go Run(12345, "notimportant", 100)
}

// decorator function that properly inits and cleans after higher level test of Monitoring package
func testFunction(t *testing.T, testToRun func(*testing.T)) {
initTest()
isRunningWithTimeout(t, time.Second)
testToRun(t)
cleaningUpAfterTest()
}
Expand All @@ -46,9 +71,7 @@ func TestSendingSingleMetric(t *testing.T) {
testFunction(t, func(t *testing.T) {
metric := Metric{Name: "test"}
Send(metric)
if len(metrics) != 1 {
t.Error("wrong number of metrics, should be 1")
}
hasNumberOfMetrics(t, time.Second, 1)

if metrics[0].Name != "test" {
t.Errorf("Got wrong name %s in stored metric", metrics[0].Name)
Expand All @@ -60,16 +83,17 @@ func TestExportingMetrics(t *testing.T) {
testFunction(t, func(t *testing.T) {
metric := Metric{Name: "test"}
Send(metric)
hasNumberOfMetrics(t, time.Second, 1)

metricsRequestChannel <- struct{}{}
metrics := <-metricsToRequest
metricsRequestedChannel <- struct{}{}
metricsToExport := <-metricsExportedToRequest

if len(metrics) != 1 {
t.Errorf("Got wrong amount of metrics %d, expected 1", len(metrics))
if len(metricsToExport) != 1 {
t.Errorf("Got wrong amount of metrics %d, expected 1", len(metricsToExport))
}

if metrics[0].Name != "test" {
t.Errorf("Got wrong name of metric %s, expected test", metrics[0].Name)
if metricsToExport[0].Name != "test" {
t.Errorf("Got wrong name of metric %s, expected test", metricsToExport[0].Name)
}
})
}
Expand All @@ -81,11 +105,9 @@ func TestBufferLimit(t *testing.T) {
metric.Timestamp = 10
metric.AddTag("tag1", 42)
metric.AddValue("value1", 11)
Send(metric)

if len(metrics) != 1 {
t.Errorf("Metrics length is %d, but should be 1 after sending first metric", len(metrics))
}
Send(metric)
hasNumberOfMetrics(t, time.Second, 1)

Send(metric)
time.Sleep(100 * time.Millisecond)
Expand All @@ -97,20 +119,20 @@ func TestBufferLimit(t *testing.T) {
}

func TestHttpRun(t *testing.T) {
go Start(12345, "/metrics", 10)
go Run(9876, "/metrics", 10)
defer Stop()

time.Sleep(time.Second)
isRunningWithTimeout(t, time.Second)

metric := Metric{Name: "test"}
metric.Timestamp = 10
metric.AddTag("tag1", 42)
metric.AddValue("value1", 11)
Send(metric)

response, err := http.Get("http://localhost:12345/metrics")
response, err := http.Get("http://localhost:9876/metrics")
if err != nil {
t.Fatalf("Failed to GET metrics at port 12345: %v", err)
t.Fatalf("Failed to GET metrics at port 9876: %v", err)
}
decoder := json.NewDecoder(response.Body)
var receivedMetrics []Metric
Expand Down Expand Up @@ -157,7 +179,7 @@ func TestHttpRun(t *testing.T) {
// PASS
// ok github.com/AliceO2Group/Control/common/monitoring 44.686s
func BenchmarkSendingMetrics(b *testing.B) {
Start(12345, "/metrics", 100)
Run(12345, "/metrics", 100)

// this goroutine keeps clearing results so RAM does not exhausted
go func() {
Expand All @@ -168,8 +190,8 @@ func BenchmarkSendingMetrics(b *testing.B) {
break
default:
if len(metrics) >= 10000000 {
metricsRequestChannel <- struct{}{}
<-metricsToRequest
metricsRequestedChannel <- struct{}{}
<-metricsExportedToRequest
}
}
time.Sleep(100 * time.Millisecond)
Expand Down
2 changes: 1 addition & 1 deletion core/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func setDefaults() error {
viper.SetDefault("kafkaEndpoints", []string{"localhost:9092"})
viper.SetDefault("enableKafka", true)
viper.SetDefault("logAllIL", false)
viper.SetDefault("metricsEndpoint", "8086/metrics")
viper.SetDefault("metricsEndpoint", "8088/ecsmetrics")
viper.SetDefault("metricsBufferSize", 10000)
return nil
}
Expand Down
4 changes: 2 additions & 2 deletions core/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ func parseMetricsEndpoint(metricsEndpoint string) (error, uint16, string) {
}

func runMetrics() {
log.Info("Starting run metrics")
metricsEndpoint := viper.GetString("metricsEndpoint")
err, port, endpoint := parseMetricsEndpoint(metricsEndpoint)
if err != nil {
Expand All @@ -82,7 +81,8 @@ func runMetrics() {
}

go func() {
if err := monitoring.Start(port, fmt.Sprintf("/%s", endpoint), viper.GetInt("metricsBufferSize")); err != nil && err != http.ErrServerClosed {
log.Infof("Starting to listen on endpoint %s:%d for metrics", endpoint, port)
if err := monitoring.Run(port, fmt.Sprintf("/%s", endpoint), viper.GetInt("metricsBufferSize")); err != nil && err != http.ErrServerClosed {
ecsmetrics.StopGolangMetrics()
log.Errorf("failed to run metrics on port %d and endpoint: %s")
}
Expand Down