Skip to content

Commit

Permalink
Merge pull request #555 from oasisprotocol/mitjat/grpc-lazy-connect
Browse files Browse the repository at this point in the history
Connect to oasis-node lazily
  • Loading branch information
mitjat authored Nov 7, 2023
2 parents 0cb4781 + 086f81d commit ee0d3bd
Show file tree
Hide file tree
Showing 11 changed files with 68 additions and 60 deletions.
7 changes: 2 additions & 5 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,8 @@ test-e2e:

fill-cache-for-e2e-regression: nexus
@./tests/e2e_regression/ensure_consistent_config.sh
cp tests/e2e_regression/e2e_config_1.yml /tmp/nexus_fill_e2e_regression_cache_1.yml
cp tests/e2e_regression/e2e_config_2.yml /tmp/nexus_fill_e2e_regression_cache_2.yml
sed -i -E 's/query_on_cache_miss: false/query_on_cache_miss: true/g' /tmp/nexus_fill_e2e_regression_cache_*.yml
./nexus --config /tmp/nexus_fill_e2e_regression_cache_1.yml analyze
./nexus --config /tmp/nexus_fill_e2e_regression_cache_2.yml analyze
./nexus --config tests/e2e_regression/e2e_config_1.yml analyze
./nexus --config tests/e2e_regression/e2e_config_2.yml analyze

