forked from fatih/pool
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathchannel.go
159 lines (134 loc) · 3.62 KB
/
channel.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
package pool
import (
"errors"
"fmt"
"net"
"sync"
"time"
)
type PoolConfig struct {
InitialCap int // inital capacity
MaxCap int // max capacity
Factory Factory
IdleTimeout time.Duration
WaitTimeout time.Duration
}
// channelPool implements the Pool interface based on buffered channels.
type channelPool struct {
// storage for our net.Conn connections
mu sync.RWMutex
conns chan *idleConn
// net.Conn generator
factory Factory
idleTimeout time.Duration
waitTimeout time.Duration
}
type idleConn struct {
conn net.Conn
t time.Time
}
// Factory is a function to create new connections.
type Factory func() (net.Conn, error)
// NewChannelPool returns a new pool based on buffered channels with an initial
// capacity and maximum capacity. Factory is used when initial capacity is
// greater than zero to fill the pool. A zero initialCap doesn't fill the Pool
// until a new Get() is called. During a Get(), If there is no new connection
// available in the pool, a new connection will be created via the Factory()
// method.
func NewChannelPool(config *PoolConfig) (Pool, error) {
if config.InitialCap < 0 || config.MaxCap <= 0 || config.InitialCap > config.MaxCap {
return nil, errors.New("invalid capacity settings")
}
c := &channelPool{
conns: make(chan *idleConn, config.MaxCap),
factory: config.Factory,
idleTimeout: config.IdleTimeout,
}
// create initial connections, if something goes wrong,
// just close the pool error out.
for i := 0; i < config.InitialCap; i++ {
conn, err := config.Factory()
if err != nil {
c.Close()
return nil, fmt.Errorf("factory is not able to fill the pool: %s", err)
}
c.conns <- &idleConn{conn: conn, t: time.Now()}
}
return c, nil
}
func (c *channelPool) getConnsAndFactory() (chan *idleConn, Factory) {
c.mu.RLock()
conns := c.conns
factory := c.factory
c.mu.RUnlock()
return conns, factory
}
// Get implements the Pool interfaces Get() method. If there is no new
// connection available in the pool, a new connection will be created via the
// Factory() method.
func (c *channelPool) Get() (net.Conn, error) {
conns, factory := c.getConnsAndFactory()
// todo: test if it is ok like this
if conns == nil {
fmt.Println("Get, conns is nil")
return nil, ErrClosed
}
// wrap our connections with out custom net.Conn implementation (wrapConn
// method) that puts the connection back to the pool if it's closed.
// TRY:
select {
// case <-time.After(c.waitTimeout):
// goto TRY
case conn := <-conns:
if conn == nil {
return nil, ErrClosed
}
return c.wrapConn(conn.conn), nil
default:
conn, err := factory()
if err != nil {
return nil, err
}
return c.wrapConn(conn), nil
}
}
// put puts the connection back to the pool. If the pool is full or closed,
// conn is simply closed. A nil conn will be rejected.
func (c *channelPool) put(conn net.Conn) error {
if conn == nil {
return errors.New("connection is nil. rejecting")
}
c.mu.RLock()
defer c.mu.RUnlock()
if c.conns == nil {
// pool is closed, close passed connection
return conn.Close()
}
// put the resource back into the pool. If the pool is full, this will
// block and the default case will be executed.
select {
case c.conns <- &idleConn{conn: conn, t: time.Now()}:
return nil
default:
// pool is full, close passed connection
return conn.Close()
}
}
func (c *channelPool) Close() {
c.mu.Lock()
conns := c.conns
c.conns = nil
c.factory = nil
c.mu.Unlock()
if conns == nil {
return
}
close(conns)
for conn := range conns {
conn.conn.Close()
}
}
func (c *channelPool) Len() int {
conns, _ := c.getConnsAndFactory()
return len(conns)
}