Skip to content

Commit

Permalink
Add a rate limit in worker (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vinyas Malagaudanavar authored Feb 20, 2020
1 parent ba9231d commit 31704f4
Showing 1 changed file with 4 additions and 1 deletion.
5 changes: 4 additions & 1 deletion worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type CeleryWorker struct {
taskLock sync.RWMutex
stopChannel chan struct{}
workWG sync.WaitGroup
rateLimitPeriod time.Duration
}

// NewCeleryWorker returns new celery worker
Expand All @@ -26,6 +27,7 @@ func NewCeleryWorker(broker CeleryBroker, backend CeleryBackend, numWorkers int)
backend: backend,
numWorkers: numWorkers,
registeredTasks: make(map[string]interface{}),
rateLimitPeriod: 100 * time.Millisecond,
}
}

Expand All @@ -38,11 +40,12 @@ func (w *CeleryWorker) StartWorker() {
for i := 0; i < w.numWorkers; i++ {
go func(workerID int) {
defer w.workWG.Done()
ticker := time.NewTicker(w.rateLimitPeriod)
for {
select {
case <-w.stopChannel:
return
default:
case <-ticker.C:

// process messages
taskMessage, err := w.broker.GetTaskMessage()
Expand Down

0 comments on commit 31704f4

Please sign in to comment.