Skip to content
This repository has been archived by the owner on Jan 2, 2025. It is now read-only.

Commit

Permalink
fix(backend): wrap database writes into explicit transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
burdiyan committed Nov 29, 2023
1 parent 212e6a0 commit ba057e1
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 68 deletions.
89 changes: 39 additions & 50 deletions backend/cmd/mintter-site/sites/sites.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
rpcpeer "google.golang.org/grpc/peer"
"google.golang.org/grpc/status"

"crawshaw.io/sqlite"
"crawshaw.io/sqlite/sqlitex"
"github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -125,13 +126,7 @@ func (ws *Website) GetSiteInfo(ctx context.Context, in *groups.GetSiteInfoReques
return nil, err
}

conn, release, err := db.Conn(ctx)
if err != nil {
return nil, err
}
defer release()

groupID, err := storage.GetKV(conn, keySiteGroup)
groupID, err := storage.GetKV(ctx, db, keySiteGroup)
if err != nil {
return nil, fmt.Errorf("failed to get group id from the db: %w", err)
}
Expand Down Expand Up @@ -205,23 +200,23 @@ func (ws *Website) InitializeServer(ctx context.Context, in *groups.InitializeSe
return nil, err
}

conn, release, err := db.Conn(ctx)
if err != nil {
return nil, err
}
defer release()
if err := db.WithTx(ctx, func(conn *sqlite.Conn) error {
_, err = hypersql.EntitiesInsertOrIgnore(conn, in.GroupId)
if err != nil {
return err
}

_, err = hypersql.EntitiesInsertOrIgnore(conn, in.GroupId)
if err != nil {
return nil, err
}
if err := storage.SetKV(ctx, conn, keySiteGroup, in.GroupId, false); err != nil {
return fmt.Errorf("failed to save group ID")
}

if err := storage.SetKV(conn, keySiteGroup, in.GroupId, false); err != nil {
return nil, fmt.Errorf("failed to save group ID")
}
if err := storage.SetKV(ctx, conn, keySiteOwner, owner.String(), false); err != nil {
return fmt.Errorf("failed to save owner")
}

if err := storage.SetKV(conn, keySiteOwner, owner.String(), false); err != nil {
return nil, fmt.Errorf("failed to save owner")
return nil
}); err != nil {
return nil, err
}

return &groups.InitializeServerResponse{}, nil
Expand All @@ -235,13 +230,7 @@ func (ws *Website) GetGroupID(ctx context.Context) (string, error) {
return "", err
}

conn, release, err := db.Conn(ctx)
if err != nil {
return "", fmt.Errorf("Failed to get db connection: %w", err)
}
defer release()

groupID, err := storage.GetKV(conn, keySiteGroup)
groupID, err := storage.GetKV(ctx, db, keySiteGroup)
if err != nil {
return "", err
}
Expand Down Expand Up @@ -276,32 +265,32 @@ func (ws *Website) PublishBlobs(ctx context.Context, in *groups.PublishBlobsRequ
if err != nil {
return nil, err
}
conn, release, err := db.Conn(ctx)
if err != nil {
return nil, fmt.Errorf("Failed to get db connection: %w", err)
}
defer release()

// Get the owner's view of the list of members.
groupID, err := storage.GetKV(conn, keySiteGroup)
if err != nil || groupID == "" {
return nil, fmt.Errorf("error getting groupID on the site, is the site initialized?: %w", err)
}
var role groups.Role
if err := db.WithSave(ctx, func(conn *sqlite.Conn) error {
// Get the owner's view of the list of members.
groupID, err := storage.GetKV(ctx, conn, keySiteGroup)
if err != nil || groupID == "" {
return fmt.Errorf("error getting groupID on the site, is the site initialized?: %w", err)
}

groupOwner, err := storage.GetKV(conn, keySiteOwner)
if err != nil || groupOwner == "" {
return nil, fmt.Errorf("error getting group owner on the site, is the site initialized?: %w", err)
}
groupOwner, err := storage.GetKV(ctx, conn, keySiteOwner)
if err != nil || groupOwner == "" {
return fmt.Errorf("error getting group owner on the site, is the site initialized?: %w", err)
}

var role groups.Role
if groupOwner == callerAccount.String() {
role = groups.Role_OWNER
} else {
r, err := hypersql.GetGroupRole(conn, groupID, "hm://a/"+callerAccount.String())
if err != nil {
return nil, err
if groupOwner == callerAccount.String() {
role = groups.Role_OWNER
} else {
r, err := hypersql.GetGroupRole(conn, groupID, "hm://a/"+callerAccount.String())
if err != nil {
return err
}
role = groups.Role(r)
}
role = groups.Role(r)
return nil
}); err != nil {
return nil, err
}

if role != groups.Role_OWNER && role != groups.Role_EDITOR {
Expand Down
24 changes: 10 additions & 14 deletions backend/daemon/api/groups/v1alpha/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,8 @@ func NewSQLiteDB(db *sqlitex.Pool) *DB {
return &DB{db: db}
}

// RecordSiteSync updates the last sync time of a site.
// RecordGroupSiteSync updates the last sync time of a site.
func (db *DB) RecordGroupSiteSync(ctx context.Context, group string, now time.Time, syncErr error, info *groups.PublicSiteInfo) error {
conn, release, err := db.db.Conn(ctx)
if err != nil {
return err
}
defer release()

nowts := now.Unix()

var errmsg string
Expand All @@ -47,15 +41,17 @@ func (db *DB) RecordGroupSiteSync(ctx context.Context, group string, now time.Ti
remoteVersion = info.GroupVersion
}

if err := sqlitex.Exec(conn, qRecordGroupSiteSync(), nil, errmsg, remoteVersion, nowts, group); err != nil {
return err
}
return db.db.WithTx(ctx, func(conn *sqlite.Conn) error {
if err := sqlitex.Exec(conn, qRecordGroupSiteSync(), nil, errmsg, remoteVersion, nowts, group); err != nil {
return err
}

if conn.Changes() == 0 {
return fmt.Errorf("group site %s couldn't update: not found", group)
}
if conn.Changes() == 0 {
return fmt.Errorf("group site %s couldn't update: not found", group)
}

return nil
return nil
})
}

var qRecordGroupSiteSync = dqb.Str(`
Expand Down
30 changes: 28 additions & 2 deletions backend/daemon/storage/sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,20 @@ func MakeTestDB(t testing.TB) *sqlitex.Pool {
}

// SetKV sets a key-value pair in the database.
func SetKV(conn *sqlite.Conn, key, value string, replace bool) error {
func SetKV[T *sqlite.Conn | *sqlitex.Pool](ctx context.Context, db T, key, value string, replace bool) error {
var conn *sqlite.Conn
switch v := any(db).(type) {
case *sqlite.Conn:
conn = v
case *sqlitex.Pool:
c, release, err := v.Conn(ctx)
if err != nil {
return err
}
defer release()
conn = c
}

if replace {
return sqlitex.Exec(conn, "INSERT OR REPLACE INTO kv (key, value) VALUES (?, ?);", nil, key, value)
}
Expand All @@ -103,7 +116,20 @@ func SetKV(conn *sqlite.Conn, key, value string, replace bool) error {
}

// GetKV gets a key-value pair from the database.
func GetKV(conn *sqlite.Conn, key string) (string, error) {
func GetKV[T *sqlite.Conn | *sqlitex.Pool](ctx context.Context, db T, key string) (string, error) {
var conn *sqlite.Conn
switch v := any(db).(type) {
case *sqlite.Conn:
conn = v
case *sqlitex.Pool:
c, release, err := v.Conn(ctx)
if err != nil {
return "", err
}
defer release()
conn = c
}

var value string
err := sqlitex.Exec(conn, "SELECT value FROM kv WHERE key = ?;", func(stmt *sqlite.Stmt) error {
value = stmt.ColumnText(0)
Expand Down
6 changes: 4 additions & 2 deletions backend/mttnet/providing.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var qAllPublicBlobs = dqb.Str(`
AND drafts.blob IS NULL;
`)

var randSrc = rand.NewSource(time.Now().UnixNano())

func makeProvidingStrategy(db *sqlitex.Pool) provider.KeyChanFunc {
// This providing strategy returns all the CID known to the blockstore
// except those which are marked as draft changes.
Expand All @@ -43,19 +45,19 @@ func makeProvidingStrategy(db *sqlitex.Pool) provider.KeyChanFunc {
log.Error("Failed to open db connection", zap.Error(err))
return
}
defer release()

// We want to provide all the entity IDs, so we convert them into raw CIDs,
// similar to how libp2p discovery service is doing.

entities, err := hypersql.EntitiesListByPrefix(conn, "*")
release()
if err != nil {
log.Error("Failed to list entities", zap.Error(err))
return
}
log.Debug("Start reproviding", zap.Int("Number of entities", len(entities)))
// Since reproviding takes long AND is has throttle limits, we are better off randomizing it.
r := rand.New(rand.NewSource(time.Now().UnixNano()))
r := rand.New(randSrc) //nolint:gosec
r.Shuffle(len(entities), func(i, j int) { entities[i], entities[j] = entities[j], entities[i] })
for _, e := range entities {
c, err := hyper.EntityID(e.ResourcesIRI).CID()
Expand Down

0 comments on commit ba057e1

Please sign in to comment.