Skip to content

Commit

Permalink
Merge pull request #633 from oasisprotocol/mitjat/e2e-cache-http
Browse files Browse the repository at this point in the history
e2e regression tests: HTTP caching proxy, new cases
  • Loading branch information
mitjat authored Feb 23, 2024
2 parents 702f22e + 534a3b1 commit 908a841
Show file tree
Hide file tree
Showing 31 changed files with 10,441 additions and 157 deletions.
1 change: 0 additions & 1 deletion .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,5 @@ docker/node/etc/genesis.json
tests/e2e/testnet/net-runner

# Local workdirs
cache
.*
*.log
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ tests/e2e_regression/*/actual/*

# Log output.
**/*.log
/cache
/logs

# API files.
Expand Down
17 changes: 9 additions & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,15 @@ accept-e2e-regression-suite:
ifndef SUITE
$(error SUITE is undefined)
endif
@# Delete all old expected files first, in case any test cases were renamed or removed.
rm -rf ./tests/e2e_regression/$(SUITE)/expected
@# Copy the actual outputs to the expected outputs.
cp -r ./tests/e2e_regression/$(SUITE)/{actual,expected}
@# The result of the "spec" test is a special case. It should always match the
@# current openAPI spec file, so we symlink it to avoid having to update the expected
@# output every time the spec changes.
rm ./tests/e2e_regression/$(SUITE)/expected/spec.body
[[ -d ./tests/e2e_regression/$(SUITE)/actual ]] || { echo "Note: No actual outputs found for suite $(SUITE). Nothing to accept."; exit 0; } \
# Delete all old expected files first, in case any test cases were renamed or removed. \
rm -rf ./tests/e2e_regression/$(SUITE)/expected; \
# Copy the actual outputs to the expected outputs. \
cp -r ./tests/e2e_regression/$(SUITE)/{actual,expected}; \
# The result of the "spec" test is a special case. It should always match the \
# current openAPI spec file, so we symlink it to avoid having to update the expected \
# output every time the spec changes. \
rm -f ./tests/e2e_regression/$(SUITE)/expected/spec.body; \
ln -s ../../../../api/spec/v1.yaml ./tests/e2e_regression/$(SUITE)/expected/spec.body

# Format code.
Expand Down
41 changes: 27 additions & 14 deletions analyzer/evmverifier/evmverifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"
"time"

