Skip to content

Commit

Permalink
Merge pull request #106 from EpicStep/main
Browse files Browse the repository at this point in the history
feat: implement connection pool
  • Loading branch information
ernado authored May 25, 2022
2 parents 73ddb29 + 2a3e61c commit 05b12f3
Show file tree
Hide file tree
Showing 10 changed files with 446 additions and 0 deletions.
44 changes: 44 additions & 0 deletions chpool/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package chpool

import (
"context"
"time"

puddle "github.com/jackc/puddle/puddleg"

"github.com/go-faster/ch"
)

// Client is an acquired *ch.Client from a Pool.
type Client struct {
res *puddle.Resource[*connResource]
p *Pool
}

// Release returns client to the pool.
func (c *Client) Release() {
if c.res == nil {
return
}

client := c.client()

if client.IsClosed() || time.Since(c.res.CreationTime()) > c.p.options.MaxConnLifetime {
c.res.Destroy()
return
}

c.res.Release()
}

func (c *Client) Do(ctx context.Context, q ch.Query) (err error) {
return c.client().Do(ctx, q)
}

func (c *Client) Ping(ctx context.Context) error {
return c.client().Ping(ctx)
}

func (c *Client) client() *ch.Client {
return c.res.Value().client
}
29 changes: 29 additions & 0 deletions chpool/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package chpool

import (
"context"
"testing"

"github.com/stretchr/testify/require"
)

func TestClient_Do(t *testing.T) {
t.Parallel()
p := PoolConn(t)
conn, err := p.Acquire(context.Background())
require.NoError(t, err)
defer conn.Release()

testDo(t, conn)
}

func TestClient_Ping(t *testing.T) {
t.Parallel()
p := PoolConn(t)

conn, err := p.Acquire(context.Background())
require.NoError(t, err)
defer conn.Release()

require.NoError(t, conn.Ping(context.Background()))
}
68 changes: 68 additions & 0 deletions chpool/common_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package chpool

import (
"context"
"testing"
"time"

"github.com/stretchr/testify/require"
"go.uber.org/zap/zaptest"

"github.com/go-faster/ch"
"github.com/go-faster/ch/cht"
"github.com/go-faster/ch/proto"
)

func PoolConnOpt(t testing.TB, opt Options) *Pool {
t.Helper()

ctx := context.Background()
server := cht.New(t)

if opt.ClientOptions.Logger == nil {
opt.ClientOptions.Logger = zaptest.NewLogger(t)
}

opt.ClientOptions.Address = server.TCP
pool, err := Dial(ctx, opt)
require.NoError(t, err)

t.Cleanup(func() {
pool.Close()
})

return pool
}

func PoolConn(t testing.TB) *Pool {
return PoolConnOpt(t, Options{})
}

type IDo interface {
Do(ctx context.Context, q ch.Query) (err error)
}

func testDo(t *testing.T, do IDo) {
var (
numbers int
data proto.ColUInt64
)

err := do.Do(context.Background(), ch.Query{
Body: "SELECT number FROM system.numbers LIMIT 10",
OnResult: func(ctx context.Context, b proto.Block) error {
numbers += len(data)
return nil
},
Result: proto.Results{
{Name: "number", Data: &data},
},
})

require.NoError(t, err)
require.Equal(t, 10, numbers)
}

func waitForReleaseToComplete() {
time.Sleep(500 * time.Millisecond)
}
26 changes: 26 additions & 0 deletions chpool/conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package chpool

import (
puddle "github.com/jackc/puddle/puddleg"

"github.com/go-faster/ch"
)

type connResource struct {
client *ch.Client
clients []Client
}

func (cr *connResource) getConn(p *Pool, res *puddle.Resource[*connResource]) *Client {
if len(cr.clients) == 0 {
cr.clients = make([]Client, 128)
}

c := &cr.clients[len(cr.clients)-1]
cr.clients = cr.clients[0 : len(cr.clients)-1]

c.res = res
c.p = p

return c
}
2 changes: 2 additions & 0 deletions chpool/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
// Package chpool is a connection pool for ch.
package chpool
189 changes: 189 additions & 0 deletions chpool/pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
package chpool

