Skip to content

Commit

Permalink
use bloom filter on get calls (#135)
Browse files Browse the repository at this point in the history
* use bloom filter on get calls

* add unsafe insert

* minor

* Table.Get points always in single batch

---------

Co-authored-by: Marcin Gorzynski <[email protected]>
  • Loading branch information
poonai and marino39 authored Apr 19, 2024
1 parent f6bcc06 commit ea1a806
Show file tree
Hide file tree
Showing 3 changed files with 158 additions and 4 deletions.
14 changes: 10 additions & 4 deletions table.go
Original file line number Diff line number Diff line change
Expand Up @@ -892,17 +892,14 @@ func (t *_table[T]) Get(ctx context.Context, sel Selector[T], optBatch ...Batch)
defer t.db.putKeyArray(valueArray)

var trs []T
err := batched[T](selPoints, t.scanPrefetchSize, func(selPoints []T) error {
err := batched[T](selPoints, len(selPoints), func(selPoints []T) error {
keys := t.keysExternal(selPoints, keyArray)

order := t.sortKeys(keys)

values, err := t.get(keys, batch, valueArray, false)
if err != nil {
return err
}

t.reorderValues(values, order)
for _, value := range values {
if len(value) == 0 {
trs = append(trs, t.valueNil)
Expand Down Expand Up @@ -958,6 +955,15 @@ func (t *_table[T]) get(keys [][]byte, batch Batch, values [][]byte, errorOnNotE
defer iter.Close()

for i := 0; i < len(keys); i++ {
if t.filter != nil && !t.filter.MayContain(context.Background(), keys[i]) {
if errorOnNotExist {
return nil, ErrNotFound
} else {
values[i] = values[i][:0]
continue
}
}

if !iter.SeekGE(keys[i]) || !bytes.Equal(iter.Key(), keys[i]) {
if errorOnNotExist {
return nil, ErrNotFound
Expand Down
100 changes: 100 additions & 0 deletions table_unsafe.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,103 @@ func (t *_table[T]) UnsafeUpdate(ctx context.Context, trs []T, oldTrs []T, optBa

return nil
}

// TableUnsafeInserter provides access to UnsafeInsert method that allows to insert
// records wihout checking if they already exist in the database.

// Warning: The indices of the records won't be updated properly if the records already exist.
type TableUnsafeInserter[T any] interface {
UnsafeInsert(ctx context.Context, trs []T, optBatch ...Batch) error
}

func (t *_table[T]) UnsafeInsert(ctx context.Context, trs []T, optBatch ...Batch) error {
t.mutex.RLock()
indexes := make(map[IndexID]*Index[T])
maps.Copy(indexes, t.secondaryIndexes)
t.mutex.RUnlock()

var (
batch Batch
externalBatch = len(optBatch) > 0 && optBatch[0] != nil
)

if externalBatch {
batch = optBatch[0]
} else {
batch = t.db.Batch(BatchTypeWriteOnly)
defer batch.Close()
}

var (
indexKeyBuffer = t.db.getKeyBufferPool().Get()[:0]
)
defer t.db.getKeyBufferPool().Put(indexKeyBuffer[:0])

// key buffers
keysBuffer := t.db.getKeyArray(minInt(len(trs), persistentBatchSize))
defer t.db.putKeyArray(keysBuffer)

// value
value := t.db.getValueBufferPool().Get()[:0]
valueBuffer := bytes.NewBuffer(value)
defer t.db.getValueBufferPool().Put(value[:0])

// serializer
var serialize = t.serializer.Serializer.Serialize
if sw, ok := t.serializer.Serializer.(SerializerWithBuffer[any]); ok {
serialize = sw.SerializeFuncWithBuffer(valueBuffer)
}

err := batched[T](trs, persistentBatchSize, func(trs []T) error {
// keys
keys := t.keysExternal(trs, keysBuffer)

// process rows
for i, key := range keys {
select {
case <-ctx.Done():
return fmt.Errorf("context done: %w", ctx.Err())
default:
}

tr := trs[i]
// serialize
data, err := serialize(&tr)
if err != nil {
return err
}

err = batch.Set(key, data, Sync)
if err != nil {
return err
}

// index keys
for _, idx := range indexes {
err = idx.OnInsert(t, tr, batch, indexKeyBuffer[:0])
if err != nil {
return err
}
}

// add to bloom filter
if t.filter != nil {
t.filter.Add(ctx, key)
}
}

return nil
})
if err != nil {
return err
}

if !externalBatch {
err = batch.Commit(Sync)
if err != nil {
return err
}
}

return nil
}
48 changes: 48 additions & 0 deletions table_unsafe_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,51 @@ func TestBondTable_UnsafeUpdate(t *testing.T) {

_ = it.Close()
}

func TestBondTable_UnsafeInsert(t *testing.T) {
db := setupDatabase()
defer tearDownDatabase(db)

const (
TokenBalanceTableID = TableID(1)
)

tokenBalanceTable := NewTable[*TokenBalance](TableOptions[*TokenBalance]{
DB: db,
TableID: TokenBalanceTableID,
TableName: "token_balance",
TablePrimaryKeyFunc: func(builder KeyBuilder, tb *TokenBalance) []byte {
return builder.AddUint64Field(tb.ID).Bytes()
},
})

tokenBalanceAccount := &TokenBalance{
ID: 1,
AccountID: 1,
ContractAddress: "0xtestContract",
AccountAddress: "0xtestAccount",
Balance: 5,
}

tableUnsafeInserter := tokenBalanceTable.(TableUnsafeInserter[*TokenBalance])
err := tableUnsafeInserter.UnsafeInsert(context.Background(), []*TokenBalance{tokenBalanceAccount})
require.NoError(t, err)

it, err := db.Backend().NewIter(&pebble.IterOptions{
LowerBound: []byte{byte(TokenBalanceTableID)},
UpperBound: []byte{byte(TokenBalanceTableID + 1)},
})
require.NoError(t, err)

for it.First(); it.Valid(); it.Next() {
rawData := it.Value()

var tokenBalanceAccount1FromDB TokenBalance
err = db.Serializer().Deserialize(rawData, &tokenBalanceAccount1FromDB)
require.NoError(t, err)
assert.Equal(t, tokenBalanceAccount, &tokenBalanceAccount1FromDB)
}

_ = it.Close()

}

0 comments on commit ea1a806

Please sign in to comment.