Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(proposer): SQLite changes #159

Merged
merged 2 commits into from
Oct 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 52 additions & 28 deletions proposer/op/proposer/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,17 @@ import (
"path/filepath"
"time"

"entgo.io/ent/dialect/sql"

"github.com/succinctlabs/op-succinct-go/proposer/db/ent"
"github.com/succinctlabs/op-succinct-go/proposer/db/ent/proofrequest"

_ "github.com/mattn/go-sqlite3"
)

type ProofDB struct {
client *ent.Client
writeClient *ent.Client
readClient *ent.Client
}

// InitDB initializes the database and returns a handle to it.
Expand All @@ -32,24 +35,47 @@ func InitDB(dbPath string, useCachedDb bool) (*ProofDB, error) {
return nil, fmt.Errorf("failed to create directories for DB: %w", err)
}

connectionString := fmt.Sprintf("file:%s?_fk=1", dbPath)
client, err := ent.Open("sqlite3", connectionString)
connectionUrl := fmt.Sprintf("file:%s?_fk=1", dbPath)

writeDrv, err := sql.Open("sqlite3", connectionUrl)
if err != nil {
return nil, fmt.Errorf("failed opening connection to sqlite: %v", err)
}
writeDb := writeDrv.DB()
// The write lock will be managed behind a Mutex.
writeDb.SetMaxOpenConns(1)
writeDb.SetConnMaxLifetime(time.Hour)

readDrv, err := sql.Open("sqlite3", connectionUrl)
if err != nil {
return nil, fmt.Errorf("failed opening connection to sqlite: %v", err)
}
readDb := readDrv.DB()
readDb.SetMaxOpenConns(4)
readDb.SetConnMaxLifetime(time.Hour)

// Run the auto migration tool.
if err := client.Schema.Create(context.Background()); err != nil {
readClient := ent.NewClient(ent.Driver(readDrv))
writeClient := ent.NewClient(ent.Driver(writeDrv))

if err := readClient.Schema.Create(context.Background()); err != nil {
return nil, fmt.Errorf("failed creating schema resources: %v", err)
}
if err := writeClient.Schema.Create(context.Background()); err != nil {
return nil, fmt.Errorf("failed creating schema resources: %v", err)
}

return &ProofDB{client: client}, nil
return &ProofDB{writeClient: writeClient, readClient: readClient}, nil
}

