From 07fac5edf43a4e449cb9cc95ea0274a7348400ee Mon Sep 17 00:00:00 2001 From: Marcin Gorzynski Date: Wed, 17 Apr 2024 14:37:14 +0200 Subject: [PATCH 1/3] add WriteOnly & ReadWrite batches --- _benchmarks/benchmark_test.go | 2 +- _benchmarks/suites/table_upsert_suite.go | 2 +- batch.go | 12 +++++-- batch_test.go | 2 +- bond.go | 20 +++++++++-- context_test.go | 5 +++ filter_test.go | 2 +- table.go | 43 +++++++++++++++--------- table_test.go | 2 +- table_unsafe.go | 2 +- 10 files changed, 65 insertions(+), 27 deletions(-) diff --git a/_benchmarks/benchmark_test.go b/_benchmarks/benchmark_test.go index b6c6852..92e4709 100644 --- a/_benchmarks/benchmark_test.go +++ b/_benchmarks/benchmark_test.go @@ -3,5 +3,5 @@ package main import "testing" func BenchmarkSuites(b *testing.B) { - RunBenchmarks(b, AllTestSuites) + RunBenchmarks(b, "BenchmarkTableInsertSuite") } diff --git a/_benchmarks/suites/table_upsert_suite.go b/_benchmarks/suites/table_upsert_suite.go index 98f7401..9de0789 100644 --- a/_benchmarks/suites/table_upsert_suite.go +++ b/_benchmarks/suites/table_upsert_suite.go @@ -137,7 +137,7 @@ func UpsertInBatchSizeWithBatch(db bond.DB, tbt bond.Table[*TokenBalance], tbs [ return func(b *testing.B) { b.ReportAllocs() - batch := db.Batch() + batch := db.Batch(bond.BatchTypeWriteOnly) for i := 0; i < b.N; i++ { err := tbt.Upsert(context.Background(), tbs[:insertBatchSize], bond.TableUpsertOnConflictReplace[*TokenBalance], batch) if err != nil { diff --git a/batch.go b/batch.go index 8aa62a4..aec4212 100644 --- a/batch.go +++ b/batch.go @@ -24,6 +24,8 @@ type Batch interface { Empty() bool Reset() + Indexed() bool + Getter Setter Deleter @@ -46,10 +48,16 @@ type _batch struct { onClose []func(b Batch) } -func newBatch(db *_db) Batch { +func newBatch(db *_db, indexed bool) Batch { id, _ := sequenceId.Next() + if indexed { + return &_batch{ + Batch: db.pebble.NewIndexedBatch(), + id: id, + } + } return &_batch{ - Batch: db.pebble.NewIndexedBatch(), + Batch: db.pebble.NewBatch(), id: id, } } diff --git a/batch_test.go b/batch_test.go index 258b459..9041e3b 100644 --- a/batch_test.go +++ b/batch_test.go @@ -14,7 +14,7 @@ func Test_Batch_Callbacks(t *testing.T) { counter := 0 - batch := db.Batch() + batch := db.Batch(BatchTypeWriteOnly) batch.OnCommit(func(b Batch) error { counter++ return nil diff --git a/bond.go b/bond.go index fc0b12c..a643d49 100644 --- a/bond.go +++ b/bond.go @@ -62,8 +62,15 @@ type DeleterWithRange interface { DeleteRange(start []byte, end []byte, opt WriteOptions, batch ...Batch) error } +type BatchType int + +const ( + BatchTypeWriteOnly BatchType = iota + BatchTypeReadWrite +) + type Batcher interface { - Batch() Batch + Batch(bType BatchType) Batch } type Iterationer interface { // TODO: weird name @@ -280,8 +287,15 @@ func (db *_db) Iter(opt *IterOptions, batch ...Batch) Iterator { } } -func (db *_db) Batch() Batch { - return newBatch(db) +func (db *_db) Batch(bType BatchType) Batch { + if bType == BatchTypeWriteOnly { + return newBatch(db, false) + } + return newBatch(db, true) +} + +func (db *_db) BatchReadWrite() Batch { + return newBatch(db, true) } func (db *_db) Apply(b Batch, opt WriteOptions) error { diff --git a/context_test.go b/context_test.go index 0583b7b..7fd8cc2 100644 --- a/context_test.go +++ b/context_test.go @@ -38,6 +38,11 @@ func (m *MockBatch) ResetRetained() { panic("implement me") } +func (m *MockBatch) Indexed() bool { + //TODO implement me + panic("implement me") +} + func (m *MockBatch) Get(key []byte, batch ...Batch) (data []byte, closer io.Closer, err error) { //TODO implement me panic("implement me") diff --git a/filter_test.go b/filter_test.go index 6d3aee9..e2f8681 100644 --- a/filter_test.go +++ b/filter_test.go @@ -114,7 +114,7 @@ func TestFilter_Insert_Batch(t *testing.T) { Balance: 5, } - batch := db.Batch() + batch := db.Batch(BatchTypeReadWrite) mFilter.On("MayContain", mock.Anything, mock.Anything).Return(false).Once() mFilter.On("Add", mock.Anything, mock.Anything).Return().Once() diff --git a/table.go b/table.go index 1ca5501..954fb9d 100644 --- a/table.go +++ b/table.go @@ -286,7 +286,7 @@ func (t *_table[T]) ReIndex(idxs []*Index[T]) error { }, }) - batch := t.db.Batch() + batch := t.db.Batch(BatchTypeWriteOnly) defer func() { _ = batch.Close() }() @@ -321,7 +321,7 @@ func (t *_table[T]) ReIndex(idxs []*Index[T]) error { return fmt.Errorf("failed to commit reindex batch: %w", err) } - batch = t.db.Batch() + batch = t.db.Batch(BatchTypeWriteOnly) } } @@ -343,17 +343,20 @@ func (t *_table[T]) Insert(ctx context.Context, trs []T, optBatch ...Batch) erro var ( batch Batch - batchCtx context.Context + batchIndexed Batch externalBatch = len(optBatch) > 0 && optBatch[0] != nil ) if externalBatch { batch = optBatch[0] } else { - batch = t.db.Batch() + batch = t.db.Batch(BatchTypeWriteOnly) defer batch.Close() } - batchCtx = ContextWithBatch(ctx, batch) + + if batch.Indexed() { + batchIndexed = batch + } var ( indexKeyBuffer = t.db.getKeyBufferPool().Get()[:0] @@ -388,7 +391,7 @@ func (t *_table[T]) Insert(ctx context.Context, trs []T, optBatch ...Batch) erro LowerBound: keys[0], UpperBound: t.dataKeySpaceEnd, }, - }, batch) + }, batchIndexed) defer iter.Close() // process rows @@ -426,7 +429,7 @@ func (t *_table[T]) Insert(ctx context.Context, trs []T, optBatch ...Batch) erro // add to bloom filter if t.filter != nil { - t.filter.Add(batchCtx, key) + t.filter.Add(ctx, key) } } @@ -454,16 +457,21 @@ func (t *_table[T]) Update(ctx context.Context, trs []T, optBatch ...Batch) erro var ( batch Batch + batchIndexed Batch externalBatch = len(optBatch) > 0 && optBatch[0] != nil ) if externalBatch { batch = optBatch[0] } else { - batch = t.db.Batch() + batch = t.db.Batch(BatchTypeWriteOnly) defer batch.Close() } + if batch.Indexed() { + batchIndexed = batch + } + var ( indexKeyBuffer = t.db.getKeyBufferPool().Get()[:0] indexKeyBuffer2 = t.db.getKeyBufferPool().Get()[:0] @@ -502,7 +510,7 @@ func (t *_table[T]) Update(ctx context.Context, trs []T, optBatch ...Batch) erro LowerBound: keys[0], UpperBound: t.dataKeySpaceEnd, }, - }, batch) + }, batchIndexed) defer iter.Close() for i, key := range keys { @@ -580,7 +588,7 @@ func (t *_table[T]) Delete(ctx context.Context, trs []T, optBatch ...Batch) erro if externalBatch { batch = optBatch[0] } else { - batch = t.db.Batch() + batch = t.db.Batch(BatchTypeWriteOnly) defer batch.Close() } @@ -634,17 +642,20 @@ func (t *_table[T]) Upsert(ctx context.Context, trs []T, onConflict func(old, ne var ( batch Batch - batchCtx context.Context + batchIndexed Batch externalBatch = len(optBatch) > 0 && optBatch[0] != nil ) if externalBatch { batch = optBatch[0] } else { - batch = t.db.Batch() + batch = t.db.Batch(BatchTypeWriteOnly) defer batch.Close() } - batchCtx = ContextWithBatch(ctx, batch) + + if batch.Indexed() { + batchIndexed = batch + } var ( indexKeyBuffer = t.db.getKeyBufferPool().Get()[:0] @@ -684,7 +695,7 @@ func (t *_table[T]) Upsert(ctx context.Context, trs []T, onConflict func(old, ne LowerBound: keys[0], UpperBound: t.dataKeySpaceEnd, }, - }, batch) + }, batchIndexed) defer iter.Close() for i := 0; i < len(keys); { @@ -755,7 +766,7 @@ func (t *_table[T]) Upsert(ctx context.Context, trs []T, onConflict func(old, ne // add to bloom filter if t.filter != nil && !isUpdate { - t.filter.Add(batchCtx, key) + t.filter.Add(ctx, key) } i++ @@ -805,7 +816,7 @@ func (t *_table[T]) Exist(tr T, optBatch ...Batch) bool { } func (t *_table[T]) exist(key []byte, batch Batch, iter Iterator) bool { - if t.filter != nil && !t.filter.MayContain(context.TODO(), key) { + if t.filter != nil && !t.filter.MayContain(context.Background(), key) { return false } diff --git a/table_test.go b/table_test.go index b066440..8df031e 100644 --- a/table_test.go +++ b/table_test.go @@ -1444,7 +1444,7 @@ func TestBond_Batch(t *testing.T) { err := tokenBalanceTable.Insert(context.Background(), []*TokenBalance{tokenBalanceAccount1}) require.NoError(t, err) - batch := db.Batch() + batch := db.Batch(BatchTypeReadWrite) exist := tokenBalanceTable.Exist(&TokenBalance{ID: 1}, batch) require.True(t, exist) diff --git a/table_unsafe.go b/table_unsafe.go index 5a66f22..8a2f8fa 100644 --- a/table_unsafe.go +++ b/table_unsafe.go @@ -32,7 +32,7 @@ func (t *_table[T]) UnsafeUpdate(ctx context.Context, trs []T, oldTrs []T, optBa if externalBatch { batch = optBatch[0] } else { - batch = t.db.Batch() + batch = t.db.Batch(BatchTypeWriteOnly) } // key From 1cf97406c35a43a56f882b2e9e945b0cf24218a6 Mon Sep 17 00:00:00 2001 From: Marcin Gorzynski Date: Wed, 17 Apr 2024 14:38:19 +0200 Subject: [PATCH 2/3] restore Benchmark all --- _benchmarks/benchmark_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_benchmarks/benchmark_test.go b/_benchmarks/benchmark_test.go index 92e4709..b6c6852 100644 --- a/_benchmarks/benchmark_test.go +++ b/_benchmarks/benchmark_test.go @@ -3,5 +3,5 @@ package main import "testing" func BenchmarkSuites(b *testing.B) { - RunBenchmarks(b, "BenchmarkTableInsertSuite") + RunBenchmarks(b, AllTestSuites) } From eec65c3feddf16077685d82dd7bc1787e297fd48 Mon Sep 17 00:00:00 2001 From: Marcin Gorzynski Date: Wed, 17 Apr 2024 14:43:45 +0200 Subject: [PATCH 3/3] Rename Batch.Indexed to Batch.Type --- batch.go | 16 +++++++++++++++- bond.go | 7 ------- context_test.go | 10 +++++----- table.go | 36 ++++++++++++++++++------------------ 4 files changed, 38 insertions(+), 31 deletions(-) diff --git a/batch.go b/batch.go index aec4212..eb73032 100644 --- a/batch.go +++ b/batch.go @@ -18,13 +18,20 @@ type Committer interface { OnClose(func(b Batch)) } +type BatchType int + +const ( + BatchTypeWriteOnly BatchType = iota + BatchTypeReadWrite +) + type Batch interface { ID() uint64 Len() int Empty() bool Reset() - Indexed() bool + Type() BatchType Getter Setter @@ -66,6 +73,13 @@ func (b *_batch) ID() uint64 { return b.id } +func (b *_batch) Type() BatchType { + if b.Batch.Indexed() { + return BatchTypeReadWrite + } + return BatchTypeWriteOnly +} + func (b *_batch) Reset() { b.Batch.Reset() diff --git a/bond.go b/bond.go index a643d49..7e49f6f 100644 --- a/bond.go +++ b/bond.go @@ -62,13 +62,6 @@ type DeleterWithRange interface { DeleteRange(start []byte, end []byte, opt WriteOptions, batch ...Batch) error } -type BatchType int - -const ( - BatchTypeWriteOnly BatchType = iota - BatchTypeReadWrite -) - type Batcher interface { Batch(bType BatchType) Batch } diff --git a/context_test.go b/context_test.go index 7fd8cc2..e7e8811 100644 --- a/context_test.go +++ b/context_test.go @@ -18,27 +18,27 @@ func (m *MockBatch) ID() uint64 { panic("implement me") } -func (m *MockBatch) Len() int { +func (m *MockBatch) Type() BatchType { //TODO implement me panic("implement me") } -func (m *MockBatch) Empty() bool { +func (m *MockBatch) Len() int { //TODO implement me panic("implement me") } -func (m *MockBatch) Reset() { +func (m *MockBatch) Empty() bool { //TODO implement me panic("implement me") } -func (m *MockBatch) ResetRetained() { +func (m *MockBatch) Reset() { //TODO implement me panic("implement me") } -func (m *MockBatch) Indexed() bool { +func (m *MockBatch) ResetRetained() { //TODO implement me panic("implement me") } diff --git a/table.go b/table.go index 954fb9d..dd0f4c4 100644 --- a/table.go +++ b/table.go @@ -342,9 +342,9 @@ func (t *_table[T]) Insert(ctx context.Context, trs []T, optBatch ...Batch) erro t.mutex.RUnlock() var ( - batch Batch - batchIndexed Batch - externalBatch = len(optBatch) > 0 && optBatch[0] != nil + batch Batch + batchReadWrite Batch + externalBatch = len(optBatch) > 0 && optBatch[0] != nil ) if externalBatch { @@ -354,8 +354,8 @@ func (t *_table[T]) Insert(ctx context.Context, trs []T, optBatch ...Batch) erro defer batch.Close() } - if batch.Indexed() { - batchIndexed = batch + if batch.Type() == BatchTypeReadWrite { + batchReadWrite = batch } var ( @@ -391,7 +391,7 @@ func (t *_table[T]) Insert(ctx context.Context, trs []T, optBatch ...Batch) erro LowerBound: keys[0], UpperBound: t.dataKeySpaceEnd, }, - }, batchIndexed) + }, batchReadWrite) defer iter.Close() // process rows @@ -456,9 +456,9 @@ func (t *_table[T]) Update(ctx context.Context, trs []T, optBatch ...Batch) erro t.mutex.RUnlock() var ( - batch Batch - batchIndexed Batch - externalBatch = len(optBatch) > 0 && optBatch[0] != nil + batch Batch + batchReadWrite Batch + externalBatch = len(optBatch) > 0 && optBatch[0] != nil ) if externalBatch { @@ -468,8 +468,8 @@ func (t *_table[T]) Update(ctx context.Context, trs []T, optBatch ...Batch) erro defer batch.Close() } - if batch.Indexed() { - batchIndexed = batch + if batch.Type() == BatchTypeReadWrite { + batchReadWrite = batch } var ( @@ -510,7 +510,7 @@ func (t *_table[T]) Update(ctx context.Context, trs []T, optBatch ...Batch) erro LowerBound: keys[0], UpperBound: t.dataKeySpaceEnd, }, - }, batchIndexed) + }, batchReadWrite) defer iter.Close() for i, key := range keys { @@ -641,9 +641,9 @@ func (t *_table[T]) Upsert(ctx context.Context, trs []T, onConflict func(old, ne t.mutex.RUnlock() var ( - batch Batch - batchIndexed Batch - externalBatch = len(optBatch) > 0 && optBatch[0] != nil + batch Batch + batchReadWrite Batch + externalBatch = len(optBatch) > 0 && optBatch[0] != nil ) if externalBatch { @@ -653,8 +653,8 @@ func (t *_table[T]) Upsert(ctx context.Context, trs []T, onConflict func(old, ne defer batch.Close() } - if batch.Indexed() { - batchIndexed = batch + if batch.Type() == BatchTypeReadWrite { + batchReadWrite = batch } var ( @@ -695,7 +695,7 @@ func (t *_table[T]) Upsert(ctx context.Context, trs []T, onConflict func(old, ne LowerBound: keys[0], UpperBound: t.dataKeySpaceEnd, }, - }, batchIndexed) + }, batchReadWrite) defer iter.Close() for i := 0; i < len(keys); {