Skip to content

Commit

Permalink
Implement ConnPool for catch SQL execute
Browse files Browse the repository at this point in the history
  • Loading branch information
huacnlee committed Jan 14, 2022
1 parent d9471b4 commit 586a343
Showing 1 changed file with 113 additions and 0 deletions.
113 changes: 113 additions & 0 deletions conn_pool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package gormsharding

import (
"context"
"database/sql"

"gorm.io/gorm"
)

// ConnPool Implement a ConnPool for replace db.Statement.ConnPool in Gorm
type ConnPool struct {
// db, This is global db instance
sharding *Sharding
gorm.ConnPool
}

// registerConnPool replace Gorm db.ConnPool as custom
func (s *Sharding) registerConnPool(db *gorm.DB) {
// Avoid assign loop
basePool := db.ConnPool
if _, ok := basePool.(ConnPool); ok {
return
}

s.ConnPool = &ConnPool{ConnPool: basePool, sharding: s}
db.ConnPool = s.ConnPool
db.Statement.ConnPool = s.ConnPool
}

func (pool *ConnPool) String() string {
return "gorm:sharding:conn_pool"
}

func (pool ConnPool) PrepareContext(ctx context.Context, query string) (*sql.Stmt, error) {
return pool.ConnPool.PrepareContext(ctx, query)
}

func (pool ConnPool) ExecContext(ctx context.Context, query string, args ...interface{}) (sql.Result, error) {
ftQuery, stQuery, table, err := pool.sharding.resolve(query, args...)
if err != nil {
return nil, err
}

pool.sharding.querys.Store("last_query", stQuery)

if table != "" {
if r, ok := pool.sharding.Resolvers[table]; ok {
if r.EnableFullTable {
pool.ConnPool.ExecContext(ctx, ftQuery, args...)
}
}
}

return pool.ConnPool.ExecContext(ctx, stQuery, args...)
}

// https://github.com/go-gorm/gorm/blob/v1.21.11/callbacks/query.go#L18
func (pool ConnPool) QueryContext(ctx context.Context, query string, args ...interface{}) (*sql.Rows, error) {
ftQuery, stQuery, table, err := pool.sharding.resolve(query, args...)
if err != nil {
return nil, err
}

pool.sharding.querys.Store("last_query", query)

if table != "" {
if r, ok := pool.sharding.Resolvers[table]; ok {
if r.EnableFullTable {
pool.ConnPool.ExecContext(ctx, ftQuery, args...)
}
}
}

return pool.ConnPool.QueryContext(ctx, stQuery, args...)
}

func (pool ConnPool) QueryRowContext(ctx context.Context, query string, args ...interface{}) *sql.Row {
_, query, _, _ = pool.sharding.resolve(query, args...)
pool.sharding.querys.Store("last_query", query)

return pool.ConnPool.QueryRowContext(ctx, query, args...)
}

// BeginTx Implement ConnPoolBeginner.BeginTx
func (pool *ConnPool) BeginTx(ctx context.Context, opt *sql.TxOptions) (gorm.ConnPool, error) {
if basePool, ok := pool.ConnPool.(gorm.ConnPoolBeginner); ok {
return basePool.BeginTx(ctx, opt)
}

return pool, nil
}

// Implement TxCommitter.Commit
func (pool *ConnPool) Commit() error {
if basePool, ok := pool.ConnPool.(gorm.TxCommitter); ok {
return basePool.Commit()
}

return nil
}

// Implement TxCommitter.Rollback
func (pool *ConnPool) Rollback() error {
if basePool, ok := pool.ConnPool.(gorm.TxCommitter); ok {
return basePool.Rollback()
}

return nil
}

func (pool *ConnPool) Ping() error {
return nil
}

0 comments on commit 586a343

Please sign in to comment.