Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support state change subscription #40

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 12 additions & 2 deletions pkg/eth/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"math/big"
"time"

"github.com/vulcanize/ipld-eth-server/pkg/events"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
Expand Down Expand Up @@ -52,24 +54,32 @@ type PublicEthAPI struct {
// Local db backend
B *Backend

// Event subscription backend
events *events.Notifier

// Proxy node for forwarding cache misses
supportsStateDiff bool // Whether or not the remote node supports the statediff_writeStateDiffAt endpoint, if it does we can fill the local cache when we hit a miss
rpc *rpc.Client
ethClient *ethclient.Client
}

// NewPublicEthAPI creates a new PublicEthAPI with the provided underlying Backend
func NewPublicEthAPI(b *Backend, client *rpc.Client, supportsStateDiff bool) *PublicEthAPI {
func NewPublicEthAPI(b *Backend, client *rpc.Client, supportsStateDiff bool) (*PublicEthAPI, error) {
var ethClient *ethclient.Client
if client != nil {
ethClient = ethclient.NewClient(client)
}
notifier, err := events.NewNotifier(b.DB, channelName)
if err != nil {
return nil, err
}
return &PublicEthAPI{
B: b,
events: notifier,
supportsStateDiff: supportsStateDiff,
rpc: client,
ethClient: ethClient,
}
}, nil
}

