diff --git a/_benchmarks/suites/table_upsert_suite.go b/_benchmarks/suites/table_upsert_suite.go index 98f7401..9de0789 100644 --- a/_benchmarks/suites/table_upsert_suite.go +++ b/_benchmarks/suites/table_upsert_suite.go @@ -137,7 +137,7 @@ func UpsertInBatchSizeWithBatch(db bond.DB, tbt bond.Table[*TokenBalance], tbs [ return func(b *testing.B) { b.ReportAllocs() - batch := db.Batch() + batch := db.Batch(bond.BatchTypeWriteOnly) for i := 0; i < b.N; i++ { err := tbt.Upsert(context.Background(), tbs[:insertBatchSize], bond.TableUpsertOnConflictReplace[*TokenBalance], batch) if err != nil { diff --git a/batch.go b/batch.go index 8aa62a4..eb73032 100644 --- a/batch.go +++ b/batch.go @@ -18,12 +18,21 @@ type Committer interface { OnClose(func(b Batch)) } +type BatchType int + +const ( + BatchTypeWriteOnly BatchType = iota + BatchTypeReadWrite +) + type Batch interface { ID() uint64 Len() int Empty() bool Reset() + Type() BatchType + Getter Setter Deleter @@ -46,10 +55,16 @@ type _batch struct { onClose []func(b Batch) } -func newBatch(db *_db) Batch { +func newBatch(db *_db, indexed bool) Batch { id, _ := sequenceId.Next() + if indexed { + return &_batch{ + Batch: db.pebble.NewIndexedBatch(), + id: id, + } + } return &_batch{ - Batch: db.pebble.NewIndexedBatch(), + Batch: db.pebble.NewBatch(), id: id, } } @@ -58,6 +73,13 @@ func (b *_batch) ID() uint64 { return b.id } +func (b *_batch) Type() BatchType { + if b.Batch.Indexed() { + return BatchTypeReadWrite + } + return BatchTypeWriteOnly +} + func (b *_batch) Reset() { b.Batch.Reset() diff --git a/batch_test.go b/batch_test.go index 258b459..9041e3b 100644 --- a/batch_test.go +++ b/batch_test.go @@ -14,7 +14,7 @@ func Test_Batch_Callbacks(t *testing.T) { counter := 0 - batch := db.Batch() + batch := db.Batch(BatchTypeWriteOnly) batch.OnCommit(func(b Batch) error { counter++ return nil diff --git a/bond.go b/bond.go index fc0b12c..7e49f6f 100644 --- a/bond.go +++ b/bond.go @@ -63,7 +63,7 @@ type DeleterWithRange interface { } type Batcher interface { - Batch() Batch + Batch(bType BatchType) Batch } type Iterationer interface { // TODO: weird name @@ -280,8 +280,15 @@ func (db *_db) Iter(opt *IterOptions, batch ...Batch) Iterator { } } -func (db *_db) Batch() Batch { - return newBatch(db) +func (db *_db) Batch(bType BatchType) Batch { + if bType == BatchTypeWriteOnly { + return newBatch(db, false) + } + return newBatch(db, true) +} + +func (db *_db) BatchReadWrite() Batch { + return newBatch(db, true) } func (db *_db) Apply(b Batch, opt WriteOptions) error { diff --git a/context_test.go b/context_test.go index 0583b7b..e7e8811 100644 --- a/context_test.go +++ b/context_test.go @@ -18,6 +18,11 @@ func (m *MockBatch) ID() uint64 { panic("implement me") } +func (m *MockBatch) Type() BatchType { + //TODO implement me + panic("implement me") +} + func (m *MockBatch) Len() int { //TODO implement me panic("implement me") diff --git a/filter_test.go b/filter_test.go index 6d3aee9..e2f8681 100644 --- a/filter_test.go +++ b/filter_test.go @@ -114,7 +114,7 @@ func TestFilter_Insert_Batch(t *testing.T) { Balance: 5, } - batch := db.Batch() + batch := db.Batch(BatchTypeReadWrite) mFilter.On("MayContain", mock.Anything, mock.Anything).Return(false).Once() mFilter.On("Add", mock.Anything, mock.Anything).Return().Once() diff --git a/table.go b/table.go index 1ca5501..dd0f4c4 100644 --- a/table.go +++ b/table.go @@ -286,7 +286,7 @@ func (t *_table[T]) ReIndex(idxs []*Index[T]) error { }, }) - batch := t.db.Batch() + batch := t.db.Batch(BatchTypeWriteOnly) defer func() { _ = batch.Close() }() @@ -321,7 +321,7 @@ func (t *_table[T]) ReIndex(idxs []*Index[T]) error { return fmt.Errorf("failed to commit reindex batch: %w", err) } - batch = t.db.Batch() + batch = t.db.Batch(BatchTypeWriteOnly) } } @@ -342,18 +342,21 @@ func (t *_table[T]) Insert(ctx context.Context, trs []T, optBatch ...Batch) erro t.mutex.RUnlock() var ( - batch Batch - batchCtx context.Context - externalBatch = len(optBatch) > 0 && optBatch[0] != nil + batch Batch + batchReadWrite Batch + externalBatch = len(optBatch) > 0 && optBatch[0] != nil ) if externalBatch { batch = optBatch[0] } else { - batch = t.db.Batch() + batch = t.db.Batch(BatchTypeWriteOnly) defer batch.Close() } - batchCtx = ContextWithBatch(ctx, batch) + + if batch.Type() == BatchTypeReadWrite { + batchReadWrite = batch + } var ( indexKeyBuffer = t.db.getKeyBufferPool().Get()[:0] @@ -388,7 +391,7 @@ func (t *_table[T]) Insert(ctx context.Context, trs []T, optBatch ...Batch) erro LowerBound: keys[0], UpperBound: t.dataKeySpaceEnd, }, - }, batch) + }, batchReadWrite) defer iter.Close() // process rows @@ -426,7 +429,7 @@ func (t *_table[T]) Insert(ctx context.Context, trs []T, optBatch ...Batch) erro // add to bloom filter if t.filter != nil { - t.filter.Add(batchCtx, key) + t.filter.Add(ctx, key) } } @@ -453,17 +456,22 @@ func (t *_table[T]) Update(ctx context.Context, trs []T, optBatch ...Batch) erro t.mutex.RUnlock() var ( - batch Batch - externalBatch = len(optBatch) > 0 && optBatch[0] != nil + batch Batch + batchReadWrite Batch + externalBatch = len(optBatch) > 0 && optBatch[0] != nil ) if externalBatch { batch = optBatch[0] } else { - batch = t.db.Batch() + batch = t.db.Batch(BatchTypeWriteOnly) defer batch.Close() } + if batch.Type() == BatchTypeReadWrite { + batchReadWrite = batch + } + var ( indexKeyBuffer = t.db.getKeyBufferPool().Get()[:0] indexKeyBuffer2 = t.db.getKeyBufferPool().Get()[:0] @@ -502,7 +510,7 @@ func (t *_table[T]) Update(ctx context.Context, trs []T, optBatch ...Batch) erro LowerBound: keys[0], UpperBound: t.dataKeySpaceEnd, }, - }, batch) + }, batchReadWrite) defer iter.Close() for i, key := range keys { @@ -580,7 +588,7 @@ func (t *_table[T]) Delete(ctx context.Context, trs []T, optBatch ...Batch) erro if externalBatch { batch = optBatch[0] } else { - batch = t.db.Batch() + batch = t.db.Batch(BatchTypeWriteOnly) defer batch.Close() } @@ -633,18 +641,21 @@ func (t *_table[T]) Upsert(ctx context.Context, trs []T, onConflict func(old, ne t.mutex.RUnlock() var ( - batch Batch - batchCtx context.Context - externalBatch = len(optBatch) > 0 && optBatch[0] != nil + batch Batch + batchReadWrite Batch + externalBatch = len(optBatch) > 0 && optBatch[0] != nil ) if externalBatch { batch = optBatch[0] } else { - batch = t.db.Batch() + batch = t.db.Batch(BatchTypeWriteOnly) defer batch.Close() } - batchCtx = ContextWithBatch(ctx, batch) + + if batch.Type() == BatchTypeReadWrite { + batchReadWrite = batch + } var ( indexKeyBuffer = t.db.getKeyBufferPool().Get()[:0] @@ -684,7 +695,7 @@ func (t *_table[T]) Upsert(ctx context.Context, trs []T, onConflict func(old, ne LowerBound: keys[0], UpperBound: t.dataKeySpaceEnd, }, - }, batch) + }, batchReadWrite) defer iter.Close() for i := 0; i < len(keys); { @@ -755,7 +766,7 @@ func (t *_table[T]) Upsert(ctx context.Context, trs []T, onConflict func(old, ne // add to bloom filter if t.filter != nil && !isUpdate { - t.filter.Add(batchCtx, key) + t.filter.Add(ctx, key) } i++ @@ -805,7 +816,7 @@ func (t *_table[T]) Exist(tr T, optBatch ...Batch) bool { } func (t *_table[T]) exist(key []byte, batch Batch, iter Iterator) bool { - if t.filter != nil && !t.filter.MayContain(context.TODO(), key) { + if t.filter != nil && !t.filter.MayContain(context.Background(), key) { return false } diff --git a/table_test.go b/table_test.go index b066440..8df031e 100644 --- a/table_test.go +++ b/table_test.go @@ -1444,7 +1444,7 @@ func TestBond_Batch(t *testing.T) { err := tokenBalanceTable.Insert(context.Background(), []*TokenBalance{tokenBalanceAccount1}) require.NoError(t, err) - batch := db.Batch() + batch := db.Batch(BatchTypeReadWrite) exist := tokenBalanceTable.Exist(&TokenBalance{ID: 1}, batch) require.True(t, exist) diff --git a/table_unsafe.go b/table_unsafe.go index 5a66f22..8a2f8fa 100644 --- a/table_unsafe.go +++ b/table_unsafe.go @@ -32,7 +32,7 @@ func (t *_table[T]) UnsafeUpdate(ctx context.Context, trs []T, oldTrs []T, optBa if externalBatch { batch = optBatch[0] } else { - batch = t.db.Batch() + batch = t.db.Batch(BatchTypeWriteOnly) } // key