Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

L1 and pending subscriptions #2122

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions blockchain/blockchain.go
Original file line number Diff line number Diff line change
Expand Up @@ -956,8 +956,8 @@ func (b *Blockchain) storeEmptyPending(txn db.Transaction, latestHeader *core.He
}

// StorePending stores a pending block given that it is for the next height
func (b *Blockchain) StorePending(pending *Pending) error {
return b.database.Update(func(txn db.Transaction) error {
func (b *Blockchain) StorePending(pending *Pending) (bool, error) {
err := b.database.Update(func(txn db.Transaction) error {
expectedParentHash := new(felt.Felt)
h, err := headsHeader(txn)
if err != nil && !errors.Is(err, db.ErrKeyNotFound) {
Expand All @@ -978,9 +978,9 @@ func (b *Blockchain) StorePending(pending *Pending) error {
} else if !errors.Is(err, db.ErrKeyNotFound) { // Allow StorePending before block zero.
return err
}

return b.storePending(txn, pending)
})
return err == nil, err
}

func (b *Blockchain) storePending(txn db.Transaction, pending *Pending) error {
Expand Down
30 changes: 21 additions & 9 deletions blockchain/blockchain_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,10 +459,12 @@ func TestEvents(t *testing.T) {
if b.Number < 6 {
require.NoError(t, chain.Store(b, &emptyCommitments, s, nil))
} else {
require.NoError(t, chain.StorePending(&blockchain.Pending{
stored, err := chain.StorePending(&blockchain.Pending{
Block: b,
StateUpdate: s,
}))
})
require.True(t, stored)
require.NoError(t, err)
}
}

Expand Down Expand Up @@ -678,7 +680,9 @@ func TestPending(t *testing.T) {
Block: b,
StateUpdate: su,
}
require.NoError(t, chain.StorePending(&pendingGenesis))
stored, err := chain.StorePending(&pendingGenesis)
require.True(t, stored)
require.NoError(t, err)

gotPending, pErr := chain.Pending()
require.NoError(t, pErr)
Expand Down Expand Up @@ -755,7 +759,9 @@ func TestPending(t *testing.T) {
Block: b,
StateUpdate: su,
}
require.ErrorIs(t, chain.StorePending(&notExpectedPending), blockchain.ErrParentDoesNotMatchHead)
stored, err := chain.StorePending(&notExpectedPending)
require.False(t, stored)
require.ErrorIs(t, err, blockchain.ErrParentDoesNotMatchHead)
})

t.Run("store expected pending block", func(t *testing.T) {
Expand All @@ -768,7 +774,9 @@ func TestPending(t *testing.T) {
Block: b,
StateUpdate: su,
}
require.NoError(t, chain.StorePending(&expectedPending))
stored, err := chain.StorePending(&expectedPending)
require.True(t, stored)
require.NoError(t, err)

gotPending, pErr := chain.Pending()
require.NoError(t, pErr)
Expand Down Expand Up @@ -803,14 +811,16 @@ func TestStorePendingIncludesNumber(t *testing.T) {
chain := blockchain.New(pebble.NewMemTest(t), &network)

// Store pending genesis.
require.NoError(t, chain.StorePending(&blockchain.Pending{
stored, err := chain.StorePending(&blockchain.Pending{
Block: &core.Block{
Header: &core.Header{
ParentHash: new(felt.Felt),
Hash: new(felt.Felt),
},
},
}))
})
require.True(t, stored)
require.NoError(t, err)
pending, err := chain.Pending()
require.NoError(t, err)
require.Equal(t, uint64(0), pending.Block.Number)
Expand All @@ -824,14 +834,16 @@ func TestStorePendingIncludesNumber(t *testing.T) {
require.NoError(t, chain.Store(b, nil, su, nil))

// Store pending.
require.NoError(t, chain.StorePending(&blockchain.Pending{
stored, err = chain.StorePending(&blockchain.Pending{
Block: &core.Block{
Header: &core.Header{
ParentHash: b.Hash,
Hash: new(felt.Felt),
},
},
}))
})
require.NoError(t, err)
require.True(t, stored)
pending, err = chain.Pending()
require.NoError(t, err)
require.Equal(t, uint64(1), pending.Block.Number)
Expand Down
21 changes: 20 additions & 1 deletion l1/l1.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,14 @@ import (
"github.com/NethermindEth/juno/blockchain"
"github.com/NethermindEth/juno/core"
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/feed"
"github.com/NethermindEth/juno/l1/contract"
"github.com/NethermindEth/juno/service"
"github.com/NethermindEth/juno/utils"
"github.com/ethereum/go-ethereum/event"
)

//go:generate mockgen -destination=../mocks/mock_subscriber.go -package=mocks github.com/NethermindEth/juno/l1 Subscriber
//go:generate mockgen -destination=./mocks/mock_subscriber.go -package=mocks github.com/NethermindEth/juno/l1 Subscriber
type Subscriber interface {
FinalisedHeight(ctx context.Context) (uint64, error)
WatchLogStateUpdate(ctx context.Context, sink chan<- *contract.StarknetLogStateUpdate) (event.Subscription, error)
Expand All @@ -24,6 +25,15 @@ type Subscriber interface {
Close()
}

type L1HeadSubscription struct {
*feed.Subscription[*core.L1Head]
}

//go:generate mockgen -destination=../mocks/mock_l1reader.go -package=mocks -mock_names Reader=L1Reader github.com/NethermindEth/juno/l1 Reader
type Reader interface {
SubscribeL1Heads() L1HeadSubscription
}

type Client struct {
l1 Subscriber
l2Chain *blockchain.Blockchain
Expand All @@ -33,6 +43,7 @@ type Client struct {
pollFinalisedInterval time.Duration
nonFinalisedLogs map[uint64]*contract.StarknetLogStateUpdate
listener EventListener
l1Heads *feed.Feed[*core.L1Head]
}

var _ service.Service = (*Client)(nil)
Expand All @@ -47,6 +58,7 @@ func NewClient(l1 Subscriber, chain *blockchain.Blockchain, log utils.SimpleLogg
pollFinalisedInterval: time.Minute,
nonFinalisedLogs: make(map[uint64]*contract.StarknetLogStateUpdate, 0),
listener: SelectiveListener{},
l1Heads: feed.New[*core.L1Head](),
}
}

Expand Down Expand Up @@ -213,10 +225,17 @@ func (c *Client) setL1Head(ctx context.Context) error {
return fmt.Errorf("l1 head for block %d and state root %s: %w", head.BlockNumber, head.StateRoot.String(), err)
}
c.listener.OnNewL1Head(head)
c.l1Heads.Send(head)
c.log.Infow("Updated l1 head",
"blockNumber", head.BlockNumber,
"blockHash", head.BlockHash.ShortString(),
"stateRoot", head.StateRoot.ShortString())

return nil
}

func (c *Client) SubscribeL1Heads() L1HeadSubscription {
return L1HeadSubscription{
Subscription: c.l1Heads.Subscribe(),
}
}
61 changes: 58 additions & 3 deletions l1/l1_pkg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/NethermindEth/juno/core/felt"
"github.com/NethermindEth/juno/db/pebble"
"github.com/NethermindEth/juno/l1/contract"
"github.com/NethermindEth/juno/mocks"
"github.com/NethermindEth/juno/l1/mocks"
"github.com/NethermindEth/juno/utils"
"github.com/ethereum/go-ethereum/core/types"
"github.com/stretchr/testify/assert"
Expand All @@ -25,7 +25,13 @@ type fakeSubscription struct {
closed bool
}

func newFakeSubscription(errs ...error) *fakeSubscription {
func newFakeSubscription() *fakeSubscription {
return &fakeSubscription{
errChan: make(chan error),
}
}

func newFakeSubscriptionGivenErr(errs ...error) *fakeSubscription {
errChan := make(chan error, 1)
if len(errs) >= 1 {
errChan <- errs[0]
Expand Down Expand Up @@ -407,7 +413,7 @@ func TestUnreliableSubscription(t *testing.T) {
// The subscription returns an error on each block.
// Each time, a second subscription succeeds.

failedUpdateSub := newFakeSubscription(err)
failedUpdateSub := newFakeSubscriptionGivenErr(err)
failedUpdateCall := subscriber.
EXPECT().
WatchLogStateUpdate(gomock.Any(), gomock.Any()).
Expand Down Expand Up @@ -466,3 +472,52 @@ func TestUnreliableSubscription(t *testing.T) {
}
}
}

func TestSubscribeL1Heads(t *testing.T) {
t.Parallel()

network := &utils.Mainnet
ctrl := gomock.NewController(t)
nopLog := utils.NewNopZapLogger()
chain := blockchain.New(pebble.NewMemTest(t), network)

subscriber := mocks.NewMockSubscriber(ctrl)
subscriber.EXPECT().Close().Times(1)
subscriber.EXPECT().FinalisedHeight(gomock.Any()).Return(uint64(0), nil).AnyTimes()
subscriber.
EXPECT().
ChainID(gomock.Any()).
Return(network.L1ChainID, nil).
Times(1)
subscriber.
EXPECT().
WatchLogStateUpdate(gomock.Any(), gomock.Any()).
Do(func(_ context.Context, sink chan<- *contract.StarknetLogStateUpdate) {
sink <- &contract.StarknetLogStateUpdate{
GlobalRoot: new(big.Int),
BlockNumber: new(big.Int),
BlockHash: new(big.Int),
Raw: types.Log{
BlockNumber: 0,
},
}
}).
Return(newFakeSubscription(), nil)

client := NewClient(subscriber, chain, nopLog).WithResubscribeDelay(0).WithPollFinalisedInterval(time.Nanosecond)

sub := client.SubscribeL1Heads()
t.Cleanup(sub.Unsubscribe)

ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
require.NoError(t, client.Run(ctx))
cancel()

got, ok := <-sub.Recv()
require.True(t, ok)
require.Equal(t, &core.L1Head{
BlockNumber: 0,
BlockHash: new(felt.Felt),
StateRoot: new(felt.Felt),
}, got)
}
100 changes: 100 additions & 0 deletions l1/mocks/mock_subscriber.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading