Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Merged by Bors] - sql: add Database.WithConnection #6445

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
109 changes: 91 additions & 18 deletions sql/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,21 +572,68 @@
type Database interface {
Executor
QueryCache
// Close closes the database.
Close() error
// QueryCount returns the number of queries executed on the database.
QueryCount() int
// QueryCache returns the query cache for this database, if it's present,
// or nil otherwise.
QueryCache() QueryCache
// Tx creates deferred sqlite transaction.
//
// Deferred transactions are not started until the first statement.
// Transaction may be started in read mode and automatically upgraded to write mode
// after one of the write statements.
//
// https://www.sqlite.org/lang_transaction.html
Tx(ctx context.Context) (Transaction, error)
// WithTx starts a new transaction and passes it to the exec function.
// It then commits the transaction if the exec function doesn't return an error,
// and rolls it back otherwise.
// If the context is canceled, the currently running SQL statement is interrupted.
WithTx(ctx context.Context, exec func(Transaction) error) error
// TxImmediate begins a new immediate transaction on the database, that is,
// a transaction that starts a write immediately without waiting for a write
// statement.
// The transaction returned from this function must always be released by calling
// its Release method. Release rolls back the transaction if it hasn't been
// committed.
// If the context is canceled, the currently running SQL statement is interrupted.
TxImmediate(ctx context.Context) (Transaction, error)
// WithTxImmediate starts a new immediate transaction and passes it to the exec
// function.
// An immediate transaction is started immediately, without waiting for a write
// statement.
// It then commits the transaction if the exec function doesn't return an error,
// and rolls it back otherwise.
// If the context is canceled, the currently running SQL statement is interrupted.
WithTxImmediate(ctx context.Context, exec func(Transaction) error) error
// WithConnection executes the provided function with a connection from the
// database pool.
// If many queries are to be executed in a row, but there's no need for an
// explicit transaction which may be long-running and thus block
// WAL checkpointing, it may be preferable to use a single connection for
// it to avoid database pool overhead.
// The connection is released back to the pool after the function returns.
// If the context is canceled, the currently running SQL statement is interrupted.
WithConnection(ctx context.Context, exec func(Executor) error) error
// Intercept adds an interceptor function to the database. The interceptor
// functions are invoked upon each query on the database, including queries
// executed within transactions.
// The query will fail if the interceptor returns an error.
// The interceptor can later be removed using RemoveInterceptor with the same key.
Intercept(key string, fn Interceptor)
// RemoveInterceptor removes the interceptor function with specified key from the database.
RemoveInterceptor(key string)
}

// Transaction represents a transaction.
type Transaction interface {
Executor
// Commit commits the transaction.
Commit() error
// Release releases the transaction. If the transaction hasn't been committed,
// it's rolled back.
Release() error
}

Expand Down Expand Up @@ -684,34 +731,22 @@
return nil
}

// Tx creates deferred sqlite transaction.
//
// Deferred transactions are not started until the first statement.
// Transaction may be started in read mode and automatically upgraded to write mode
// after one of the write statements.
//
// https://www.sqlite.org/lang_transaction.html
// Tx implements Database.
func (db *sqliteDatabase) Tx(ctx context.Context) (Transaction, error) {
return db.getTx(ctx, beginDefault)
}

// WithTx will pass initialized deferred transaction to exec callback.
// Will commit only if error is nil.
// WithTx implements Database.
func (db *sqliteDatabase) WithTx(ctx context.Context, exec func(Transaction) error) error {
return db.withTx(ctx, beginDefault, exec)
}

// TxImmediate creates immediate transaction.
//
// IMMEDIATE cause the database connection to start a new write immediately, without waiting
// for a write statement. The BEGIN IMMEDIATE might fail with SQLITE_BUSY if another write
// transaction is already active on another database connection.
// TxImmediate implements Database.
func (db *sqliteDatabase) TxImmediate(ctx context.Context) (Transaction, error) {
return db.getTx(ctx, beginImmediate)
}

// WithTxImmediate will pass initialized immediate transaction to exec callback.
// Will commit only if error is nil.
// WithTxImmediate implements Database.
func (db *sqliteDatabase) WithTxImmediate(ctx context.Context, exec func(Transaction) error) error {
return db.withTx(ctx, beginImmediate, exec)
}
Expand All @@ -727,7 +762,7 @@
return nil
}

// Exec statement using one of the connection from the pool.
// Exec implements Executor.
//
// If you care about atomicity of the operation (for example writing rewards to multiple accounts)
// Tx should be used. Otherwise sqlite will not guarantee that all side-effects of operations are
Expand Down Expand Up @@ -758,7 +793,7 @@
return exec(conn, query, encoder, decoder)
}

