diff --git a/sql/database.go b/sql/database.go index 94a9322563..c12ab1a87a 100644 --- a/sql/database.go +++ b/sql/database.go @@ -572,21 +572,68 @@ type Interceptor func(query string) error type Database interface { Executor QueryCache + // Close closes the database. Close() error + // QueryCount returns the number of queries executed on the database. QueryCount() int + // QueryCache returns the query cache for this database, if it's present, + // or nil otherwise. QueryCache() QueryCache + // Tx creates deferred sqlite transaction. + // + // Deferred transactions are not started until the first statement. + // Transaction may be started in read mode and automatically upgraded to write mode + // after one of the write statements. + // + // https://www.sqlite.org/lang_transaction.html Tx(ctx context.Context) (Transaction, error) + // WithTx starts a new transaction and passes it to the exec function. + // It then commits the transaction if the exec function doesn't return an error, + // and rolls it back otherwise. + // If the context is canceled, the currently running SQL statement is interrupted. WithTx(ctx context.Context, exec func(Transaction) error) error + // TxImmediate begins a new immediate transaction on the database, that is, + // a transaction that starts a write immediately without waiting for a write + // statement. + // The transaction returned from this function must always be released by calling + // its Release method. Release rolls back the transaction if it hasn't been + // committed. + // If the context is canceled, the currently running SQL statement is interrupted. TxImmediate(ctx context.Context) (Transaction, error) + // WithTxImmediate starts a new immediate transaction and passes it to the exec + // function. + // An immediate transaction is started immediately, without waiting for a write + // statement. + // It then commits the transaction if the exec function doesn't return an error, + // and rolls it back otherwise. + // If the context is canceled, the currently running SQL statement is interrupted. WithTxImmediate(ctx context.Context, exec func(Transaction) error) error + // WithConnection executes the provided function with a connection from the + // database pool. + // If many queries are to be executed in a row, but there's no need for an + // explicit transaction which may be long-running and thus block + // WAL checkpointing, it may be preferable to use a single connection for + // it to avoid database pool overhead. + // The connection is released back to the pool after the function returns. + // If the context is canceled, the currently running SQL statement is interrupted. + WithConnection(ctx context.Context, exec func(Executor) error) error + // Intercept adds an interceptor function to the database. The interceptor + // functions are invoked upon each query on the database, including queries + // executed within transactions. + // The query will fail if the interceptor returns an error. + // The interceptor can later be removed using RemoveInterceptor with the same key. Intercept(key string, fn Interceptor) + // RemoveInterceptor removes the interceptor function with specified key from the database. RemoveInterceptor(key string) } // Transaction represents a transaction. type Transaction interface { Executor + // Commit commits the transaction. Commit() error + // Release releases the transaction. If the transaction hasn't been committed, + // it's rolled back. Release() error } @@ -684,34 +731,22 @@ func (db *sqliteDatabase) startExclusive() error { return nil } -// Tx creates deferred sqlite transaction. -// -// Deferred transactions are not started until the first statement. -// Transaction may be started in read mode and automatically upgraded to write mode -// after one of the write statements. -// -// https://www.sqlite.org/lang_transaction.html +// Tx implements Database. func (db *sqliteDatabase) Tx(ctx context.Context) (Transaction, error) { return db.getTx(ctx, beginDefault) } -// WithTx will pass initialized deferred transaction to exec callback. -// Will commit only if error is nil. +// WithTx implements Database. func (db *sqliteDatabase) WithTx(ctx context.Context, exec func(Transaction) error) error { return db.withTx(ctx, beginDefault, exec) } -// TxImmediate creates immediate transaction. -// -// IMMEDIATE cause the database connection to start a new write immediately, without waiting -// for a write statement. The BEGIN IMMEDIATE might fail with SQLITE_BUSY if another write -// transaction is already active on another database connection. +// TxImmediate implements Database. func (db *sqliteDatabase) TxImmediate(ctx context.Context) (Transaction, error) { return db.getTx(ctx, beginImmediate) } -// WithTxImmediate will pass initialized immediate transaction to exec callback. -// Will commit only if error is nil. +// WithTxImmediate implements Database. func (db *sqliteDatabase) WithTxImmediate(ctx context.Context, exec func(Transaction) error) error { return db.withTx(ctx, beginImmediate, exec) } @@ -727,7 +762,7 @@ func (db *sqliteDatabase) runInterceptors(query string) error { return nil } -// Exec statement using one of the connection from the pool. +// Exec implements Executor. // // If you care about atomicity of the operation (for example writing rewards to multiple accounts) // Tx should be used. Otherwise sqlite will not guarantee that all side-effects of operations are @@ -758,7 +793,7 @@ func (db *sqliteDatabase) Exec(query string, encoder Encoder, decoder Decoder) ( return exec(conn, query, encoder, decoder) } -// Close closes all pooled connections. +// Close implements Database. func (db *sqliteDatabase) Close() error { db.closeMux.Lock() defer db.closeMux.Unlock() @@ -772,6 +807,23 @@ func (db *sqliteDatabase) Close() error { return nil } +// WithConnection implements Database. +func (db *sqliteDatabase) WithConnection(ctx context.Context, exec func(Executor) error) error { + if db.closed { + return ErrClosed + } + conCtx, cancel := context.WithCancel(ctx) + conn := db.getConn(conCtx) + defer func() { + cancel() + db.pool.Put(conn) + }() + if conn == nil { + return ErrNoConnection + } + return exec(&sqliteConn{queryCache: db.queryCache, db: db, conn: conn}) +} + // Intercept adds an interceptor function to the database. The interceptor functions // are invoked upon each query. The query will fail if the interceptor returns an error. // The interceptor can later be removed using RemoveInterceptor with the same key. @@ -1093,6 +1145,27 @@ func (tx *sqliteTx) Exec(query string, encoder Encoder, decoder Decoder) (int, e return exec(tx.conn, query, encoder, decoder) } +type sqliteConn struct { + *queryCache + db *sqliteDatabase + conn *sqlite.Conn +} + +func (c *sqliteConn) Exec(query string, encoder Encoder, decoder Decoder) (int, error) { + if err := c.db.runInterceptors(query); err != nil { + return 0, fmt.Errorf("running query interceptors: %w", err) + } + + c.db.queryCount.Add(1) + if c.db.latency != nil { + start := time.Now() + defer func() { + c.db.latency.WithLabelValues(query).Observe(float64(time.Since(start))) + }() + } + return exec(c.conn, query, encoder, decoder) +} + func mapSqliteError(err error) error { switch sqlite.ErrCode(err) { case sqlite.SQLITE_CONSTRAINT_PRIMARYKEY, sqlite.SQLITE_CONSTRAINT_UNIQUE: diff --git a/sql/database_test.go b/sql/database_test.go index d197d5e497..60b81ff9cf 100644 --- a/sql/database_test.go +++ b/sql/database_test.go @@ -638,3 +638,24 @@ func TestExclusive(t *testing.T) { }) } } + +func TestConnection(t *testing.T) { + db := InMemoryTest(t) + var r int + require.NoError(t, db.WithConnection(context.Background(), func(ex Executor) error { + n, err := ex.Exec("select ?", func(stmt *Statement) { + stmt.BindInt64(1, 42) + }, func(stmt *Statement) bool { + r = stmt.ColumnInt(0) + return true + }) + require.NoError(t, err) + require.Equal(t, 1, n) + require.Equal(t, 42, r) + return nil + })) + + require.Error(t, db.WithConnection(context.Background(), func(Executor) error { + return errors.New("error") + })) +} diff --git a/sql/interface.go b/sql/interface.go index c9b0ee1441..13b2a1e0ce 100644 --- a/sql/interface.go +++ b/sql/interface.go @@ -6,13 +6,18 @@ import "go.uber.org/zap" // Executor is an interface for executing raw statement. type Executor interface { + // Exec executes a statement. Exec(string, Encoder, Decoder) (int, error) } // Migration is interface for migrations provider. type Migration interface { + // Apply applies the migration. Apply(db Executor, logger *zap.Logger) error + // Rollback rolls back the migration. Rollback() error + // Name returns the name of the migration. Name() string + // Order returns the sequential number of the migration. Order() int } diff --git a/sql/schema.go b/sql/schema.go index f393d7534f..aa05416206 100644 --- a/sql/schema.go +++ b/sql/schema.go @@ -31,14 +31,17 @@ func LoadDBSchemaScript(db Executor) (string, error) { return "", err } fmt.Fprintf(&sb, "PRAGMA user_version = %d;\n", version) + // The following SQL query ensures that tables are listed first, + // ordered by name, and then all other objects, ordered by their table name + // and then by their own name. if _, err = db.Exec(` SELECT tbl_name, sql || ';' FROM sqlite_master WHERE sql IS NOT NULL AND tbl_name NOT LIKE 'sqlite_%' ORDER BY - CASE WHEN type = 'table' THEN 1 ELSE 2 END, -- ensures tables are first - tbl_name, -- tables are sorted by name, then all other objects - name -- (indexes, triggers, etc.) also by name + CASE WHEN type = 'table' THEN 1 ELSE 2 END, + tbl_name, + name `, nil, func(st *Statement) bool { fmt.Fprintln(&sb, st.ColumnText(1)) return true