Skip to content

Commit

Permalink
fix: fixed concurrency parallel execution & sending in closed chan issue
Browse files Browse the repository at this point in the history
  • Loading branch information
hokamsingh committed Oct 11, 2024
1 parent 571d6b5 commit c92316c
Showing 1 changed file with 29 additions and 22 deletions.
51 changes: 29 additions & 22 deletions internal/core/concurrency/concurrency.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ type WorkerPool struct {
resultChan chan result
workerCount int
wg sync.WaitGroup
once sync.Once // Used to ensure resultChan is closed only once
}

type result struct {
Expand All @@ -55,7 +56,7 @@ func NewWorkerPool(workerCount int) *WorkerPool {
}

// Run starts the workers in the pool.
func (wp *WorkerPool) Run(ctx context.Context) {
func (wp *WorkerPool) Run(ctx context.Context, taskIndexes map[*Task]int) {
for i := 0; i < wp.workerCount; i++ {
wp.wg.Add(1)
go func() {
Expand All @@ -66,22 +67,24 @@ func (wp *WorkerPool) Run(ctx context.Context) {
return
default:
output, err := task.Execute(ctx)
wp.resultChan <- result{output: output, err: err}
wp.resultChan <- result{index: taskIndexes[task], output: output, err: err}
}
}
}()
}
}

// Stop waits for all workers to finish.
// Stop closes the task channel and waits for all workers to finish.
func (wp *WorkerPool) Stop() {
close(wp.taskChan)
wp.wg.Wait()
close(wp.resultChan)
close(wp.taskChan) // No more tasks can be submitted
wp.wg.Wait() // Wait for all workers to finish
wp.once.Do(func() {
close(wp.resultChan) // Close result channel only once after workers are done
})
}

// Submit adds a task to the task channel.
func (wp *WorkerPool) Submit(task *Task, index int) {
func (wp *WorkerPool) Submit(task *Task) {
wp.taskChan <- task
}

Expand Down Expand Up @@ -122,46 +125,50 @@ func (tm *TaskManager) Run(ctx context.Context) ([]interface{}, error) {
func (tm *TaskManager) runParallel(ctx context.Context) ([]interface{}, error) {
pool := NewWorkerPool(tm.workerCount)
results := make([]interface{}, len(tm.tasks))
errChan := make(chan error, 1) // Buffer size 1 for first error
errChan := make(chan error, 1) // Buffer size 1 for first error
var mu sync.Mutex // Protects access to the results slice
doneChan := make(chan struct{}) // To signal when results collection is done

// Start worker pool
pool.Run(ctx)
taskIndexes := make(map[*Task]int)
for i, task := range tm.tasks {
taskIndexes[task] = i
}
pool.Run(ctx, taskIndexes)

// Submit tasks to the worker pool
for i, task := range tm.tasks {
go func(index int, task *Task) {
select {
case <-ctx.Done():
errChan <- ctx.Err()
default:
pool.Submit(task, index)
}
}(i, task)
for _, task := range tm.tasks {
pool.Submit(task)
}

// Collect results
go func() {
for res := range pool.Results() {
mu.Lock()
if res.err != nil {
select {
case errChan <- res.err: // pass only first error
case errChan <- res.err:
default:
}
} else {
results[res.index] = res.output
}
mu.Unlock()
}
close(doneChan) // Close doneChan when results collection is complete
}()

// Stop the worker pool and wait for results
pool.Stop()
close(errChan)

// Check for errors
if len(errChan) > 0 {
return nil, <-errChan
select {
case err := <-errChan:
return nil, err
default:
return results, nil
}
return results, nil
}

// runSequential executes all tasks one by one and collects the results.
Expand Down

0 comments on commit c92316c

Please sign in to comment.