import (
"context"
"runtime"
"sync"
"time"

"github.com/go-faster/ch"

puddle "github.com/jackc/puddle/puddleg"
)

// Pool of connections to ClickHouse.
type Pool struct {
pool *puddle.Pool[*connResource]
options Options

closeOnce sync.Once
closeChan chan struct{}
}

// Options for Pool.
type Options struct {
ClientOptions ch.Options
MaxConnLifetime time.Duration
MaxConnIdleTime time.Duration
MaxConns int32
MinConns int32
HealthCheckPeriod time.Duration
}

// Defaults for pool.
const (
DefaultMaxConnLifetime = time.Hour
DefaultMaxConnIdleTime = time.Minute * 30
DefaultHealthCheckPeriod = time.Minute
)

func (o *Options) setDefaults() {
if o.MaxConnLifetime == 0 {
o.MaxConnLifetime = DefaultMaxConnLifetime
}
if o.MaxConnIdleTime == 0 {
o.MaxConnIdleTime = DefaultMaxConnIdleTime
}
if o.MaxConns == 0 {
o.MaxConns = int32(runtime.NumCPU())
}
if o.HealthCheckPeriod == 0 {
o.HealthCheckPeriod = DefaultHealthCheckPeriod
}
}

// Dial returns a pool of connections to ClickHouse.
func Dial(ctx context.Context, opt Options) (*Pool, error) {
opt.setDefaults()

p := &Pool{
options: opt,
closeChan: make(chan struct{}),
}

p.pool = puddle.NewPool(
func(ctx context.Context) (*connResource, error) {
c, err := ch.Dial(ctx, p.options.ClientOptions)
if err != nil {
return nil, err
}

return &connResource{
client: c,
clients: make([]Client, 64),
}, nil
},
func(c *connResource) {
_ = c.client.Close()
}, opt.MaxConns)

if err := p.createIdleResources(ctx, int(p.options.MinConns)); err != nil {
p.Close()
return nil, err
}

res, err := p.pool.Acquire(ctx)
if err != nil {
p.Close()
return nil, err
}
res.Release()

go p.backgroundHealthCheck()

return p, nil
}

// Acquire connection from pool.
func (p *Pool) Acquire(ctx context.Context) (*Client, error) {
res, err := p.pool.Acquire(ctx)
if err != nil {
return nil, err
}

return res.Value().getConn(p, res), nil
}

func (p *Pool) Do(ctx context.Context, q ch.Query) (err error) {
c, err := p.Acquire(ctx)
if err != nil {
return err
}
defer c.Release()

return c.Do(ctx, q)
}

func (p *Pool) Ping(ctx context.Context) error {
c, err := p.Acquire(ctx)
if err != nil {
return err
}
defer c.Release()

return c.Ping(ctx)
}

func (p *Pool) backgroundHealthCheck() {
ticker := time.NewTicker(p.options.HealthCheckPeriod)

for {
select {
case <-p.closeChan:
ticker.Stop()
return
case <-ticker.C:
p.checkIdleConnsHealth()
p.checkMinConns()
}
}
}

func (p *Pool) checkIdleConnsHealth() {
resources := p.pool.AcquireAllIdle()

now := time.Now()
for _, res := range resources {
if now.Sub(res.CreationTime()) > p.options.MaxConnLifetime {
res.Destroy()
} else if res.IdleDuration() > p.options.MaxConnIdleTime {
res.Destroy()
} else {
res.ReleaseUnused()
}
}
}

func (p *Pool) checkMinConns() {
for i := p.options.MinConns - p.pool.Stat().TotalResources(); i > 0; i-- {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
_ = p.pool.CreateResource(ctx)
}()
}
}

func (p *Pool) createIdleResources(ctx context.Context, resourcesCount int) error {
for i := 0; i < resourcesCount; i++ {
err := p.pool.CreateResource(ctx)
if err != nil {
return err
}
}

return nil
}

// Stat return pool statistic.
func (p *Pool) Stat() *puddle.Stat {
return p.pool.Stat()
}

// Close pool.
func (p *Pool) Close() {
p.closeOnce.Do(func() {
close(p.closeChan)
p.pool.Close()
})
}
Loading

0 comments on commit 05b12f3

Please sign in to comment.