ethCommon "github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -44,20 +45,28 @@ func NewAnalyzer(
logger.Warn("EVM contracts verifier only supports testnet/mainnet, stopping", "chain_name", chain)
return nil, fmt.Errorf("invalid chainName %s, expected one of testnet/mainnet", chain)
}
// Default interval is 5 minutes.
if cfg.Interval == 0 {
cfg.Interval = 5 * time.Minute
}
if cfg.Interval < time.Minute {
return nil, fmt.Errorf("invalid interval %s, evm contracts verifier interval must be at least 1 minute to meet Sourcify rate limits", cfg.Interval.String())
}
// interItemDelay should be at least 1 second to meet Sourcify rate limits.
if cfg.InterItemDelay == 0 {
cfg.InterItemDelay = time.Second
}
if cfg.InterItemDelay < time.Second {
return nil, fmt.Errorf("invalid interItemDelay %s, evm contracts verifier inter item delay must be at least 1 second to meet sourcify rate limits", cfg.InterItemDelay.String())

// Assuming we're running against "real" Sourcify, impose conservative request rates so we don't get banned.
// If we're running against localhost, assume it's backed by a local cache and proceed quickly.
// NOTE: We might hit Sourcify at very high rates if the local cache is empty, and the default intervals (0) are used.
// In my experiment with 26 contracts, that was not a problem - Sourcify did not have time to ban me.
if !(strings.Contains(sourcifyServerUrl, "localhost") || strings.Contains(sourcifyServerUrl, "127.0.0.1")) {
// Default interval is 5 minutes.
if cfg.Interval == 0 {
cfg.Interval = 5 * time.Minute
}
if cfg.Interval < time.Minute {
return nil, fmt.Errorf("invalid interval %s, evm contracts verifier interval must be at least 1 minute to meet Sourcify rate limits", cfg.Interval.String())
}
// interItemDelay should be at least 1 second to meet Sourcify rate limits.
if cfg.InterItemDelay == 0 {
cfg.InterItemDelay = time.Second
}
if cfg.InterItemDelay < time.Second {
return nil, fmt.Errorf("invalid interItemDelay %s, evm contracts verifier inter item delay must be at least 1 second to meet sourcify rate limits", cfg.InterItemDelay.String())
}
}

client, err := sourcify.NewClient(sourcifyServerUrl, chain, logger)
if err != nil {
return nil, err
Expand Down Expand Up @@ -194,7 +203,11 @@ func (p *processor) ProcessItem(ctx context.Context, batch *storage.QueryBatch,
}

batch.Queue(
queries.RuntimeEVMVerifyContractUpdate,
// NOTE: This also updates `verification_info_downloaded_at`, causing the `evm_abi` to re-parse
// the contract's txs and events.
// NOTE: We upsert rather than update; if the contract is not in the db yet, UPDATE would ignore the
// contract and this analyzer would keep retrying it on every iteration.
queries.RuntimeEVMVerifyContractUpsert,
p.runtime,
item.Addr,
abi.Output.ABI,
Expand Down
78 changes: 41 additions & 37 deletions analyzer/item/item.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,11 @@ const (
var ErrEmptyBatch = errors.New("no items in batch")

type itemBasedAnalyzer[Item any] struct {
maxBatchSize uint64
stopOnEmptyQueue bool
fixedInterval time.Duration
interItemDelay time.Duration
analyzerName string
maxBatchSize uint64
stopIfQueueEmptyFor time.Duration
fixedInterval time.Duration
interItemDelay time.Duration
analyzerName string

processor ItemProcessor[Item]

Expand All @@ -61,8 +61,8 @@ type ItemProcessor[Item any] interface {

// NewAnalyzer returns a new item based analyzer using the provided item processor.
//
// If stopOnEmptyQueue is true, the analyzer will process batches of items until its
// work queue is empty, at which point it will terminate and return. Likely to
// If stopIfQueueEmptyFor is a non-zero duration, the analyzer will process batches of items until its
// work queue is empty for `stopIfQueueEmptyFor`, at which point it will terminate and return. Likely to
// be used in the regression tests.
//
// If fixedInterval is provided, the analyzer will process one batch every fixedInterval.
Expand All @@ -79,15 +79,15 @@ func NewAnalyzer[Item any](
cfg.BatchSize = defaultBatchSize
}
a := &itemBasedAnalyzer[Item]{
cfg.BatchSize,
cfg.StopOnEmptyQueue,
cfg.Interval,
cfg.InterItemDelay,
name,
processor,
target,
logger,
metrics.NewDefaultAnalysisMetrics(name),
maxBatchSize: cfg.BatchSize,
stopIfQueueEmptyFor: cfg.StopIfQueueEmptyFor,
fixedInterval: cfg.Interval,
interItemDelay: cfg.InterItemDelay,
analyzerName: name,
processor: processor,
target: target,
logger: logger,
metrics: metrics.NewDefaultAnalysisMetrics(name),
}

return a, nil
Expand Down Expand Up @@ -195,41 +195,45 @@ func (a *itemBasedAnalyzer[Item]) Start(ctx context.Context) {
)
return
}
mostRecentTask := time.Now()

for {
delay := backoff.Timeout()
if a.fixedInterval != 0 {
delay = a.fixedInterval
}
select {
case <-time.After(delay):
// Process another batch of items.
case <-ctx.Done():
a.logger.Warn("shutting down item analyzer", "reason", ctx.Err())
return
}
// Update queueLength
queueLength, err := a.sendQueueLengthMetric(ctx)
if err == nil && queueLength == 0 && a.stopOnEmptyQueue {
a.logger.Warn("item analyzer work queue is empty; shutting down")
// Stop if queue has been empty for a while, and configured to do so.
if err == nil && queueLength == 0 && a.stopIfQueueEmptyFor != 0 && time.Since(mostRecentTask) > a.stopIfQueueEmptyFor {
a.logger.Warn("item analyzer work queue has been empty for a while; shutting down",
"queue_empty_since", mostRecentTask,
"queue_empty_for", time.Since(mostRecentTask),
"stop_if_queue_empty_for", a.stopIfQueueEmptyFor)
return
}
a.logger.Info("work queue length", "num_items", queueLength)

numProcessed, err := a.processBatch(ctx)
if err != nil {
if err != nil { //nolint:gocritic
a.logger.Error("error processing batch", "err", err)
backoff.Failure()
continue
}
if numProcessed == 0 {
// Count this as a failure to reduce the polling when we are
// running faster than the block analyzer can find new tokens.
} else if numProcessed == 0 {
// We are running faster than work is being created. Reduce needless GetItems() calls.
backoff.Failure()
continue
} else {
mostRecentTask = time.Now()
backoff.Success()
}

backoff.Success()
// Sleep a little before the next batch.
delay := backoff.Timeout()
if a.fixedInterval != 0 {
delay = a.fixedInterval
}
select {
case <-time.After(delay):
// Process another batch of items.
case <-ctx.Done():
a.logger.Warn("shutting down item analyzer", "reason", ctx.Err())
return
}
}
}

Expand Down
8 changes: 4 additions & 4 deletions analyzer/item/item_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,10 @@ const (

// Default item based config.
var testItemBasedConfig = &config.ItemBasedAnalyzerConfig{
BatchSize: 3,
StopOnEmptyQueue: true,
Interval: 0, // use backoff
InterItemDelay: 0,
BatchSize: 3,
StopIfQueueEmptyFor: time.Second,
Interval: 0, // use backoff
InterItemDelay: 0,
}

type mockItem struct {
Expand Down
22 changes: 15 additions & 7 deletions analyzer/queries/queries.go
Original file line number Diff line number Diff line change
Expand Up @@ -925,15 +925,23 @@ var (
WHERE
runtime = $1 AND verification_info_downloaded_at IS NULL`

RuntimeEVMVerifyContractUpdate = `
UPDATE chain.evm_contracts
RuntimeEVMVerifiedContracts = `
SELECT
contracts.contract_address,
contracts.verification_level
FROM chain.evm_contracts AS contracts
WHERE
runtime = $1 AND verification_level IS NOT NULL`

RuntimeEVMVerifyContractUpsert = `
INSERT INTO chain.evm_contracts (runtime, contract_address, verification_info_downloaded_at, abi, compilation_metadata, source_files)
VALUES ($1, $2, CURRENT_TIMESTAMP, $3, $4, $5)
ON CONFLICT (runtime, contract_address) DO UPDATE
SET
verification_info_downloaded_at = CURRENT_TIMESTAMP,
abi = $3,
compilation_metadata = $4,
source_files = $5
WHERE
runtime = $1 AND contract_address = $2`
abi = EXCLUDED.abi,
compilation_metadata = EXCLUDED.compilation_metadata,
source_files = EXCLUDED.source_files`

RuntimeEvmVerifiedContractTxs = `
WITH abi_contracts AS (
Expand Down
106 changes: 106 additions & 0 deletions cache/httpproxy/caching_http_proxy.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package httpproxy

import (
"bytes"
"fmt"
"io"
"net/http"
"net/url"
"path/filepath"
"regexp"
"time"

"github.com/oasisprotocol/nexus/cache/kvstore"
cmdCommon "github.com/oasisprotocol/nexus/cmd/common"
"github.com/oasisprotocol/nexus/config"
)

// cachingHttpProxy is a minimal HTTP handler that caches responses; it is inteded *for testing only*
// and comes with several caveats:
// - The cache is never invalidated.
// - All requests are forwarded to the same target host.
// - Requests are cached based on the full URL, NOT the headers or the method (GET vs POST) or the request body.
// - Low-likelihood errors (e.g. malformed URLs) result in panics.
type cachingHttpProxy struct {
client *http.Client
cache kvstore.KVStore
targetHost string // Must include the protocol (e.g. "http://")
}

var _ http.Handler = (*cachingHttpProxy)(nil)

type CacheableResponse struct {
StatusCode int
Header http.Header
Body []byte
}

func (p cachingHttpProxy) ServeHTTP(w http.ResponseWriter, req *http.Request) {
// Rewrite the request URL to new host
targetURL, _ := url.JoinPath(p.targetHost, req.URL.RequestURI())
req, err := http.NewRequestWithContext(req.Context(), req.Method, targetURL, req.Body)
if err != nil {
panic(fmt.Sprintf("error rewriting request to %s %s: %v", req.Method, req.URL.String(), err))
}

// Get the response to the modified request (via cache)
cResp, err := kvstore.GetFromCacheOrCall(p.cache, false, kvstore.CacheKey(req.URL.RequestURI()), func() (*CacheableResponse, error) {
resp, err2 := p.client.Do(req)
if err2 != nil {
return nil, err2
}
defer resp.Body.Close()
body, err2 := io.ReadAll(resp.Body)
if err2 != nil {
return nil, fmt.Errorf("error reading response body from %s: %v", resp.Request.URL.String(), err2)
}
return &CacheableResponse{
StatusCode: resp.StatusCode,
Header: resp.Header,
Body: body,
}, nil
})
if err != nil {
cResp = &CacheableResponse{
StatusCode: http.StatusInternalServerError,
Header: http.Header{},
Body: []byte(fmt.Sprintf("error proxying request: %v", err)),
}
}

// Write out the response to `w`.
for key, values := range cResp.Header {
for _, value := range values {
w.Header().Add(key, value)
}
}
w.WriteHeader(cResp.StatusCode) // 200, 404, etc
if _, err = io.Copy(w, bytes.NewReader(cResp.Body)); err != nil {
panic(err)
}
}

// Creates a http.Server that proxies all requests to the target URL.
// The server caches all responses in a persisted key-value store, located in an
// autogenerated subdirectory of the cache root dir.
func NewHttpServer(cacheCfg config.CacheConfig, proxyCfg config.HttpCachingProxyConfig) (*http.Server, error) {
// Derive the cache root dir from the target URL.
cleanTargetUrl := regexp.MustCompile("[^a-zA-Z0-9]+").ReplaceAllString(proxyCfg.TargetURL, "_")
cachePath := filepath.Join(cacheCfg.CacheDir, cleanTargetUrl)
kvStore, err := kvstore.OpenKVStore(cmdCommon.RootLogger().WithModule("caching-http-proxy"), cachePath, nil)
if err != nil {
return nil, err
}

handler := cachingHttpProxy{
client: &http.Client{},
cache: kvStore,
targetHost: proxyCfg.TargetURL,
}

return &http.Server{
Addr: proxyCfg.HostAddr,
Handler: handler,
ReadHeaderTimeout: time.Second,
}, nil
}
Loading

0 comments on commit 908a841

Please sign in to comment.