From c3b244cd11978062b90d8aba4853df4dcacbdcc1 Mon Sep 17 00:00:00 2001 From: Anshul Date: Sun, 7 Apr 2024 16:15:25 +0400 Subject: [PATCH 1/2] fix: concurrently fetch metrics --- internal/insight/insight.go | 39 ++++++++++++++++++++++++++++--------- 1 file changed, 30 insertions(+), 9 deletions(-) diff --git a/internal/insight/insight.go b/internal/insight/insight.go index 92c8069..d0edd2c 100644 --- a/internal/insight/insight.go +++ b/internal/insight/insight.go @@ -10,6 +10,7 @@ package insight import ( "context" + "sync" "time" "github.com/aws/aws-sdk-go-v2/aws" @@ -65,17 +66,37 @@ func (in *Insight) Fetch(ctx context.Context, dbiResourceId string, dur time.Dur chunks = append(chunks, metrics[i:end]) } - // TODO: concurrent + childContext, cancel := context.WithCancel(ctx) + defer cancel() + samples := map[string]Samples{} - for _, chunk := range chunks { - set, err := in.fetch(ctx, dbiResourceId, dur, chunk...) - if err != nil { - return nil, err - } + var wg sync.WaitGroup + var err error - for k, v := range set { - samples[k] = v - } + for _, chunk := range chunks { + chunk := chunk + wg.Add(1) + + go func() { + defer wg.Done() + + set, e := in.fetch(childContext, dbiResourceId, dur, chunk...) + if e != nil { + if e != context.Canceled { + cancel() + err = e + } + return + } + + for k, v := range set { + samples[k] = v + } + }() + } + wg.Wait() + if err != nil { + return nil, err } return samples, nil From 4bb8c8e3fa6c6c69b12e992e4975406dbf1d0bdc Mon Sep 17 00:00:00 2001 From: Anshul Date: Sun, 7 Apr 2024 22:49:54 +0400 Subject: [PATCH 2/2] chore: review comments --- internal/insight/insight.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/internal/insight/insight.go b/internal/insight/insight.go index d0edd2c..5dd3873 100644 --- a/internal/insight/insight.go +++ b/internal/insight/insight.go @@ -72,6 +72,7 @@ func (in *Insight) Fetch(ctx context.Context, dbiResourceId string, dur time.Dur samples := map[string]Samples{} var wg sync.WaitGroup var err error + var mu sync.Mutex for _, chunk := range chunks { chunk := chunk @@ -81,8 +82,12 @@ func (in *Insight) Fetch(ctx context.Context, dbiResourceId string, dur time.Dur defer wg.Done() set, e := in.fetch(childContext, dbiResourceId, dur, chunk...) + + mu.Lock() + defer mu.Unlock() + if e != nil { - if e != context.Canceled { + if err == nil { cancel() err = e }