// CloseDB closes the connection to the database.
func (db *ProofDB) CloseDB() error {
if db.client != nil {
if err := db.client.Close(); err != nil {
if db.writeClient != nil {
if err := db.writeClient.Close(); err != nil {
return fmt.Errorf("error closing database: %w", err)
}
}
if db.readClient != nil {
if err := db.readClient.Close(); err != nil {
return fmt.Errorf("error closing database: %w", err)
}
}
Expand All @@ -59,7 +85,7 @@ func (db *ProofDB) CloseDB() error {
// NewEntry creates a new proof request entry in the database.
func (db *ProofDB) NewEntry(proofType proofrequest.Type, start, end uint64) error {
now := uint64(time.Now().Unix())
_, err := db.client.ProofRequest.
_, err := db.writeClient.ProofRequest.
Create().
SetType(proofType).
SetStartBlock(start).
Expand All @@ -78,7 +104,7 @@ func (db *ProofDB) NewEntry(proofType proofrequest.Type, start, end uint64) erro

// UpdateProofStatus updates the status of a proof request in the database.
func (db *ProofDB) UpdateProofStatus(id int, proofStatus proofrequest.Status) error {
_, err := db.client.ProofRequest.Update().
_, err := db.writeClient.ProofRequest.Update().
Where(proofrequest.ID(id)).
SetStatus(proofStatus).
SetLastUpdatedTime(uint64(time.Now().Unix())).
Expand All @@ -89,7 +115,7 @@ func (db *ProofDB) UpdateProofStatus(id int, proofStatus proofrequest.Status) er

// SetProverRequestID sets the prover request ID for a proof request in the database.
func (db *ProofDB) SetProverRequestID(id int, proverRequestID string) error {
_, err := db.client.ProofRequest.Update().
_, err := db.writeClient.ProofRequest.Update().
Where(proofrequest.ID(id)).
SetProverRequestID(proverRequestID).
SetProofRequestTime(uint64(time.Now().Unix())).
Expand All @@ -106,7 +132,7 @@ func (db *ProofDB) SetProverRequestID(id int, proverRequestID string) error {
// AddFulfilledProof adds a proof to a proof request in the database and sets the status to COMPLETE.
func (db *ProofDB) AddFulfilledProof(id int, proof []byte) error {
// Start a transaction
tx, err := db.client.Tx(context.Background())
tx, err := db.writeClient.Tx(context.Background())
if err != nil {
return fmt.Errorf("failed to start transaction: %w", err)
}
Expand Down Expand Up @@ -153,7 +179,7 @@ func (db *ProofDB) AddFulfilledProof(id int, proof []byte) error {

// GetNumberOfProofsWithStatuses returns the number of proofs with the given status(es).
func (db *ProofDB) GetNumberOfRequestsWithStatuses(statuses ...proofrequest.Status) (int, error) {
count, err := db.client.ProofRequest.Query().
count, err := db.readClient.ProofRequest.Query().
Where(
proofrequest.StatusIn(statuses...),
).
Expand All @@ -169,7 +195,7 @@ func (db *ProofDB) GetNumberOfRequestsWithStatuses(statuses ...proofrequest.Stat
// AddL1BlockInfoToAggRequest adds the L1 block info to the existing AGG proof request.
func (db *ProofDB) AddL1BlockInfoToAggRequest(startBlock, endBlock, l1BlockNumber uint64, l1BlockHash string) (*ent.ProofRequest, error) {
// Perform the update
rowsAffected, err := db.client.ProofRequest.Update().
rowsAffected, err := db.writeClient.ProofRequest.Update().
Where(
proofrequest.TypeEQ(proofrequest.TypeAGG),
proofrequest.StatusEQ(proofrequest.StatusUNREQ),
Expand All @@ -190,7 +216,7 @@ func (db *ProofDB) AddL1BlockInfoToAggRequest(startBlock, endBlock, l1BlockNumbe
}

// Fetch the updated ProofRequest
updatedProof, err := db.client.ProofRequest.Query().
updatedProof, err := db.readClient.ProofRequest.Query().
Where(
proofrequest.TypeEQ(proofrequest.TypeAGG),
proofrequest.StatusEQ(proofrequest.StatusUNREQ),
Expand All @@ -210,7 +236,7 @@ func (db *ProofDB) AddL1BlockInfoToAggRequest(startBlock, endBlock, l1BlockNumbe

// GetLatestEndBlock returns the latest end block of a proof request in the database.
func (db *ProofDB) GetLatestEndBlock() (uint64, error) {
maxEnd, err := db.client.ProofRequest.Query().
maxEnd, err := db.readClient.ProofRequest.Query().
Order(ent.Desc(proofrequest.FieldEndBlock)).
Select(proofrequest.FieldEndBlock).
First(context.Background())
Expand All @@ -229,7 +255,7 @@ func (db *ProofDB) GetWitnessGenerationTimeoutProofsOnServer() ([]*ent.ProofRequ
currentTime := time.Now().Unix()
twentyMinutesAgo := currentTime - 20*60

proofs, err := db.client.ProofRequest.Query().
proofs, err := db.readClient.ProofRequest.Query().
Where(
proofrequest.StatusEQ(proofrequest.StatusWITNESSGEN),
proofrequest.ProverRequestIDIsNil(),
Expand All @@ -247,7 +273,7 @@ func (db *ProofDB) GetWitnessGenerationTimeoutProofsOnServer() ([]*ent.ProofRequ
// If a proof failed to be sent to the prover network, it's status will be set to FAILED, but the prover request ID will be empty.
// This function returns all such proofs.
func (db *ProofDB) GetProofsFailedOnServer() ([]*ent.ProofRequest, error) {
proofs, err := db.client.ProofRequest.Query().
proofs, err := db.readClient.ProofRequest.Query().
Where(
proofrequest.StatusEQ(proofrequest.StatusFAILED),
proofrequest.ProverRequestIDEQ(""),
Expand All @@ -266,7 +292,7 @@ func (db *ProofDB) GetProofsFailedOnServer() ([]*ent.ProofRequest, error) {

// Get all pending proofs with a status of requested and a prover ID that is not empty.
func (db *ProofDB) GetAllPendingProofs() ([]*ent.ProofRequest, error) {
proofs, err := db.client.ProofRequest.Query().
proofs, err := db.readClient.ProofRequest.Query().
Where(
proofrequest.StatusEQ(proofrequest.StatusPROVING),
).
Expand All @@ -280,7 +306,7 @@ func (db *ProofDB) GetAllPendingProofs() ([]*ent.ProofRequest, error) {

// GetAllProofsWithStatus returns all proofs with the given status.
func (db *ProofDB) GetAllProofsWithStatus(status proofrequest.Status) ([]*ent.ProofRequest, error) {
proofs, err := db.client.ProofRequest.Query().
proofs, err := db.readClient.ProofRequest.Query().
Where(
proofrequest.StatusEQ(status),
).
Expand All @@ -299,7 +325,7 @@ func (db *ProofDB) GetAllProofsWithStatus(status proofrequest.Status) ([]*ent.Pr
// GetNextUnrequestedProof returns the next unrequested proof in the database.
func (db *ProofDB) GetNextUnrequestedProof() (*ent.ProofRequest, error) {
// Get the unrequested AGG proof with the lowest start block.
aggProof, err := db.client.ProofRequest.Query().
aggProof, err := db.readClient.ProofRequest.Query().
Where(
proofrequest.StatusEQ(proofrequest.StatusUNREQ),
proofrequest.TypeEQ(proofrequest.TypeAGG),
Expand All @@ -316,7 +342,7 @@ func (db *ProofDB) GetNextUnrequestedProof() (*ent.ProofRequest, error) {
}

// If there's no AGG proof available, get the unrequested SPAN proof with the lowest start block.
spanProof, err := db.client.ProofRequest.Query().
spanProof, err := db.readClient.ProofRequest.Query().
Where(
proofrequest.StatusEQ(proofrequest.StatusUNREQ),
proofrequest.TypeEQ(proofrequest.TypeSPAN),
Expand All @@ -338,7 +364,7 @@ func (db *ProofDB) GetNextUnrequestedProof() (*ent.ProofRequest, error) {

// GetAllCompletedAggProofs returns all completed AGG proofs for a given start block.
func (db *ProofDB) GetAllCompletedAggProofs(startBlock uint64) ([]*ent.ProofRequest, error) {
proofs, err := db.client.ProofRequest.Query().
proofs, err := db.readClient.ProofRequest.Query().
Where(
proofrequest.TypeEQ(proofrequest.TypeAGG),
proofrequest.StartBlockEQ(startBlock),
Expand All @@ -360,7 +386,7 @@ func (db *ProofDB) GetAllCompletedAggProofs(startBlock uint64) ([]*ent.ProofRequ
// Returns true if a new AGG proof was created, false otherwise.
func (db *ProofDB) TryCreateAggProofFromSpanProofs(from, minTo uint64) (bool, uint64, error) {
// If there's already an AGG proof in progress/completed with the same start block, return.
count, err := db.client.ProofRequest.Query().
count, err := db.readClient.ProofRequest.Query().
Where(
proofrequest.TypeEQ(proofrequest.TypeAGG),
proofrequest.StartBlockEQ(from),
Expand Down Expand Up @@ -398,9 +424,8 @@ func (db *ProofDB) TryCreateAggProofFromSpanProofs(from, minTo uint64) (bool, ui
// GetMaxContiguousSpanProofRange returns the start and end of the contiguous span proof chain.
func (db *ProofDB) GetMaxContiguousSpanProofRange(start uint64) (uint64, error) {
ctx := context.Background()
client := db.client

query := client.ProofRequest.Query().
query := db.readClient.ProofRequest.Query().
Where(
proofrequest.TypeEQ(proofrequest.TypeSPAN),
proofrequest.StatusEQ(proofrequest.StatusCOMPLETE),
Expand Down Expand Up @@ -430,10 +455,9 @@ func (db *ProofDB) GetMaxContiguousSpanProofRange(start uint64) (uint64, error)
// If there's a gap in the proofs, or the proofs don't fully cover the range, return an error.
func (db *ProofDB) GetConsecutiveSpanProofs(start, end uint64) ([][]byte, error) {
ctx := context.Background()
client := db.client

// Query the DB for the span proofs that cover the range [start, end].
query := client.ProofRequest.Query().
query := db.readClient.ProofRequest.Query().
Where(
proofrequest.TypeEQ(proofrequest.TypeSPAN),
proofrequest.StatusEQ(proofrequest.StatusCOMPLETE),
Expand Down
2 changes: 1 addition & 1 deletion proposer/op/proposer/prove.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (l *L2OutputSubmitter) RequestOPSuccinctProof(p ent.ProofRequest) error {
// Set the proof status to PROVING once the prover ID has been retrieved. Only proofs with status PROVING, SUCCESS or FAILED have a prover request ID.
err = l.db.UpdateProofStatus(p.ID, proofrequest.StatusPROVING)
if err != nil {
return fmt.Errorf("failed to set proof status to PROVING: %w", err)
return fmt.Errorf("failed to set proof status to proving: %w", err)
}

err = l.db.SetProverRequestID(p.ID, proofId)
Expand Down
Loading