From bf3dbd2a5d82baabb685331f3d76cf24e3d89c0b Mon Sep 17 00:00:00 2001 From: Ratan Kaliani Date: Thu, 10 Oct 2024 13:19:39 -0700 Subject: [PATCH 1/2] fix: db lock --- proposer/op/proposer/db/db.go | 78 +++++++++++++++++++++++------------ proposer/op/proposer/prove.go | 2 +- 2 files changed, 53 insertions(+), 27 deletions(-) diff --git a/proposer/op/proposer/db/db.go b/proposer/op/proposer/db/db.go index 3fdeb4ce..6779415f 100644 --- a/proposer/op/proposer/db/db.go +++ b/proposer/op/proposer/db/db.go @@ -7,6 +7,8 @@ import ( "path/filepath" "time" + "entgo.io/ent/dialect/sql" + "github.com/succinctlabs/op-succinct-go/proposer/db/ent" "github.com/succinctlabs/op-succinct-go/proposer/db/ent/proofrequest" @@ -14,7 +16,8 @@ import ( ) type ProofDB struct { - client *ent.Client + writeClient *ent.Client + readClient *ent.Client } // InitDB initializes the database and returns a handle to it. @@ -32,24 +35,47 @@ func InitDB(dbPath string, useCachedDb bool) (*ProofDB, error) { return nil, fmt.Errorf("failed to create directories for DB: %w", err) } - connectionString := fmt.Sprintf("file:%s?_fk=1", dbPath) - client, err := ent.Open("sqlite3", connectionString) + connectionUrl := fmt.Sprintf("file:%s?_fk=1", dbPath) + + writeDrv, err := sql.Open("sqlite3", connectionUrl) + if err != nil { + return nil, fmt.Errorf("failed opening connection to sqlite: %v", err) + } + writeDb := writeDrv.DB() + // The write lock will be managed behind a Mutex. + writeDb.SetMaxOpenConns(1) + writeDb.SetConnMaxLifetime(time.Hour) + + readDrv, err := sql.Open("sqlite3", connectionUrl) if err != nil { return nil, fmt.Errorf("failed opening connection to sqlite: %v", err) } + readDb := readDrv.DB() + readDb.SetMaxOpenConns(4) + readDb.SetConnMaxLifetime(time.Hour) - // Run the auto migration tool. - if err := client.Schema.Create(context.Background()); err != nil { + readClient := ent.NewClient(ent.Driver(readDrv)) + writeClient := ent.NewClient(ent.Driver(writeDrv)) + + if err := readClient.Schema.Create(context.Background()); err != nil { + return nil, fmt.Errorf("failed creating schema resources: %v", err) + } + if err := writeClient.Schema.Create(context.Background()); err != nil { return nil, fmt.Errorf("failed creating schema resources: %v", err) } - return &ProofDB{client: client}, nil + return &ProofDB{writeClient: writeClient, readClient: readClient}, nil } // CloseDB closes the connection to the database. func (db *ProofDB) CloseDB() error { - if db.client != nil { - if err := db.client.Close(); err != nil { + if db.writeClient != nil { + if err := db.writeClient.Close(); err != nil { + return fmt.Errorf("error closing database: %w", err) + } + } + if db.readClient != nil { + if err := db.readClient.Close(); err != nil { return fmt.Errorf("error closing database: %w", err) } } @@ -59,7 +85,7 @@ func (db *ProofDB) CloseDB() error { // NewEntry creates a new proof request entry in the database. func (db *ProofDB) NewEntry(proofType proofrequest.Type, start, end uint64) error { now := uint64(time.Now().Unix()) - _, err := db.client.ProofRequest. + _, err := db.writeClient.ProofRequest. Create(). SetType(proofType). SetStartBlock(start). @@ -78,7 +104,7 @@ func (db *ProofDB) NewEntry(proofType proofrequest.Type, start, end uint64) erro // UpdateProofStatus updates the status of a proof request in the database. func (db *ProofDB) UpdateProofStatus(id int, proofStatus proofrequest.Status) error { - _, err := db.client.ProofRequest.Update(). + _, err := db.writeClient.ProofRequest.Update(). Where(proofrequest.ID(id)). SetStatus(proofStatus). SetLastUpdatedTime(uint64(time.Now().Unix())). @@ -89,7 +115,7 @@ func (db *ProofDB) UpdateProofStatus(id int, proofStatus proofrequest.Status) er // SetProverRequestID sets the prover request ID for a proof request in the database. func (db *ProofDB) SetProverRequestID(id int, proverRequestID string) error { - _, err := db.client.ProofRequest.Update(). + _, err := db.writeClient.ProofRequest.Update(). Where(proofrequest.ID(id)). SetProverRequestID(proverRequestID). SetProofRequestTime(uint64(time.Now().Unix())). @@ -106,7 +132,7 @@ func (db *ProofDB) SetProverRequestID(id int, proverRequestID string) error { // AddFulfilledProof adds a proof to a proof request in the database and sets the status to COMPLETE. func (db *ProofDB) AddFulfilledProof(id int, proof []byte) error { // Start a transaction - tx, err := db.client.Tx(context.Background()) + tx, err := db.writeClient.Tx(context.Background()) if err != nil { return fmt.Errorf("failed to start transaction: %w", err) } @@ -153,7 +179,7 @@ func (db *ProofDB) AddFulfilledProof(id int, proof []byte) error { // GetNumberOfProofsWithStatuses returns the number of proofs with the given status(es). func (db *ProofDB) GetNumberOfRequestsWithStatuses(statuses ...proofrequest.Status) (int, error) { - count, err := db.client.ProofRequest.Query(). + count, err := db.readClient.ProofRequest.Query(). Where( proofrequest.StatusIn(statuses...), ). @@ -169,7 +195,7 @@ func (db *ProofDB) GetNumberOfRequestsWithStatuses(statuses ...proofrequest.Stat // AddL1BlockInfoToAggRequest adds the L1 block info to the existing AGG proof request. func (db *ProofDB) AddL1BlockInfoToAggRequest(startBlock, endBlock, l1BlockNumber uint64, l1BlockHash string) (*ent.ProofRequest, error) { // Perform the update - rowsAffected, err := db.client.ProofRequest.Update(). + rowsAffected, err := db.writeClient.ProofRequest.Update(). Where( proofrequest.TypeEQ(proofrequest.TypeAGG), proofrequest.StatusEQ(proofrequest.StatusUNREQ), @@ -190,7 +216,7 @@ func (db *ProofDB) AddL1BlockInfoToAggRequest(startBlock, endBlock, l1BlockNumbe } // Fetch the updated ProofRequest - updatedProof, err := db.client.ProofRequest.Query(). + updatedProof, err := db.readClient.ProofRequest.Query(). Where( proofrequest.TypeEQ(proofrequest.TypeAGG), proofrequest.StatusEQ(proofrequest.StatusUNREQ), @@ -210,7 +236,7 @@ func (db *ProofDB) AddL1BlockInfoToAggRequest(startBlock, endBlock, l1BlockNumbe // GetLatestEndBlock returns the latest end block of a proof request in the database. func (db *ProofDB) GetLatestEndBlock() (uint64, error) { - maxEnd, err := db.client.ProofRequest.Query(). + maxEnd, err := db.readClient.ProofRequest.Query(). Order(ent.Desc(proofrequest.FieldEndBlock)). Select(proofrequest.FieldEndBlock). First(context.Background()) @@ -229,7 +255,7 @@ func (db *ProofDB) GetWitnessGenerationTimeoutProofsOnServer() ([]*ent.ProofRequ currentTime := time.Now().Unix() twentyMinutesAgo := currentTime - 20*60 - proofs, err := db.client.ProofRequest.Query(). + proofs, err := db.readClient.ProofRequest.Query(). Where( proofrequest.StatusEQ(proofrequest.StatusWITNESSGEN), proofrequest.ProverRequestIDIsNil(), @@ -247,7 +273,7 @@ func (db *ProofDB) GetWitnessGenerationTimeoutProofsOnServer() ([]*ent.ProofRequ // If a proof failed to be sent to the prover network, it's status will be set to FAILED, but the prover request ID will be empty. // This function returns all such proofs. func (db *ProofDB) GetProofsFailedOnServer() ([]*ent.ProofRequest, error) { - proofs, err := db.client.ProofRequest.Query(). + proofs, err := db.readClient.ProofRequest.Query(). Where( proofrequest.StatusEQ(proofrequest.StatusFAILED), proofrequest.ProverRequestIDEQ(""), @@ -266,7 +292,7 @@ func (db *ProofDB) GetProofsFailedOnServer() ([]*ent.ProofRequest, error) { // Get all pending proofs with a status of requested and a prover ID that is not empty. func (db *ProofDB) GetAllPendingProofs() ([]*ent.ProofRequest, error) { - proofs, err := db.client.ProofRequest.Query(). + proofs, err := db.readClient.ProofRequest.Query(). Where( proofrequest.StatusEQ(proofrequest.StatusPROVING), ). @@ -280,7 +306,7 @@ func (db *ProofDB) GetAllPendingProofs() ([]*ent.ProofRequest, error) { // GetAllProofsWithStatus returns all proofs with the given status. func (db *ProofDB) GetAllProofsWithStatus(status proofrequest.Status) ([]*ent.ProofRequest, error) { - proofs, err := db.client.ProofRequest.Query(). + proofs, err := db.readClient.ProofRequest.Query(). Where( proofrequest.StatusEQ(status), ). @@ -299,7 +325,7 @@ func (db *ProofDB) GetAllProofsWithStatus(status proofrequest.Status) ([]*ent.Pr // GetNextUnrequestedProof returns the next unrequested proof in the database. func (db *ProofDB) GetNextUnrequestedProof() (*ent.ProofRequest, error) { // Get the unrequested AGG proof with the lowest start block. - aggProof, err := db.client.ProofRequest.Query(). + aggProof, err := db.readClient.ProofRequest.Query(). Where( proofrequest.StatusEQ(proofrequest.StatusUNREQ), proofrequest.TypeEQ(proofrequest.TypeAGG), @@ -316,7 +342,7 @@ func (db *ProofDB) GetNextUnrequestedProof() (*ent.ProofRequest, error) { } // If there's no AGG proof available, get the unrequested SPAN proof with the lowest start block. - spanProof, err := db.client.ProofRequest.Query(). + spanProof, err := db.readClient.ProofRequest.Query(). Where( proofrequest.StatusEQ(proofrequest.StatusUNREQ), proofrequest.TypeEQ(proofrequest.TypeSPAN), @@ -338,7 +364,7 @@ func (db *ProofDB) GetNextUnrequestedProof() (*ent.ProofRequest, error) { // GetAllCompletedAggProofs returns all completed AGG proofs for a given start block. func (db *ProofDB) GetAllCompletedAggProofs(startBlock uint64) ([]*ent.ProofRequest, error) { - proofs, err := db.client.ProofRequest.Query(). + proofs, err := db.readClient.ProofRequest.Query(). Where( proofrequest.TypeEQ(proofrequest.TypeAGG), proofrequest.StartBlockEQ(startBlock), @@ -360,7 +386,7 @@ func (db *ProofDB) GetAllCompletedAggProofs(startBlock uint64) ([]*ent.ProofRequ // Returns true if a new AGG proof was created, false otherwise. func (db *ProofDB) TryCreateAggProofFromSpanProofs(from, minTo uint64) (bool, uint64, error) { // If there's already an AGG proof in progress/completed with the same start block, return. - count, err := db.client.ProofRequest.Query(). + count, err := db.readClient.ProofRequest.Query(). Where( proofrequest.TypeEQ(proofrequest.TypeAGG), proofrequest.StartBlockEQ(from), @@ -398,7 +424,7 @@ func (db *ProofDB) TryCreateAggProofFromSpanProofs(from, minTo uint64) (bool, ui // GetMaxContiguousSpanProofRange returns the start and end of the contiguous span proof chain. func (db *ProofDB) GetMaxContiguousSpanProofRange(start uint64) (uint64, error) { ctx := context.Background() - client := db.client + client := db.readClient query := client.ProofRequest.Query(). Where( @@ -430,7 +456,7 @@ func (db *ProofDB) GetMaxContiguousSpanProofRange(start uint64) (uint64, error) // If there's a gap in the proofs, or the proofs don't fully cover the range, return an error. func (db *ProofDB) GetConsecutiveSpanProofs(start, end uint64) ([][]byte, error) { ctx := context.Background() - client := db.client + client := db.readClient // Query the DB for the span proofs that cover the range [start, end]. query := client.ProofRequest.Query(). diff --git a/proposer/op/proposer/prove.go b/proposer/op/proposer/prove.go index 1468faab..40c00081 100644 --- a/proposer/op/proposer/prove.go +++ b/proposer/op/proposer/prove.go @@ -224,7 +224,7 @@ func (l *L2OutputSubmitter) RequestOPSuccinctProof(p ent.ProofRequest) error { // Set the proof status to PROVING once the prover ID has been retrieved. Only proofs with status PROVING, SUCCESS or FAILED have a prover request ID. err = l.db.UpdateProofStatus(p.ID, proofrequest.StatusPROVING) if err != nil { - return fmt.Errorf("failed to set proof status to PROVING: %w", err) + return fmt.Errorf("failed to set proof status to proving: %w", err) } err = l.db.SetProverRequestID(p.ID, proofId) From e4a8c598b1e3aaa9f0ed3029e39bc367434e3434 Mon Sep 17 00:00:00 2001 From: Ratan Kaliani Date: Thu, 10 Oct 2024 13:23:14 -0700 Subject: [PATCH 2/2] nit --- proposer/op/proposer/db/db.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/proposer/op/proposer/db/db.go b/proposer/op/proposer/db/db.go index 6779415f..94cae46d 100644 --- a/proposer/op/proposer/db/db.go +++ b/proposer/op/proposer/db/db.go @@ -424,9 +424,8 @@ func (db *ProofDB) TryCreateAggProofFromSpanProofs(from, minTo uint64) (bool, ui // GetMaxContiguousSpanProofRange returns the start and end of the contiguous span proof chain. func (db *ProofDB) GetMaxContiguousSpanProofRange(start uint64) (uint64, error) { ctx := context.Background() - client := db.readClient - query := client.ProofRequest.Query(). + query := db.readClient.ProofRequest.Query(). Where( proofrequest.TypeEQ(proofrequest.TypeSPAN), proofrequest.StatusEQ(proofrequest.StatusCOMPLETE), @@ -456,10 +455,9 @@ func (db *ProofDB) GetMaxContiguousSpanProofRange(start uint64) (uint64, error) // If there's a gap in the proofs, or the proofs don't fully cover the range, return an error. func (db *ProofDB) GetConsecutiveSpanProofs(start, end uint64) ([][]byte, error) { ctx := context.Background() - client := db.readClient // Query the DB for the span proofs that cover the range [start, end]. - query := client.ProofRequest.Query(). + query := db.readClient.ProofRequest.Query(). Where( proofrequest.TypeEQ(proofrequest.TypeSPAN), proofrequest.StatusEQ(proofrequest.StatusCOMPLETE),