# Run the api tests locally, assuming the environment is set up with an oasis-node that is
# accessible as specified in the config file.
Expand Down
4 changes: 0 additions & 4 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,10 +212,6 @@ func SingleNetworkLookup(rpc string) map[string]*ArchiveConfig {
type CacheConfig struct {
// CacheDir is the directory where the cache data is stored
CacheDir string `koanf:"cache_dir"`

// If set, the analyzer will query the node upon any cache
// misses.
QueryOnCacheMiss bool `koanf:"query_on_cache_miss"`
}

func (cfg *CacheConfig) Validate() error {
Expand Down
21 changes: 0 additions & 21 deletions storage/oasis/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,6 @@ import (

// NewConsensusClient creates a new ConsensusClient.
func NewConsensusClient(ctx context.Context, sourceConfig *config.SourceConfig) (nodeapi.ConsensusApiLite, error) {
// If we are using purely file-backed analyzers, do not connect to the node.
if sourceConfig.Cache != nil && !sourceConfig.Cache.QueryOnCacheMiss {
cachePath := filepath.Join(sourceConfig.Cache.CacheDir, "consensus")
nodeApi, err := file.NewFileConsensusApiLite(cachePath, nil)
if err != nil {
return nil, fmt.Errorf("error instantiating cache-based consensusApi: %w", err)
}
return nodeApi, nil
}

// Create an API that connects to the real node, then wrap it in a caching layer.
var nodeApi nodeapi.ConsensusApiLite
nodeApi, err := history.NewHistoryConsensusApiLite(ctx, sourceConfig.History(), sourceConfig.Nodes, sourceConfig.FastStartup)
Expand All @@ -44,17 +34,6 @@ func NewConsensusClient(ctx context.Context, sourceConfig *config.SourceConfig)

// NewRuntimeClient creates a new RuntimeClient.
func NewRuntimeClient(ctx context.Context, sourceConfig *config.SourceConfig, runtime common.Runtime) (nodeapi.RuntimeApiLite, error) {
// If we are using purely file-backed analyzers, do not connect to the node.
if sourceConfig.Cache != nil && !sourceConfig.Cache.QueryOnCacheMiss {
cachePath := filepath.Join(sourceConfig.Cache.CacheDir, string(runtime))
nodeApi, err := file.NewFileRuntimeApiLite(runtime, cachePath, nil)
if err != nil {
return nil, fmt.Errorf("error instantiating cache-based runtimeApi: %w", err)
}
return nodeApi, nil
}

// Create an API that connects to the real node, then wrap it in a caching layer.
var nodeApi nodeapi.RuntimeApiLite
nodeApi, err := history.NewHistoryRuntimeApiLite(ctx, sourceConfig.History(), sourceConfig.SDKParaTime(runtime), sourceConfig.Nodes, sourceConfig.FastStartup, runtime)
if err != nil {
Expand Down
52 changes: 52 additions & 0 deletions storage/oasis/connections/raw_grpc.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
package connections

import (
"context"
"crypto/tls"
"sync"

cmnGrpc "github.com/oasisprotocol/oasis-core/go/common/grpc"
sdkConfig "github.com/oasisprotocol/oasis-sdk/client-sdk/go/config"
Expand Down Expand Up @@ -33,3 +35,53 @@ func RawConnect(nodeConfig *config.NodeConfig) (*grpc.ClientConn, error) {

return cmnGrpc.Dial(nodeConfig.RPC, dialOpts...)
}

func LazyGrpcConnect(nodeConfig config.NodeConfig) *LazyGrpcConn {
return &LazyGrpcConn{
inner: nil, // The underlying connection will be initialized lazily.
lock: sync.Mutex{},
nodeConfig: nodeConfig,
}
}

type LazyGrpcConn struct {
inner *grpc.ClientConn
lock sync.Mutex // For lazy initialization.

nodeConfig config.NodeConfig // The node to connect to.
}

// ensureConn initializes `inner` if it hasn't been initialized yet.
// This function is thread-safe. If it returns nil, `inner` is guaranteed to be non-nil.
func (c *LazyGrpcConn) ensureConn() error {
c.lock.Lock()
defer c.lock.Unlock()

if c.inner != nil {
return nil
}

// Initialize `inner`.
var err error
c.inner, err = RawConnect(&c.nodeConfig)
if err != nil {
return err
}

return nil
}

func (c *LazyGrpcConn) Close() error {
if c.inner == nil {
return nil
}
return c.inner.Close()
}

func (c *LazyGrpcConn) Invoke(ctx context.Context, method string, args interface{}, reply interface{}, opts ...grpc.CallOption) error {
if err := c.ensureConn(); err != nil {
return err
}

return c.inner.Invoke(ctx, method, args, reply, opts...)
}
7 changes: 3 additions & 4 deletions storage/oasis/nodeapi/cobalt/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"

"google.golang.org/grpc"

"github.com/oasisprotocol/oasis-core/go/common/cbor"

// nexus-internal data types.
Expand All @@ -19,6 +17,7 @@ import (
scheduler "github.com/oasisprotocol/nexus/coreapi/v22.2.11/scheduler/api"

"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/storage/oasis/connections"
"github.com/oasisprotocol/nexus/storage/oasis/nodeapi"

// data types for Cobalt gRPC APIs.
Expand All @@ -37,12 +36,12 @@ import (
// Cobalt node. To be able to use the old gRPC API, this struct uses gRPC
// directly, skipping the convenience wrappers provided by oasis-core.
type CobaltConsensusApiLite struct {
grpcConn *grpc.ClientConn
grpcConn *connections.LazyGrpcConn
}

var _ nodeapi.ConsensusApiLite = (*CobaltConsensusApiLite)(nil)

func NewCobaltConsensusApiLite(grpcConn *grpc.ClientConn) *CobaltConsensusApiLite {
func NewCobaltConsensusApiLite(grpcConn *connections.LazyGrpcConn) *CobaltConsensusApiLite {
return &CobaltConsensusApiLite{
grpcConn: grpcConn,
}
Expand Down
7 changes: 3 additions & 4 deletions storage/oasis/nodeapi/damask/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"fmt"

"google.golang.org/grpc"

"github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/cbor"

Expand All @@ -21,6 +19,7 @@ import (
staking "github.com/oasisprotocol/nexus/coreapi/v22.2.11/staking/api"

"github.com/oasisprotocol/nexus/log"
"github.com/oasisprotocol/nexus/storage/oasis/connections"
"github.com/oasisprotocol/nexus/storage/oasis/nodeapi"

// data types for Damask gRPC APIs.
Expand All @@ -32,12 +31,12 @@ import (
// compatible with Damask gRPC API, this struct just trivially wraps the
// convenience methods provided by oasis-core.
type DamaskConsensusApiLite struct {
grpcConn *grpc.ClientConn
grpcConn *connections.LazyGrpcConn
}

var _ nodeapi.ConsensusApiLite = (*DamaskConsensusApiLite)(nil)

func NewDamaskConsensusApiLite(grpcConn *grpc.ClientConn) *DamaskConsensusApiLite {
func NewDamaskConsensusApiLite(grpcConn *connections.LazyGrpcConn) *DamaskConsensusApiLite {
return &DamaskConsensusApiLite{
grpcConn: grpcConn,
}
Expand Down
10 changes: 2 additions & 8 deletions storage/oasis/nodeapi/history/history.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,12 @@ var _ nodeapi.ConsensusApiLite = (*HistoryConsensusApiLite)(nil)
type APIConstructor func(ctx context.Context, chainContext string, archiveConfig *config.ArchiveConfig, fastStartup bool) (nodeapi.ConsensusApiLite, error)

func damaskAPIConstructor(ctx context.Context, chainContext string, archiveConfig *config.ArchiveConfig, fastStartup bool) (nodeapi.ConsensusApiLite, error) {
rawConn, err := connections.RawConnect(archiveConfig.ResolvedConsensusNode())
if err != nil {
return nil, fmt.Errorf("oasis-node RawConnect: %w", err)
}
rawConn := connections.LazyGrpcConnect(*archiveConfig.ResolvedConsensusNode())
return damask.NewDamaskConsensusApiLite(rawConn), nil
}

func cobaltAPIConstructor(ctx context.Context, chainContext string, archiveConfig *config.ArchiveConfig, fastStartup bool) (nodeapi.ConsensusApiLite, error) {
rawConn, err := connections.RawConnect(archiveConfig.ResolvedConsensusNode())
if err != nil {
return nil, fmt.Errorf("oasis-node RawConnect: %w", err)
}
rawConn := connections.LazyGrpcConnect(*archiveConfig.ResolvedConsensusNode())
return cobalt.NewCobaltConsensusApiLite(rawConn), nil
}

Expand Down
5 changes: 1 addition & 4 deletions storage/oasis/nodeapi/history/runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,7 @@ func NewHistoryRuntimeApiLite(ctx context.Context, history *config.History, sdkP
return nil, err
}
sdkClient := sdkConn.Runtime(sdkPT)
rawConn, err := connections.RawConnect(archiveConfig.ResolvedRuntimeNode(runtime))
if err != nil {
return nil, fmt.Errorf("oasis-node RawConnect: %w", err)
}
rawConn := connections.LazyGrpcConnect(*archiveConfig.ResolvedRuntimeNode(runtime))
apis[record.ArchiveName] = nodeapi.NewUniversalRuntimeApiLite(sdkPT.Namespace(), rawConn, &sdkClient)
}
}
Expand Down
7 changes: 3 additions & 4 deletions storage/oasis/nodeapi/universal_runtime.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ import (
"fmt"
"time"

"google.golang.org/grpc"

coreCommon "github.com/oasisprotocol/oasis-core/go/common"
"github.com/oasisprotocol/oasis-core/go/common/cbor"
"github.com/oasisprotocol/oasis-core/go/common/crypto/hash"
Expand All @@ -16,6 +14,7 @@ import (

roothash "github.com/oasisprotocol/nexus/coreapi/v22.2.11/roothash/api/block"
coreRuntimeClient "github.com/oasisprotocol/nexus/coreapi/v22.2.11/runtime/client/api"
"github.com/oasisprotocol/nexus/storage/oasis/connections"

common "github.com/oasisprotocol/nexus/common"
cobaltRoothash "github.com/oasisprotocol/nexus/coreapi/v21.1.1/roothash/api/block"
Expand All @@ -36,7 +35,7 @@ type UniversalRuntimeApiLite struct {
// A raw gRPC connection to the node. Used for fetching raw CBOR-encoded
// responses for RPCs whose encodings changed over time, and this class
// needs to handle the various formats/types.
grpcConn *grpc.ClientConn
grpcConn *connections.LazyGrpcConn

// An oasis-sdk managed connection to the node. Used for RPCs that have
// had a stable ABI over time. That is the majority of them, and oasis-sdk
Expand All @@ -47,7 +46,7 @@ type UniversalRuntimeApiLite struct {

var _ RuntimeApiLite = (*UniversalRuntimeApiLite)(nil)

func NewUniversalRuntimeApiLite(runtimeID coreCommon.Namespace, grpcConn *grpc.ClientConn, sdkClient *connection.RuntimeClient) *UniversalRuntimeApiLite {
func NewUniversalRuntimeApiLite(runtimeID coreCommon.Namespace, grpcConn *connections.LazyGrpcConn, sdkClient *connection.RuntimeClient) *UniversalRuntimeApiLite {
return &UniversalRuntimeApiLite{
runtimeID: runtimeID,
grpcConn: grpcConn,
Expand Down
4 changes: 1 addition & 3 deletions tests/e2e_regression/e2e_config_1.yml
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,7 @@

analysis:
source:
cache:
cache_dir: tests/e2e_regression/rpc-cache
query_on_cache_miss: false
cache: { cache_dir: tests/e2e_regression/rpc-cache }
chain_name: mainnet
nodes:
damask:
Expand Down
4 changes: 1 addition & 3 deletions tests/e2e_regression/e2e_config_2.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,7 @@

analysis:
source:
cache:
cache_dir: tests/e2e_regression/rpc-cache
query_on_cache_miss: false
cache: { cache_dir: tests/e2e_regression/rpc-cache }
chain_name: mainnet
nodes:
damask:
Expand Down

0 comments on commit ee0d3bd

Please sign in to comment.