/*
Expand Down
3 changes: 2 additions & 1 deletion pkg/eth/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,7 +189,8 @@ var _ = Describe("API", func() {
indexAndPublisher := eth2.NewIPLDPublisher(db)
backend, err := eth.NewEthBackend(db, &eth.Config{})
Expect(err).ToNot(HaveOccurred())
api = eth.NewPublicEthAPI(backend, nil, false)
api, err = eth.NewPublicEthAPI(backend, nil, false)
Expect(err).ToNot(HaveOccurred())
err = indexAndPublisher.Publish(test_helpers.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
err = publishCode(db, test_helpers.ContractCodeHash, test_helpers.ContractCode)
Expand Down
6 changes: 4 additions & 2 deletions pkg/eth/eth_state_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ var _ = Describe("eth state reading tests", func() {
RPCGasCap: big.NewInt(10000000000),
})
Expect(err).ToNot(HaveOccurred())
api = eth.NewPublicEthAPI(backend, nil, false)
api, err = eth.NewPublicEthAPI(backend, nil, false)
Expect(err).ToNot(HaveOccurred())

// make the test blockchain (and state)
blocks, receipts, chain = test_helpers.MakeChain(5, test_helpers.Genesis, test_helpers.TestChainGen)
Expand Down Expand Up @@ -153,7 +154,8 @@ var _ = Describe("eth state reading tests", func() {

// Insert some non-canonical data into the database so that we test our ability to discern canonicity
indexAndPublisher := eth2.NewIPLDPublisher(db)
api = eth.NewPublicEthAPI(backend, nil, false)
api, err = eth.NewPublicEthAPI(backend, nil, false)
Expect(err).ToNot(HaveOccurred())
err = indexAndPublisher.Publish(test_helpers.MockConvertedPayload)
Expect(err).ToNot(HaveOccurred())
// The non-canonical header has a child
Expand Down
3 changes: 2 additions & 1 deletion pkg/eth/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,10 @@ package eth
import (
"encoding/json"
"fmt"
"os"

"github.com/ethereum/go-ethereum/cmd/utils"
log "github.com/sirupsen/logrus"
"os"

sdtypes "github.com/ethereum/go-ethereum/statediff/types"

Expand Down
2 changes: 1 addition & 1 deletion pkg/eth/subscription_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type StorageFilter struct {
IntermediateNodes bool
}

// Init is used to initialize a EthSubscription struct with env variables
// NewEthSubscriptionConfig initializes and returns an EthSubscription struct with env variables
func NewEthSubscriptionConfig() (*SubscriptionSettings, error) {
sc := new(SubscriptionSettings)
// Below default to false, which means we do not backfill by default
Expand Down
46 changes: 46 additions & 0 deletions pkg/eth/subscriptions.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package eth

import (
"context"
"github.com/ethereum/go-ethereum/eth/filters"
"github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-server/pkg/events"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/rpc"
)

const (
channelName = "postgraphile:header_cids"
)

func (api *PublicEthAPI) NewStateChanges(ctx context.Context, crit filters.FilterCriteria) (*rpc.Subscription, error) {
notifier, supported := rpc.NotifierFromContext(ctx)
if !supported {
return &rpc.Subscription{}, rpc.ErrNotificationsUnsupported
}
rpcSub := notifier.CreateSubscription()
go func() {
errChan := make(chan error)
stateChanges := make(chan events.Payload)
api.events.SubscribeStateChanges(ethereum.FilterQuery(crit), stateChanges, errChan)

for {
select {
case s := <-stateChanges:
notifier.Notify(rpcSub.ID, s)
case <-rpcSub.Err():
api.events.Close()
return
case <-notifier.Closed():
api.events.Close()
return
case err := <- errChan:
logrus.Errorf("error from NewStateChanges notifier: %v", err)
return
}
}
}()

return rpcSub, nil
}
233 changes: 233 additions & 0 deletions pkg/events/notifier.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
// VulcanizeDB
// Copyright © 2021 Vulcanize

// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.

// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.

// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

package events

import (
"encoding/json"
"math/big"
"time"

"github.com/ethereum/go-ethereum"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/rlp"
"github.com/jmoiron/sqlx"
"github.com/lib/pq"
log "github.com/sirupsen/logrus"
"github.com/vulcanize/ipld-eth-indexer/pkg/postgres"

"github.com/vulcanize/ipld-eth-server/pkg/shared"
)

var (
getBlockInfoPgStr = `SELECT block_number, block_hash FROM eth.header_cids WHERE id = $1`
getStateLeafsPgStr = `SELECT state_cids.id, state_leaf_key, data FROM eth.state_cids
INNER JOIN eth.header_cids ON (state_cids.header_id = header_cids.id)
INNER JOIN public.blocks ON (state_cids.mh_key = blocks.key)
WHERE header_cids.id = $1 AND state_leaf_key IS NOT NULL`
getStorageLeafsPgStr = `SELECT storage_leaf_key, data FROM eth.storage_cids
INNER JOIN eth.state_cids ON (storage_cids.state_id = state_cids.id)
INNER JOIN public.blocks ON (storage_cids.mh_key = blocks.key)
WHERE state_cids.id = $1 AND storage_leaf_key IS NOT NULL`
)

// Notifier listens to inserts on Postgres tables and forwards the data
type Notifier struct {
db *postgres.DB
listener *pq.Listener
failed chan error
}

// NewNotifier creates a new notifier for given PostgreSQL credentials.
func NewNotifier(db *postgres.DB, channelName string) (*Notifier, error) {
n := &Notifier{db: db, failed: make(chan error)}

listener := pq.NewListener(
db.Config.DbConnectionString(),
10*time.Second, time.Minute,
n.callBack)

if err := listener.Listen(channelName); err != nil {
listener.Close()
log.Println("ERROR!:", err)
return nil, err
}

n.listener = listener
return n, nil
}

func (n *Notifier) SubscribeStateChanges(query ethereum.FilterQuery, payloadChan chan Payload, errChan chan error) {
outChan := make(chan []byte)
doneChan := make(chan struct{})
n.notify(doneChan, outChan, errChan)
go func() {
for {
select {
case out := <-outChan:
jsonPayload := new(JSONPayload)
json.Unmarshal(out, jsonPayload)
if len(jsonPayload.Node) < 2 {
log.Warn("pushed JSON payload does not contain expected number of entrie in __node__ array")
continue
}
payload, err := n.getStateChanges(jsonPayload.Node[1])
if err != nil {
errChan <- err
continue
}
payloadChan <- *payload
case <-doneChan:
return
}
}
}()
}

func (n *Notifier) getStateChanges(headerID string) (*Payload, error) {
tx, err := n.db.Beginx()
if err != nil {
return nil, err
}
blockInfo := new(BlockInfoPayload)
if err := tx.Select(blockInfo, getBlockInfoPgStr, headerID); err != nil {
shared.Rollback(tx)
return nil, err
}
blockNum := new(big.Int)
blockNum.SetString(blockInfo.BlockNumber, 10)
stateLeafPayloads, err := n.getStateLeafs(tx, headerID)
if err != nil {
shared.Rollback(tx)
return nil, err
}
stateAccounts := make([]AccountDiff, len(stateLeafPayloads))
for i, slp := range stateLeafPayloads {
storageLeafPayloads, err := n.getStorageLeafs(tx, slp.ID)
if err != nil {
shared.Rollback(tx)
return nil, err
}
stateAccounts[i] = AccountDiff{
Key: common.Hex2Bytes(slp.StateLeafKey),
Value: slp.RLPData,
Storage: storageLeafPayloads,
}
}
stateChangePayload := new(StateDiff)
stateChangePayload.BlockHash = common.HexToHash(blockInfo.BlockHash)
stateChangePayload.BlockNumber = blockNum
stateChangePayload.UpdatedAccounts = stateAccounts
if err := tx.Commit(); err != nil {
return nil, err
}

by, err := rlp.EncodeToBytes(stateChangePayload)
if err != nil {
return nil, err
}
return &Payload{
StateDiffRlp: by,
}, nil
}

func (n *Notifier) getStateLeafs(tx *sqlx.Tx, headerID string) ([]StateLeafPayload, error) {
rows, err := tx.Queryx(getStateLeafsPgStr, headerID)
if err != nil {
return nil, err
}
stateLeafPayloads := make([]StateLeafPayload, 0)
defer rows.Close()
for rows.Next() {
stateLeafPayload := new(StateLeafPayload)
if err := rows.StructScan(stateLeafPayload); err != nil {
return nil, err
}
stateLeafPayloads = append(stateLeafPayloads, *stateLeafPayload)
}
if rows.Err() != nil {
return nil, err
}
return stateLeafPayloads, err
}

func (n *Notifier) getStorageLeafs(tx *sqlx.Tx, stateID int64) ([]StorageDiff, error) {
rows, err := tx.Queryx(getStorageLeafsPgStr, stateID)
if err != nil {
return nil, err
}
storageLeafPayloads := make([]StorageDiff, 0)
defer rows.Close()
for rows.Next() {
storageLeafPayload := new(StorageLeafPayload)
if err := rows.StructScan(storageLeafPayload); err != nil {
return nil, err
}
storageLeafPayloads = append(storageLeafPayloads, StorageDiff{
Key: common.Hex2Bytes(storageLeafPayload.StorageLeaf),
Value: storageLeafPayload.RLPData,
})
}
if rows.Err() != nil {
return nil, err
}
return storageLeafPayloads, err
}

// notify is the main loop of the notifier to receive data from
// the database in JSON-FORMAT and send it down the provided channel.
func (n *Notifier) notify(doneChan chan struct{}, outChan chan []byte, errChan chan error) {
go func() {
defer close(doneChan)
for {
select {
case e := <-n.listener.Notify:
if e == nil {
continue
}
outChan <- []byte(e.Extra)
case err := <-n.failed:
if err != nil {
errChan <- err
}
return
case <-time.After(time.Minute):
if err := n.listener.Ping(); err != nil {
errChan <- err
return
}
}
}
}()
}

// callBack
func (n *Notifier) callBack(event pq.ListenerEventType, err error) {
if err != nil {
log.Errorf("listener error: %s\n", err)
}
if event == pq.ListenerEventConnectionAttemptFailed {
n.failed <- err
}
if event == pq.ListenerEventDisconnected {
n.failed <- err
}
}

// Close closes the notifier.
func (n *Notifier) Close() error {
return n.listener.Close()
}
Loading