Skip to content

Commit

Permalink
Optimize epoch range inference by reusing existing field (#240)
Browse files Browse the repository at this point in the history
For oversized `getLogs` requests:

- Remove extra db index for epoch range inference
- Use existing `epoch` field to suggest epoch ranges instead
- Simplify code and reduce overhead
  • Loading branch information
wanliqun authored Nov 4, 2024
1 parent 5b67d8c commit 913e7e4
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 89 deletions.
33 changes: 20 additions & 13 deletions rpc/handler/cfx_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,20 +146,25 @@ func (handler *CfxLogsApiHandler) getLogsReorgGuard(
continue
}

// convert block number back to epoch number for log filter with epoch range
// convert block number range back to epoch number range for log filter with epoch range
if filter.FromEpoch != nil {
var valErr *store.DataSetTooLargeError
if errors.As(err, &valErr) && valErr.SuggestedRange != nil {
var valErr *store.SuggestedFilterOversizedError[store.SuggestedBlockRange]
if errors.As(err, &valErr) {
if valErr.SuggestedRange.MaxEndEpoch == 0 {
return nil, false, valErr.Unwrap()
}

fromEpoch, _ := filter.FromEpoch.ToInt()
suggstedEpoch, ok, err := handler.ms.ClosestEpochUpToBlock(valErr.SuggestedRange.To)
maxPossibleEpochNum := valErr.SuggestedRange.MaxEndEpoch
endBlockNum := valErr.SuggestedRange.To

if err == nil && ok && suggstedEpoch >= fromEpoch.Uint64() {
valErr.SuggestedRange.From = fromEpoch.Uint64()
valErr.SuggestedRange.To = suggstedEpoch
} else {
valErr.SuggestedRange = nil
suggstedEndEpoch, ok, err := handler.ms.ClosestEpochUpToBlock(maxPossibleEpochNum, endBlockNum)
if err != nil || !ok || suggstedEndEpoch < fromEpoch.Uint64() {
return nil, false, valErr.Unwrap()
}
return nil, false, valErr

suggestedEpochRange := store.NewSuggestedEpochRange(fromEpoch.Uint64(), suggstedEndEpoch)
return nil, false, store.NewSuggestedFilterOversizeError(valErr.Unwrap(), suggestedEpochRange)
}
}

Expand Down Expand Up @@ -225,7 +230,7 @@ func (handler *CfxLogsApiHandler) getLogsReorgGuard(

// ensure result set never oversized
if len(logs) > int(store.MaxLogLimit) {
return nil, false, store.NewResultSetTooLargeError()
return nil, false, store.ErrFilterResultSetTooLarge
}

return logs, len(dbFilters) > 0, nil
Expand Down Expand Up @@ -435,7 +440,8 @@ func (handler *CfxLogsApiHandler) checkFullnodeLogFilter(filter *types.LogFilter
numEpochs := epochRange.To - epochRange.From + 1
if numEpochs > uint64(store.MaxLogEpochRange) {
epochRange.To = epochRange.From + uint64(store.MaxLogEpochRange) - 1
return store.NewQuerySetTooLargeError(&epochRange)
suggestedRange := store.NewSuggestedEpochRange(epochRange.From, epochRange.To)
return store.NewSuggestedFilterQuerySetTooLargeError(&suggestedRange)
}
}

Expand All @@ -444,7 +450,8 @@ func (handler *CfxLogsApiHandler) checkFullnodeLogFilter(filter *types.LogFilter
numBlocks := blockRange.To - blockRange.From + 1
if numBlocks > uint64(store.MaxLogBlockRange) {
blockRange.To = blockRange.From + uint64(store.MaxLogBlockRange) - 1
return store.NewQuerySetTooLargeError(&blockRange)
suggestedRange := store.SuggestedBlockRange{RangeUint64: blockRange}
return store.NewSuggestedFilterQuerySetTooLargeError(&suggestedRange)
}
}

Expand Down
5 changes: 3 additions & 2 deletions rpc/handler/eth_logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ func (handler *EthLogsApiHandler) getLogsReorgGuard(
}

if len(logs) > int(store.MaxLogLimit) {
return nil, false, store.NewResultSetTooLargeError()
return nil, false, store.ErrFilterResultSetTooLarge
}

return logs, dbFilter != nil, nil
Expand Down Expand Up @@ -261,7 +261,8 @@ func (handler *EthLogsApiHandler) checkFnEthLogFilter(filter *types.FilterQuery)
numBlocks := blockRange.To - blockRange.From + 1
if numBlocks > uint64(store.MaxLogBlockRange) {
blockRange.To = blockRange.From + uint64(store.MaxLogBlockRange) - 1
return store.NewQuerySetTooLargeError(&blockRange)
suggestedRange := store.SuggestedBlockRange{RangeUint64: blockRange}
return store.NewSuggestedFilterQuerySetTooLargeError(&suggestedRange)
}
}

Expand Down
83 changes: 62 additions & 21 deletions store/log_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ const (
)

var ( // common errors
errMsgLogsQuerySetTooLarge = "the query set is too large, please narrow down your filter condition"
ErrFilterQuerySetTooLarge = errors.New("the query set is too large, please narrow down your filter condition")

errMsgLogsResultSetTooLarge = fmt.Sprintf(
ErrFilterResultSetTooLarge = errors.Errorf(
"the result set exceeds the max limit of %v logs, please narrow down your filter conditions", MaxLogLimit,
)

Expand All @@ -42,37 +42,78 @@ var ( // Log filter constants
MaxLogBlockRange uint64
)

type DataSetTooLargeError struct {
Msg string
SuggestedRange *citypes.RangeUint64
type SuggestedBlockRange struct {
citypes.RangeUint64
// the maximum possible epoch for suggesting an epoch range
// a value of 0 indicates that no maximum epoch is provided
MaxEndEpoch uint64
}

var _ error = (*DataSetTooLargeError)(nil)
func NewSuggestedBlockRange(from, to, maxEndEpoch uint64) SuggestedBlockRange {
return SuggestedBlockRange{
RangeUint64: citypes.RangeUint64{From: from, To: to},
MaxEndEpoch: maxEndEpoch,
}
}

func NewQuerySetTooLargeError(suggestions ...*citypes.RangeUint64) *DataSetTooLargeError {
return NewDataSetTooLargeError(errMsgLogsQuerySetTooLarge, suggestions...)
type SuggestedEpochRange struct {
citypes.RangeUint64
}

func NewResultSetTooLargeError(suggestions ...*citypes.RangeUint64) *DataSetTooLargeError {
return NewDataSetTooLargeError(errMsgLogsResultSetTooLarge, suggestions...)
func NewSuggestedEpochRange(from, to uint64) SuggestedEpochRange {
return SuggestedEpochRange{RangeUint64: citypes.RangeUint64{From: from, To: to}}
}

func NewDataSetTooLargeError(msg string, suggestions ...*citypes.RangeUint64) *DataSetTooLargeError {
var suggestion *citypes.RangeUint64
if len(suggestions) > 0 && suggestions[0] != nil {
suggestion = suggestions[0]
type SuggestedFilterRange interface {
SuggestedBlockRange | SuggestedEpochRange
}

var (
_ error = (*SuggestedFilterOversizedError[SuggestedBlockRange])(nil)
_ error = (*SuggestedFilterOversizedError[SuggestedEpochRange])(nil)
)

type SuggestedFilterOversizedError[T SuggestedFilterRange] struct {
inner error
SuggestedRange T
}

func NewSuggestedFilterQuerySetTooLargeError[T SuggestedFilterRange](suggestedRange *T) error {
if suggestedRange == nil {
return ErrFilterQuerySetTooLarge
}
return &DataSetTooLargeError{
Msg: msg,
SuggestedRange: suggestion,

return NewSuggestedFilterOversizeError(ErrFilterQuerySetTooLarge, *suggestedRange)
}

func NewSuggestedFilterResultSetTooLargeError[T SuggestedFilterRange](suggestedRange *T) error {
if suggestedRange == nil {
return ErrFilterResultSetTooLarge
}

return NewSuggestedFilterOversizeError(ErrFilterResultSetTooLarge, *suggestedRange)
}

func NewSuggestedFilterOversizeError[T SuggestedFilterRange](inner error, suggestedRange T) *SuggestedFilterOversizedError[T] {
return &SuggestedFilterOversizedError[T]{
inner: inner,
SuggestedRange: suggestedRange,
}
}

func (e *DataSetTooLargeError) Error() string {
if e.SuggestedRange == nil {
return e.Msg
func (e *SuggestedFilterOversizedError[T]) Error() string {
switch v := any(e.SuggestedRange).(type) {
case SuggestedBlockRange:
return fmt.Sprintf("%v: a suggested block range is %v", e.inner.Error(), v)
case SuggestedEpochRange:
return fmt.Sprintf("%v: a suggested epoch range is %v", e.inner.Error(), v)
default:
return e.inner.Error()
}
return fmt.Sprintf("%v: suggested filter range is %s", e.Msg, *e.SuggestedRange)
}

func (e *SuggestedFilterOversizedError[T]) Unwrap() error {
return e.inner
}

func initLogFilter() {
Expand Down
4 changes: 2 additions & 2 deletions store/mysql/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ func (ms *MysqlStore) GetLogs(ctx context.Context, storeFilter store.LogFilter)

// check log count
if len(result) > int(store.MaxLogLimit) {
return nil, store.NewResultSetTooLargeError()
return nil, store.ErrFilterResultSetTooLarge
}

continue
Expand Down Expand Up @@ -342,7 +342,7 @@ func (ms *MysqlStore) GetLogs(ctx context.Context, storeFilter store.LogFilter)

// check log count
if len(result) > int(store.MaxLogLimit) {
return nil, store.NewResultSetTooLargeError()
return nil, store.ErrFilterResultSetTooLarge
}
}

Expand Down
2 changes: 1 addition & 1 deletion store/mysql/store_log.go
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ func (ls *logStore) GetLogs(ctx context.Context, storeFilter store.LogFilter) ([

// check log count
if len(result) > int(store.MaxLogLimit) {
return nil, store.NewResultSetTooLargeError()
return nil, store.ErrFilterResultSetTooLarge
}
}

Expand Down
2 changes: 1 addition & 1 deletion store/mysql/store_log_big_contract.go
Original file line number Diff line number Diff line change
Expand Up @@ -410,7 +410,7 @@ func (bcls *bigContractLogStore) GetContractLogs(

// check log count
if len(result) > int(store.MaxLogLimit) {
return nil, store.NewResultSetTooLargeError()
return nil, store.ErrFilterResultSetTooLarge
}
}

Expand Down
84 changes: 43 additions & 41 deletions store/mysql/store_log_filter.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package mysql

import (
"database/sql"
"fmt"

"github.com/Conflux-Chain/confura/store"
Expand Down Expand Up @@ -93,8 +92,8 @@ func (filter *LogFilter) calculateQuerySetSize(db *gorm.DB) (types.RangeUint64,
// define main query to retrieve the auto-increment ID range within the filtered block range
mainQuery := db.
Select("MIN(t0.id) AS `from`, MAX(t0.id) AS `to`").
Table(fmt.Sprintf("`%v` AS t0, (?) AS t1", filter.TableName), subQuery).
Where("t0.bn IN (t1.minb, t1.maxb)")
Table(fmt.Sprintf("`%v` AS t0", filter.TableName)).
Joins("INNER JOIN (?) AS t1 ON t0.bn IN (t1.minb, t1.maxb)", subQuery)

// execute the main query to fetch the ID range
var pidRange types.RangeUint64
Expand All @@ -110,34 +109,38 @@ func (filter *LogFilter) calculateQuerySetSize(db *gorm.DB) (types.RangeUint64,
}

// suggestBlockRange returns an adjusted block range that limits the query result size to be within `limitSize` records.
// If the original filter conditions result in a query range exceeding `limitSize`, this function identifies the first block
// that causes the result size to surpass the limit within `queryRange`. It then returns a range starting from `filter.BlockFrom`
// up to (but not including) that block number, or nil if no valid range can be suggested (e.g., empty filter).
func (filter *LogFilter) suggestBlockRange(
db *gorm.DB, queryRange types.RangeUint64, limitSize uint64) (*types.RangeUint64, error) {
// If the original filter condition would produce more than `limitSize` records, this function identifies the first block
// within `queryRange` where the result size exceeds the limit. It then returns a range from `filter.BlockFrom` up to
// (but not including) that block. If no valid range can be suggested, it returns nil.
//
// It also includes the maximum possible epoch corresponding to the end of the suggested block range, which can be used by
// the caller for further inference, such as deriving a suggested epoch range if needed.
func (filter *LogFilter) suggestBlockRange(db *gorm.DB, queryRange types.RangeUint64, limitSize uint64) (*store.SuggestedBlockRange, error) {
// no possible block range to suggest
if filter.BlockFrom >= filter.BlockTo {
return nil, nil
}

// find the first block exceeding `limitSize` within `queryRange`
var firstExceedingBlock sql.NullInt64
// fetch info on the first block exceeding `limitSize` within `queryRange`
var exceedingBlock struct{ Bn, Epoch uint64 }
err := db.Table(filter.TableName).
Select("bn").
Select("bn, epoch").
Where("id >= ?", queryRange.From+limitSize).
Order("id ASC").
Limit(1).
Scan(&firstExceedingBlock).Error
if err != nil || !firstExceedingBlock.Valid {
Take(&exceedingBlock).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil, nil
}
return nil, err
}

// if a valid exceeding block is found, return the adjusted range
if bn := uint64(firstExceedingBlock.Int64); bn > filter.BlockFrom {
return &types.RangeUint64{
From: filter.BlockFrom,
To: bn - 1,
}, nil
// suggest a narrower block range if possible
if exceedingBlock.Bn > filter.BlockFrom {
blockRange := store.NewSuggestedBlockRange(
filter.BlockFrom, exceedingBlock.Bn-1, exceedingBlock.Epoch,
)
return &blockRange, nil
}

// no available block range to suggest
Expand All @@ -160,7 +163,7 @@ func (filter *LogFilter) validateQuerySetSize(db *gorm.DB) error {
if err != nil {
return err
}
return store.NewQuerySetTooLargeError(suggestedRange)
return store.NewSuggestedFilterQuerySetTooLargeError(suggestedRange)
}
// otherwise defer to result set size validation
}
Expand All @@ -173,7 +176,7 @@ func (filter *LogFilter) validateQuerySetSize(db *gorm.DB) error {
if err != nil {
return err
}
return store.NewResultSetTooLargeError(suggestedRange)
return store.NewSuggestedFilterResultSetTooLargeError(suggestedRange)
}

// otherwise validate the count directly
Expand All @@ -185,33 +188,32 @@ func (filter *LogFilter) validateQuerySetSize(db *gorm.DB) error {

// validateCount validates the result set count against the configured max limit.
func (filter *LogFilter) validateCount(db *gorm.DB) error {
db = db.Select("bn").
db = db.Select("bn, epoch").
Table(filter.TableName).
Where("bn BETWEEN ? AND ?", filter.BlockFrom, filter.BlockTo).
Order("bn ASC").
Offset(int(store.MaxLogLimit)).
Limit(1)

Offset(int(store.MaxLogLimit))
db = applyTopicsFilter(db, filter.Topics)

var blockNums []uint64
if err := db.Find(&blockNums).Error; err != nil {
// fetch info on the first block exceeding `store.MaxLogLimit`
var exceedingBlock struct{ Bn, Epoch uint64 }
err := db.Take(&exceedingBlock).Error
if err != nil {
if errors.Is(err, gorm.ErrRecordNotFound) {
return nil
}
return err
}

if len(blockNums) == 0 {
return nil
}

if blockNums[0] <= filter.BlockFrom {
return store.NewResultSetTooLargeError()
// suggest a narrower block range if possible
if exceedingBlock.Bn > filter.BlockFrom {
blockRange := store.NewSuggestedBlockRange(
filter.BlockFrom, exceedingBlock.Bn-1, exceedingBlock.Epoch,
)
return store.NewSuggestedFilterResultSetTooLargeError(&blockRange)
}

// suggest a narrower range if possible
return store.NewResultSetTooLargeError(&types.RangeUint64{
From: filter.BlockFrom,
To: blockNums[0] - 1,
})
return store.ErrFilterResultSetTooLarge
}

func (filter *LogFilter) hasTopicsFilter() bool {
Expand Down Expand Up @@ -245,7 +247,7 @@ func (filter *LogFilter) Find(db *gorm.DB) ([]int, error) {
}

if len(result) > int(store.MaxLogLimit) {
return nil, store.NewResultSetTooLargeError()
return nil, store.ErrFilterResultSetTooLarge
}

return result, nil
Expand Down Expand Up @@ -280,7 +282,7 @@ func (filter *AddressIndexedLogFilter) Find(db *gorm.DB) ([]*AddressIndexedLog,
}

if len(result) > int(store.MaxLogLimit) {
return nil, store.NewResultSetTooLargeError()
return nil, store.ErrFilterResultSetTooLarge
}

return result, nil
Expand Down
Loading

0 comments on commit 913e7e4

Please sign in to comment.