From ea1a806a3e32d3e2168431a7522a4cfc9a3352a1 Mon Sep 17 00:00:00 2001 From: Poonai Date: Fri, 19 Apr 2024 05:38:51 -0700 Subject: [PATCH] use bloom filter on get calls (#135) * use bloom filter on get calls * add unsafe insert * minor * Table.Get points always in single batch --------- Co-authored-by: Marcin Gorzynski --- table.go | 14 ++++-- table_unsafe.go | 100 +++++++++++++++++++++++++++++++++++++++++++ table_unsafe_test.go | 48 +++++++++++++++++++++ 3 files changed, 158 insertions(+), 4 deletions(-) diff --git a/table.go b/table.go index dd0f4c4..2a25b0a 100644 --- a/table.go +++ b/table.go @@ -892,17 +892,14 @@ func (t *_table[T]) Get(ctx context.Context, sel Selector[T], optBatch ...Batch) defer t.db.putKeyArray(valueArray) var trs []T - err := batched[T](selPoints, t.scanPrefetchSize, func(selPoints []T) error { + err := batched[T](selPoints, len(selPoints), func(selPoints []T) error { keys := t.keysExternal(selPoints, keyArray) - order := t.sortKeys(keys) - values, err := t.get(keys, batch, valueArray, false) if err != nil { return err } - t.reorderValues(values, order) for _, value := range values { if len(value) == 0 { trs = append(trs, t.valueNil) @@ -958,6 +955,15 @@ func (t *_table[T]) get(keys [][]byte, batch Batch, values [][]byte, errorOnNotE defer iter.Close() for i := 0; i < len(keys); i++ { + if t.filter != nil && !t.filter.MayContain(context.Background(), keys[i]) { + if errorOnNotExist { + return nil, ErrNotFound + } else { + values[i] = values[i][:0] + continue + } + } + if !iter.SeekGE(keys[i]) || !bytes.Equal(iter.Key(), keys[i]) { if errorOnNotExist { return nil, ErrNotFound diff --git a/table_unsafe.go b/table_unsafe.go index 8a2f8fa..e6ba916 100644 --- a/table_unsafe.go +++ b/table_unsafe.go @@ -101,3 +101,103 @@ func (t *_table[T]) UnsafeUpdate(ctx context.Context, trs []T, oldTrs []T, optBa return nil } + +// TableUnsafeInserter provides access to UnsafeInsert method that allows to insert +// records wihout checking if they already exist in the database. + +// Warning: The indices of the records won't be updated properly if the records already exist. +type TableUnsafeInserter[T any] interface { + UnsafeInsert(ctx context.Context, trs []T, optBatch ...Batch) error +} + +func (t *_table[T]) UnsafeInsert(ctx context.Context, trs []T, optBatch ...Batch) error { + t.mutex.RLock() + indexes := make(map[IndexID]*Index[T]) + maps.Copy(indexes, t.secondaryIndexes) + t.mutex.RUnlock() + + var ( + batch Batch + externalBatch = len(optBatch) > 0 && optBatch[0] != nil + ) + + if externalBatch { + batch = optBatch[0] + } else { + batch = t.db.Batch(BatchTypeWriteOnly) + defer batch.Close() + } + + var ( + indexKeyBuffer = t.db.getKeyBufferPool().Get()[:0] + ) + defer t.db.getKeyBufferPool().Put(indexKeyBuffer[:0]) + + // key buffers + keysBuffer := t.db.getKeyArray(minInt(len(trs), persistentBatchSize)) + defer t.db.putKeyArray(keysBuffer) + + // value + value := t.db.getValueBufferPool().Get()[:0] + valueBuffer := bytes.NewBuffer(value) + defer t.db.getValueBufferPool().Put(value[:0]) + + // serializer + var serialize = t.serializer.Serializer.Serialize + if sw, ok := t.serializer.Serializer.(SerializerWithBuffer[any]); ok { + serialize = sw.SerializeFuncWithBuffer(valueBuffer) + } + + err := batched[T](trs, persistentBatchSize, func(trs []T) error { + // keys + keys := t.keysExternal(trs, keysBuffer) + + // process rows + for i, key := range keys { + select { + case <-ctx.Done(): + return fmt.Errorf("context done: %w", ctx.Err()) + default: + } + + tr := trs[i] + // serialize + data, err := serialize(&tr) + if err != nil { + return err + } + + err = batch.Set(key, data, Sync) + if err != nil { + return err + } + + // index keys + for _, idx := range indexes { + err = idx.OnInsert(t, tr, batch, indexKeyBuffer[:0]) + if err != nil { + return err + } + } + + // add to bloom filter + if t.filter != nil { + t.filter.Add(ctx, key) + } + } + + return nil + }) + if err != nil { + return err + } + + if !externalBatch { + err = batch.Commit(Sync) + if err != nil { + return err + } + } + + return nil +} diff --git a/table_unsafe_test.go b/table_unsafe_test.go index 8b52cbf..0025608 100644 --- a/table_unsafe_test.go +++ b/table_unsafe_test.go @@ -89,3 +89,51 @@ func TestBondTable_UnsafeUpdate(t *testing.T) { _ = it.Close() } + +func TestBondTable_UnsafeInsert(t *testing.T) { + db := setupDatabase() + defer tearDownDatabase(db) + + const ( + TokenBalanceTableID = TableID(1) + ) + + tokenBalanceTable := NewTable[*TokenBalance](TableOptions[*TokenBalance]{ + DB: db, + TableID: TokenBalanceTableID, + TableName: "token_balance", + TablePrimaryKeyFunc: func(builder KeyBuilder, tb *TokenBalance) []byte { + return builder.AddUint64Field(tb.ID).Bytes() + }, + }) + + tokenBalanceAccount := &TokenBalance{ + ID: 1, + AccountID: 1, + ContractAddress: "0xtestContract", + AccountAddress: "0xtestAccount", + Balance: 5, + } + + tableUnsafeInserter := tokenBalanceTable.(TableUnsafeInserter[*TokenBalance]) + err := tableUnsafeInserter.UnsafeInsert(context.Background(), []*TokenBalance{tokenBalanceAccount}) + require.NoError(t, err) + + it, err := db.Backend().NewIter(&pebble.IterOptions{ + LowerBound: []byte{byte(TokenBalanceTableID)}, + UpperBound: []byte{byte(TokenBalanceTableID + 1)}, + }) + require.NoError(t, err) + + for it.First(); it.Valid(); it.Next() { + rawData := it.Value() + + var tokenBalanceAccount1FromDB TokenBalance + err = db.Serializer().Deserialize(rawData, &tokenBalanceAccount1FromDB) + require.NoError(t, err) + assert.Equal(t, tokenBalanceAccount, &tokenBalanceAccount1FromDB) + } + + _ = it.Close() + +}