Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: adds support for gRPC streaming and removes dependency on RPC #3915

Open
wants to merge 34 commits into
base: feature/api-breaks
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 22 commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
e831ee3
feat!: use blocks streaming API instead of RPC in fetcher
rach-id Nov 2, 2024
ea9a2d7
test: node builder test using the new grpc endpoints
rach-id Nov 4, 2024
5cd2e94
chore: import
rach-id Nov 4, 2024
c30fdbc
chore: add stop function for fetcher
rach-id Nov 4, 2024
9a194b2
feat!: remove rpc port
rach-id Nov 4, 2024
8360531
chore: remove core rpc tests
rach-id Nov 4, 2024
2b71210
chore: remove core rpc tests
rach-id Nov 4, 2024
13bddc3
fix: test fill blocks failing after node stop
rach-id Nov 4, 2024
a568598
chore: use commits for deps
rach-id Nov 4, 2024
291952b
chore: fmt
rach-id Nov 4, 2024
3087a3a
chore: fix tests
rach-id Nov 5, 2024
cc81362
chore: fmt
rach-id Nov 5, 2024
7d89608
chore: fmt
rach-id Nov 5, 2024
51096af
chore: memory optimisation
rach-id Nov 6, 2024
fe54137
chore: rename to first part
rach-id Nov 8, 2024
62c1dc7
feat!: use a local signed block
rach-id Nov 8, 2024
184391b
chore: remove unnecessary select
rach-id Nov 8, 2024
e11d333
chore: fmt
rach-id Nov 8, 2024
b37c4b2
Merge branch 'main' into support-grpc-endpoints
rach-id Nov 11, 2024
b14cb16
chore: go mod tidy
rach-id Nov 11, 2024
e51f237
chore: remove unnecessary =nil
rach-id Nov 12, 2024
221253f
chore: fix test
rach-id Nov 12, 2024
fe20093
chore: rename grpc port to port
rach-id Nov 12, 2024
f371c33
chore: use net.JoinHostPort as suggested by @cristaloleg
rach-id Nov 20, 2024
92c4f94
chore: context timeout as suggested by @walldiss
rach-id Nov 20, 2024
af7f2be
chore: remove unnecessary comment
rach-id Nov 21, 2024
7a62570
feat!: remove pointers in requests
rach-id Nov 21, 2024
1b41b56
Merge branch 'feature/api-breaks' into support-grpc-endpoints
rach-id Nov 21, 2024
a863f6b
chore: bump version
rach-id Nov 21, 2024
b5a545f
chore: fix remaining implementation to support removing pointers
rach-id Nov 21, 2024
a10ac73
chore: lint
rach-id Nov 21, 2024
d4243fa
chore: check if fetcher is listening for new blocks
rach-id Nov 21, 2024
511068a
fix: correctly set the is listening for blocks to false
rach-id Nov 22, 2024
a033feb
feat: support stopping the gRPC connection + subscription retry
rach-id Nov 22, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 4 additions & 15 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,13 @@ package core
import (
"fmt"

retryhttp "github.com/hashicorp/go-retryablehttp"
"github.com/tendermint/tendermint/rpc/client"
"github.com/tendermint/tendermint/rpc/client/http"
coregrpc "github.com/tendermint/tendermint/rpc/grpc"
)

// Client is an alias to Core Client.
type Client = client.Client
type Client = coregrpc.BlockAPIClient

// NewRemote creates a new Client that communicates with a remote Core endpoint over HTTP.
// NewRemote creates a new Client that communicates with a remote Core endpoint over gRPC.
func NewRemote(ip, port string) (Client, error) {
httpClient := retryhttp.NewClient()
httpClient.RetryMax = 2
// suppress logging
httpClient.Logger = nil

return http.NewWithClient(
fmt.Sprintf("tcp://%s:%s", ip, port),
"/websocket",
httpClient.StandardClient(),
)
return coregrpc.StartBlockAPIGRPCClient(fmt.Sprintf("tcp://%s:%s", ip, port))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we reuse the client across state and core modules?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I'm planning to do it in a subsequent PR

}
4 changes: 2 additions & 2 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,12 +173,12 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
}
log.Debugw("fetched signed block from core", "height", b.Header.Height)

eds, err := extendBlock(b.Data, b.Header.Version.App)
eds, err := extendBlock(*b.Data, b.Header.Version.App)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As we already know, Data is quite big, let's avoid copying it here. It should be easy to change extendBlock to take ptr instead.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this will be GC at some point. The thing is extendBlock also makes a couple other calls by value instead of reference. So multiple copies will be done regardless, this will be just one less. What do you think of creating an issue that checks wherever Data is used, it passes it by reference, even in app and maybe other repos so that we have everything fixed?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are we passing by value here? Just curious

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because extendBlock expects the parameter to be passed by value. I didn't want to change that behavior since Data is already being copied in a lot of places, one more copy is fine. Related: #3915 (comment)

if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err)
}
// create extended header
eh, err := ce.construct(&b.Header, &b.Commit, &b.ValidatorSet, eds)
eh, err := ce.construct(b.Header, b.Commit, b.ValidatorSet, eds)
if err != nil {
panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err))
}
Expand Down
14 changes: 11 additions & 3 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"bytes"
"context"
"net"
"testing"
"time"

Expand Down Expand Up @@ -62,6 +63,7 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
require.NoError(t, err)
assert.True(t, has)
}
require.NoError(t, fetcher.Stop(ctx))
}

// TestExchange_DoNotStoreHistoric tests that the CoreExchange will not
Expand Down Expand Up @@ -118,7 +120,11 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn
// flakiness with accessing account state)
_, err := cctx.WaitForHeightWithTimeout(2, time.Second*2) // TODO @renaynay: configure?
require.NoError(t, err)
return NewBlockFetcher(cctx.Client), cctx
host, port, err := net.SplitHostPort(cctx.GRPCClient.Target())
require.NoError(t, err)
blockAPIClient, err := NewRemote(host, port)
require.NoError(t, err)
return NewBlockFetcher(blockAPIClient), cctx
}

// fillBlocks fills blocks until the context is canceled.
Expand All @@ -136,7 +142,9 @@ func fillBlocks(
}

_, err := cctx.FillBlock(16, cfg.Genesis.Accounts()[0].Name, flags.BroadcastBlock)
require.NoError(t, err)
if err != nil && ctx.Err() == nil {
require.NoError(t, err)
Comment on lines +147 to +148
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not just require.NoError? we do not expect one here

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there is a funny behavior happening there that I guess will be fixed if we support closing the connection manually. Related: #3915 (comment)

}
}
}

Expand All @@ -154,7 +162,7 @@ func generateNonEmptyBlocks(
sub, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
defer func() {
err = fetcher.UnsubscribeNewBlockEvent(ctx)
err = fetcher.Stop(ctx)
require.NoError(t, err)
}()

Expand Down
Loading
Loading