Skip to content

Commit

Permalink
refact: avoid use of defer calls in loops (#3466)
Browse files Browse the repository at this point in the history
* refact apic.Send()
* refact Papi.SendDeletedDecisions()
* refact MetricsProvider.Run()
* refact PluginBroker.pushNotificationsToPlugin()
* refact leakybucket.LoadBuckets()
  • Loading branch information
mmetc authored Feb 19, 2025
1 parent c4ff422 commit 8a10e2c
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 182 deletions.
2 changes: 1 addition & 1 deletion cmd/crowdsec-cli/clipapi/papi.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (cli *cliPapi) sync(ctx context.Context, out io.Writer, db *database.Client
return fmt.Errorf("unable to initialize PAPI client: %w", err)
}

t.Go(papi.SyncDecisions)
t.Go(func() error { return papi.SyncDecisions(ctx) })

err = papi.PullOnce(ctx, time.Time{}, true)
if err != nil {
Expand Down
49 changes: 22 additions & 27 deletions cmd/crowdsec/lpmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,27 @@ func (m *MetricsProvider) metricsPayload() *models.AllMetrics {
}
}

func (m *MetricsProvider) sendMetrics(ctx context.Context, met *models.AllMetrics) {
defer trace.CatchPanic("crowdsec/MetricsProvider.sendMetrics")

ctxTime, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

_, resp, err := m.apic.UsageMetrics.Add(ctxTime, met)
switch {
case errors.Is(err, context.DeadlineExceeded):
m.logger.Warnf("timeout sending lp metrics")
case err != nil && resp != nil && resp.Response.StatusCode == http.StatusNotFound:
m.logger.Warnf("metrics endpoint not found, older LAPI?")
case err != nil:
m.logger.Warnf("failed to send lp metrics: %s", err)
case resp.Response.StatusCode != http.StatusCreated:
m.logger.Warnf("failed to send lp metrics: %s", resp.Response.Status)
default:
m.logger.Tracef("lp usage metrics sent")
}
}

func (m *MetricsProvider) Run(ctx context.Context, myTomb *tomb.Tomb) error {
defer trace.CatchPanic("crowdsec/MetricsProvider.Run")

Expand All @@ -144,34 +165,8 @@ func (m *MetricsProvider) Run(ctx context.Context, myTomb *tomb.Tomb) error {
for {
select {
case <-ticker.C:
ctxTime, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()

_, resp, err := m.apic.UsageMetrics.Add(ctxTime, met)
switch {
case errors.Is(err, context.DeadlineExceeded):
m.logger.Warnf("timeout sending lp metrics")
ticker.Reset(m.interval)
continue
case err != nil && resp != nil && resp.Response.StatusCode == http.StatusNotFound:
m.logger.Warnf("metrics endpoint not found, older LAPI?")
ticker.Reset(m.interval)
continue
case err != nil:
m.logger.Warnf("failed to send lp metrics: %s", err)
ticker.Reset(m.interval)
continue
}

if resp.Response.StatusCode != http.StatusCreated {
m.logger.Warnf("failed to send lp metrics: %s", resp.Response.Status)
ticker.Reset(m.interval)
continue
}

m.sendMetrics(ctx, met)
ticker.Reset(m.interval)

m.logger.Tracef("lp usage metrics sent")
case <-myTomb.Dying():
ticker.Stop()
return nil
Expand Down
50 changes: 18 additions & 32 deletions pkg/apiserver/apic.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import (
const (
// delta values must be smaller than the interval
pullIntervalDefault = time.Hour * 2
pullIntervalDelta = 5 * time.Minute
pullIntervalDelta = time.Minute * 5
pushIntervalDefault = time.Second * 10
pushIntervalDelta = time.Second * 7
metricsIntervalDefault = time.Minute * 30
Expand Down Expand Up @@ -363,6 +363,15 @@ func shouldShareAlert(alert *models.Alert, consoleConfig *csconfig.ConsoleConfig
return true
}

func (a *apic) sendBatch(ctx context.Context, signals []*models.AddSignalsRequestItem) error {
ctxBatch, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

_, _, err := a.apiClient.Signal.Add(ctxBatch, (*models.AddSignalsRequest)(&signals))

return err
}

func (a *apic) Send(ctx context.Context, cacheOrig *models.AddSignalsRequest) {
/*we do have a problem with this :
The apic.Push background routine reads from alertToPush chan.
Expand All @@ -375,44 +384,21 @@ func (a *apic) Send(ctx context.Context, cacheOrig *models.AddSignalsRequest) {
I don't know enough about gin to tell how much of an issue it can be.
*/
var (
cache []*models.AddSignalsRequestItem = *cacheOrig
send models.AddSignalsRequest
)

bulkSize := 50
pageStart := 0
pageEnd := bulkSize

for {
if pageEnd >= len(cache) {
send = cache[pageStart:]
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)
var cache []*models.AddSignalsRequestItem = *cacheOrig

defer cancel()
batchSize := 50

_, _, err := a.apiClient.Signal.Add(ctx, &send)
if err != nil {
log.Errorf("sending signal to central API: %s", err)
return
}
for start := 0; start < len(cache); start += batchSize {
end := start + batchSize

break
if end > len(cache) {
end = len(cache)
}

send = cache[pageStart:pageEnd]
ctx, cancel := context.WithTimeout(ctx, 5*time.Second)

defer cancel()

_, _, err := a.apiClient.Signal.Add(ctx, &send)
if err != nil {
// we log it here as well, because the return value of func might be discarded
if err := a.sendBatch(ctx, cache[start:end]); err != nil {
log.Errorf("sending signal to central API: %s", err)
return
}

pageStart += bulkSize
pageEnd += bulkSize
}
}

Expand Down
6 changes: 3 additions & 3 deletions pkg/apiserver/apiserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,8 +332,8 @@ func (s *APIServer) papiPull(ctx context.Context) error {
return nil
}

func (s *APIServer) papiSync() error {
if err := s.papi.SyncDecisions(); err != nil {
func (s *APIServer) papiSync(ctx context.Context) error {
if err := s.papi.SyncDecisions(ctx); err != nil {
log.Errorf("capi decisions sync: %s", err)
return err
}
Expand All @@ -351,7 +351,7 @@ func (s *APIServer) initAPIC(ctx context.Context) {
if s.papi.URL != "" {
log.Info("Starting PAPI decision receiver")
s.papi.pullTomb.Go(func() error { return s.papiPull(ctx) })
s.papi.syncTomb.Go(s.papiSync)
s.papi.syncTomb.Go(func() error { return s.papiSync(ctx) })
} else {
log.Warnf("papi_url is not set in online_api_credentials.yaml, can't synchronize with the console. Run cscli console enable console_management to add it.")
}
Expand Down
55 changes: 22 additions & 33 deletions pkg/apiserver/papi.go
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ func (p *Papi) Pull(ctx context.Context) error {
return nil
}

func (p *Papi) SyncDecisions() error {
func (p *Papi) SyncDecisions(ctx context.Context) error {
defer trace.CatchPanic("lapi/syncDecisionsToCAPI")

var cache models.DecisionsDeleteRequest
Expand All @@ -304,7 +304,7 @@ func (p *Papi) SyncDecisions() error {
return nil
}

go p.SendDeletedDecisions(&cache)
go p.SendDeletedDecisions(ctx, &cache)

return nil
case <-ticker.C:
Expand All @@ -315,7 +315,7 @@ func (p *Papi) SyncDecisions() error {
p.mu.Unlock()
p.Logger.Infof("sync decisions: %d deleted decisions to push", len(cacheCopy))

go p.SendDeletedDecisions(&cacheCopy)
go p.SendDeletedDecisions(ctx, &cacheCopy)
}
case deletedDecisions := <-p.Channels.DeleteDecisionChannel:
if (p.consoleConfig.ShareManualDecisions != nil && *p.consoleConfig.ShareManualDecisions) || (p.consoleConfig.ConsoleManagement != nil && *p.consoleConfig.ConsoleManagement) {
Expand All @@ -335,45 +335,34 @@ func (p *Papi) SyncDecisions() error {
}
}

func (p *Papi) SendDeletedDecisions(cacheOrig *models.DecisionsDeleteRequest) {
var (
cache []models.DecisionsDeleteRequestItem = *cacheOrig
send models.DecisionsDeleteRequest
)
func (p *Papi) sendDeletedDecisionsBatch(ctx context.Context, decisions []models.DecisionsDeleteRequestItem) error {
ctxBatch, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()

bulkSize := 50
pageStart := 0
pageEnd := bulkSize

for {
if pageEnd >= len(cache) {
send = cache[pageStart:]
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
_, _, err := p.apiClient.DecisionDelete.Add(ctxBatch, (*models.DecisionsDeleteRequest)(&decisions))
if err != nil {
return err
}

defer cancel()
return nil
}

_, _, err := p.apiClient.DecisionDelete.Add(ctx, &send)
if err != nil {
p.Logger.Errorf("sending deleted decisions to central API: %s", err)
return
}
func (p *Papi) SendDeletedDecisions(ctx context.Context, cacheOrig *models.DecisionsDeleteRequest) {
var cache []models.DecisionsDeleteRequestItem = *cacheOrig

break
}
batchSize := 50

send = cache[pageStart:pageEnd]
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
for start := 0; start < len(cache); start += batchSize {
end := start + batchSize

defer cancel()
if end > len(cache) {
end = len(cache)
}

_, _, err := p.apiClient.DecisionDelete.Add(ctx, &send)
if err != nil {
// we log it here as well, because the return value of func might be discarded
if err := p.sendDeletedDecisionsBatch(ctx, cache[start:end]); err != nil {
p.Logger.Errorf("sending deleted decisions to central API: %s", err)
return
}

pageStart += bulkSize
pageEnd += bulkSize
}
}

Expand Down
Loading

0 comments on commit 8a10e2c

Please sign in to comment.