Skip to content

Commit

Permalink
Merge pull request #6 from khunafin/add_interface
Browse files Browse the repository at this point in the history
extract Pool interface
  • Loading branch information
shmel1k authored Apr 6, 2021
2 parents fe097c6 + 15a2707 commit fbb9992
Show file tree
Hide file tree
Showing 3 changed files with 29 additions and 20 deletions.
17 changes: 17 additions & 0 deletions gop.go
Original file line number Diff line number Diff line change
@@ -1,2 +1,19 @@
// Package gop contains golang worker pool helpers.
package gop

import "context"

// TaskFn is a wrapper for task function.
type TaskFn func()

type Pool interface {
// Add adds tasks to the pool.
Add(TaskFn) error
AddContext(context.Context, TaskFn) error
// QueueSize is a current queue size.
QueueSize() int32
// Shutdown closes pool and stops workers.
//
// If any tasks in a queue left, pool will not take them, so the tasks will be lost.
Shutdown() error
}
28 changes: 10 additions & 18 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,8 @@ import (
"time"
)

// TaskFn is a wrapper for task function.
type TaskFn func()

// Pool represents worker pool.
type Pool struct {
// pool represents worker pool.
type pool struct {
conf Config
tasks chan TaskFn
quit chan struct{}
Expand All @@ -21,9 +18,9 @@ type Pool struct {
}

// NewPool creates a new pool with given configuration params.
func NewPool(conf Config) *Pool {
func NewPool(conf Config) Pool {
conf = conf.withDefaults()
p := &Pool{
p := &pool{
conf: conf,
quit: make(chan struct{}),
tasks: make(chan TaskFn, conf.MaxQueueSize),
Expand Down Expand Up @@ -57,16 +54,15 @@ func NewPool(conf Config) *Pool {
return p
}

// Add adds tasks to the pool.
func (p *Pool) Add(t TaskFn) error {
func (p *pool) Add(t TaskFn) error {
return p.add(context.Background(), t)
}

func (p *Pool) AddContext(ctx context.Context, t TaskFn) error {
func (p *pool) AddContext(ctx context.Context, t TaskFn) error {
return p.add(ctx, t)
}

func (p *Pool) add(ctx context.Context, t TaskFn) error {
func (p *pool) add(ctx context.Context, t TaskFn) error {
select {
case <-p.quit:
return ErrPoolClosed
Expand Down Expand Up @@ -122,7 +118,7 @@ func (p *Pool) add(ctx context.Context, t TaskFn) error {
}
}

func (p *Pool) spawnExtraWorker(t TaskFn) error {
func (p *pool) spawnExtraWorker(t TaskFn) error {
// FIXME: possible optimization. Add check if
// additional workers are enabled in configuration.
var swapped bool
Expand Down Expand Up @@ -160,16 +156,12 @@ func (p *Pool) spawnExtraWorker(t TaskFn) error {
return nil
}

// QueueSize is a current queue size.
func (p *Pool) QueueSize() int32 {
func (p *pool) QueueSize() int32 {
res := atomic.LoadInt32(&p.realQueueSize)
return res
}

// Shutdown closes pool and stops workers.
//
// If any tasks in a queue left, pool will not take them, so the tasks will be lost.
func (p *Pool) Shutdown() error {
func (p *pool) Shutdown() error {
select {
case <-p.quit:
return ErrPoolClosed
Expand Down
4 changes: 2 additions & 2 deletions pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"time"
)

func shutdownPool(t *testing.T, p *Pool) {
func shutdownPool(t *testing.T, p Pool) {
if err := p.Shutdown(); err != nil {
t.Errorf("failed to shutdown pool: %s", err)
}
Expand Down Expand Up @@ -104,7 +104,7 @@ func TestPoolQueueOverfilled(t *testing.T) {

func TestPoolScheduleTimeout(t *testing.T) {
var testData = []struct {
pool *Pool
pool Pool
expectedErr error
}{
{
Expand Down

0 comments on commit fbb9992

Please sign in to comment.