diff --git a/beacon/client/http-provider.go b/beacon/client/http-provider.go index 7c2ff4a..9175e1c 100644 --- a/beacon/client/http-provider.go +++ b/beacon/client/http-provider.go @@ -5,18 +5,20 @@ import ( "context" "fmt" "io" + "log/slog" "net/http" + "net/http/httptrace" + "net/url" "strconv" "strings" "sync" "time" "github.com/goccy/go-json" - "github.com/rocket-pool/node-manager-core/beacon" + "github.com/rocket-pool/node-manager-core/log" ) const ( - RequestUrlFormat = "%s%s" RequestContentType = "application/json" RequestSyncStatusPath = "/eth/v1/node/syncing" @@ -36,24 +38,53 @@ const ( RequestWithdrawalCredentialsChangePath = "/eth/v1/beacon/pool/bls_to_execution_changes" MaxRequestValidatorsCount = 600 + + DefaultFastTimeout time.Duration = 5 * time.Second + DefaultSlowTimeout time.Duration = 30 * time.Second ) +type BeaconHttpProviderOpts struct { + DefaultFastTimeout time.Duration + DefaultSlowTimeout time.Duration +} + type BeaconHttpProvider struct { - providerAddress string - client http.Client + baseUrl *url.URL + httpClient http.Client + defaultFastTimeout time.Duration + defaultSlowTimeout time.Duration } -func NewBeaconHttpProvider(providerAddress string, timeout time.Duration) *BeaconHttpProvider { - return &BeaconHttpProvider{ - providerAddress: providerAddress, - client: http.Client{ - Timeout: timeout, - }, +// Creates a new HTTP provider for the Beacon API +func NewBeaconHttpProvider(providerAddress string, opts *BeaconHttpProviderOpts) (*BeaconHttpProvider, error) { + baseUrl, err := url.Parse(providerAddress) + if err != nil { + return nil, fmt.Errorf("error parsing provider address [%s] into URL: %w", providerAddress, err) + } + provider := &BeaconHttpProvider{ + baseUrl: baseUrl, + httpClient: http.Client{}, + } + + // Specify the default timeouts to use for calls that aren't given one by the caller + if opts != nil { + provider.defaultFastTimeout = opts.DefaultFastTimeout + provider.defaultSlowTimeout = opts.DefaultSlowTimeout + } else { + provider.defaultFastTimeout = DefaultFastTimeout + provider.defaultSlowTimeout = DefaultSlowTimeout } + return provider, nil } func (p *BeaconHttpProvider) Beacon_Attestations(ctx context.Context, blockId string) (AttestationsResponse, bool, error) { - responseBody, status, err := p.getRequest(ctx, fmt.Sprintf(RequestAttestationsPath, blockId)) + // Prep the context + ctx, cancel := p.prepareContext(ctx, p.defaultFastTimeout) + defer cancel() + + // Run the request + url := p.baseUrl.JoinPath(fmt.Sprintf(RequestAttestationsPath, blockId)) + responseBody, status, err := p.getRequest(ctx, url) if err != nil { return AttestationsResponse{}, false, fmt.Errorf("error getting attestations data for slot %s: %w", blockId, err) } @@ -71,7 +102,13 @@ func (p *BeaconHttpProvider) Beacon_Attestations(ctx context.Context, blockId st } func (p *BeaconHttpProvider) Beacon_Block(ctx context.Context, blockId string) (BeaconBlockResponse, bool, error) { - responseBody, status, err := p.getRequest(ctx, fmt.Sprintf(RequestBeaconBlockPath, blockId)) + // Prep the context + ctx, cancel := p.prepareContext(ctx, p.defaultFastTimeout) + defer cancel() + + // Run the request + url := p.baseUrl.JoinPath(fmt.Sprintf(RequestBeaconBlockPath, blockId)) + responseBody, status, err := p.getRequest(ctx, url) if err != nil { return BeaconBlockResponse{}, false, fmt.Errorf("error getting beacon block data: %w", err) } @@ -89,8 +126,14 @@ func (p *BeaconHttpProvider) Beacon_Block(ctx context.Context, blockId string) ( } func (p *BeaconHttpProvider) Beacon_BlsToExecutionChanges_Post(ctx context.Context, request BLSToExecutionChangeRequest) error { + // Prep the context + ctx, cancel := p.prepareContext(ctx, p.defaultFastTimeout) + defer cancel() + + // Perform the post request requestArray := []BLSToExecutionChangeRequest{request} // This route must be wrapped in an array - responseBody, status, err := p.postRequest(ctx, RequestWithdrawalCredentialsChangePath, requestArray) + url := p.baseUrl.JoinPath(RequestWithdrawalCredentialsChangePath) + responseBody, status, err := p.postRequest(ctx, url, requestArray) if err != nil { return fmt.Errorf("error broadcasting withdrawal credentials change for validator %s: %w", request.Message.ValidatorIndex, err) } @@ -101,16 +144,20 @@ func (p *BeaconHttpProvider) Beacon_BlsToExecutionChanges_Post(ctx context.Conte } func (p *BeaconHttpProvider) Beacon_Committees(ctx context.Context, stateId string, epoch *uint64) (CommitteesResponse, error) { - var committees CommitteesResponse + // Prep the context + ctx, cancel := p.prepareContext(ctx, p.defaultSlowTimeout) + defer cancel() - query := "" + // Create the URL + url := p.baseUrl.JoinPath(fmt.Sprintf(RequestCommitteePath, stateId)) if epoch != nil { - query = fmt.Sprintf("?epoch=%d", *epoch) + query := url.Query() + query.Add("epoch", strconv.FormatUint(*epoch, 10)) + url.RawQuery = query.Encode() } // Committees responses are large, so let the json decoder read it in a buffered fashion - clientWithoutTimeout := http.Client{} - reader, status, err := getRequestReader(ctx, fmt.Sprintf(RequestCommitteePath, stateId)+query, p.providerAddress, clientWithoutTimeout) + reader, status, err := p.getRequestReader(ctx, url) if err != nil { return CommitteesResponse{}, fmt.Errorf("error getting committees: %w", err) } @@ -132,6 +179,7 @@ func (p *BeaconHttpProvider) Beacon_Committees(ctx context.Context, stateId stri d.currentReader = &reader // Begin decoding + var committees CommitteesResponse if err := d.decoder.Decode(&committees); err != nil { return CommitteesResponse{}, fmt.Errorf("error decoding committees: %w", err) } @@ -140,7 +188,13 @@ func (p *BeaconHttpProvider) Beacon_Committees(ctx context.Context, stateId stri } func (p *BeaconHttpProvider) Beacon_FinalityCheckpoints(ctx context.Context, stateId string) (FinalityCheckpointsResponse, error) { - responseBody, status, err := p.getRequest(ctx, fmt.Sprintf(RequestFinalityCheckpointsPath, stateId)) + // Prep the context + ctx, cancel := p.prepareContext(ctx, p.defaultFastTimeout) + defer cancel() + + // Run the request + url := p.baseUrl.JoinPath(fmt.Sprintf(RequestFinalityCheckpointsPath, stateId)) + responseBody, status, err := p.getRequest(ctx, url) if err != nil { return FinalityCheckpointsResponse{}, fmt.Errorf("error getting finality checkpoints: %w", err) } @@ -155,7 +209,13 @@ func (p *BeaconHttpProvider) Beacon_FinalityCheckpoints(ctx context.Context, sta } func (p *BeaconHttpProvider) Beacon_Genesis(ctx context.Context) (GenesisResponse, error) { - responseBody, status, err := p.getRequest(ctx, RequestGenesisPath) + // Prep the context + ctx, cancel := p.prepareContext(ctx, p.defaultFastTimeout) + defer cancel() + + // Run the request + url := p.baseUrl.JoinPath(RequestGenesisPath) + responseBody, status, err := p.getRequest(ctx, url) if err != nil { return GenesisResponse{}, fmt.Errorf("error getting genesis data: %w", err) } @@ -170,7 +230,13 @@ func (p *BeaconHttpProvider) Beacon_Genesis(ctx context.Context) (GenesisRespons } func (p *BeaconHttpProvider) Beacon_Header(ctx context.Context, blockId string) (BeaconBlockHeaderResponse, bool, error) { - responseBody, status, err := p.getRequest(ctx, fmt.Sprintf(RequestBeaconBlockHeaderPath, blockId)) + // Prep the context + ctx, cancel := p.prepareContext(ctx, p.defaultFastTimeout) + defer cancel() + + // Run the request + url := p.baseUrl.JoinPath(fmt.Sprintf(RequestBeaconBlockHeaderPath, blockId)) + responseBody, status, err := p.getRequest(ctx, url) if err != nil { return BeaconBlockHeaderResponse{}, false, fmt.Errorf("error getting beacon block header data: %w", err) } @@ -188,11 +254,20 @@ func (p *BeaconHttpProvider) Beacon_Header(ctx context.Context, blockId string) } func (p *BeaconHttpProvider) Beacon_Validators(ctx context.Context, stateId string, ids []string) (ValidatorsResponse, error) { - var query string + // Prep the context + ctx, cancel := p.prepareContext(ctx, p.defaultSlowTimeout) + defer cancel() + + // Create the URL + url := p.baseUrl.JoinPath(fmt.Sprintf(RequestValidatorsPath, stateId)) if len(ids) > 0 { - query = fmt.Sprintf("?id=%s", strings.Join(ids, ",")) + query := url.Query() + query.Add("id", strings.Join(ids, ",")) + url.RawQuery = query.Encode() } - responseBody, status, err := p.getRequestWithoutTimeout(ctx, fmt.Sprintf(RequestValidatorsPath, stateId)+query) + + // Run the request + responseBody, status, err := p.getRequest(ctx, url) if err != nil { return ValidatorsResponse{}, fmt.Errorf("error getting validators: %w", err) } @@ -207,7 +282,13 @@ func (p *BeaconHttpProvider) Beacon_Validators(ctx context.Context, stateId stri } func (p *BeaconHttpProvider) Beacon_VoluntaryExits_Post(ctx context.Context, request VoluntaryExitRequest) error { - responseBody, status, err := p.postRequest(ctx, RequestVoluntaryExitPath, request) + // Prep the context + ctx, cancel := p.prepareContext(ctx, p.defaultFastTimeout) + defer cancel() + + // Perform the post request + url := p.baseUrl.JoinPath(RequestVoluntaryExitPath) + responseBody, status, err := p.postRequest(ctx, url, request) if err != nil { return fmt.Errorf("error broadcasting exit for validator at index %s: %w", request.Message.ValidatorIndex, err) } @@ -218,7 +299,13 @@ func (p *BeaconHttpProvider) Beacon_VoluntaryExits_Post(ctx context.Context, req } func (p *BeaconHttpProvider) Config_DepositContract(ctx context.Context) (Eth2DepositContractResponse, error) { - responseBody, status, err := p.getRequest(ctx, RequestEth2DepositContractMethod) + // Prep the context + ctx, cancel := p.prepareContext(ctx, p.defaultFastTimeout) + defer cancel() + + // Run the request + url := p.baseUrl.JoinPath(RequestEth2DepositContractMethod) + responseBody, status, err := p.getRequest(ctx, url) if err != nil { return Eth2DepositContractResponse{}, fmt.Errorf("error getting eth2 deposit contract: %w", err) } @@ -233,13 +320,21 @@ func (p *BeaconHttpProvider) Config_DepositContract(ctx context.Context) (Eth2De } func (p *BeaconHttpProvider) Config_Spec(ctx context.Context) (Eth2ConfigResponse, error) { - responseBody, status, err := p.getRequest(ctx, RequestEth2ConfigPath) + // Prep the context + ctx, cancel := p.prepareContext(ctx, p.defaultFastTimeout) + defer cancel() + + // Run the request + url := p.baseUrl.JoinPath(RequestEth2ConfigPath) + responseBody, status, err := p.getRequest(ctx, url) if err != nil { return Eth2ConfigResponse{}, fmt.Errorf("error getting eth2 config: %w", err) } if status != http.StatusOK { return Eth2ConfigResponse{}, fmt.Errorf("error getting eth2 config: HTTP status %d; response body: '%s'", status, string(responseBody)) } + + // Unmarshal the response var eth2Config Eth2ConfigResponse if err := json.Unmarshal(responseBody, ð2Config); err != nil { return Eth2ConfigResponse{}, fmt.Errorf("error decoding eth2 config: %w", err) @@ -248,13 +343,21 @@ func (p *BeaconHttpProvider) Config_Spec(ctx context.Context) (Eth2ConfigRespons } func (p *BeaconHttpProvider) Node_Syncing(ctx context.Context) (SyncStatusResponse, error) { - responseBody, status, err := p.getRequest(ctx, RequestSyncStatusPath) + // Prep the context + ctx, cancel := p.prepareContext(ctx, p.defaultFastTimeout) + defer cancel() + + // Run the request + url := p.baseUrl.JoinPath(RequestSyncStatusPath) + responseBody, status, err := p.getRequest(ctx, url) if err != nil { return SyncStatusResponse{}, fmt.Errorf("error getting node sync status: %w", err) } if status != http.StatusOK { return SyncStatusResponse{}, fmt.Errorf("error getting node sync status: HTTP status %d; response body: '%s'", status, string(responseBody)) } + + // Unmarshal the response var syncStatus SyncStatusResponse if err := json.Unmarshal(responseBody, &syncStatus); err != nil { return SyncStatusResponse{}, fmt.Errorf("error decoding node sync status: %w", err) @@ -263,7 +366,13 @@ func (p *BeaconHttpProvider) Node_Syncing(ctx context.Context) (SyncStatusRespon } func (p *BeaconHttpProvider) Validator_DutiesProposer(ctx context.Context, indices []string, epoch uint64) (ProposerDutiesResponse, error) { - responseBody, status, err := p.getRequest(ctx, fmt.Sprintf(RequestValidatorProposerDuties, strconv.FormatUint(epoch, 10))) + // Prep the context + ctx, cancel := p.prepareContext(ctx, p.defaultFastTimeout) + defer cancel() + + // Run the request + url := p.baseUrl.JoinPath(fmt.Sprintf(RequestValidatorProposerDuties, strconv.FormatUint(epoch, 10))) + responseBody, status, err := p.getRequest(ctx, url) if err != nil { return ProposerDutiesResponse{}, fmt.Errorf("error getting validator proposer duties: %w", err) } @@ -271,6 +380,7 @@ func (p *BeaconHttpProvider) Validator_DutiesProposer(ctx context.Context, indic return ProposerDutiesResponse{}, fmt.Errorf("error getting validator proposer duties: HTTP status %d; response body: '%s'", status, string(responseBody)) } + // Unmarshal the response var syncDuties ProposerDutiesResponse if err := json.Unmarshal(responseBody, &syncDuties); err != nil { return ProposerDutiesResponse{}, fmt.Errorf("error decoding validator proposer duties data: %w", err) @@ -279,9 +389,13 @@ func (p *BeaconHttpProvider) Validator_DutiesProposer(ctx context.Context, indic } func (p *BeaconHttpProvider) Validator_DutiesSync_Post(ctx context.Context, indices []string, epoch uint64) (SyncDutiesResponse, error) { - // Perform the post request - responseBody, status, err := p.postRequest(ctx, fmt.Sprintf(RequestValidatorSyncDuties, strconv.FormatUint(epoch, 10)), indices) + // Prep the context + ctx, cancel := p.prepareContext(ctx, p.defaultFastTimeout) + defer cancel() + // Perform the post request + url := p.baseUrl.JoinPath(fmt.Sprintf(RequestValidatorSyncDuties, strconv.FormatUint(epoch, 10))) + responseBody, status, err := p.postRequest(ctx, url, indices) if err != nil { return SyncDutiesResponse{}, fmt.Errorf("error getting validator sync duties: %w", err) } @@ -289,6 +403,7 @@ func (p *BeaconHttpProvider) Validator_DutiesSync_Post(ctx context.Context, indi return SyncDutiesResponse{}, fmt.Errorf("error getting validator sync duties: HTTP status %d; response body: '%s'", status, string(responseBody)) } + // Unmarshal the response var syncDuties SyncDutiesResponse if err := json.Unmarshal(responseBody, &syncDuties); err != nil { return SyncDutiesResponse{}, fmt.Errorf("error decoding validator sync duties data: %w", err) @@ -301,20 +416,9 @@ func (p *BeaconHttpProvider) Validator_DutiesSync_Post(ctx context.Context, indi // ========================== // Make a GET request to the beacon node and read the body of the response -func (p *BeaconHttpProvider) getRequest(ctx context.Context, requestPath string) ([]byte, int, error) { - return getRequestImpl(ctx, requestPath, p.providerAddress, p.client) -} - -// Make a GET request to the beacon node and read the body of the response -func (p *BeaconHttpProvider) getRequestWithoutTimeout(ctx context.Context, requestPath string) ([]byte, int, error) { - clientWithoutTimeout := http.Client{} - return getRequestImpl(ctx, requestPath, p.providerAddress, clientWithoutTimeout) -} - -// Make a GET request to the beacon node and read the body of the response -func getRequestImpl(ctx context.Context, requestPath string, providerAddress string, client http.Client) ([]byte, int, error) { +func (p *BeaconHttpProvider) getRequest(ctx context.Context, url *url.URL) ([]byte, int, error) { // Send request - reader, status, err := getRequestReader(ctx, requestPath, providerAddress, client) + reader, status, err := p.getRequestReader(ctx, url) if err != nil { return []byte{}, 0, err } @@ -333,7 +437,10 @@ func getRequestImpl(ctx context.Context, requestPath string, providerAddress str } // Make a POST request to the beacon node -func (p *BeaconHttpProvider) postRequest(ctx context.Context, requestPath string, requestBody any) ([]byte, int, error) { +func (p *BeaconHttpProvider) postRequest(ctx context.Context, url *url.URL, requestBody any) ([]byte, int, error) { + // Log the request and add tracing if enabled + ctx = p.logRequest(ctx, http.MethodPost, url) + // Get request body requestBodyBytes, err := json.Marshal(requestBody) if err != nil { @@ -342,17 +449,18 @@ func (p *BeaconHttpProvider) postRequest(ctx context.Context, requestPath string requestBodyReader := bytes.NewReader(requestBodyBytes) // Create the request - path := fmt.Sprintf(RequestUrlFormat, p.providerAddress, requestPath) - request, err := http.NewRequestWithContext(ctx, http.MethodPost, path, requestBodyReader) + request, err := http.NewRequestWithContext(ctx, http.MethodPost, url.String(), requestBodyReader) if err != nil { - return nil, 0, fmt.Errorf("error creating POST request to [%s]: %w", path, err) + return nil, 0, fmt.Errorf("error creating POST request to [%s]: %w", url, err) } request.Header.Set("Content-Type", RequestContentType) // Submit the request - response, err := p.client.Do(request) + response, err := p.httpClient.Do(request) if err != nil { - return []byte{}, 0, fmt.Errorf("error running POST request to [%s]: %w", path, err) + // Remove the query for readability + trimmedPath := url.JoinPath(url.Host, url.Path) + return []byte{}, 0, fmt.Errorf("error running POST request to [%s]: %w", trimmedPath, err) } defer func() { _ = response.Body.Close() @@ -368,31 +476,72 @@ func (p *BeaconHttpProvider) postRequest(ctx context.Context, requestPath string return body, response.StatusCode, nil } -// Get an eth2 epoch number by time -func epochAt(config beacon.Eth2Config, time uint64) uint64 { - return config.GenesisEpoch + (time-config.GenesisTime)/config.SecondsPerEpoch -} - // Make a GET request but do not read its body yet (allows buffered decoding) -func getRequestReader(ctx context.Context, requestPath string, providerAddress string, client http.Client) (io.ReadCloser, int, error) { +func (p *BeaconHttpProvider) getRequestReader(ctx context.Context, url *url.URL) (io.ReadCloser, int, error) { + // Log the request and add tracing if enabled + ctx = p.logRequest(ctx, http.MethodGet, url) + // Make the request - path := fmt.Sprintf(RequestUrlFormat, providerAddress, requestPath) - req, err := http.NewRequestWithContext(ctx, http.MethodGet, path, nil) + path := url.String() + req, err := http.NewRequestWithContext(ctx, http.MethodGet, url.String(), nil) if err != nil { return nil, 0, fmt.Errorf("error creating GET request to [%s]: %w", path, err) } req.Header.Set("Content-Type", RequestContentType) // Submit the request - response, err := client.Do(req) + response, err := p.httpClient.Do(req) if err != nil { // Remove the query for readability - trimmedPath, _, _ := strings.Cut(path, "?") + trimmedPath := url.JoinPath(url.Host, url.Path) return nil, 0, fmt.Errorf("error running GET request to [%s]: %w", trimmedPath, err) } return response.Body, response.StatusCode, nil } +// Adds a timeout to the context if one didn't already exist +func (p *BeaconHttpProvider) prepareContext(ctx context.Context, defaultTimeout time.Duration) (context.Context, context.CancelFunc) { + // Make a new context if it wasn't provided + if ctx == nil { + ctx = context.Background() + } + + // Return if there was already a deadline + _, hasDeadline := ctx.Deadline() + if hasDeadline { + return ctx, func() {} + } + + // Add a default timeout if there isn't one + return context.WithTimeout(ctx, defaultTimeout) +} + +// Log a request and prepare the context by adding HTTP tracing if the logger has it enabled +func (p *BeaconHttpProvider) logRequest(ctx context.Context, methodName string, url *url.URL) context.Context { + logger, _ := log.FromContext(ctx) + if logger == nil { + return ctx + } + + // Log the request + args := []any{ + slog.String(log.MethodKey, methodName), + slog.String("host", url.Host), + slog.String("path", url.Path), + } + deadline, hasDeadline := ctx.Deadline() + if hasDeadline { + args = append(args, slog.Time("deadline", deadline.UTC())) + } + logger.Debug("Running BN request", args...) + tracer := logger.HttpTracer + if tracer != nil { + // Enable HTTP tracing if requested + ctx = httptrace.WithClientTrace(ctx, tracer) + } + return ctx +} + // ========================== // === Committees Decoder === // ========================== diff --git a/beacon/client/std-client.go b/beacon/client/std-client.go index b70ee36..39e5853 100644 --- a/beacon/client/std-client.go +++ b/beacon/client/std-client.go @@ -586,3 +586,8 @@ func (c *StandardClient) getValidatorsByOpts(ctx context.Context, pubkeysOrIndic return ValidatorsResponse{Data: trueData}, nil } + +// Get an eth2 epoch number by time +func epochAt(config beacon.Eth2Config, time uint64) uint64 { + return config.GenesisEpoch + (time-config.GenesisTime)/config.SecondsPerEpoch +} diff --git a/beacon/client/std-http-client.go b/beacon/client/std-http-client.go index a19a8b6..ea6b52d 100644 --- a/beacon/client/std-http-client.go +++ b/beacon/client/std-http-client.go @@ -2,14 +2,37 @@ package client import "time" +// Options for the standard HTTP client +type StandardHttpClientOpts struct { + // The time to wait for a request that is expected to return quickly + FastTimeout time.Duration + + // The time to wait for a request that is expected to take a lot of processing on the BN and return slowly + SlowTimeout time.Duration +} + +// Standard high-level client for interacting with a Beacon Node over HTTP type StandardHttpClient struct { *StandardClient } -// Create a new client instance -func NewStandardHttpClient(providerAddress string, timeout time.Duration) *StandardHttpClient { - provider := NewBeaconHttpProvider(providerAddress, timeout) +// Create a new client instance. +func NewStandardHttpClient(providerAddress string, opts *StandardHttpClientOpts) (*StandardHttpClient, error) { + var provider *BeaconHttpProvider + var err error + if opts != nil { + provider, err = NewBeaconHttpProvider(providerAddress, &BeaconHttpProviderOpts{ + DefaultFastTimeout: opts.FastTimeout, + DefaultSlowTimeout: opts.SlowTimeout, + }) + } else { + provider, err = NewBeaconHttpProvider(providerAddress, nil) + } + if err != nil { + return nil, err + } + return &StandardHttpClient{ StandardClient: NewStandardClient(provider), - } + }, nil } diff --git a/config/external-beacon-config.go b/config/external-beacon-config.go index 80b636e..ff73286 100644 --- a/config/external-beacon-config.go +++ b/config/external-beacon-config.go @@ -14,6 +14,12 @@ type ExternalBeaconConfig struct { // The URL of the Prysm gRPC endpoint (only needed if using Prysm VCs) PrysmRpcUrl Parameter[string] + + // Number of milliseconds to wait for a fast request to complete + FastTimeoutMs Parameter[uint64] + + // Number of milliseconds to wait for a slow request to complete + SlowTimeoutMs Parameter[uint64] } // Generates a new ExternalBeaconConfig configuration @@ -92,6 +98,34 @@ func NewExternalBeaconConfig() *ExternalBeaconConfig { Network_All: "", }, }, + + FastTimeoutMs: Parameter[uint64]{ + ParameterCommon: &ParameterCommon{ + ID: ids.FastTimeoutID, + Name: "Fast Timeout", + Description: "Number of milliseconds to wait for a request to complete that is expected to be fast and light before timing out the request.", + AffectsContainers: []ContainerID{ContainerID_Daemon}, + CanBeBlank: false, + OverwriteOnUpgrade: false, + }, + Default: map[Network]uint64{ + Network_All: 5000, + }, + }, + + SlowTimeoutMs: Parameter[uint64]{ + ParameterCommon: &ParameterCommon{ + ID: ids.SlowTimeoutID, + Name: "Slow Timeout", + Description: "Number of milliseconds to wait for a request to complete that is expected to be slow and heavy, either taking a long time to process or returning a large amount of data, before timing out the request. Examples include querying the Beacon Node for the state of a large number of validators.", + AffectsContainers: []ContainerID{ContainerID_Daemon}, + CanBeBlank: false, + OverwriteOnUpgrade: false, + }, + Default: map[Network]uint64{ + Network_All: 30000, + }, + }, } } @@ -106,6 +140,8 @@ func (cfg *ExternalBeaconConfig) GetParameters() []IParameter { &cfg.BeaconNode, &cfg.HttpUrl, &cfg.PrysmRpcUrl, + &cfg.FastTimeoutMs, + &cfg.SlowTimeoutMs, } } diff --git a/config/external-execution-config.go b/config/external-execution-config.go index 425f60e..478a33c 100644 --- a/config/external-execution-config.go +++ b/config/external-execution-config.go @@ -14,6 +14,12 @@ type ExternalExecutionConfig struct { // The URL of the Websocket endpoint WebsocketUrl Parameter[string] + + // Number of milliseconds to wait for a fast request to complete + FastTimeoutMs Parameter[uint64] + + // Number of milliseconds to wait for a slow request to complete + SlowTimeoutMs Parameter[uint64] } // Generates a new ExternalExecutionConfig configuration @@ -85,6 +91,34 @@ func NewExternalExecutionConfig() *ExternalExecutionConfig { Network_All: "", }, }, + + FastTimeoutMs: Parameter[uint64]{ + ParameterCommon: &ParameterCommon{ + ID: ids.FastTimeoutID, + Name: "Fast Timeout", + Description: "Number of milliseconds to wait for a request to complete that is expected to be fast and light before timing out the request.", + AffectsContainers: []ContainerID{ContainerID_Daemon}, + CanBeBlank: false, + OverwriteOnUpgrade: false, + }, + Default: map[Network]uint64{ + Network_All: 5000, + }, + }, + + SlowTimeoutMs: Parameter[uint64]{ + ParameterCommon: &ParameterCommon{ + ID: ids.SlowTimeoutID, + Name: "Slow Timeout", + Description: "Number of milliseconds to wait for a request to complete that is expected to be slow and heavy, either taking a long time to process or returning a large amount of data, before timing out the request. Examples include filtering through Ethereum event logs.", + AffectsContainers: []ContainerID{ContainerID_Daemon}, + CanBeBlank: false, + OverwriteOnUpgrade: false, + }, + Default: map[Network]uint64{ + Network_All: 30000, + }, + }, } } @@ -99,6 +133,8 @@ func (cfg *ExternalExecutionConfig) GetParameters() []IParameter { &cfg.ExecutionClient, &cfg.HttpUrl, &cfg.WebsocketUrl, + &cfg.FastTimeoutMs, + &cfg.SlowTimeoutMs, } } diff --git a/config/fallback-config.go b/config/fallback-config.go index 98e3425..aed1ba8 100644 --- a/config/fallback-config.go +++ b/config/fallback-config.go @@ -15,6 +15,9 @@ type FallbackConfig struct { // The URL of the Prysm gRPC endpoint (only needed if using Prysm VCs) PrysmRpcUrl Parameter[string] + + // The delay in milliseconds when checking a client again after it disconnects during a request + ReconnectDelayMs Parameter[uint64] } // Generates a new FallbackConfig configuration @@ -75,6 +78,20 @@ func NewFallbackConfig() *FallbackConfig { Network_All: "", }, }, + + ReconnectDelayMs: Parameter[uint64]{ + ParameterCommon: &ParameterCommon{ + ID: ids.FallbackReconnectDelayID, + Name: "Reconnect Delay", + Description: "The delay, in milliseconds, to wait after the primary Execution Client or primary Beacon Node disconnects during a request before trying it again.", + AffectsContainers: []ContainerID{ContainerID_Daemon}, + CanBeBlank: false, + OverwriteOnUpgrade: false, + }, + Default: map[Network]uint64{ + Network_All: 60000, + }, + }, } } @@ -90,6 +107,7 @@ func (cfg *FallbackConfig) GetParameters() []IParameter { &cfg.EcHttpUrl, &cfg.BnHttpUrl, &cfg.PrysmRpcUrl, + &cfg.ReconnectDelayMs, } } diff --git a/config/iconfig.go b/config/iconfig.go index 8e659dc..83dba80 100644 --- a/config/iconfig.go +++ b/config/iconfig.go @@ -1,6 +1,22 @@ package config -import "github.com/rocket-pool/node-manager-core/log" +import ( + "time" + + "github.com/rocket-pool/node-manager-core/log" +) + +// Timeout settings for the client +type ClientTimeouts struct { + // The timeout for requests that are expected to be fast + FastTimeout time.Duration + + // The timeout for requests that are expected to be slow and require either significant processing or a large return size from the server + SlowTimeout time.Duration + + // The delay before rechecking the primary client, if fallbacks support is enabled + RecheckDelay time.Duration +} // NMC servers typically provide some kind of persistent configuration; it must implement this interface. type IConfig interface { @@ -27,9 +43,15 @@ type IConfig interface { // The URLs for the Execution clients to use GetExecutionClientUrls() (string, string) + // The timeouts for the Execution clients and manager to use + GetExecutionClientTimeouts() ClientTimeouts + // The URLs for the Beacon nodes to use GetBeaconNodeUrls() (string, string) + // The timeouts for the Beacon nodes and manager to use + GetBeaconNodeTimeouts() ClientTimeouts + // The configuration for the daemon loggers GetLoggerOptions() log.LoggerOptions } diff --git a/config/ids/ids.go b/config/ids/ids.go index 0248c14..c465249 100644 --- a/config/ids/ids.go +++ b/config/ids/ids.go @@ -11,6 +11,8 @@ const ( PortID string = "port" OpenPortID string = "openPort" HttpUrlID string = "httpUrl" + FastTimeoutID string = "fastTimeout" + SlowTimeoutID string = "slowTimeout" EcID string = "executionClient" BnID string = "beaconNode" GraffitiID string = "graffiti" @@ -19,14 +21,15 @@ const ( CacheSizeID string = "cacheSize" // Logger - LoggerLevelID string = "level" - LoggerFormatID string = "format" - LoggerAddSourceID string = "addSource" - LoggerMaxSizeID string = "maxSize" - LoggerMaxBackupsID string = "maxBackups" - LoggerMaxAgeID string = "maxAge" - LoggerLocalTimeID string = "localTime" - LoggerCompressID string = "compress" + LoggerLevelID string = "level" + LoggerFormatID string = "format" + LoggerAddSourceID string = "addSource" + LoggerMaxSizeID string = "maxSize" + LoggerMaxBackupsID string = "maxBackups" + LoggerMaxAgeID string = "maxAge" + LoggerLocalTimeID string = "localTime" + LoggerCompressID string = "compress" + LoggerEnableHttpTracingID string = "enableHttpTracing" // Besu BesuJvmHeapSizeID string = "jvmHeapSize" @@ -48,6 +51,7 @@ const ( FallbackUseFallbackClientsID string = "useFallbackClients" FallbackEcHttpUrlID string = "ecHttpUrl" FallbackBnHttpUrlID string = "bnHttpUrl" + FallbackReconnectDelayID string = "reconnectDelay" // Geth GethEvmTimeoutID string = "evmTimeout" diff --git a/config/local-beacon-config.go b/config/local-beacon-config.go index 29dfcec..4a9d9c2 100644 --- a/config/local-beacon-config.go +++ b/config/local-beacon-config.go @@ -23,6 +23,12 @@ type LocalBeaconConfig struct { // Toggle for forwarding the HTTP API port outside of Docker OpenHttpPort Parameter[RpcPortMode] + // Number of milliseconds to wait for a fast request to complete + FastTimeoutMs Parameter[uint64] + + // Number of milliseconds to wait for a slow request to complete + SlowTimeoutMs Parameter[uint64] + // Subconfigs Lighthouse *LighthouseBnConfig Lodestar *LodestarBnConfig @@ -138,6 +144,34 @@ func NewLocalBeaconConfig() *LocalBeaconConfig { Network_All: RpcPortMode_Closed, }, }, + + FastTimeoutMs: Parameter[uint64]{ + ParameterCommon: &ParameterCommon{ + ID: ids.FastTimeoutID, + Name: "Fast Timeout", + Description: "Number of milliseconds to wait for a request to complete that is expected to be fast and light before timing out the request.", + AffectsContainers: []ContainerID{ContainerID_Daemon}, + CanBeBlank: false, + OverwriteOnUpgrade: false, + }, + Default: map[Network]uint64{ + Network_All: 5000, + }, + }, + + SlowTimeoutMs: Parameter[uint64]{ + ParameterCommon: &ParameterCommon{ + ID: ids.SlowTimeoutID, + Name: "Slow Timeout", + Description: "Number of milliseconds to wait for a request to complete that is expected to be slow and heavy, either taking a long time to process or returning a large amount of data, before timing out the request. Examples include querying the Beacon Node for the state of a large number of validators.", + AffectsContainers: []ContainerID{ContainerID_Daemon}, + CanBeBlank: false, + OverwriteOnUpgrade: false, + }, + Default: map[Network]uint64{ + Network_All: 30000, + }, + }, } cfg.Lighthouse = NewLighthouseBnConfig() @@ -162,6 +196,8 @@ func (cfg *LocalBeaconConfig) GetParameters() []IParameter { &cfg.P2pPort, &cfg.HttpPort, &cfg.OpenHttpPort, + &cfg.FastTimeoutMs, + &cfg.SlowTimeoutMs, } } diff --git a/config/local-execution-config.go b/config/local-execution-config.go index 758e8f8..02cad62 100644 --- a/config/local-execution-config.go +++ b/config/local-execution-config.go @@ -26,6 +26,12 @@ type LocalExecutionConfig struct { // P2P traffic port P2pPort Parameter[uint16] + // Number of milliseconds to wait for a fast request to complete + FastTimeoutMs Parameter[uint64] + + // Number of milliseconds to wait for a slow request to complete + SlowTimeoutMs Parameter[uint64] + // Subconfigs Geth *GethConfig Nethermind *NethermindConfig @@ -146,6 +152,34 @@ func NewLocalExecutionConfig() *LocalExecutionConfig { Network_All: 30303, }, }, + + FastTimeoutMs: Parameter[uint64]{ + ParameterCommon: &ParameterCommon{ + ID: ids.FastTimeoutID, + Name: "Fast Timeout", + Description: "Number of milliseconds to wait for a request to complete that is expected to be fast and light before timing out the request.", + AffectsContainers: []ContainerID{ContainerID_Daemon}, + CanBeBlank: false, + OverwriteOnUpgrade: false, + }, + Default: map[Network]uint64{ + Network_All: 5000, + }, + }, + + SlowTimeoutMs: Parameter[uint64]{ + ParameterCommon: &ParameterCommon{ + ID: ids.SlowTimeoutID, + Name: "Slow Timeout", + Description: "Number of milliseconds to wait for a request to complete that is expected to be slow and heavy, either taking a long time to process or returning a large amount of data, before timing out the request. Examples include filtering through Ethereum event logs.", + AffectsContainers: []ContainerID{ContainerID_Daemon}, + CanBeBlank: false, + OverwriteOnUpgrade: false, + }, + Default: map[Network]uint64{ + Network_All: 30000, + }, + }, } // Create the subconfigs @@ -171,6 +205,8 @@ func (cfg *LocalExecutionConfig) GetParameters() []IParameter { &cfg.EnginePort, &cfg.OpenApiPorts, &cfg.P2pPort, + &cfg.FastTimeoutMs, + &cfg.SlowTimeoutMs, } } diff --git a/config/logger-config.go b/config/logger-config.go index 4372cb1..f413b45 100644 --- a/config/logger-config.go +++ b/config/logger-config.go @@ -32,6 +32,9 @@ type LoggerConfig struct { // Toggle for compressing rotated logs Compress Parameter[bool] + + // Toggle for enabling HTTP request tracing + EnableHttpTracing Parameter[bool] } // Generates a new Logger configuration @@ -174,6 +177,18 @@ func NewLoggerConfig() *LoggerConfig { Network_All: true, }, }, + + EnableHttpTracing: Parameter[bool]{ + ParameterCommon: &ParameterCommon{ + ID: ids.LoggerEnableHttpTracingID, + Name: "Enable HTTP Tracing", + Description: "When enabled, each step of every HTTP request for the Execution Client and Beacon Node will be logged. This results in very verbose log files, so only enable this when you need to debug HTTP requests to your clients specifically.", + AffectsContainers: []ContainerID{ContainerID_Daemon}, + }, + Default: map[Network]bool{ + Network_All: false, + }, + }, } } @@ -193,6 +208,7 @@ func (cfg *LoggerConfig) GetParameters() []IParameter { &cfg.MaxAge, &cfg.LocalTime, &cfg.Compress, + &cfg.EnableHttpTracing, } } @@ -204,13 +220,14 @@ func (cfg *LoggerConfig) GetSubconfigs() map[string]IConfigSection { // Calculate the default number of Geth peers func (cfg *LoggerConfig) GetOptions() log.LoggerOptions { return log.LoggerOptions{ - MaxSize: int(cfg.MaxSize.Value), - MaxBackups: int(cfg.MaxBackups.Value), - MaxAge: int(cfg.MaxAge.Value), - LocalTime: cfg.LocalTime.Value, - Compress: cfg.Compress.Value, - Format: cfg.Format.Value, - Level: cfg.Level.Value, - AddSource: cfg.AddSource.Value, + MaxSize: int(cfg.MaxSize.Value), + MaxBackups: int(cfg.MaxBackups.Value), + MaxAge: int(cfg.MaxAge.Value), + LocalTime: cfg.LocalTime.Value, + Compress: cfg.Compress.Value, + Format: cfg.Format.Value, + Level: cfg.Level.Value, + AddSource: cfg.AddSource.Value, + EnableHttpTracing: cfg.EnableHttpTracing.Value, } } diff --git a/eth/std-rpc-client.go b/eth/std-rpc-client.go new file mode 100644 index 0000000..f09f469 --- /dev/null +++ b/eth/std-rpc-client.go @@ -0,0 +1,308 @@ +package eth + +import ( + "context" + "fmt" + "log/slog" + "math/big" + "net/http/httptrace" + "time" + + "github.com/ethereum/go-ethereum" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/ethclient" + "github.com/rocket-pool/node-manager-core/log" +) + +const ( + // Default timeout for fast requests + DefaultFastTimeout = 5 * time.Second + + // Default timeout for slow requests + DefaultSlowTimeout = 30 * time.Second +) + +// Options for creating a new StandardRpcClient +type StandardRpcClientOptions struct { + // Timeout to use for requests that should return quickly + FastTimeout time.Duration + + // Timeout to use for requests that are expected to take longer to process + SlowTimeout time.Duration +} + +// Standard RPC-based Execution Client binding with logging support, using Geth as the backing client implementation. +type StandardRpcClient struct { + client *ethclient.Client + defaultFastTimeout time.Duration + defaultSlowTimeout time.Duration +} + +// Creates a new StandardRpcClient instance +func NewStandardRpcClient(address string, opts *StandardRpcClientOptions) (*StandardRpcClient, error) { + client, err := ethclient.Dial(address) + if err != nil { + return nil, fmt.Errorf("error creating EC binding for [%s]: %w", address, err) + } + wrapper := &StandardRpcClient{ + client: client, + } + if opts != nil { + wrapper.defaultFastTimeout = opts.FastTimeout + wrapper.defaultSlowTimeout = opts.SlowTimeout + } else { + wrapper.defaultFastTimeout = DefaultFastTimeout + wrapper.defaultSlowTimeout = DefaultSlowTimeout + } + return wrapper, nil +} + +// CodeAt returns the code of the given account. This is needed to differentiate +// between contract internal errors and the local chain being out of sync. +func (c *StandardRpcClient) CodeAt(ctx context.Context, contract common.Address, blockNumber *big.Int) ([]byte, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "CodeAt") + return c.client.CodeAt(ctx, contract, blockNumber) +} + +// CallContract executes an Ethereum contract call with the specified data as the +// input. +func (c *StandardRpcClient) CallContract(ctx context.Context, call ethereum.CallMsg, blockNumber *big.Int) ([]byte, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "CallContract") + return c.client.CallContract(ctx, call, blockNumber) +} + +// HeaderByHash returns the block header with the given hash. +func (c *StandardRpcClient) HeaderByHash(ctx context.Context, hash common.Hash) (*types.Header, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "HeaderByHash") + return c.client.HeaderByHash(ctx, hash) +} + +// HeaderByNumber returns a block header from the current canonical chain. If number is +// nil, the latest known header is returned. +func (c *StandardRpcClient) HeaderByNumber(ctx context.Context, number *big.Int) (*types.Header, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "HeaderByNumber") + return c.client.HeaderByNumber(ctx, number) +} + +// PendingCodeAt returns the code of the given account in the pending state. +func (c *StandardRpcClient) PendingCodeAt(ctx context.Context, account common.Address) ([]byte, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "PendingCodeAt") + return c.client.PendingCodeAt(ctx, account) +} + +// PendingNonceAt retrieves the current pending nonce associated with an account. +func (c *StandardRpcClient) PendingNonceAt(ctx context.Context, account common.Address) (uint64, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "PendingNonceAt") + return c.client.PendingNonceAt(ctx, account) +} + +// SuggestGasPrice retrieves the currently suggested gas price to allow a timely +// execution of a transaction. +func (c *StandardRpcClient) SuggestGasPrice(ctx context.Context) (*big.Int, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "SuggestGasPrice") + return c.client.SuggestGasPrice(ctx) +} + +// SuggestGasTipCap retrieves the currently suggested 1559 priority fee to allow +// a timely execution of a transaction. +func (c *StandardRpcClient) SuggestGasTipCap(ctx context.Context) (*big.Int, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "SuggestGasTipCap") + return c.client.SuggestGasTipCap(ctx) +} + +// EstimateGas tries to estimate the gas needed to execute a specific +// transaction based on the current pending state of the backend blockchain. +// There is no guarantee that this is the true gas limit requirement as other +// transactions may be added or removed by miners, but it should provide a basis +// for setting a reasonable default. +func (c *StandardRpcClient) EstimateGas(ctx context.Context, call ethereum.CallMsg) (uint64, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "EstimateGas") + return c.client.EstimateGas(ctx, call) +} + +// SendTransaction injects the transaction into the pending pool for execution. +func (c *StandardRpcClient) SendTransaction(ctx context.Context, tx *types.Transaction) error { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "SendTransaction") + return c.client.SendTransaction(ctx, tx) +} + +// FilterLogs executes a log filter operation, blocking during execution and +// returning all the results in one batch. +func (c *StandardRpcClient) FilterLogs(ctx context.Context, query ethereum.FilterQuery) ([]types.Log, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultSlowTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "FilterLogs") + return c.client.FilterLogs(ctx, query) +} + +// SubscribeFilterLogs creates a background log filtering operation, returning +// a subscription immediately, which can be used to stream the found events. +func (c *StandardRpcClient) SubscribeFilterLogs(ctx context.Context, query ethereum.FilterQuery, ch chan<- types.Log) (ethereum.Subscription, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "SubscribeFilterLogs") + return c.client.SubscribeFilterLogs(ctx, query, ch) +} + +// TransactionReceipt returns the receipt of a transaction by transaction hash. +// Note that the receipt is not available for pending transactions. +func (c *StandardRpcClient) TransactionReceipt(ctx context.Context, txHash common.Hash) (*types.Receipt, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "TransactionReceipt") + return c.client.TransactionReceipt(ctx, txHash) +} + +// BlockNumber returns the most recent block number +func (c *StandardRpcClient) BlockNumber(ctx context.Context) (uint64, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "BlockNumber") + return c.client.BlockNumber(ctx) +} + +// BalanceAt returns the wei balance of the given account. +// The block number can be nil, in which case the balance is taken from the latest known block. +func (c *StandardRpcClient) BalanceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (*big.Int, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "BalanceAt") + return c.client.BalanceAt(ctx, account, blockNumber) +} + +// TransactionByHash returns the transaction with the given hash. +func (c *StandardRpcClient) TransactionByHash(ctx context.Context, hash common.Hash) (*types.Transaction, bool, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "TransactionByHash") + return c.client.TransactionByHash(ctx, hash) +} + +// NonceAt returns the account nonce of the given account. +// The block number can be nil, in which case the nonce is taken from the latest known block. +func (c *StandardRpcClient) NonceAt(ctx context.Context, account common.Address, blockNumber *big.Int) (uint64, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "NonceAt") + return c.client.NonceAt(ctx, account, blockNumber) +} + +// SyncProgress retrieves the current progress of the sync algorithm. If there's +// no sync currently running, it returns nil. +func (c *StandardRpcClient) SyncProgress(ctx context.Context) (*ethereum.SyncProgress, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "SyncProgress") + return c.client.SyncProgress(ctx) +} + +func (c *StandardRpcClient) ChainID(ctx context.Context) (*big.Int, error) { + // Prep the context + ctx, cancel := c.prepareContext(ctx, c.defaultFastTimeout) + defer cancel() + + ctx = c.logRequest(ctx, "ChainID") + return c.client.ChainID(ctx) +} + +/// ======================== +/// == Internal Functions == +/// ======================== + +// Adds a timeout to the context if one didn't already exist +func (c *StandardRpcClient) prepareContext(ctx context.Context, defaultTimeout time.Duration) (context.Context, context.CancelFunc) { + // Make a new context if it wasn't provided + if ctx == nil { + ctx = context.Background() + } + + // Return if there was already a deadline + _, hasDeadline := ctx.Deadline() + if hasDeadline { + return ctx, func() {} + } + + // Add a default timeout if there isn't one + return context.WithTimeout(ctx, defaultTimeout) +} + +// Logs the request and returns a context with the provided timeout and HTTP tracing enabled if requested +func (c *StandardRpcClient) logRequest(ctx context.Context, methodName string) context.Context { + logger, _ := log.FromContext(ctx) + if logger == nil { + return ctx + } + + // Log the request + args := []any{ + slog.String(log.MethodKey, methodName), + } + deadline, hasDeadline := ctx.Deadline() + if hasDeadline { + args = append(args, slog.Time("deadline", deadline.UTC())) + } + logger.Debug("Running EC request", args...) + tracer := logger.HttpTracer + if tracer != nil { + // Enable HTTP tracing if requested + ctx = httptrace.WithClientTrace(ctx, tracer) + } + return ctx +} diff --git a/log/logger.go b/log/logger.go index 1bdd6df..2cb96d6 100644 --- a/log/logger.go +++ b/log/logger.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "log/slog" + "net/http/httptrace" "os" "path/filepath" @@ -13,6 +14,11 @@ import ( // Logger is a simple wrapper for a slog Logger that writes to a file on disk. type Logger struct { *slog.Logger + + // The HTTP client tracer for this logger if HTTP tracing was enabled + HttpTracer *httptrace.ClientTrace + + // Internal fields logFile *lumberjack.Logger path string } @@ -51,11 +57,16 @@ func NewLogger(logFilePath string, options LoggerOptions) (*Logger, error) { case LogFormat_Logfmt: handler = slog.NewTextHandler(logFile, logOptions) } - return &Logger{ + logger := &Logger{ Logger: slog.New(handler), logFile: logFile, path: logFilePath, - }, nil + } + + if options.EnableHttpTracing { + logger.HttpTracer = logger.createHttpClientTracer() + } + return logger, nil } // Creates a new logger that uses the slog default logger, which writes to the terminal instead of a file. @@ -107,3 +118,55 @@ func FromContext(ctx context.Context) (*Logger, bool) { log, ok := ctx.Value(ContextLogKey).(*Logger) return log, ok } + +// ======================== +// === Internal Methods === +// ======================== + +// Creates an HTTP client tracer for logging HTTP client events +func (l *Logger) createHttpClientTracer() *httptrace.ClientTrace { + tracer := &httptrace.ClientTrace{} + tracer.ConnectDone = func(network, addr string, err error) { + l.Debug("HTTP Connect Done", + slog.String("network", network), + slog.String("addr", addr), + Err(err), + ) + } + tracer.DNSDone = func(dnsInfo httptrace.DNSDoneInfo) { + l.Debug("HTTP DNS Done", + slog.String("addrs", fmt.Sprint(dnsInfo.Addrs)), + slog.Bool("coalesced", dnsInfo.Coalesced), + Err(dnsInfo.Err), + ) + } + tracer.DNSStart = func(dnsInfo httptrace.DNSStartInfo) { + l.Debug("HTTP DNS Start", + slog.String("host", dnsInfo.Host), + ) + } + tracer.GotConn = func(connInfo httptrace.GotConnInfo) { + l.Debug("HTTP Got Connection", + slog.Bool("reused", connInfo.Reused), + slog.Bool("wasIdle", connInfo.WasIdle), + slog.Duration("idleTime", connInfo.IdleTime), + slog.String("localAddr", connInfo.Conn.LocalAddr().String()), + slog.String("remoteAddr", connInfo.Conn.RemoteAddr().String()), + ) + } + tracer.GotFirstResponseByte = func() { + l.Debug("HTTP Got First Response Byte") + } + tracer.PutIdleConn = func(err error) { + l.Debug("HTTP Put Idle Connection", + Err(err), + ) + } + tracer.WroteRequest = func(wroteInfo httptrace.WroteRequestInfo) { + l.Debug("HTTP Wrote Request", + Err(wroteInfo.Err), + ) + } + + return tracer +} diff --git a/log/options.go b/log/options.go index 2dcc7ae..a61adcd 100644 --- a/log/options.go +++ b/log/options.go @@ -44,4 +44,7 @@ type LoggerOptions struct { // True to include the source code position of the log statement in log messages AddSource bool + + // True to enable HTTP request tracing + EnableHttpTracing bool } diff --git a/node/services/bn-manager.go b/node/services/bn-manager.go index e0f2a89..bffdacf 100644 --- a/node/services/bn-manager.go +++ b/node/services/bn-manager.go @@ -3,25 +3,30 @@ package services import ( "context" "fmt" + "log/slog" "time" "github.com/ethereum/go-ethereum/common" "github.com/rocket-pool/node-manager-core/api/types" "github.com/rocket-pool/node-manager-core/beacon" + "github.com/rocket-pool/node-manager-core/log" ) // This is a proxy for multiple Beacon clients, providing natural fallback support if one of them fails. type BeaconClientManager struct { - primaryBc beacon.IBeaconClient - fallbackBc beacon.IBeaconClient - primaryReady bool - fallbackReady bool - expectedChainID uint - fallbackEnabled bool + primaryBc beacon.IBeaconClient + fallbackBc beacon.IBeaconClient + primaryReady bool + fallbackReady bool + expectedChainID uint + fallbackEnabled bool + primaryFailTime time.Time + fallbackFailTime time.Time + recheckTime time.Duration } // Creates a new BeaconClientManager instance -func NewBeaconClientManager(primaryBc beacon.IBeaconClient, chainID uint, clientTimeout time.Duration) *BeaconClientManager { +func NewBeaconClientManager(primaryBc beacon.IBeaconClient, chainID uint) *BeaconClientManager { return &BeaconClientManager{ primaryBc: primaryBc, primaryReady: true, @@ -32,7 +37,7 @@ func NewBeaconClientManager(primaryBc beacon.IBeaconClient, chainID uint, client } // Creates a new BeaconClientManager instance with a fallback client -func NewBeaconClientManagerWithFallback(primaryBc beacon.IBeaconClient, fallbackBc beacon.IBeaconClient, chainID uint, clientTimeout time.Duration) *BeaconClientManager { +func NewBeaconClientManagerWithFallback(primaryBc beacon.IBeaconClient, fallbackBc beacon.IBeaconClient, chainID uint, clientRecheckTime time.Duration) *BeaconClientManager { return &BeaconClientManager{ primaryBc: primaryBc, fallbackBc: fallbackBc, @@ -40,6 +45,7 @@ func NewBeaconClientManagerWithFallback(primaryBc beacon.IBeaconClient, fallback fallbackReady: true, expectedChainID: chainID, fallbackEnabled: true, + recheckTime: clientRecheckTime, } } @@ -64,7 +70,7 @@ func (m *BeaconClientManager) IsFallbackReady() bool { } func (m *BeaconClientManager) IsFallbackEnabled() bool { - return m.fallbackBc != nil + return m.fallbackEnabled } func (m *BeaconClientManager) GetClientTypeName() string { @@ -73,10 +79,49 @@ func (m *BeaconClientManager) GetClientTypeName() string { func (m *BeaconClientManager) SetPrimaryReady(ready bool) { m.primaryReady = ready + if ready { + m.primaryFailTime = time.Time{} + } else { + m.primaryFailTime = time.Now() + } } func (m *BeaconClientManager) SetFallbackReady(ready bool) { m.fallbackReady = ready + if ready { + m.fallbackFailTime = time.Time{} + } else { + m.fallbackFailTime = time.Now() + } +} + +func (m *BeaconClientManager) RecheckFailTimes(logger *log.Logger) { + if !m.fallbackEnabled { + return + } + + if !m.primaryReady { + timeSincePrimaryFail := time.Since(m.primaryFailTime) + if time.Since(m.primaryFailTime) > m.recheckTime { + if logger != nil { + logger.Info("Reconnecting primary Beacon client", + slog.Duration("elapsed", timeSincePrimaryFail), + ) + } + m.SetPrimaryReady(true) + } + } + if !m.fallbackReady { + timeSinceFallbackFail := time.Since(m.fallbackFailTime) + if time.Since(m.fallbackFailTime) > m.recheckTime { + if logger != nil { + logger.Info("Reconnecting fallback Beacon client", + slog.Duration("elapsed", timeSinceFallbackFail), + ) + } + m.SetFallbackReady(true) + } + } } /// ======================= diff --git a/node/services/ec-manager.go b/node/services/ec-manager.go index bf6189f..5f0ac34 100644 --- a/node/services/ec-manager.go +++ b/node/services/ec-manager.go @@ -3,6 +3,7 @@ package services import ( "context" "fmt" + "log/slog" "math" "math/big" "time" @@ -12,41 +13,43 @@ import ( "github.com/ethereum/go-ethereum/core/types" apitypes "github.com/rocket-pool/node-manager-core/api/types" "github.com/rocket-pool/node-manager-core/eth" + "github.com/rocket-pool/node-manager-core/log" ) // This is a proxy for multiple ETH clients, providing natural fallback support if one of them fails. type ExecutionClientManager struct { - primaryEc eth.IExecutionClient - fallbackEc eth.IExecutionClient - primaryReady bool - fallbackReady bool - expectedChainID uint - timeout time.Duration - fallbackEnabled bool + primaryEc eth.IExecutionClient + fallbackEc eth.IExecutionClient + primaryReady bool + fallbackReady bool + expectedChainID uint + fallbackEnabled bool + primaryFailTime time.Time + fallbackFailTime time.Time + recheckTime time.Duration } // Creates a new ExecutionClientManager instance -func NewExecutionClientManager(primaryEc eth.IExecutionClient, chainID uint, clientTimeout time.Duration) *ExecutionClientManager { +func NewExecutionClientManager(primaryEc eth.IExecutionClient, chainID uint) *ExecutionClientManager { return &ExecutionClientManager{ primaryEc: primaryEc, primaryReady: true, fallbackReady: false, expectedChainID: chainID, - timeout: clientTimeout, fallbackEnabled: false, } } // Creates a new ExecutionClientManager instance that includes a fallback client -func NewExecutionClientManagerWithFallback(primaryEc eth.IExecutionClient, fallbackEc eth.IExecutionClient, chainID uint, clientTimeout time.Duration) *ExecutionClientManager { +func NewExecutionClientManagerWithFallback(primaryEc eth.IExecutionClient, fallbackEc eth.IExecutionClient, chainID uint, clientRecheckTime time.Duration) *ExecutionClientManager { return &ExecutionClientManager{ primaryEc: primaryEc, fallbackEc: fallbackEc, primaryReady: true, fallbackReady: true, expectedChainID: chainID, - timeout: clientTimeout, fallbackEnabled: true, + recheckTime: clientRecheckTime, } } @@ -71,7 +74,7 @@ func (m *ExecutionClientManager) IsFallbackReady() bool { } func (m *ExecutionClientManager) IsFallbackEnabled() bool { - return m.fallbackEc != nil + return m.fallbackEnabled } func (m *ExecutionClientManager) GetClientTypeName() string { @@ -80,10 +83,49 @@ func (m *ExecutionClientManager) GetClientTypeName() string { func (m *ExecutionClientManager) SetPrimaryReady(ready bool) { m.primaryReady = ready + if ready { + m.primaryFailTime = time.Time{} + } else { + m.primaryFailTime = time.Now() + } } func (m *ExecutionClientManager) SetFallbackReady(ready bool) { m.fallbackReady = ready + if ready { + m.fallbackFailTime = time.Time{} + } else { + m.fallbackFailTime = time.Now() + } +} + +func (m *ExecutionClientManager) RecheckFailTimes(logger *log.Logger) { + if !m.fallbackEnabled { + return + } + + if !m.primaryReady { + timeSincePrimaryFail := time.Since(m.primaryFailTime) + if time.Since(m.primaryFailTime) > m.recheckTime { + if logger != nil { + logger.Info("Reconnecting primary Execution client", + slog.Duration("elapsed", timeSincePrimaryFail), + ) + } + m.SetPrimaryReady(true) + } + } + if !m.fallbackReady { + timeSinceFallbackFail := time.Since(m.fallbackFailTime) + if time.Since(m.fallbackFailTime) > m.recheckTime { + if logger != nil { + logger.Info("Reconnecting fallback Execution client", + slog.Duration("elapsed", timeSinceFallbackFail), + ) + } + m.SetFallbackReady(true) + } + } } /// ======================== diff --git a/node/services/function-runners.go b/node/services/function-runners.go index 5db6330..96635c3 100644 --- a/node/services/function-runners.go +++ b/node/services/function-runners.go @@ -19,52 +19,67 @@ type function2[ClientType any, ReturnType1 any, ReturnType2 any] func(ClientType // Attempts to run a function progressively through each client until one succeeds or they all fail. // Expects functions with 1 output and an error; for functions with other signatures, see the other runFunctionX functions. func runFunction1[ClientType any, ReturnType any](m iClientManagerImpl[ClientType], ctx context.Context, function function1[ClientType, ReturnType]) (ReturnType, error) { - logger, _ := log.FromContext(ctx) + // If there's no fallback, just run the function on the primary + if !m.IsFallbackEnabled() { + return function(m.GetPrimaryClient()) + } + var blank ReturnType + logger, _ := log.FromContext(ctx) typeName := m.GetClientTypeName() + // Check the clients for recovery + m.RecheckFailTimes(logger) + // Check if we can use the primary if m.IsPrimaryReady() { // Try to run the function on the primary result, err := function(m.GetPrimaryClient()) - if err != nil { - if isDisconnected(err) { - // If it's disconnected, log it and try the fallback - m.SetPrimaryReady(false) - if m.IsFallbackEnabled() { - logger.Warn("Primary "+typeName+" client disconnected, using fallback...", log.Err(err)) - return runFunction1[ClientType, ReturnType](m, ctx, function) - } else { - logger.Warn("Primary "+typeName+" disconnected and no fallback is configured.", log.Err(err)) - return blank, fmt.Errorf("all " + typeName + "s failed") - } - } - // If it's a different error, just return it + if err == nil { + // If there's no error, return the result + return result, nil + } + + // If it's not a disconnect error, just return it + if !isDisconnected(err) { return blank, err } - // If there's no error, return the result - return result, nil + + // Log the disconnect and try the fallback if available + m.SetPrimaryReady(false) + if logger != nil { + logger.Warn("Primary "+typeName+" client disconnected, using fallback...", log.Err(err)) + } + return runFunction1[ClientType, ReturnType](m, ctx, function) } + // Check if we can use the fallback if m.IsFallbackReady() { // Try to run the function on the fallback result, err := function(m.GetFallbackClient()) - if err != nil { - if isDisconnected(err) { - // If it's disconnected, log it and try the fallback - logger.Warn("Fallback "+typeName+" disconnected", log.Err(err)) - m.SetFallbackReady(false) - return blank, fmt.Errorf("all " + typeName + "s failed") - } - - // If it's a different error, just return it + if err == nil { + // If there's no error, return the result + return result, nil + } + + // If it's not a disconnect error, just return it + if !isDisconnected(err) { return blank, err } - // If there's no error, return the result - return result, nil + + // If Log the disconnect and return an error + if logger != nil { + logger.Warn("Fallback "+typeName+" disconnected", log.Err(err)) + } + m.SetFallbackReady(false) + return blank, fmt.Errorf("all " + typeName + "s failed") } - return blank, fmt.Errorf("no " + typeName + "s were ready") + // If neither client is ready, just run the primary + if logger != nil { + logger.Warn("No " + typeName + "s are ready, forcing use of primary...") + } + return function(m.GetPrimaryClient()) } // Run a function with 0 outputs and an error diff --git a/node/services/manager.go b/node/services/manager.go index b57dfae..a086025 100644 --- a/node/services/manager.go +++ b/node/services/manager.go @@ -1,5 +1,7 @@ package services +import "github.com/rocket-pool/node-manager-core/log" + type IClientManager[ClientType any] interface { GetPrimaryClient() ClientType GetFallbackClient() ClientType @@ -15,4 +17,5 @@ type iClientManagerImpl[ClientType any] interface { // Internal functions SetPrimaryReady(bool) SetFallbackReady(bool) + RecheckFailTimes(logger *log.Logger) } diff --git a/node/services/service-provider.go b/node/services/service-provider.go index a3c004d..8daaf37 100644 --- a/node/services/service-provider.go +++ b/node/services/service-provider.go @@ -5,10 +5,8 @@ import ( "fmt" "path/filepath" "runtime" - "time" dclient "github.com/docker/docker/client" - "github.com/ethereum/go-ethereum/ethclient" "github.com/rocket-pool/node-manager-core/beacon/client" "github.com/rocket-pool/node-manager-core/config" "github.com/rocket-pool/node-manager-core/eth" @@ -42,36 +40,52 @@ type ServiceProvider struct { } // Creates a new ServiceProvider instance based on the given config -func NewServiceProvider(cfg config.IConfig, clientTimeout time.Duration) (*ServiceProvider, error) { +func NewServiceProvider(cfg config.IConfig) (*ServiceProvider, error) { resources := cfg.GetNetworkResources() // EC Manager var ecManager *ExecutionClientManager primaryEcUrl, fallbackEcUrl := cfg.GetExecutionClientUrls() - primaryEc, err := ethclient.Dial(primaryEcUrl) + timeouts := cfg.GetExecutionClientTimeouts() + ecOpts := ð.StandardRpcClientOptions{ + FastTimeout: timeouts.FastTimeout, + SlowTimeout: timeouts.SlowTimeout, + } + primaryEc, err := eth.NewStandardRpcClient(primaryEcUrl, ecOpts) if err != nil { return nil, fmt.Errorf("error connecting to primary EC at [%s]: %w", primaryEcUrl, err) } if fallbackEcUrl != "" { // Get the fallback EC url, if applicable - fallbackEc, err := ethclient.Dial(fallbackEcUrl) + fallbackEc, err := eth.NewStandardRpcClient(fallbackEcUrl, ecOpts) if err != nil { return nil, fmt.Errorf("error connecting to fallback EC at [%s]: %w", fallbackEcUrl, err) } - ecManager = NewExecutionClientManagerWithFallback(primaryEc, fallbackEc, resources.ChainID, clientTimeout) + ecManager = NewExecutionClientManagerWithFallback(primaryEc, fallbackEc, resources.ChainID, timeouts.RecheckDelay) } else { - ecManager = NewExecutionClientManager(primaryEc, resources.ChainID, clientTimeout) + ecManager = NewExecutionClientManager(primaryEc, resources.ChainID) } // Beacon manager var bcManager *BeaconClientManager primaryBnUrl, fallbackBnUrl := cfg.GetBeaconNodeUrls() - primaryBc := client.NewStandardHttpClient(primaryBnUrl, clientTimeout) + timeouts = cfg.GetBeaconNodeTimeouts() + bnOpts := &client.StandardHttpClientOpts{ + FastTimeout: timeouts.FastTimeout, + SlowTimeout: timeouts.SlowTimeout, + } + primaryBc, err := client.NewStandardHttpClient(primaryBnUrl, bnOpts) + if err != nil { + return nil, fmt.Errorf("error connecting to primary BC at [%s]: %w", primaryBnUrl, err) + } if fallbackBnUrl != "" { - fallbackBc := client.NewStandardHttpClient(fallbackBnUrl, clientTimeout) - bcManager = NewBeaconClientManagerWithFallback(primaryBc, fallbackBc, resources.ChainID, clientTimeout) + fallbackBc, err := client.NewStandardHttpClient(fallbackBnUrl, bnOpts) + if err != nil { + return nil, fmt.Errorf("error connecting to fallback BC at [%s]: %w", fallbackBnUrl, err) + } + bcManager = NewBeaconClientManagerWithFallback(primaryBc, fallbackBc, resources.ChainID, timeouts.RecheckDelay) } else { - bcManager = NewBeaconClientManager(primaryBc, resources.ChainID, clientTimeout) + bcManager = NewBeaconClientManager(primaryBc, resources.ChainID) } // Docker client