// Close closes all pooled connections.
// Close implements Database.
func (db *sqliteDatabase) Close() error {
db.closeMux.Lock()
defer db.closeMux.Unlock()
Expand All @@ -772,6 +807,23 @@
return nil
}

// WithConnection implements Database.
func (db *sqliteDatabase) WithConnection(ctx context.Context, exec func(Executor) error) error {
if db.closed {
return ErrClosed
}

Check warning on line 814 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L813-L814

Added lines #L813 - L814 were not covered by tests
conCtx, cancel := context.WithCancel(ctx)
conn := db.getConn(conCtx)
defer func() {
cancel()
db.pool.Put(conn)
}()
if conn == nil {
return ErrNoConnection
}

Check warning on line 823 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L822-L823

Added lines #L822 - L823 were not covered by tests
return exec(&sqliteConn{queryCache: db.queryCache, db: db, conn: conn})
}

// Intercept adds an interceptor function to the database. The interceptor functions
// are invoked upon each query. The query will fail if the interceptor returns an error.
// The interceptor can later be removed using RemoveInterceptor with the same key.
Expand Down Expand Up @@ -1093,6 +1145,27 @@
return exec(tx.conn, query, encoder, decoder)
}

type sqliteConn struct {
*queryCache
db *sqliteDatabase
conn *sqlite.Conn
}

func (c *sqliteConn) Exec(query string, encoder Encoder, decoder Decoder) (int, error) {
if err := c.db.runInterceptors(query); err != nil {
return 0, fmt.Errorf("running query interceptors: %w", err)
}

Check warning on line 1157 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L1156-L1157

Added lines #L1156 - L1157 were not covered by tests

c.db.queryCount.Add(1)
if c.db.latency != nil {
start := time.Now()
defer func() {
c.db.latency.WithLabelValues(query).Observe(float64(time.Since(start)))
}()

Check warning on line 1164 in sql/database.go

View check run for this annotation

Codecov / codecov/patch

sql/database.go#L1161-L1164

Added lines #L1161 - L1164 were not covered by tests
}
return exec(c.conn, query, encoder, decoder)
}

func mapSqliteError(err error) error {
switch sqlite.ErrCode(err) {
case sqlite.SQLITE_CONSTRAINT_PRIMARYKEY, sqlite.SQLITE_CONSTRAINT_UNIQUE:
Expand Down
21 changes: 21 additions & 0 deletions sql/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -638,3 +638,24 @@ func TestExclusive(t *testing.T) {
})
}
}

func TestConnection(t *testing.T) {
db := InMemoryTest(t)
var r int
require.NoError(t, db.WithConnection(context.Background(), func(ex Executor) error {
n, err := ex.Exec("select ?", func(stmt *Statement) {
stmt.BindInt64(1, 42)
}, func(stmt *Statement) bool {
r = stmt.ColumnInt(0)
return true
})
require.NoError(t, err)
require.Equal(t, 1, n)
require.Equal(t, 42, r)
return nil
}))

require.Error(t, db.WithConnection(context.Background(), func(Executor) error {
return errors.New("error")
}))
}
5 changes: 5 additions & 0 deletions sql/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,18 @@ import "go.uber.org/zap"

// Executor is an interface for executing raw statement.
type Executor interface {
// Exec executes a statement.
Exec(string, Encoder, Decoder) (int, error)
}

// Migration is interface for migrations provider.
type Migration interface {
// Apply applies the migration.
Apply(db Executor, logger *zap.Logger) error
// Rollback rolls back the migration.
Rollback() error
fasmat marked this conversation as resolved.
Show resolved Hide resolved
// Name returns the name of the migration.
Name() string
// Order returns the sequential number of the migration.
Order() int
}
9 changes: 6 additions & 3 deletions sql/schema.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,17 @@ func LoadDBSchemaScript(db Executor) (string, error) {
return "", err
}
fmt.Fprintf(&sb, "PRAGMA user_version = %d;\n", version)
// The following SQL query ensures that tables are listed first,
// ordered by name, and then all other objects, ordered by their table name
// and then by their own name.
if _, err = db.Exec(`
SELECT tbl_name, sql || ';'
FROM sqlite_master
WHERE sql IS NOT NULL AND tbl_name NOT LIKE 'sqlite_%'
ORDER BY
CASE WHEN type = 'table' THEN 1 ELSE 2 END, -- ensures tables are first
tbl_name, -- tables are sorted by name, then all other objects
name -- (indexes, triggers, etc.) also by name
CASE WHEN type = 'table' THEN 1 ELSE 2 END,
tbl_name,
name
`, nil, func(st *Statement) bool {
fmt.Fprintln(&sb, st.ColumnText(1))
return true
Expand Down
Loading