From 551d26ea4758907aadaac7065f06ad74eb0b4766 Mon Sep 17 00:00:00 2001 From: Yawning Angel Date: Wed, 8 Jun 2022 15:28:51 +0000 Subject: [PATCH] feat: Add a way to follow a SQL db instead of indexing This should allow multiple non-archival instances to share the same database, with one and only one instance writing to the db. --- conf/config.go | 5 +- indexer/indexer.go | 177 ++++++++++++++++++++++++++++++++++++++++++++- main.go | 2 +- 3 files changed, 177 insertions(+), 7 deletions(-) diff --git a/conf/config.go b/conf/config.go index 0b2afcc1..a89c775c 100644 --- a/conf/config.go +++ b/conf/config.go @@ -22,8 +22,9 @@ type Config struct { // blocks that the node doesn't have data for, such as by skipping them in checkpoint sync. // For sensible reasons, indexing may actually start at an even later block, such as if // this block is already indexed or the node indicates that it doesn't have this block. - IndexingStart uint64 `koanf:"indexing_start"` - IndexingDisable bool `koanf:"indexing_disable"` + IndexingStart uint64 `koanf:"indexing_start"` + IndexingDisable bool `koanf:"indexing_disable"` + IndexingSQLFollow bool `koanf:"indexing_sql_follow"` Log *LogConfig `koanf:"log"` Cache *CacheConfig `koanf:"cache"` diff --git a/indexer/indexer.go b/indexer/indexer.go index cabc109d..7fa36e69 100644 --- a/indexer/indexer.go +++ b/indexer/indexer.go @@ -4,10 +4,15 @@ import ( "context" "errors" "fmt" + "math/big" "sync/atomic" "time" + ethCommon "github.com/ethereum/go-ethereum/common" + ethTypes "github.com/ethereum/go-ethereum/core/types" + "github.com/oasisprotocol/oasis-core/go/common" + "github.com/oasisprotocol/oasis-core/go/common/quantity" "github.com/oasisprotocol/oasis-core/go/common/service" "github.com/oasisprotocol/oasis-core/go/roothash/api/block" "github.com/oasisprotocol/oasis-sdk/client-sdk/go/client" @@ -16,6 +21,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/oasisprotocol/emerald-web3-gateway/conf" + "github.com/oasisprotocol/emerald-web3-gateway/db/model" + "github.com/oasisprotocol/emerald-web3-gateway/filters" "github.com/oasisprotocol/emerald-web3-gateway/storage" ) @@ -48,9 +55,11 @@ type Service struct { pruningStep uint64 indexingStart uint64 indexingDisable bool + indexingFollow bool backend Backend client client.RuntimeClient + storage storage.Storage core core.V1 queryBlockGasLimit bool @@ -302,14 +311,172 @@ func (s *Service) indexingWorker() { } } +func (s *Service) followingWorker() { + // This is gross but there is no other way to dispatch pubsub events + // which is the whole point of this routine. + ib := s.backend.(*indexBackend) + + var lastIndexed uint64 +pollLoop: + for { + select { + case <-s.ctx.Done(): + return + case <-time.After(indexerLoopDelay): + } + + // Poll the db for new blocks. + latest, err := s.storage.GetLatestBlockNumber(s.ctx) + if err != nil { + s.Logger.Warn("failed to query latest block from DB", + "err", err, + ) + continue + } + + // Do the "right thing(TM)". + var startAt uint64 + switch lastIndexed { + case latest: + // No new blocks. + continue + case 0: + // First time through the loop. + startAt = latest + default: + startAt = lastIndexed + 1 + } + + for round := startAt; round <= latest; round++ { + block, err := s.storage.GetBlockByNumber(s.ctx, round) + if err != nil { + s.Logger.Warn("failed to get block from DB", + "err", err, + "round", round, + ) + continue pollLoop + } + + logs, err := s.storage.GetLogs(s.ctx, round, round) + if err != nil { + s.Logger.Warn("failed to get logs from DB", + "err", err, + "round", round, + ) + continue pollLoop + } + + chainEvent := filters.ChainEvent{ + Block: block, + Hash: ethCommon.HexToHash(block.Hash), + Logs: logs, + } + ib.subscribe.ChainChan() <- chainEvent + + // Reconstruct the transactions and receipts that were actually + // stored for this block. The replacement of prior failure + // nonsense is surprisingly annoying to deal with. + numTxes := len(block.Transactions) + uniqueTxes := make([]*model.Transaction, 0, numTxes) + uniqueReceipts := make([]*model.Receipt, 0, numTxes) + var lastTransactionPrice quantity.Quantity + for i, tx := range block.Transactions { + // In the Emerald Paratime it can happen that an already committed + // transaction is re-proposed at a later block. The duplicate transaction fails, + // but it is still included in the block. The gateway should drop these + // transactions to remain compatible with ETH semantics. + if tx.Status == uint(ethTypes.ReceiptStatusFailed) { + continue + } + + dbTx, err := s.storage.GetTransaction(s.ctx, tx.Hash) + if err != nil { + // Unlike the indexer version of this, the tx (by hash) + // MUST be in the database by the point that the block + // is in the database. + // + // This is an invariant violation so recovery is unlikely. + s.Logger.Warn("failed to get tx from DB", + "err", err, + "round", round, + "tx_hash", tx.Hash, + ) + continue pollLoop + } + if dbTx.Round != tx.Round && dbTx.Status != uint(ethTypes.ReceiptStatusFailed) { + // Earlier tx exists, and it didn't fail. + // + // This is an invariant violation so recovery is unlikely. + ib.logger.Error("duplicate tx", + "round", round, + "earlier_tx", dbTx, + "tx", tx, + ) + continue pollLoop + } + + // If the tx passes the deduplication/replacement checks, + // presumably the receipt does as well? + receipt, err := s.storage.GetTransactionReceipt(s.ctx, tx.Hash) + if err != nil { + s.Logger.Warn("failed to get tx receipt from DB", + "err", err, + "round", round, + "tx_hash", tx.Hash, + ) + continue pollLoop + } + + // Done this way because quantity.Quantity.UnmarshalText + // can have a side-effect on error. + if err = func() error { + var gp big.Int + if err = gp.UnmarshalText([]byte(tx.GasPrice)); err != nil { + return err + } + return lastTransactionPrice.FromBigInt(&gp) + }(); err != nil { + s.Logger.Warn("failed to parse tx gas price", + "err", err, + "round", round, + "tx_hash", tx.Hash, + ) + } + + uniqueTxes = append(uniqueTxes, block.Transactions[i]) + uniqueReceipts = append(uniqueReceipts, receipt) + } + + blockData := &BlockData{ + Block: block, + Receipts: uniqueReceipts, + UniqueTxes: uniqueTxes, + LastTransactionPrice: &lastTransactionPrice, + } + + if ib.observer != nil { + ib.observer.OnBlockIndexed(blockData) + } + ib.blockNotifier.Broadcast(blockData) + } + + lastIndexed = latest // Caught up! + } +} + // Start starts service. func (s *Service) Start() { - // TODO/NotYawning: Non-archive nodes that have the indexer disabled - // likey want to use a different notion of healthy, and probably also - // want to start a worker that monitors the database for changes. - if s.indexingDisable { + switch { + case s.indexingDisable: + // Archive mode disables the worker entirely. + s.updateHealth(true) + return + case s.indexingFollow: + // DB follow mode follows another indexer instance via the DB. s.updateHealth(true) + go s.followingWorker() return + default: } go s.indexingWorker() @@ -342,6 +509,7 @@ func New( runtimeID: runtimeID, backend: cachingBackend, client: client, + storage: storage, core: core.NewV1(client), ctx: ctx, cancelCtx: cancelCtx, @@ -349,6 +517,7 @@ func New( pruningStep: cfg.PruningStep, indexingStart: cfg.IndexingStart, indexingDisable: cfg.IndexingDisable, + indexingFollow: cfg.IndexingSQLFollow, } s.Logger = s.Logger.With("runtime_id", s.runtimeID.String()) diff --git a/main.go b/main.go index 08c2d512..0440d535 100644 --- a/main.go +++ b/main.go @@ -195,7 +195,7 @@ func runRoot() error { // For now, "disable" write access to the DB in a kind of kludgy way // if the indexer is disabled. Yes this means that no migrations // can be done. Deal with it. - dbReadOnly := cfg.IndexingDisable + dbReadOnly := cfg.IndexingDisable || cfg.IndexingSQLFollow // Initialize db for migrations (higher timeouts). db, err := psql.InitDB(ctx, cfg.Database, true, dbReadOnly)