diff --git a/common/kvstore/Batch.go b/common/kvstore/Batch.go deleted file mode 100644 index 7d54890ba4..0000000000 --- a/common/kvstore/Batch.go +++ /dev/null @@ -1,14 +0,0 @@ -package kvstore - -// Batch is a collection of key / value pairs that can be written atomically to a database. -type Batch[K any] interface { - // Put stores the given key / value pair in the batch, overwriting any existing value for that key. - // If nil is passed as the value, a byte slice of length 0 will be stored. - Put(key K, value []byte) - // Delete removes the key from the batch. - Delete(key K) - // Apply atomically writes all the key / value pairs in the batch to the database. - Apply() error - // Size returns the number of operations in the batch. - Size() uint32 -} diff --git a/common/kvstore/batch.go b/common/kvstore/batch.go new file mode 100644 index 0000000000..5838ddce3f --- /dev/null +++ b/common/kvstore/batch.go @@ -0,0 +1,32 @@ +package kvstore + +import "time" + +// Batch is a collection of key / value pairs that will be written atomically to a database. +// Although it is thread safe to modify different batches in parallel or to modify a batch while +// the store is being modified, it is not thread safe to concurrently modify the same batch. +type Batch[K any] interface { + // Put stores the given key / value pair in the batch, overwriting any existing value for that key. + // If nil is passed as the value, a byte slice of length 0 will be stored. + Put(key K, value []byte) + // Delete removes the key from the batch. + Delete(key K) + // Apply atomically writes all the key / value pairs in the batch to the database. + Apply() error + // Size returns the number of operations in the batch. + Size() uint32 +} + +// TTLBatch is a collection of key / value pairs that will be written atomically to a database with +// time-to-live (TTL) or expiration times. Although it is thread safe to modify different batches in +// parallel or to modify a batch while the store is being modified, it is not thread safe to concurrently +// modify the same batch. +type TTLBatch[K any] interface { + Batch[K] + // PutWithTTL stores the given key / value pair in the batch with a time-to-live (TTL) or expiration time. + // If nil is passed as the value, a byte slice of length 0 will be stored. + PutWithTTL(key K, value []byte, ttl time.Duration) + // PutWithExpiration stores the given key / value pair in the batch with an expiration time. + // If nil is passed as the value, a byte slice of length 0 will be stored. + PutWithExpiration(key K, value []byte, expiryTime time.Time) +} diff --git a/common/kvstore/key.go b/common/kvstore/key.go new file mode 100644 index 0000000000..518c782e79 --- /dev/null +++ b/common/kvstore/key.go @@ -0,0 +1,27 @@ +package kvstore + +import "errors" + +// ErrInvalidKey is returned when a key cannot be interpreted as the requested type. +var ErrInvalidKey = errors.New("invalid key") + +// Key represents a key in a TableStore. Each key is scoped to a specific table. +type Key interface { + // Bytes returns the key as a byte slice. Does not include internal metadata (i.e. the table). + Bytes() []byte + // Raw returns the raw byte slice that represents the key. This value + // may not be equal to the byte slice that was used to create the key, and + // should be treated as an opaque value. + Raw() []byte + // Builder returns the KeyBuilder that created this key. + Builder() KeyBuilder +} + +// KeyBuilder is used to create keys for a TableStore. Each KeyBuilder is scoped to a particular table, +// and can be used to create keys that are within that table. +type KeyBuilder interface { + // TableName returns the name of the table that this KeyBuilder is scoped to. + TableName() string + // Key creates a key from a byte slice. + Key(key []byte) Key +} diff --git a/common/kvstore/leveldb/leveldb_store.go b/common/kvstore/leveldb/leveldb_store.go index 1c0bab6364..747f0e49c0 100644 --- a/common/kvstore/leveldb/leveldb_store.go +++ b/common/kvstore/leveldb/leveldb_store.go @@ -12,7 +12,7 @@ import ( "github.com/syndtr/goleveldb/leveldb/util" ) -var _ kvstore.Store = &levelDBStore{} +var _ kvstore.Store[[]byte] = &levelDBStore{} // levelDBStore implements kvstore.Store interfaces with levelDB as the backend engine. type levelDBStore struct { @@ -25,7 +25,7 @@ type levelDBStore struct { } // NewStore returns a new levelDBStore built using LevelDB. -func NewStore(logger logging.Logger, path string) (kvstore.Store, error) { +func NewStore(logger logging.Logger, path string) (kvstore.Store[[]byte], error) { levelDB, err := leveldb.OpenFile(path, nil) if err != nil { @@ -88,7 +88,7 @@ func (store *levelDBStore) WriteBatch(keys [][]byte, values [][]byte) error { } // NewBatch creates a new batch for the store. -func (store *levelDBStore) NewBatch() kvstore.StoreBatch { +func (store *levelDBStore) NewBatch() kvstore.Batch[[]byte] { return &levelDBBatch{ store: store, batch: new(leveldb.Batch), diff --git a/common/kvstore/mapstore/map_store.go b/common/kvstore/mapstore/map_store.go index f8882b324f..4ff932a538 100644 --- a/common/kvstore/mapstore/map_store.go +++ b/common/kvstore/mapstore/map_store.go @@ -9,7 +9,7 @@ import ( "sync" ) -var _ kvstore.Store = &mapStore{} +var _ kvstore.Store[[]byte] = &mapStore{} // mapStore is a simple in-memory implementation of KVStore. Designed more as a correctness test than a // production implementation -- there are things that may not be performant with this implementation. @@ -19,7 +19,7 @@ type mapStore struct { } // NewStore creates a new mapStore. -func NewStore() kvstore.Store { +func NewStore() kvstore.Store[[]byte] { return &mapStore{ data: make(map[string][]byte), } @@ -79,7 +79,7 @@ func (store *mapStore) WriteBatch(keys, values [][]byte) error { } // NewBatch creates a new batch for the store. -func (store *mapStore) NewBatch() kvstore.StoreBatch { +func (store *mapStore) NewBatch() kvstore.Batch[[]byte] { return &batch{ store: store, keys: make([][]byte, 0), diff --git a/common/kvstore/store.go b/common/kvstore/store.go index 6aa8903e77..76f734629d 100644 --- a/common/kvstore/store.go +++ b/common/kvstore/store.go @@ -8,33 +8,31 @@ import ( // ErrNotFound is returned when a key is not found in the database. var ErrNotFound = errors.New("not found") -// StoreBatch is a collection of operations that can be applied atomically to a Store. -type StoreBatch Batch[[]byte] - // Store implements a key-value store. May be backed by a database like LevelDB. +// The generic type K is the type of the keys in the store. // // Implementations of this interface are expected to be thread-safe. -type Store interface { +type Store[K any] interface { // Put stores the given key / value pair in the database, overwriting any existing value for that key. // If nil is passed as the value, a byte slice of length 0 will be stored. - Put(key []byte, value []byte) error + Put(k K, value []byte) error // Get retrieves the value for the given key. Returns a ErrNotFound error if the key does not exist. // The value returned is safe to modify. - Get(key []byte) ([]byte, error) + Get(k K) ([]byte, error) // Delete removes the key from the database. Does not return an error if the key does not exist. - Delete(key []byte) error + Delete(k K) error // NewBatch creates a new batch that can be used to perform multiple operations atomically. - NewBatch() StoreBatch + NewBatch() Batch[K] // NewIterator returns an iterator that can be used to iterate over a subset of the keys in the database. // Only keys with the given prefix will be iterated. The iterator must be closed by calling Release() when done. // The iterator will return keys in lexicographically sorted order. The iterator walks over a consistent snapshot // of the database, so it will not see any writes that occur after the iterator is created. - NewIterator(prefix []byte) (iterator.Iterator, error) + NewIterator(prefix K) (iterator.Iterator, error) // Shutdown shuts down the store, flushing any remaining data to disk. // diff --git a/common/kvstore/table.go b/common/kvstore/table.go index a1357128a5..e44bd63cac 100644 --- a/common/kvstore/table.go +++ b/common/kvstore/table.go @@ -1,32 +1,14 @@ package kvstore -import "errors" - -// ErrTableLimitExceeded is returned when the maximum number of tables has been reached. -var ErrTableLimitExceeded = errors.New("table limit exceeded") +import ( + "errors" + "github.com/syndtr/goleveldb/leveldb/iterator" + "time" +) // ErrTableNotFound is returned when a table is not found. var ErrTableNotFound = errors.New("table not found") -// Table can be used to operate on data in a specific table in a TableStore. -type Table interface { - // Store permits access to the table as if it were a store. - Store - - // Name returns the name of the table. - Name() string - - // TableKey creates a new key scoped to this table that can be used for batch operations that modify this table. - TableKey(key []byte) TableKey -} - -// TableKey is a key scoped to a particular table. It can be used to perform batch operations that modify multiple -// table keys atomically. -type TableKey []byte - -// TableBatch is a collection of operations that can be applied atomically to a TableStore. -type TableBatch Batch[TableKey] - // TableStore implements a key-value store, with the addition of the abstraction of tables. // A "table" in this context is a disjoint keyspace. Keys in one table to not collide with keys in another table, // and keys within a particular table can be iterated over efficiently. @@ -36,20 +18,40 @@ type TableBatch Batch[TableKey] // // Implementations of this interface are expected to be thread-safe, except where noted. type TableStore interface { - - // GetTable gets the table with the given name. If the table does not exist, it is first created. - // Returns ErrTableNotFound if the table does not exist and cannot be created. - GetTable(name string) (Table, error) - - // GetTables returns a list of all tables in the store in no particular order. - GetTables() []Table - - // NewBatch creates a new batch that can be used to perform multiple operations across tables atomically. - NewBatch() TableBatch - - // Shutdown shuts down the store, flushing any remaining data to disk. - Shutdown() error - - // Destroy shuts down and permanently deletes all data in the store. - Destroy() error + Store[Key] + + // GetKeyBuilder gets the key builder for a particular table. Returns ErrTableNotFound if the table does not exist. + // The returned KeyBuilder can be used to interact with the table. + // + // Warning: Do not use key builders created by one TableStore instance with another TableStore instance. + // This may result in odd and undefined behavior. + GetKeyBuilder(name string) (KeyBuilder, error) + + // GetKeyBuilders returns all key builders in the store. + GetKeyBuilders() []KeyBuilder + + // GetTables returns a list of the table names currently in the store. + GetTables() []string + + // PutWithTTL adds a key-value pair to the store that expires after a specified duration. + // Key is eventually deleted after the TTL elapses. + // + // Warning: updating the value of a key with a ttl/expiration has undefined behavior. Support for this pattern + // may be implemented in the future if a use case is identified. + PutWithTTL(key Key, value []byte, ttl time.Duration) error + + // PutWithExpiration adds a key-value pair to the store that expires at a specified time. + // Key is eventually deleted after the expiry time. + // + // Warning: updating the value of a key with a ttl/expiration has undefined behavior. Support for this pattern + // may be implemented in the future if a use case is identified. + PutWithExpiration(key Key, value []byte, expiryTime time.Time) error + + // NewTTLBatch creates a new TTLBatch that can be used to perform multiple operations atomically. + // Use this instead of NewBatch to create a batch that supports TTL/expiration. + NewTTLBatch() TTLBatch[Key] + + // NewTableIterator returns an iterator that can be used to iterate over all keys in a table. + // Equivalent to NewIterator(keyBuilder.Key([]byte{})). + NewTableIterator(keyBuilder KeyBuilder) (iterator.Iterator, error) } diff --git a/common/kvstore/tablestore/config.go b/common/kvstore/tablestore/config.go new file mode 100644 index 0000000000..964932a9ee --- /dev/null +++ b/common/kvstore/tablestore/config.go @@ -0,0 +1,60 @@ +package tablestore + +import "time" + +// StoreType describes the underlying store implementation. +type StoreType int + +const ( + // LevelDB is a LevelDB-backed store. + LevelDB StoreType = iota + // MapStore is an in-memory store. This store does not preserve data across restarts. + MapStore +) + +// Config is the configuration for a TableStore. +type Config struct { + // The type of the base store. Default is LevelDB. + Type StoreType + // The path to the file system directory where the store will write its data. Default is nil. + // Some store implementations may ignore this field (e.g. MapStore). Other store implementations may require + // this field to be set (e.g. LevelDB). + Path *string + // If true, the store will perform garbage collection on a background goroutine. Default is true. + GarbageCollectionEnabled bool + // If garbage collection is enabled, this is the interval at which it will run. Default is 5 minutes. + GarbageCollectionInterval time.Duration + // If garbage collection is enabled, this is the maximum number of entries to delete in a single batch during + // garbage collection. Default is 1024. + GarbageCollectionBatchSize uint32 + // The list of tables to create on startup. Any pre-existing table not in this list will be deleted. If + // this list is nil, the previous schema will be carried forward with no modifications. Default is nil. + Schema []string +} + +// DefaultConfig returns a Config with default values. +func DefaultConfig() *Config { + return &Config{ + Type: LevelDB, + Path: nil, + GarbageCollectionEnabled: true, + GarbageCollectionInterval: 5 * time.Minute, + GarbageCollectionBatchSize: 1024, + Schema: nil, + } +} + +// DefaultLevelDBConfig returns a Config with default values for a LevelDB store. +func DefaultLevelDBConfig(path string) *Config { + config := DefaultConfig() + config.Type = LevelDB + config.Path = &path + return config +} + +// DefaultMapStoreConfig returns a Config with default values for a MapStore. +func DefaultMapStoreConfig() *Config { + config := DefaultConfig() + config.Type = MapStore + return config +} diff --git a/common/kvstore/tablestore/key_test.go b/common/kvstore/tablestore/key_test.go new file mode 100644 index 0000000000..771132a552 --- /dev/null +++ b/common/kvstore/tablestore/key_test.go @@ -0,0 +1,27 @@ +package tablestore + +import ( + tu "github.com/Layr-Labs/eigenda/common/testutils" + "github.com/stretchr/testify/assert" + "testing" +) + +func TestGetName(t *testing.T) { + tu.InitializeRandom() + + tableName := tu.RandomString(10) + + kb := newKeyBuilder(tableName, 0) + assert.Equal(t, tableName, kb.TableName()) +} + +func TestBytesRoundTrip(t *testing.T) { + tu.InitializeRandom() + + tableName := tu.RandomString(10) + b := tu.RandomBytes(10) + + kb := newKeyBuilder(tableName, 0) + k := kb.Key(b) + assert.Equal(t, b, k.Bytes()) +} diff --git a/common/kvstore/tablestore/table_store.go b/common/kvstore/tablestore/table_store.go index ac92199f0a..5669bf6af0 100644 --- a/common/kvstore/tablestore/table_store.go +++ b/common/kvstore/tablestore/table_store.go @@ -1,21 +1,39 @@ package tablestore import ( + "context" "github.com/Layr-Labs/eigenda/common/kvstore" "github.com/Layr-Labs/eigensdk-go/logging" + "github.com/syndtr/goleveldb/leveldb/iterator" + "sync" + "time" ) var _ kvstore.TableStore = &tableStore{} // tableStore is an implementation of TableStore that wraps a Store. type tableStore struct { + // the context for the store + ctx context.Context + + // the cancel function for the store + cancel context.CancelFunc + + // this wait group is completed when the garbage collection goroutine is done + waitGroup *sync.WaitGroup + + // the logger for the store logger logging.Logger // A base store implementation that this TableStore wraps. - base kvstore.Store + base kvstore.Store[[]byte] - // A map from table names to tables. - tableMap map[string]kvstore.Table + // A map from table names to key builders. + keyBuilderMap map[string]kvstore.KeyBuilder + + // A key builder for the expiration table. Keys in this table are made up of a timestamp prepended to a key. + // The value is an empty byte slice. Iterating over this table will return keys in order of expiration time. + expirationKeyBuilder kvstore.KeyBuilder } // wrapper wraps the given Store to create a TableStore. @@ -25,19 +43,40 @@ type tableStore struct { // in undefined behavior. func newTableStore( logger logging.Logger, - base kvstore.Store, - tables map[string]kvstore.Table) kvstore.TableStore { + base kvstore.Store[[]byte], + tableIDMap map[uint32]string, + expirationKeyBuilder kvstore.KeyBuilder, + gcEnabled bool, + gcPeriod time.Duration, + gcBatchSize uint32) kvstore.TableStore { + + ctx, cancel := context.WithCancel(context.Background()) + waitGroup := &sync.WaitGroup{} - return &tableStore{ - logger: logger, - base: base, - tableMap: tables, + store := &tableStore{ + ctx: ctx, + cancel: cancel, + waitGroup: waitGroup, + logger: logger, + base: base, + keyBuilderMap: make(map[string]kvstore.KeyBuilder), + expirationKeyBuilder: expirationKeyBuilder, } + + for prefix, name := range tableIDMap { + store.keyBuilderMap[name] = newKeyBuilder(name, prefix) + } + + if gcEnabled { + store.expireKeysInBackground(gcPeriod, gcBatchSize) + } + + return store } -// GetTable gets the table with the given name. If the table does not exist, it is first created. -func (t *tableStore) GetTable(name string) (kvstore.Table, error) { - table, ok := t.tableMap[name] +// GetKeyBuilder gets the key builder for a particular table. Returns ErrTableNotFound if the table does not exist. +func (t *tableStore) GetKeyBuilder(name string) (kvstore.KeyBuilder, error) { + table, ok := t.keyBuilderMap[name] if !ok { return nil, kvstore.ErrTableNotFound } @@ -45,30 +84,144 @@ func (t *tableStore) GetTable(name string) (kvstore.Table, error) { return table, nil } +// GetKeyBuilders returns a list of all tables in the store in no particular order. +func (t *tableStore) GetKeyBuilders() []kvstore.KeyBuilder { + tables := make([]kvstore.KeyBuilder, 0, len(t.keyBuilderMap)) + for _, kb := range t.keyBuilderMap { + tables = append(tables, kb) + } + + return tables +} + // GetTables returns a list of all tables in the store in no particular order. -func (t *tableStore) GetTables() []kvstore.Table { - tables := make([]kvstore.Table, 0, len(t.tableMap)) - for _, table := range t.tableMap { - tables = append(tables, table) +func (t *tableStore) GetTables() []string { + tables := make([]string, 0, len(t.keyBuilderMap)) + for _, kb := range t.keyBuilderMap { + tables = append(tables, kb.TableName()) } return tables } // NewBatch creates a new batch for writing to the store. -func (t *tableStore) NewBatch() kvstore.TableBatch { - return &tableStoreBatch{ - store: t, - batch: t.base.NewBatch(), +func (t *tableStore) NewBatch() kvstore.Batch[kvstore.Key] { + return newTableStoreBatch(t.base, t.expirationKeyBuilder) +} + +// NewTTLBatch creates a new batch for writing to the store with TTLs. +func (t *tableStore) NewTTLBatch() kvstore.TTLBatch[kvstore.Key] { + return newTableStoreBatch(t.base, t.expirationKeyBuilder) +} + +// Put adds a key-value pair to the store. +func (t *tableStore) Put(k kvstore.Key, value []byte) error { + return t.base.Put(k.Raw(), value) +} + +// Get retrieves the value for a key from the store. +func (t *tableStore) Get(k kvstore.Key) ([]byte, error) { + return t.base.Get(k.Raw()) +} + +// Delete removes a key from the store. +func (t *tableStore) Delete(k kvstore.Key) error { + return t.base.Delete(k.Raw()) +} + +// PutWithTTL adds a key-value pair to the store that expires after a specified duration. +func (t *tableStore) PutWithTTL(key kvstore.Key, value []byte, ttl time.Duration) error { + batch := t.NewTTLBatch() + batch.PutWithTTL(key, value, ttl) + return batch.Apply() +} + +// PutWithExpiration adds a key-value pair to the store that expires at a specified time. +func (t *tableStore) PutWithExpiration(key kvstore.Key, value []byte, expiryTime time.Time) error { + batch := t.NewTTLBatch() + batch.PutWithExpiration(key, value, expiryTime) + return batch.Apply() +} + +// NewIterator returns an iterator that can be used to iterate over a subset of the keys in the store. +func (t *tableStore) NewIterator(prefix kvstore.Key) (iterator.Iterator, error) { + return newTableStoreIterator(t.base, prefix) +} + +// NewTableIterator returns an iterator that can be used to iterate over all keys in a table. +func (t *tableStore) NewTableIterator(builder kvstore.KeyBuilder) (iterator.Iterator, error) { + return newTableStoreIterator(t.base, builder.Key([]byte{})) +} + +// ExpireKeysInBackground spawns a background goroutine that periodically checks for expired keys and deletes them. +func (t *tableStore) expireKeysInBackground(gcPeriod time.Duration, gcBatchSize uint32) { + t.waitGroup.Add(1) + ticker := time.NewTicker(gcPeriod) + go func() { + defer t.waitGroup.Done() + for { + select { + case now := <-ticker.C: + err := t.expireKeys(now, gcBatchSize) + if err != nil { + t.logger.Error("Error expiring keys", err) + continue + } + case <-t.ctx.Done(): + ticker.Stop() + return + } + } + }() +} + +// Delete all keys with a TTL that has expired. +func (t *tableStore) expireKeys(now time.Time, gcBatchSize uint32) error { + it, err := t.NewTableIterator(t.expirationKeyBuilder) + if err != nil { + return err + } + defer it.Release() + + batch := t.base.NewBatch() + + for it.Next() { + expiryKey := it.Key() + expiryTimestamp, baseKey := parsePrependedTimestamp(expiryKey) + + if expiryTimestamp.After(now) { + // No more values to expire + break + } + + batch.Delete(baseKey) + batch.Delete(expiryKey) + + if batch.Size() >= gcBatchSize { + err = batch.Apply() + if err != nil { + return err + } + batch = t.base.NewBatch() + } + } + + if batch.Size() > 0 { + return batch.Apply() } + return nil } // Shutdown shuts down the store, flushing any remaining cached data to disk. func (t *tableStore) Shutdown() error { + t.cancel() + t.waitGroup.Wait() return t.base.Shutdown() } // Destroy shuts down and permanently deletes all data in the store. func (t *tableStore) Destroy() error { + t.cancel() + t.waitGroup.Wait() return t.base.Destroy() } diff --git a/common/kvstore/tablestore/table_store_batch.go b/common/kvstore/tablestore/table_store_batch.go index 46399409a6..91f19c6fb2 100644 --- a/common/kvstore/tablestore/table_store_batch.go +++ b/common/kvstore/tablestore/table_store_batch.go @@ -1,32 +1,59 @@ package tablestore -import "github.com/Layr-Labs/eigenda/common/kvstore" +import ( + "github.com/Layr-Labs/eigenda/common/kvstore" + "time" +) + +var _ kvstore.TTLBatch[kvstore.Key] = &tableStoreBatch{} // tableStoreBatch is a batch for writing to a table store. type tableStoreBatch struct { - store *tableStore - batch kvstore.StoreBatch + baseBatch kvstore.Batch[[]byte] + expirationKeyBuilder kvstore.KeyBuilder } -// Put adds a key-value pair to the batch. -func (t *tableStoreBatch) Put(key kvstore.TableKey, value []byte) { - if value == nil { - value = []byte{} +// newTableStoreBatch creates a new batch for writing to a table store. +func newTableStoreBatch( + base kvstore.Store[[]byte], + expirationKeyBuilder kvstore.KeyBuilder) kvstore.TTLBatch[kvstore.Key] { + + return &tableStoreBatch{ + baseBatch: base.NewBatch(), + expirationKeyBuilder: expirationKeyBuilder, } - t.batch.Put(key, value) +} + +// PutWithTTL adds a key-value pair to the batch that expires after a specified duration. +func (t *tableStoreBatch) PutWithTTL(k kvstore.Key, value []byte, ttl time.Duration) { + expirationTime := time.Now().Add(ttl) + t.PutWithExpiration(k, value, expirationTime) +} + +// PutWithExpiration adds a key-value pair to the batch that expires at a specified time. +func (t *tableStoreBatch) PutWithExpiration(k kvstore.Key, value []byte, expiryTime time.Time) { + expirationKey := t.expirationKeyBuilder.Key(prependTimestamp(expiryTime, k.Raw())) + + t.baseBatch.Put(k.Raw(), value) + t.baseBatch.Put(expirationKey.Raw(), []byte{}) +} + +// Put adds a key-value pair to the batch. +func (t *tableStoreBatch) Put(k kvstore.Key, value []byte) { + t.baseBatch.Put(k.Raw(), value) } // Delete removes a key-value pair from the batch. -func (t *tableStoreBatch) Delete(key kvstore.TableKey) { - t.batch.Delete(key) +func (t *tableStoreBatch) Delete(k kvstore.Key) { + t.baseBatch.Delete(k.Raw()) } // Apply applies the batch to the store. func (t *tableStoreBatch) Apply() error { - return t.batch.Apply() + return t.baseBatch.Apply() } // Size returns the number of operations in the batch. func (t *tableStoreBatch) Size() uint32 { - return t.batch.Size() + return t.baseBatch.Size() } diff --git a/common/kvstore/tablestore/table_store_builder.go b/common/kvstore/tablestore/table_store_builder.go index 582362e924..117c1e95c1 100644 --- a/common/kvstore/tablestore/table_store_builder.go +++ b/common/kvstore/tablestore/table_store_builder.go @@ -24,6 +24,12 @@ const metadataTableID uint32 = math.MaxUint32 // once a table is dropped, future tables may be instantiated with the same name and the table ID may be reused. const namespaceTableID uint32 = math.MaxUint32 - 1 +// The table ID reserved for the expiration table. The expiration table is used to store expiration times for keys. +// The keys in the expiration table are created by prepending the expiration time to the key of the data to be expired. +// The value of the key in this table is an empty byte slice. By iterating over this table in lexicographical order, +// keys are encountered in order of expiration time. +const expirationTableID uint32 = math.MaxUint32 - 2 + // This key is used to store the schema version in the metadata table. const metadataSchemaVersionKey = "schema_version" @@ -31,42 +37,24 @@ const metadataSchemaVersionKey = "schema_version" // This is required for atomic deletion of tables. const metadataDeletionKey = "deletion" +// When a new table is created, the ID used for that table is stored in the metadata table under this key. +const nextTableIDKey = "next_table_id" + // The current schema version of the metadata table. const currentSchemaVersion uint64 = 0 -// StoreType describes the underlying store implementation. -type StoreType int - -const ( - // LevelDB is a LevelDB-backed store. - LevelDB StoreType = iota - // MapStore is an in-memory store. This store does not preserve data across restarts. - MapStore -) - -// Start creates a new TableStore instance of the given type. The store will be created at the given path. -// This method can be used to instantiate a new store or to load an existing store. -// This method will set up a table for each table name provided, and will drop all tables not in the list. -// Dropping a table is irreversible and will delete all data in the table, so be very careful not to call -// this method with table names omitted by mistake. -func (t StoreType) Start(logger logging.Logger, path string, tables ...string) (kvstore.TableStore, error) { - base, err := buildBaseStore(t, logger, path) - if err != nil { - return nil, fmt.Errorf("error building base store: %w", err) +// Start creates a new TableStore. This method can be used to instantiate a new store or to load an existing store. +func Start(logger logging.Logger, config *Config) (kvstore.TableStore, error) { + if config == nil { + return nil, errors.New("config is required") } - return start(logger, base, true, tables...) -} - -// Load loads a table store from disk without modifying the table schema. If there is no existing store at the given -// path, this method will create one and return a store without any tables. -func (t StoreType) Load(logger logging.Logger, path string) (kvstore.TableStore, error) { - base, err := buildBaseStore(t, logger, path) + base, err := buildBaseStore(config.Type, logger, config.Path) if err != nil { return nil, fmt.Errorf("error building base store: %w", err) } - return start(logger, base, false) + return start(logger, base, config) } // Future work: if we ever decide to permit third parties to provide custom store implementations not in this module, @@ -77,47 +65,64 @@ func (t StoreType) Load(logger logging.Logger, path string) (kvstore.TableStore, // modifySchema is false, the tables are loaded as is, and any tables in the provided list are ignored. func start( logger logging.Logger, - base kvstore.Store, - modifySchema bool, - tables ...string) (kvstore.TableStore, error) { + base kvstore.Store[[]byte], + config *Config) (kvstore.TableStore, error) { - metadataTable := newTableView(base, "metadata", metadataTableID) - namespaceTable := newTableView(base, "namespace", namespaceTableID) + metadataKeyBuilder := newKeyBuilder("metadata", metadataTableID) + namespaceKeyBuilder := newKeyBuilder("namespace", namespaceTableID) + expirationKeyBuilder := newKeyBuilder("expiration", expirationTableID) - err := validateSchema(metadataTable) + err := validateSchema(base, metadataKeyBuilder) if err != nil { return nil, fmt.Errorf("error validating schema: %w", err) } - err = handleIncompleteDeletion(logger, base, metadataTable, namespaceTable) + err = handleIncompleteDeletion(logger, base, metadataKeyBuilder, namespaceKeyBuilder) if err != nil { return nil, fmt.Errorf("error handling incomplete deletion: %w", err) } - tableIDMap, err := loadNamespaceTable(namespaceTable) + tableIDMap, err := loadNamespaceTable(base, namespaceKeyBuilder) if err != nil { return nil, fmt.Errorf("error loading namespace table: %w", err) } - if modifySchema { - err = addAndRemoveTables(base, metadataTable, namespaceTable, tableIDMap, tables) + if config.Schema != nil { + err = addAndRemoveTables( + base, + metadataKeyBuilder, + namespaceKeyBuilder, + tableIDMap, + config.Schema) if err != nil { return nil, fmt.Errorf("error adding and removing tables: %w", err) } } - tableMap := make(map[string]kvstore.Table, len(tableIDMap)) - for tableID, tableName := range tableIDMap { - tableMap[tableName] = newTableView(base, tableName, tableID) - } - return newTableStore(logger, base, tableMap), nil + store := newTableStore( + logger, + base, + tableIDMap, + expirationKeyBuilder, + config.GarbageCollectionEnabled, + config.GarbageCollectionInterval, + config.GarbageCollectionBatchSize) + + return store, nil } // buildBaseStore creates a new base store of the given type. -func buildBaseStore(storeType StoreType, logger logging.Logger, path string) (kvstore.Store, error) { +func buildBaseStore( + storeType StoreType, + logger logging.Logger, + path *string) (kvstore.Store[[]byte], error) { + switch storeType { case LevelDB: - return leveldb.NewStore(logger, path) + if path == nil { + return nil, errors.New("path is required for LevelDB store") + } + return leveldb.NewStore(logger, *path) case MapStore: return mapstore.NewStore(), nil default: @@ -126,10 +131,12 @@ func buildBaseStore(storeType StoreType, logger logging.Logger, path string) (kv } // validateSchema loads/initiates the schema version in the metadata table. -func validateSchema(metadataTable kvstore.Table) error { +func validateSchema( + base kvstore.Store[[]byte], + metadataKeyBuilder kvstore.KeyBuilder) error { - schemaKey := []byte(metadataSchemaVersionKey) - onDiskSchemaBytes, err := metadataTable.Get(schemaKey) + schemaKey := metadataKeyBuilder.Key([]byte(metadataSchemaVersionKey)) + onDiskSchemaBytes, err := base.Get(schemaKey.Raw()) if err != nil { if !errors.Is(err, kvstore.ErrNotFound) { @@ -140,7 +147,7 @@ func validateSchema(metadataTable kvstore.Table) error { onDiskSchemaBytes = make([]byte, 8) binary.BigEndian.PutUint64(onDiskSchemaBytes, currentSchemaVersion) - err = metadataTable.Put(schemaKey, onDiskSchemaBytes) + err = base.Put(schemaKey.Raw(), onDiskSchemaBytes) if err != nil { return fmt.Errorf("error setting schema version in metadata table: %w", err) } @@ -163,25 +170,25 @@ func validateSchema(metadataTable kvstore.Table) error { // This method adds and removes tables as needed to match the given list of tables. The table ID map is updated // to reflect the new state of the tables. func addAndRemoveTables( - base kvstore.Store, - metadataTable kvstore.Table, - namespaceTable kvstore.Table, + base kvstore.Store[[]byte], + metadataKeyBuilder kvstore.KeyBuilder, + namespaceKeyBuilder kvstore.KeyBuilder, tableIDMap map[uint32]string, currentTables []string) error { tablesToAdd, tablesToDrop := computeSchemaChange(tableIDMap, currentTables) - err := dropTables(base, metadataTable, namespaceTable, tableIDMap, tablesToDrop) + err := dropTables(base, metadataKeyBuilder, namespaceKeyBuilder, tableIDMap, tablesToDrop) if err != nil { return fmt.Errorf("error dropping tables: %w", err) } - err = addTables(namespaceTable, tableIDMap, tablesToAdd) + err = addTables(base, metadataKeyBuilder, namespaceKeyBuilder, tableIDMap, tablesToAdd) if err != nil { return fmt.Errorf("error adding tables: %w", err) } - err = sanityCheckNamespaceTable(namespaceTable, tableIDMap, currentTables) + err = sanityCheckNamespaceTable(base, namespaceKeyBuilder, tableIDMap, currentTables) if err != nil { return fmt.Errorf("error sanity checking namespace table: %w", err) } @@ -221,9 +228,9 @@ func computeSchemaChange( // Drop a list of tables. Updates the table ID map as well as data within the store. func dropTables( - base kvstore.Store, - metadataTable kvstore.Table, - namespaceTable kvstore.Table, + base kvstore.Store[[]byte], + metadataKeyBuilder kvstore.KeyBuilder, + namespaceKeyBuilder kvstore.KeyBuilder, tableIDMap map[uint32]string, tablesToDrop []string) error { @@ -237,7 +244,7 @@ func dropTables( reverseTableIDMap[tableID] = tableName } for _, tableName := range tablesToDrop { - err := dropTable(base, metadataTable, namespaceTable, tableName, reverseTableIDMap[tableName]) + err := dropTable(base, metadataKeyBuilder, namespaceKeyBuilder, tableName, reverseTableIDMap[tableName]) if err != nil { return fmt.Errorf("error dropping table %s: %w", tableName, err) } @@ -249,7 +256,9 @@ func dropTables( // Add tables to the store. Updates the table ID map as well as data within the store. func addTables( - namespaceTable kvstore.Table, + base kvstore.Store[[]byte], + metadataKeyBuilder kvstore.KeyBuilder, + namespaceKeyBuilder kvstore.KeyBuilder, tableIDMap map[uint32]string, tablesToAdd []string) error { @@ -258,24 +267,12 @@ func addTables( return nil } - // Determine the table IDs for the new tables to be added. - // We want to fill gaps prior to assigning new IDs. - newTableIDs := make([]uint32, 0, len(tablesToAdd)) - nextID := uint32(0) - for len(newTableIDs) < len(tablesToAdd) { - if _, alreadyUsed := tableIDMap[nextID]; !alreadyUsed { - newTableIDs = append(newTableIDs, nextID) - } - nextID++ - } - // Sort tables to add. Ensures deterministic table IDs given the same input. sort.Strings(tablesToAdd) // Add new tables. - for i, tableName := range tablesToAdd { - tableID := newTableIDs[i] - err := createTable(namespaceTable, tableName, tableID) + for _, tableName := range tablesToAdd { + tableID, err := createTable(base, metadataKeyBuilder, namespaceKeyBuilder, tableName) if err != nil { return fmt.Errorf("error creating table %s: %w", tableName, err) } @@ -288,11 +285,14 @@ func addTables( // Perform sanity checks on the namespace table. // This method makes potential logic in the namespace errors fail fast and visibly. func sanityCheckNamespaceTable( - namespaceTable kvstore.Table, + base kvstore.Store[[]byte], + namespaceKeyBuilder kvstore.KeyBuilder, tableIDMap map[uint32]string, currentTableList []string) error { - parsedNamespaceTable, err := loadNamespaceTable(namespaceTable) + // TODO also check that no table has ID greater than next table ID + + parsedNamespaceTable, err := loadNamespaceTable(base, namespaceKeyBuilder) if err != nil { return fmt.Errorf("error loading namespace table: %w", err) } @@ -324,11 +324,12 @@ func sanityCheckNamespaceTable( // This method makes sure that any such incomplete deletions are completed. func handleIncompleteDeletion( logger logging.Logger, - base kvstore.Store, - metadataTable kvstore.Table, - namespaceTable kvstore.Table) error { + base kvstore.Store[[]byte], + metadataKeyBuilder kvstore.KeyBuilder, + namespaceKeyBuilder kvstore.KeyBuilder) error { - deletionValue, err := metadataTable.Get([]byte(metadataDeletionKey)) + deletionKey := metadataKeyBuilder.Key([]byte(metadataDeletionKey)) + deletionValue, err := base.Get(deletionKey.Raw()) if errors.Is(err, kvstore.ErrNotFound) { // No deletion in progress, nothing to do. return nil @@ -342,7 +343,7 @@ func handleIncompleteDeletion( deletionTableName := string(deletionValue[4:]) logger.Errorf("found incomplete deletion of table %s, completing deletion", deletionTableName) - return dropTable(base, metadataTable, namespaceTable, deletionTableName, deletionTableID) + return dropTable(base, metadataKeyBuilder, namespaceKeyBuilder, deletionTableName, deletionTableID) } // Get the prefix for the given table ID and prefix length. @@ -354,11 +355,13 @@ func getPrefix(tableID uint32) []byte { // loadNamespaceTable loads the namespace table from disk into the given map. // Returns a map from table IDs to table names. -func loadNamespaceTable(namespaceTable kvstore.Table) (map[uint32]string, error) { +func loadNamespaceTable( + base kvstore.Store[[]byte], + namespaceKeyBuilder kvstore.KeyBuilder) (map[uint32]string, error) { tableIDMap := make(map[uint32]string) - it, err := namespaceTable.NewIterator(nil) + it, err := base.NewIterator(namespaceKeyBuilder.Key([]byte{}).Raw()) if err != nil { return nil, fmt.Errorf("error creating namespace table iterator: %w", err) } @@ -368,49 +371,73 @@ func loadNamespaceTable(namespaceTable kvstore.Table) (map[uint32]string, error) keyBytes := it.Key() valueBytes := it.Value() - tableID := binary.BigEndian.Uint32(keyBytes) + tableID := binary.BigEndian.Uint32(keyBytes[prefixLength:]) tableName := string(valueBytes) tableIDMap[tableID] = tableName } return tableIDMap, nil } -// CreateTable creates a new table with the given name. If a table with the given name already exists, -// this method becomes a no-op. Returns ErrTableLimitExceeded if the maximum number of tables has been reached. +// createTable creates a new table with the given name. +// Returns ErrTableLimitExceeded if the maximum number of tables has been reached. func createTable( - namespaceTable kvstore.Table, - name string, - tableID uint32) error { + base kvstore.Store[[]byte], + metadataKeyBuilder kvstore.KeyBuilder, + namespaceKeyBuilder kvstore.KeyBuilder, + name string) (uint32, error) { - tableKey := make([]byte, 4) - binary.BigEndian.PutUint32(tableKey, tableID) - err := namespaceTable.Put(tableKey, []byte(name)) + batch := base.NewBatch() + var tableID uint32 + tableIDBytes, err := base.Get(metadataKeyBuilder.Key([]byte(nextTableIDKey)).Raw()) if err != nil { - return fmt.Errorf("error updating namespace table: %w", err) + if errors.Is(err, kvstore.ErrNotFound) { + tableIDBytes = make([]byte, 4) + } else { + return 0, fmt.Errorf("error reading next table ID: %w", err) + } } + tableID = binary.BigEndian.Uint32(tableIDBytes) - return nil + if tableID == expirationTableID { + return 0, errors.New("table limit exceeded") + } + + nextTableID := tableID + 1 + nextTableIDBytes := make([]byte, 4) + binary.BigEndian.PutUint32(nextTableIDBytes, nextTableID) + + keyBytes := make([]byte, 4) + binary.BigEndian.PutUint32(keyBytes, tableID) + batch.Put(namespaceKeyBuilder.Key(keyBytes).Raw(), []byte(name)) + batch.Put(metadataKeyBuilder.Key([]byte(nextTableIDKey)).Raw(), nextTableIDBytes) + + err = batch.Apply() + if err != nil { + return 0, fmt.Errorf("error updating namespace table: %w", err) + } + + return tableID, nil } // DropTable deletes the table with the given name. This is a no-op if the table does not exist. func dropTable( - base kvstore.Store, - metadataTable kvstore.Table, - namespaceTable kvstore.Table, + base kvstore.Store[[]byte], + metadataKeyBuilder kvstore.KeyBuilder, + namespaceKeyBuilder kvstore.KeyBuilder, name string, tableID uint32) error { // This single atomic operation ensures that the table is deleted completely, even if we crash // in the middle of the operation. When next starting up, if an entry is observed in this location, // then the interrupted deletion can be completed. - deletionKey := []byte(metadataDeletionKey) + deletionKey := metadataKeyBuilder.Key([]byte(metadataDeletionKey)) deletionValue := make([]byte, 4+len(name)) binary.BigEndian.PutUint32(deletionValue, tableID) copy(deletionValue[4:], name) - err := metadataTable.Put(deletionKey, deletionValue) + err := base.Put(deletionKey.Raw(), deletionValue) if err != nil { return fmt.Errorf("error updating metadata table for deletion: %w", err) } @@ -430,13 +457,14 @@ func dropTable( } // All table entries have been deleted. Now delete the table from the namespace table. - tableKey := make([]byte, 4) - binary.BigEndian.PutUint32(tableKey, tableID) - err = namespaceTable.Delete(tableKey) + keyBytes := make([]byte, 4) + binary.BigEndian.PutUint32(keyBytes, tableID) + tableKey := namespaceKeyBuilder.Key(keyBytes) + err = base.Delete(tableKey.Raw()) if err != nil { return fmt.Errorf("error deleting from namespace table: %w", err) } // Finally, remove the deletion key from the metadata table. - return metadataTable.Delete(deletionKey) + return base.Delete(deletionKey.Raw()) } diff --git a/common/kvstore/tablestore/table_store_iterator.go b/common/kvstore/tablestore/table_store_iterator.go new file mode 100644 index 0000000000..c556690cdd --- /dev/null +++ b/common/kvstore/tablestore/table_store_iterator.go @@ -0,0 +1,73 @@ +package tablestore + +import ( + "github.com/Layr-Labs/eigenda/common/kvstore" + "github.com/syndtr/goleveldb/leveldb/iterator" + "github.com/syndtr/goleveldb/leveldb/util" +) + +var _ iterator.Iterator = &tableStoreIterator{} + +type tableStoreIterator struct { + baseIterator iterator.Iterator + keyBuilder kvstore.KeyBuilder +} + +// newTableStoreIterator creates a new table store iterator that iterates over a table. +func newTableStoreIterator(base kvstore.Store[[]byte], k kvstore.Key) (*tableStoreIterator, error) { + + baseIterator, err := base.NewIterator(k.Raw()) + if err != nil { + return nil, err + } + + return &tableStoreIterator{ + baseIterator: baseIterator, + keyBuilder: k.Builder(), + }, nil +} + +func (t *tableStoreIterator) First() bool { + return t.baseIterator.First() +} + +func (t *tableStoreIterator) Last() bool { + return t.baseIterator.Last() +} + +func (t *tableStoreIterator) Seek(key []byte) bool { + return t.baseIterator.Seek(t.keyBuilder.Key(key).Raw()) +} + +func (t *tableStoreIterator) Next() bool { + return t.baseIterator.Next() +} + +func (t *tableStoreIterator) Prev() bool { + return t.baseIterator.Prev() +} + +func (t *tableStoreIterator) Release() { + t.baseIterator.Release() +} + +func (t *tableStoreIterator) SetReleaser(releaser util.Releaser) { + t.baseIterator.SetReleaser(releaser) +} + +func (t *tableStoreIterator) Valid() bool { + return t.baseIterator.Valid() +} + +func (t *tableStoreIterator) Error() error { + return t.baseIterator.Error() +} + +func (t *tableStoreIterator) Key() []byte { + baseKey := t.baseIterator.Key() + return baseKey[prefixLength:] +} + +func (t *tableStoreIterator) Value() []byte { + return t.baseIterator.Value() +} diff --git a/common/kvstore/tablestore/table_store_keys.go b/common/kvstore/tablestore/table_store_keys.go new file mode 100644 index 0000000000..a8a4ee76ee --- /dev/null +++ b/common/kvstore/tablestore/table_store_keys.go @@ -0,0 +1,63 @@ +package tablestore + +import ( + "encoding/binary" + "github.com/Layr-Labs/eigenda/common/kvstore" +) + +const prefixLength = 4 + +var _ kvstore.Key = (*key)(nil) + +type key struct { + keyBuilder kvstore.KeyBuilder + data []byte +} + +// Builder returns the KeyBuilder that was used to create the key. +func (k *key) Builder() kvstore.KeyBuilder { + return k.keyBuilder +} + +// Raw returns the raw byte slice that represents the key. +func (k *key) Raw() []byte { + return k.data +} + +// Bytes interprets the key as a byte slice and returns it. +func (k *key) Bytes() []byte { + return k.data[prefixLength:] +} + +var _ kvstore.KeyBuilder = (*keyBuilder)(nil) + +type keyBuilder struct { + tableName string + prefix []byte +} + +// newKeyBuilder creates a new KeyBuilder for the given table name and prefix. +func newKeyBuilder(tableName string, prefix uint32) kvstore.KeyBuilder { + prefixBytes := make([]byte, 4) + binary.BigEndian.PutUint32(prefixBytes, prefix) + return &keyBuilder{ + tableName: tableName, + prefix: prefixBytes, + } +} + +// TableName returns the name of the table that this KeyBuilder is scoped to. +func (k *keyBuilder) TableName() string { + return k.tableName +} + +// Key creates a key from a byte slice. +func (k *keyBuilder) Key(data []byte) kvstore.Key { + result := make([]byte, prefixLength+len(data)) + copy(result, k.prefix) + copy(result[prefixLength:], data) + return &key{ + keyBuilder: k, + data: result, + } +} diff --git a/common/kvstore/tablestore/table_store_test.go b/common/kvstore/tablestore/table_store_test.go index 17d6d94d83..b34c81678a 100644 --- a/common/kvstore/tablestore/table_store_test.go +++ b/common/kvstore/tablestore/table_store_test.go @@ -33,7 +33,8 @@ func TestTableList(t *testing.T) { logger, err := common.NewLogger(common.DefaultLoggerConfig()) assert.NoError(t, err) - tStore, err := LevelDB.Start(logger, dbPath) + config := DefaultLevelDBConfig(dbPath) + tStore, err := Start(logger, config) assert.NoError(t, err) tables := tStore.GetTables() @@ -44,89 +45,102 @@ func TestTableList(t *testing.T) { // Add some tables - tStore, err = LevelDB.Start(logger, dbPath, "table1") + config = DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table1"} + tStore, err = Start(logger, config) assert.NoError(t, err) tables = tStore.GetTables() assert.Equal(t, 1, len(tables)) - assert.Equal(t, "table1", tables[0].Name()) + assert.Equal(t, "table1", tables[0]) err = tStore.Shutdown() assert.NoError(t, err) - tStore, err = LevelDB.Start(logger, dbPath, "table1", "table2") + config = DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table1", "table2"} + tStore, err = Start(logger, config) assert.NoError(t, err) tables = tStore.GetTables() assert.Equal(t, 2, len(tables)) sort.SliceStable(tables, func(i, j int) bool { - return tables[i].Name() < tables[j].Name() + return tables[i] < tables[j] }) - assert.Equal(t, "table1", tables[0].Name()) - assert.Equal(t, "table2", tables[1].Name()) + assert.Equal(t, "table1", tables[0]) + assert.Equal(t, "table2", tables[1]) err = tStore.Shutdown() assert.NoError(t, err) - tStore, err = LevelDB.Start(logger, dbPath, "table1", "table2", "table3") + config = DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table1", "table2", "table3"} + tStore, err = Start(logger, config) assert.NoError(t, err) tables = tStore.GetTables() assert.Equal(t, 3, len(tables)) sort.SliceStable(tables, func(i, j int) bool { - return tables[i].Name() < tables[j].Name() + return tables[i] < tables[j] }) - assert.Equal(t, "table1", tables[0].Name()) - assert.Equal(t, "table2", tables[1].Name()) - assert.Equal(t, "table3", tables[2].Name()) + assert.Equal(t, "table1", tables[0]) + assert.Equal(t, "table2", tables[1]) + assert.Equal(t, "table3", tables[2]) err = tStore.Shutdown() assert.NoError(t, err) // Restarting with the same tables should work. - tStore, err = LevelDB.Start(logger, dbPath, "table1", "table2", "table3") + config = DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table1", "table2", "table3"} + tStore, err = Start(logger, config) assert.NoError(t, err) tables = tStore.GetTables() assert.Equal(t, 3, len(tables)) sort.SliceStable(tables, func(i, j int) bool { - return tables[i].Name() < tables[j].Name() + return tables[i] < tables[j] }) - assert.Equal(t, "table1", tables[0].Name()) - assert.Equal(t, "table2", tables[1].Name()) - assert.Equal(t, "table3", tables[2].Name()) + assert.Equal(t, "table1", tables[0]) + assert.Equal(t, "table2", tables[1]) + assert.Equal(t, "table3", tables[2]) err = tStore.Shutdown() assert.NoError(t, err) // Delete a table - - tStore, err = LevelDB.Start(logger, dbPath, "table1", "table3") + config = DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table1", "table3"} + tStore, err = Start(logger, config) assert.NoError(t, err) tables = tStore.GetTables() assert.Equal(t, 2, len(tables)) sort.SliceStable(tables, func(i, j int) bool { - return tables[i].Name() < tables[j].Name() + return tables[i] < tables[j] }) - assert.Equal(t, "table1", tables[0].Name()) + assert.Equal(t, "table1", tables[0]) err = tStore.Shutdown() assert.NoError(t, err) // Add a table back in (this uses a different code path) - tStore, err = LevelDB.Start(logger, dbPath, "table1", "table3", "table4") + config = DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table1", "table3", "table4"} + tStore, err = Start(logger, config) assert.NoError(t, err) tables = tStore.GetTables() assert.Equal(t, 3, len(tables)) sort.SliceStable(tables, func(i, j int) bool { - return tables[i].Name() < tables[j].Name() + return tables[i] < tables[j] }) - assert.Equal(t, "table1", tables[0].Name()) - assert.Equal(t, "table3", tables[1].Name()) - assert.Equal(t, "table4", tables[2].Name()) + assert.Equal(t, "table1", tables[0]) + assert.Equal(t, "table3", tables[1]) + assert.Equal(t, "table4", tables[2]) err = tStore.Shutdown() assert.NoError(t, err) // Delete the rest of the tables - tStore, err = LevelDB.Start(logger, dbPath) + config = DefaultLevelDBConfig(dbPath) + config.Schema = []string{} + tStore, err = Start(logger, config) assert.NoError(t, err) tables = tStore.GetTables() @@ -141,57 +155,64 @@ func TestUniqueKeySpace(t *testing.T) { logger, err := common.NewLogger(common.DefaultLoggerConfig()) assert.NoError(t, err) - store, err := MapStore.Start(logger, dbPath, "table1", "table2") + config := DefaultMapStoreConfig() + config.Schema = []string{"table1", "table2"} + store, err := Start(logger, config) assert.NoError(t, err) - table1, err := store.GetTable("table1") + kb1, err := store.GetKeyBuilder("table1") assert.NoError(t, err) - table2, err := store.GetTable("table2") + kb2, err := store.GetKeyBuilder("table2") assert.NoError(t, err) // Write to the tables - err = table1.Put([]byte("key1"), []byte("value1")) + err = store.Put(kb1.Key([]byte("key1")), []byte("value1")) assert.NoError(t, err) - err = table2.Put([]byte("key1"), []byte("value2")) + err = store.Put(kb2.Key([]byte("key1")), []byte("value2")) assert.NoError(t, err) - value, err := table1.Get([]byte("key1")) + value, err := store.Get(kb1.Key([]byte("key1"))) assert.NoError(t, err) assert.Equal(t, []byte("value1"), value) - value, err = table2.Get([]byte("key1")) + value, err = store.Get(kb2.Key([]byte("key1"))) assert.NoError(t, err) assert.Equal(t, []byte("value2"), value) // Delete a key from one table but not the other - err = table1.Delete([]byte("key1")) + err = store.Delete(kb1.Key([]byte("key1"))) assert.NoError(t, err) - _, err = table1.Get([]byte("key1")) + _, err = store.Get(kb1.Key([]byte("key1"))) assert.Equal(t, kvstore.ErrNotFound, err) - value, err = table2.Get([]byte("key1")) + value, err = store.Get(kb2.Key([]byte("key1"))) assert.NoError(t, err) assert.Equal(t, []byte("value2"), value) + + err = store.Destroy() + assert.NoError(t, err) } func TestBatchOperations(t *testing.T) { logger, err := common.NewLogger(common.DefaultLoggerConfig()) assert.NoError(t, err) - store, err := MapStore.Start(logger, dbPath, "table1", "table2", "table3") + config := DefaultMapStoreConfig() + config.Schema = []string{"table1", "table2", "table3"} + store, err := Start(logger, config) assert.NoError(t, err) - table1, err := store.GetTable("table1") + kb1, err := store.GetKeyBuilder("table1") assert.NoError(t, err) - table2, err := store.GetTable("table2") + kb2, err := store.GetKeyBuilder("table2") assert.NoError(t, err) - table3, err := store.GetTable("table3") + kb3, err := store.GetKeyBuilder("table3") assert.NoError(t, err) // Test a batch with just puts @@ -203,15 +224,15 @@ func TestBatchOperations(t *testing.T) { v := make([]byte, 8) binary.BigEndian.PutUint64(v, uint64(i)) - batch.Put(table1.TableKey(k), v) + batch.Put(kb1.Key(k), v) v = make([]byte, 8) binary.BigEndian.PutUint64(v, uint64(i+10)) - batch.Put(table2.TableKey(k), v) + batch.Put(kb2.Key(k), v) v = make([]byte, 8) binary.BigEndian.PutUint64(v, uint64(i+20)) - batch.Put(table3.TableKey(k), v) + batch.Put(kb3.Key(k), v) } err = batch.Apply() @@ -221,15 +242,15 @@ func TestBatchOperations(t *testing.T) { k := make([]byte, 8) binary.BigEndian.PutUint64(k, uint64(i)) - value, err := table1.Get(k) + value, err := store.Get(kb1.Key(k)) assert.NoError(t, err) assert.Equal(t, uint64(i), binary.BigEndian.Uint64(value)) - value, err = table2.Get(k) + value, err = store.Get(kb2.Key(k)) assert.NoError(t, err) assert.Equal(t, uint64(i+10), binary.BigEndian.Uint64(value)) - value, err = table3.Get(k) + value, err = store.Get(kb3.Key(k)) assert.NoError(t, err) assert.Equal(t, uint64(i+20), binary.BigEndian.Uint64(value)) } @@ -243,9 +264,9 @@ func TestBatchOperations(t *testing.T) { k := make([]byte, 8) binary.BigEndian.PutUint64(k, uint64(i)) - batch.Delete(table1.TableKey(k)) - batch.Delete(table2.TableKey(k)) - batch.Delete(table3.TableKey(k)) + batch.Delete(kb1.Key(k)) + batch.Delete(kb2.Key(k)) + batch.Delete(kb3.Key(k)) } } @@ -257,24 +278,24 @@ func TestBatchOperations(t *testing.T) { binary.BigEndian.PutUint64(k, uint64(i)) if i%2 == 1 { - _, err = table1.Get(k) + _, err = store.Get(kb1.Key(k)) assert.Equal(t, kvstore.ErrNotFound, err) - _, err = table2.Get(k) + _, err = store.Get(kb2.Key(k)) assert.Equal(t, kvstore.ErrNotFound, err) - _, err = table3.Get(k) + _, err = store.Get(kb3.Key(k)) assert.Equal(t, kvstore.ErrNotFound, err) } else { - value, err := table1.Get(k) + value, err := store.Get(kb1.Key(k)) assert.NoError(t, err) assert.Equal(t, uint64(i), binary.BigEndian.Uint64(value)) - value, err = table2.Get(k) + value, err = store.Get(kb2.Key(k)) assert.NoError(t, err) assert.Equal(t, uint64(i+10), binary.BigEndian.Uint64(value)) - value, err = table3.Get(k) + value, err = store.Get(kb3.Key(k)) assert.NoError(t, err) assert.Equal(t, uint64(i+20), binary.BigEndian.Uint64(value)) } @@ -288,21 +309,21 @@ func TestBatchOperations(t *testing.T) { k := make([]byte, 8) binary.BigEndian.PutUint64(k, uint64(i)) if i%4 == 0 { - batch.Delete(table1.TableKey(k)) - batch.Delete(table2.TableKey(k)) - batch.Delete(table3.TableKey(k)) + batch.Delete(kb1.Key(k)) + batch.Delete(kb2.Key(k)) + batch.Delete(kb3.Key(k)) } else if i%2 == 1 { v := make([]byte, 8) binary.BigEndian.PutUint64(v, uint64(2*i)) - batch.Put(table1.TableKey(k), v) + batch.Put(kb1.Key(k), v) v = make([]byte, 8) binary.BigEndian.PutUint64(v, uint64(2*i+10)) - batch.Put(table2.TableKey(k), v) + batch.Put(kb2.Key(k), v) v = make([]byte, 8) binary.BigEndian.PutUint64(v, uint64(2*i+20)) - batch.Put(table3.TableKey(k), v) + batch.Put(kb3.Key(k), v) } } @@ -314,36 +335,36 @@ func TestBatchOperations(t *testing.T) { binary.BigEndian.PutUint64(k, uint64(i)) if i%4 == 0 { - _, err = table1.Get(k) + _, err = store.Get(kb1.Key(k)) assert.Equal(t, kvstore.ErrNotFound, err) - _, err = table2.Get(k) + _, err = store.Get(kb2.Key(k)) assert.Equal(t, kvstore.ErrNotFound, err) - _, err = table3.Get(k) + _, err = store.Get(kb3.Key(k)) assert.Equal(t, kvstore.ErrNotFound, err) } else if i%2 == 1 { - val, err := table1.Get(k) + val, err := store.Get(kb1.Key(k)) assert.NoError(t, err) assert.Equal(t, uint64(2*i), binary.BigEndian.Uint64(val)) - val, err = table2.Get(k) + val, err = store.Get(kb2.Key(k)) assert.NoError(t, err) assert.Equal(t, uint64(2*i+10), binary.BigEndian.Uint64(val)) - val, err = table3.Get(k) + val, err = store.Get(kb3.Key(k)) assert.NoError(t, err) assert.Equal(t, uint64(2*i+20), binary.BigEndian.Uint64(val)) } else { - val, err := table1.Get(k) + val, err := store.Get(kb1.Key(k)) assert.NoError(t, err) assert.Equal(t, uint64(i), binary.BigEndian.Uint64(val)) - val, err = table2.Get(k) + val, err = store.Get(kb2.Key(k)) assert.NoError(t, err) assert.Equal(t, uint64(i+10), binary.BigEndian.Uint64(val)) - val, err = table3.Get(k) + val, err = store.Get(kb3.Key(k)) assert.NoError(t, err) assert.Equal(t, uint64(i+20), binary.BigEndian.Uint64(val)) } @@ -359,16 +380,18 @@ func TestDropTable(t *testing.T) { logger, err := common.NewLogger(common.DefaultLoggerConfig()) assert.NoError(t, err) - store, err := LevelDB.Start(logger, dbPath, "table1", "table2", "table3") + config := DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table1", "table2", "table3"} + store, err := Start(logger, config) assert.NoError(t, err) - table1, err := store.GetTable("table1") + kb1, err := store.GetKeyBuilder("table1") assert.NoError(t, err) - table2, err := store.GetTable("table2") + kb2, err := store.GetKeyBuilder("table2") assert.NoError(t, err) - table3, err := store.GetTable("table3") + kb3, err := store.GetKeyBuilder("table3") assert.NoError(t, err) // Insert some data into the tables @@ -379,13 +402,13 @@ func TestDropTable(t *testing.T) { k := make([]byte, 8) binary.BigEndian.PutUint64(k, uint64(i)) - err = table1.Put(k, value) + err = store.Put(kb1.Key(k), value) assert.NoError(t, err) - err = table2.Put(k, value) + err = store.Put(kb2.Key(k), value) assert.NoError(t, err) - err = table3.Put(k, value) + err = store.Put(kb3.Key(k), value) assert.NoError(t, err) } @@ -397,15 +420,15 @@ func TestDropTable(t *testing.T) { expectedValue := make([]byte, 8) binary.BigEndian.PutUint64(expectedValue, uint64(i)) - value, err := table1.Get(k) + value, err := store.Get(kb1.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue, value) - value, err = table2.Get(k) + value, err = store.Get(kb2.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue, value) - _, err = table3.Get(k) + _, err = store.Get(kb3.Key(k)) assert.NoError(t, err) } @@ -413,16 +436,18 @@ func TestDropTable(t *testing.T) { err = store.Shutdown() assert.NoError(t, err) - store, err = LevelDB.Start(logger, dbPath, "table1", "table3") + config = DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table1", "table3"} + store, err = Start(logger, config) assert.NoError(t, err) - table1, err = store.GetTable("table1") + kb1, err = store.GetKeyBuilder("table1") assert.NoError(t, err) - _, err = store.GetTable("table2") + _, err = store.GetKeyBuilder("table2") assert.Equal(t, kvstore.ErrTableNotFound, err) - table3, err = store.GetTable("table3") + kb3, err = store.GetKeyBuilder("table3") assert.NoError(t, err) for i := 0; i < 100; i++ { @@ -432,11 +457,11 @@ func TestDropTable(t *testing.T) { expectedValue := make([]byte, 8) binary.BigEndian.PutUint64(expectedValue, uint64(i)) - value, err := table1.Get(k) + value, err := store.Get(kb1.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue, value) - value, err = table3.Get(k) + value, err = store.Get(kb3.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue, value) } @@ -445,16 +470,18 @@ func TestDropTable(t *testing.T) { err = store.Shutdown() assert.NoError(t, err) - store, err = LevelDB.Start(logger, dbPath, "table3") + config = DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table3"} + store, err = Start(logger, config) assert.NoError(t, err) - _, err = store.GetTable("table1") + _, err = store.GetKeyBuilder("table1") assert.Equal(t, kvstore.ErrTableNotFound, err) - _, err = store.GetTable("table2") + _, err = store.GetKeyBuilder("table2") assert.Equal(t, kvstore.ErrTableNotFound, err) - table3, err = store.GetTable("table3") + kb3, err = store.GetKeyBuilder("table3") assert.NoError(t, err) for i := 0; i < 100; i++ { @@ -464,7 +491,7 @@ func TestDropTable(t *testing.T) { expectedValue := make([]byte, 8) binary.BigEndian.PutUint64(expectedValue, uint64(i)) - value, err := table3.Get(k) + value, err := store.Get(kb3.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue, value) } @@ -473,16 +500,18 @@ func TestDropTable(t *testing.T) { err = store.Shutdown() assert.NoError(t, err) - store, err = LevelDB.Start(logger, dbPath) + config = DefaultLevelDBConfig(dbPath) + config.Schema = []string{} + store, err = Start(logger, config) assert.NoError(t, err) - _, err = store.GetTable("table1") + _, err = store.GetKeyBuilder("table1") assert.Equal(t, kvstore.ErrTableNotFound, err) - _, err = store.GetTable("table2") + _, err = store.GetKeyBuilder("table2") assert.Equal(t, kvstore.ErrTableNotFound, err) - _, err = store.GetTable("table3") + _, err = store.GetKeyBuilder("table3") assert.Equal(t, kvstore.ErrTableNotFound, err) err = store.Destroy() @@ -497,22 +526,24 @@ func TestSimultaneousAddAndDrop(t *testing.T) { logger, err := common.NewLogger(common.DefaultLoggerConfig()) assert.NoError(t, err) - store, err := LevelDB.Start(logger, dbPath, "table1", "table2", "table3", "table4", "table5") + config := DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table1", "table2", "table3", "table4", "table5"} + store, err := Start(logger, config) assert.NoError(t, err) - table1, err := store.GetTable("table1") + kb1, err := store.GetKeyBuilder("table1") assert.NoError(t, err) - table2, err := store.GetTable("table2") + kb2, err := store.GetKeyBuilder("table2") assert.NoError(t, err) - table3, err := store.GetTable("table3") + kb3, err := store.GetKeyBuilder("table3") assert.NoError(t, err) - table4, err := store.GetTable("table4") + kb4, err := store.GetKeyBuilder("table4") assert.NoError(t, err) - table5, err := store.GetTable("table5") + kb5, err := store.GetKeyBuilder("table5") assert.NoError(t, err) // Insert some data into the tables @@ -523,19 +554,19 @@ func TestSimultaneousAddAndDrop(t *testing.T) { k := make([]byte, 8) binary.BigEndian.PutUint64(k, uint64(i)) - err = table1.Put(k, value) + err = store.Put(kb1.Key(k), value) assert.NoError(t, err) - err = table2.Put(k, value) + err = store.Put(kb2.Key(k), value) assert.NoError(t, err) - err = table3.Put(k, value) + err = store.Put(kb3.Key(k), value) assert.NoError(t, err) - err = table4.Put(k, value) + err = store.Put(kb4.Key(k), value) assert.NoError(t, err) - err = table5.Put(k, value) + err = store.Put(kb5.Key(k), value) assert.NoError(t, err) } @@ -547,23 +578,23 @@ func TestSimultaneousAddAndDrop(t *testing.T) { expectedValue := make([]byte, 8) binary.BigEndian.PutUint64(expectedValue, uint64(i)) - value, err := table1.Get(k) + value, err := store.Get(kb1.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue, value) - value, err = table2.Get(k) + value, err = store.Get(kb2.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue, value) - value, err = table3.Get(k) + value, err = store.Get(kb3.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue, value) - value, err = table4.Get(k) + value, err = store.Get(kb4.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue, value) - value, err = table5.Get(k) + value, err = store.Get(kb5.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue, value) } @@ -572,34 +603,36 @@ func TestSimultaneousAddAndDrop(t *testing.T) { err = store.Shutdown() assert.NoError(t, err) - store, err = LevelDB.Start(logger, dbPath, "table1", "table5", "table6", "table7", "table8", "table9") + config = DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table1", "table5", "table6", "table7", "table8", "table9"} + store, err = Start(logger, config) assert.NoError(t, err) - table1, err = store.GetTable("table1") + kb1, err = store.GetKeyBuilder("table1") assert.NoError(t, err) - _, err = store.GetTable("table2") + _, err = store.GetKeyBuilder("table2") assert.Equal(t, kvstore.ErrTableNotFound, err) - _, err = store.GetTable("table3") + _, err = store.GetKeyBuilder("table3") assert.Equal(t, kvstore.ErrTableNotFound, err) - _, err = store.GetTable("table4") + _, err = store.GetKeyBuilder("table4") assert.Equal(t, kvstore.ErrTableNotFound, err) - table5, err = store.GetTable("table5") + kb2, err = store.GetKeyBuilder("table5") assert.NoError(t, err) - table6, err := store.GetTable("table6") + table6, err := store.GetKeyBuilder("table6") assert.NoError(t, err) - table7, err := store.GetTable("table7") + table7, err := store.GetKeyBuilder("table7") assert.NoError(t, err) - table8, err := store.GetTable("table8") + table8, err := store.GetKeyBuilder("table8") assert.NoError(t, err) - table9, err := store.GetTable("table9") + table9, err := store.GetKeyBuilder("table9") assert.NoError(t, err) // Check data in the tables that were not dropped @@ -610,34 +643,22 @@ func TestSimultaneousAddAndDrop(t *testing.T) { expectedValue := make([]byte, 8) binary.BigEndian.PutUint64(expectedValue, uint64(i)) - value, err := table1.Get(k) + value, err := store.Get(kb1.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue, value) - value, err = table5.Get(k) + value, err = store.Get(kb5.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue, value) } - // Verify the table IDs. There should be no gaps, and the tables that did not get dropped should have the same IDs - // as before. - tableView1 := table1.(*tableView) - assert.Equal(t, uint32(0), tableView1.prefix) - - tableView5 := table5.(*tableView) - assert.Equal(t, uint32(4), tableView5.prefix) - - tableView6 := table6.(*tableView) - assert.Equal(t, uint32(1), tableView6.prefix) - - tableView7 := table7.(*tableView) - assert.Equal(t, uint32(2), tableView7.prefix) - - tableView8 := table8.(*tableView) - assert.Equal(t, uint32(3), tableView8.prefix) - - tableView9 := table9.(*tableView) - assert.Equal(t, uint32(5), tableView9.prefix) + // Verify the table IDs. + assert.Equal(t, uint32(0), getTableID(kb1)) + assert.Equal(t, uint32(4), getTableID(kb2)) + assert.Equal(t, uint32(5), getTableID(table6)) + assert.Equal(t, uint32(6), getTableID(table7)) + assert.Equal(t, uint32(7), getTableID(table8)) + assert.Equal(t, uint32(8), getTableID(table9)) err = store.Destroy() assert.NoError(t, err) @@ -645,17 +666,24 @@ func TestSimultaneousAddAndDrop(t *testing.T) { verifyDBIsDeleted(t) } +func getTableID(kb kvstore.KeyBuilder) uint32 { + prefix := kb.(*keyBuilder).prefix + return binary.BigEndian.Uint32(prefix) +} + func TestIteration(t *testing.T) { logger, err := common.NewLogger(common.DefaultLoggerConfig()) assert.NoError(t, err) - store, err := MapStore.Start(logger, dbPath, "table1", "table2") + config := DefaultMapStoreConfig() + config.Schema = []string{"table1", "table2"} + store, err := Start(logger, config) assert.NoError(t, err) - table1, err := store.GetTable("table1") + kb1, err := store.GetKeyBuilder("table1") assert.NoError(t, err) - table2, err := store.GetTable("table2") + kb2, err := store.GetKeyBuilder("table2") assert.NoError(t, err) // Prefix "qwer" @@ -664,12 +692,12 @@ func TestIteration(t *testing.T) { value := make([]byte, 8) binary.BigEndian.PutUint64(value, uint64(i)) - err = table1.Put(k, value) + err = store.Put(kb1.Key(k), value) assert.NoError(t, err) value = make([]byte, 8) binary.BigEndian.PutUint64(value, uint64(2*i)) - err = table2.Put(k, value) + err = store.Put(kb2.Key(k), value) assert.NoError(t, err) } @@ -679,17 +707,17 @@ func TestIteration(t *testing.T) { value := make([]byte, 8) binary.BigEndian.PutUint64(value, uint64(i)) - err = table1.Put(k, value) + err = store.Put(kb1.Key(k), value) assert.NoError(t, err) value = make([]byte, 8) binary.BigEndian.PutUint64(value, uint64(2*i)) - err = table2.Put(k, value) + err = store.Put(kb2.Key(k), value) assert.NoError(t, err) } // Iterate table 1 with no prefix filter - it, err := table1.NewIterator(nil) + it, err := store.NewTableIterator(kb1) assert.NoError(t, err) count := 0 @@ -721,7 +749,7 @@ func TestIteration(t *testing.T) { it.Release() // Iterate table 2 with no prefix filter - it, err = table2.NewIterator(nil) + it, err = store.NewTableIterator(kb2) assert.NoError(t, err) count = 0 @@ -753,7 +781,7 @@ func TestIteration(t *testing.T) { it.Release() // Iterate over the "qwer" keys from table 1 - it, err = table1.NewIterator([]byte("qwer")) + it, err = store.NewIterator(kb1.Key([]byte("qwer"))) assert.NoError(t, err) count = 0 @@ -773,7 +801,7 @@ func TestIteration(t *testing.T) { it.Release() // Iterate over the "asdf" keys from table 2 - it, err = table2.NewIterator([]byte("asdf")) + it, err = store.NewIterator(kb2.Key([]byte("asdf"))) assert.NoError(t, err) count = 0 @@ -802,13 +830,15 @@ func TestRestart(t *testing.T) { logger, err := common.NewLogger(common.DefaultLoggerConfig()) assert.NoError(t, err) - store, err := LevelDB.Start(logger, dbPath, "table1", "table2") + config := DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table1", "table2"} + store, err := Start(logger, config) assert.NoError(t, err) - table1, err := store.GetTable("table1") + kb1, err := store.GetKeyBuilder("table1") assert.NoError(t, err) - table2, err := store.GetTable("table2") + kb2, err := store.GetKeyBuilder("table2") assert.NoError(t, err) for i := 0; i < 100; i++ { @@ -821,22 +851,23 @@ func TestRestart(t *testing.T) { value2 := make([]byte, 8) binary.BigEndian.PutUint64(value2, uint64(i*2)) - err = table1.Put(k, value1) + err = store.Put(kb1.Key(k), value1) assert.NoError(t, err) - err = table2.Put(k, value2) + err = store.Put(kb2.Key(k), value2) assert.NoError(t, err) } err = store.Shutdown() assert.NoError(t, err) - store, err = LevelDB.Start(logger, dbPath, "table1", "table2") + // Restart the store + store, err = Start(logger, config) assert.NoError(t, err) - table1, err = store.GetTable("table1") + kb1, err = store.GetKeyBuilder("table1") assert.NoError(t, err) - table2, err = store.GetTable("table2") + kb2, err = store.GetKeyBuilder("table2") assert.NoError(t, err) for i := 0; i < 100; i++ { @@ -849,11 +880,11 @@ func TestRestart(t *testing.T) { expectedValue2 := make([]byte, 8) binary.BigEndian.PutUint64(expectedValue2, uint64(i*2)) - value1, err := table1.Get(k) + value1, err := store.Get(kb1.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue1, value1) - value2, err := table2.Get(k) + value2, err := store.Get(kb2.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue2, value2) } @@ -871,7 +902,7 @@ type expectedTableData map[string][]byte type expectedStoreData map[string]expectedTableData // getTableNameList returns a list of table names from a table map. -func getTableNameList(tableMap map[string]kvstore.Table) []string { +func getTableNameList(tableMap map[string]kvstore.KeyBuilder) []string { names := make([]string, 0, len(tableMap)) for name := range tableMap { names = append(names, name) @@ -887,10 +918,11 @@ func TestRandomOperations(t *testing.T) { logger, err := common.NewLogger(common.DefaultLoggerConfig()) assert.NoError(t, err) - store, err := LevelDB.Start(logger, dbPath) + config := DefaultLevelDBConfig(dbPath) + store, err := Start(logger, config) assert.NoError(t, err) - tables := make(map[string]kvstore.Table) + tables := make(map[string]kvstore.KeyBuilder) expectedData := make(expectedStoreData) for i := 0; i < 10000; i++ { @@ -902,11 +934,13 @@ func TestRandomOperations(t *testing.T) { err = store.Shutdown() assert.NoError(t, err) - store, err = LevelDB.Start(logger, dbPath, getTableNameList(tables)...) + config = DefaultLevelDBConfig(dbPath) + config.Schema = getTableNameList(tables) + store, err = Start(logger, config) assert.NoError(t, err) for tableName := range tables { - table, err := store.GetTable(tableName) + table, err := store.GetKeyBuilder(tableName) assert.NoError(t, err) tables[tableName] = table } @@ -920,7 +954,9 @@ func TestRandomOperations(t *testing.T) { name := tu.RandomString(8) tableNames = append(tableNames, name) - store, err = LevelDB.Start(logger, dbPath, tableNames...) + config = DefaultLevelDBConfig(dbPath) + config.Schema = tableNames + store, err = Start(logger, config) assert.NoError(t, err) expectedData[name] = make(expectedTableData) @@ -928,7 +964,7 @@ func TestRandomOperations(t *testing.T) { tables[name] = nil for tableName := range tables { - table, err := store.GetTable(tableName) + table, err := store.GetKeyBuilder(tableName) assert.NoError(t, err) tables[tableName] = table } @@ -945,14 +981,16 @@ func TestRandomOperations(t *testing.T) { } delete(tables, name) - store, err = LevelDB.Start(logger, dbPath, getTableNameList(tables)...) + config = DefaultLevelDBConfig(dbPath) + config.Schema = getTableNameList(tables) + store, err = Start(logger, config) assert.NoError(t, err) // Delete all expected data for the table delete(expectedData, name) for tableName := range tables { - table, err := store.GetTable(tableName) + table, err := store.GetKeyBuilder(tableName) assert.NoError(t, err) tables[tableName] = table } @@ -971,7 +1009,7 @@ func TestRandomOperations(t *testing.T) { expectedData[tableName][string(k)] = v - err = table.Put(k, v) + err = store.Put(table.Key(k), v) assert.NoError(t, err) } else { // Delete a value @@ -994,7 +1032,7 @@ func TestRandomOperations(t *testing.T) { } delete(expectedData[tableName], k) - err = table.Delete([]byte(k)) + err = store.Delete(table.Key([]byte(k))) assert.NoError(t, err) } @@ -1006,7 +1044,7 @@ func TestRandomOperations(t *testing.T) { for k := range tableData { expectedValue := tableData[k] - value, err := table.Get([]byte(k)) + value, err := store.Get(table.Key([]byte(k))) assert.NoError(t, err) assert.Equal(t, expectedValue, value) } @@ -1018,12 +1056,12 @@ func TestRandomOperations(t *testing.T) { assert.NoError(t, err) } -var _ kvstore.Store = &explodingStore{} +var _ kvstore.Store[[]byte] = &explodingStore{} // explodingStore is a store that returns an error after a certain number of operations. // Used to intentionally crash table deletion to exercise table deletion recovery. type explodingStore struct { - base kvstore.Store + base kvstore.Store[[]byte] deletionsRemaining int } @@ -1043,7 +1081,7 @@ func (e *explodingStore) Delete(key []byte) error { return e.base.Delete(key) } -func (e *explodingStore) NewBatch() kvstore.StoreBatch { +func (e *explodingStore) NewBatch() kvstore.Batch[[]byte] { panic("not used") } @@ -1065,13 +1103,15 @@ func TestInterruptedTableDeletion(t *testing.T) { logger, err := common.NewLogger(common.DefaultLoggerConfig()) assert.NoError(t, err) - store, err := LevelDB.Start(logger, dbPath, "table1", "table2") + config := DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table1", "table2"} + store, err := Start(logger, config) assert.NoError(t, err) - table1, err := store.GetTable("table1") + kb1, err := store.GetKeyBuilder("table1") assert.NoError(t, err) - table2, err := store.GetTable("table2") + kb2, err := store.GetKeyBuilder("table2") assert.NoError(t, err) // Write some data to the tables @@ -1082,10 +1122,10 @@ func TestInterruptedTableDeletion(t *testing.T) { value := make([]byte, 8) binary.BigEndian.PutUint64(value, uint64(i)) - err = table1.Put(k, value) + err = store.Put(kb1.Key(k), value) assert.NoError(t, err) - err = table2.Put(k, value) + err = store.Put(kb2.Key(k), value) assert.NoError(t, err) } @@ -1101,27 +1141,31 @@ func TestInterruptedTableDeletion(t *testing.T) { deletionsRemaining: 50, } - _, err = start(logger, explodingBase, true, "table2") + config = DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table2"} + _, err = start(logger, explodingBase, config) assert.Error(t, err) err = explodingBase.Shutdown() assert.NoError(t, err) // Restart the store. The table should be gone by the time the method returns. - store, err = LevelDB.Start(logger, dbPath, "table2") + config = DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table2"} + store, err = Start(logger, config) assert.NoError(t, err) tables := store.GetTables() assert.Equal(t, 1, len(tables)) - assert.Equal(t, "table2", tables[0].Name()) - table2, err = store.GetTable("table2") + assert.Equal(t, "table2", tables[0]) + kb2, err = store.GetKeyBuilder("table2") assert.NoError(t, err) // Check that the data in the remaining table is still there. We shouldn't see any data from the deleted table. for i := 0; i < 100; i++ { k := make([]byte, 8) binary.BigEndian.PutUint64(k, uint64(i)) - value, err := table2.Get(k) + value, err := store.Get(kb2.Key(k)) assert.NoError(t, err) assert.Equal(t, uint64(i), binary.BigEndian.Uint64(value)) } @@ -1138,13 +1182,15 @@ func TestLoadWithoutModifiedSchema(t *testing.T) { logger, err := common.NewLogger(common.DefaultLoggerConfig()) assert.NoError(t, err) - store, err := LevelDB.Start(logger, dbPath, "table1", "table2") + config := DefaultLevelDBConfig(dbPath) + config.Schema = []string{"table1", "table2"} + store, err := Start(logger, config) assert.NoError(t, err) - table1, err := store.GetTable("table1") + kb1, err := store.GetKeyBuilder("table1") assert.NoError(t, err) - table2, err := store.GetTable("table2") + kb2, err := store.GetKeyBuilder("table2") assert.NoError(t, err) for i := 0; i < 100; i++ { @@ -1157,22 +1203,24 @@ func TestLoadWithoutModifiedSchema(t *testing.T) { value2 := make([]byte, 8) binary.BigEndian.PutUint64(value2, uint64(i*2)) - err = table1.Put(k, value1) + err = store.Put(kb1.Key(k), value1) assert.NoError(t, err) - err = table2.Put(k, value2) + err = store.Put(kb2.Key(k), value2) assert.NoError(t, err) } err = store.Shutdown() assert.NoError(t, err) - store, err = LevelDB.Load(logger, dbPath) + // Load the store without the schema + config = DefaultLevelDBConfig(dbPath) + store, err = Start(logger, config) assert.NoError(t, err) - table1, err = store.GetTable("table1") + kb1, err = store.GetKeyBuilder("table1") assert.NoError(t, err) - table2, err = store.GetTable("table2") + kb2, err = store.GetKeyBuilder("table2") assert.NoError(t, err) for i := 0; i < 100; i++ { @@ -1185,11 +1233,11 @@ func TestLoadWithoutModifiedSchema(t *testing.T) { expectedValue2 := make([]byte, 8) binary.BigEndian.PutUint64(expectedValue2, uint64(i*2)) - value1, err := table1.Get(k) + value1, err := store.Get(kb1.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue1, value1) - value2, err := table2.Get(k) + value2, err := store.Get(kb2.Key(k)) assert.NoError(t, err) assert.Equal(t, expectedValue2, value2) } diff --git a/common/kvstore/tablestore/table_view.go b/common/kvstore/tablestore/table_view.go deleted file mode 100644 index 62791870cd..0000000000 --- a/common/kvstore/tablestore/table_view.go +++ /dev/null @@ -1,183 +0,0 @@ -package tablestore - -import ( - "encoding/binary" - "github.com/Layr-Labs/eigenda/common/kvstore" - "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/util" -) - -var _ kvstore.Table = &tableView{} - -// tableView allows data in a table to be accessed as if it were a Store. -type tableView struct { - // base is the underlying store. - base kvstore.Store - // name is the name of the table. - name string - // prefix is the prefix for all keys in the table. - prefix uint32 -} - -// NewTableView creates a new view into a table in a New. -func newTableView( - base kvstore.Store, - name string, - prefix uint32) kvstore.Table { - - return &tableView{ - base: base, - name: name, - prefix: prefix, - } -} - -// Name returns the name of the table. -func (t *tableView) Name() string { - return t.name -} - -// Put adds a key-value pair to the table. -func (t *tableView) Put(key []byte, value []byte) error { - if value == nil { - value = []byte{} - } - - k := t.TableKey(key) - return t.base.Put(k, value) -} - -// Get retrieves a value from the table. -func (t *tableView) Get(key []byte) ([]byte, error) { - k := t.TableKey(key) - return t.base.Get(k) -} - -// Delete removes a key-value pair from the table. -func (t *tableView) Delete(key []byte) error { - k := t.TableKey(key) - return t.base.Delete(k) -} - -// iteratorWrapper wraps the base iterator to iterate only over keys in the table. -type iteratorWrapper struct { - base iterator.Iterator -} - -func (i *iteratorWrapper) First() bool { - return i.base.First() -} - -func (i *iteratorWrapper) Last() bool { - return i.base.Last() -} - -func (i *iteratorWrapper) Seek(key []byte) bool { - return i.base.Seek(key) -} - -func (i *iteratorWrapper) Next() bool { - return i.base.Next() -} - -func (i *iteratorWrapper) Prev() bool { - return i.base.Prev() -} - -func (i *iteratorWrapper) Release() { - i.base.Release() -} - -func (i *iteratorWrapper) SetReleaser(releaser util.Releaser) { - i.base.SetReleaser(releaser) -} - -func (i *iteratorWrapper) Valid() bool { - return i.base.Valid() -} - -func (i *iteratorWrapper) Error() error { - return i.base.Error() -} - -func (i *iteratorWrapper) Key() []byte { - baseKey := i.base.Key() - return baseKey[4:] -} - -func (i *iteratorWrapper) Value() []byte { - return i.base.Value() -} - -// NewIterator creates a new iterator. Only keys prefixed with the given prefix will be iterated. -func (t *tableView) NewIterator(prefix []byte) (iterator.Iterator, error) { - if prefix == nil { - prefix = []byte{} - } - - it, err := t.base.NewIterator(t.TableKey(prefix)) - if err != nil { - return nil, err - } - - return &iteratorWrapper{ - base: it, - }, nil -} - -// Shutdown shuts down the table. -func (t *tableView) Shutdown() error { - return t.base.Shutdown() -} - -// Destroy shuts down a table and deletes all data in it. -func (t *tableView) Destroy() error { - return t.base.Destroy() -} - -// tableBatch is a batch for a table in a New. -type tableBatch struct { - table kvstore.Table - batch kvstore.StoreBatch -} - -// Put schedules a key-value pair to be added to the table. -func (t *tableBatch) Put(key []byte, value []byte) { - if value == nil { - value = []byte{} - } - k := t.table.TableKey(key) - t.batch.Put(k, value) -} - -// Delete schedules a key-value pair to be removed from the table. -func (t *tableBatch) Delete(key []byte) { - k := t.table.TableKey(key) - t.batch.Delete(k) -} - -// Apply applies the batch to the table. -func (t *tableBatch) Apply() error { - return t.batch.Apply() -} - -// Size returns the number of operations in the batch. -func (t *tableBatch) Size() uint32 { - return t.batch.Size() -} - -// NewBatch creates a new batch for the table. -func (t *tableView) NewBatch() kvstore.StoreBatch { - return &tableBatch{ - table: t, - batch: t.base.NewBatch(), - } -} - -// TableKey creates a key scoped to this table. -func (t *tableView) TableKey(key []byte) kvstore.TableKey { - result := make([]byte, 4+len(key)) - binary.BigEndian.PutUint32(result, t.prefix) - copy(result[4:], key) - return result -} diff --git a/common/kvstore/ttl/ttl_wrapper_test.go b/common/kvstore/tablestore/ttl_test.go similarity index 51% rename from common/kvstore/ttl/ttl_wrapper_test.go rename to common/kvstore/tablestore/ttl_test.go index 37b850a584..136fa90cbe 100644 --- a/common/kvstore/ttl/ttl_wrapper_test.go +++ b/common/kvstore/tablestore/ttl_test.go @@ -1,9 +1,7 @@ -package ttl +package tablestore import ( - "context" "github.com/Layr-Labs/eigenda/common" - "github.com/Layr-Labs/eigenda/common/kvstore/mapstore" tu "github.com/Layr-Labs/eigenda/common/testutils" "github.com/stretchr/testify/assert" "math/rand" @@ -18,8 +16,8 @@ func TestExpiryKeyParsing(t *testing.T) { for i := 0; i < 1000; i++ { key := tu.RandomBytes(rand.Intn(100)) expiryTime := tu.RandomTime() - expiryKey := buildExpiryKey(key, expiryTime) - parsedKey, parsedExpiryTime := parseExpiryKey(expiryKey) + expiryKey := prependTimestamp(expiryTime, key) + parsedExpiryTime, parsedKey := parsePrependedTimestamp(expiryKey) assert.Equal(t, key, parsedKey) assert.Equal(t, expiryTime, parsedExpiryTime) } @@ -27,8 +25,8 @@ func TestExpiryKeyParsing(t *testing.T) { // Try a very large key. key := tu.RandomBytes(100) expiryTime := time.Unix(0, 1<<62-1) - expiryKey := buildExpiryKey(key, expiryTime) - parsedKey, parsedExpiryTime := parseExpiryKey(expiryKey) + expiryKey := prependTimestamp(expiryTime, key) + parsedExpiryTime, parsedKey := parsePrependedTimestamp(expiryKey) assert.Equal(t, key, parsedKey) assert.Equal(t, expiryTime, parsedExpiryTime) } @@ -40,14 +38,14 @@ func TestExpiryKeyOrdering(t *testing.T) { for i := 0; i < 1000; i++ { expiryTime := tu.RandomTime() - expiryKey := buildExpiryKey(tu.RandomBytes(10), expiryTime) + expiryKey := prependTimestamp(expiryTime, tu.RandomBytes(10)) expiryKeys = append(expiryKeys, expiryKey) } // Add some keys with very large expiry times. for i := 0; i < 1000; i++ { expiryTime := tu.RandomTime().Add(time.Duration(1<<62 - 1)) - expiryKey := buildExpiryKey(tu.RandomBytes(10), expiryTime) + expiryKey := prependTimestamp(expiryTime, tu.RandomBytes(10)) expiryKeys = append(expiryKeys, expiryKey) } @@ -61,8 +59,8 @@ func TestExpiryKeyOrdering(t *testing.T) { a := expiryKeys[i-1] b := expiryKeys[i] - _, aTime := parseExpiryKey(a) - _, bTime := parseExpiryKey(b) + aTime, _ := parsePrependedTimestamp(a) + bTime, _ := parsePrependedTimestamp(b) assert.True(t, aTime.Before(bTime)) } @@ -74,12 +72,14 @@ func TestRandomDataExpired(t *testing.T) { logger, err := common.NewLogger(common.DefaultLoggerConfig()) assert.NoError(t, err) - baseStore := mapstore.NewStore() - store := ttlStore{ - store: baseStore, - ctx: context.Background(), - logger: logger, - } + config := DefaultMapStoreConfig() + config.Schema = []string{"test"} + config.GarbageCollectionEnabled = false + tStore, err := Start(logger, config) + assert.NoError(t, err) + + kb, err := tStore.GetKeyBuilder("test") + assert.NoError(t, err) data := make(map[string][]byte) expiryTimes := make(map[string]time.Time) @@ -98,7 +98,7 @@ func TestRandomDataExpired(t *testing.T) { data[stringifiedKey] = value expiryTimes[stringifiedKey] = expiryTime - err := store.PutWithExpiration(key, value, expiryTime) + err := tStore.PutWithExpiration(kb.Key(key), value, expiryTime) assert.NoError(t, err) } @@ -110,7 +110,7 @@ func TestRandomDataExpired(t *testing.T) { elapsedSeconds := rand.Intn(simulatedSeconds / 10) currentTime = currentTime.Add(time.Duration(elapsedSeconds) * time.Second) - err = store.expireKeys(currentTime) + err = (tStore.(*tableStore)).expireKeys(currentTime, uint32(1024)) assert.NoError(t, err) for key := range data { @@ -118,17 +118,20 @@ func TestRandomDataExpired(t *testing.T) { expired := !currentTime.Before(keyExpirationTime) if expired { - value, err := store.Get([]byte(key)) + value, err := tStore.Get(kb.Key([]byte(key))) assert.Error(t, err) assert.Nil(t, value) } else { - value, err := store.Get([]byte(key)) + value, err := tStore.Get(kb.Key([]byte(key))) assert.NoError(t, err) expectedValue := data[key] assert.Equal(t, expectedValue, value) } } } + + err = tStore.Shutdown() + assert.NoError(t, err) } func TestBatchRandomDataExpired(t *testing.T) { @@ -137,12 +140,14 @@ func TestBatchRandomDataExpired(t *testing.T) { logger, err := common.NewLogger(common.DefaultLoggerConfig()) assert.NoError(t, err) - baseStore := mapstore.NewStore() - store := ttlStore{ - store: baseStore, - ctx: context.Background(), - logger: logger, - } + config := DefaultMapStoreConfig() + config.Schema = []string{"test"} + config.GarbageCollectionEnabled = false + tStore, err := Start(logger, config) + assert.NoError(t, err) + + kb, err := tStore.GetKeyBuilder("test") + assert.NoError(t, err) data := make(map[string][]byte) expiryTimes := make(map[string]time.Time) @@ -156,22 +161,21 @@ func TestBatchRandomDataExpired(t *testing.T) { expiryTime := startingTime.Add(time.Duration(rand.Intn(simulatedSeconds)) * time.Second) - keys := make([][]byte, 0) - values := make([][]byte, 0) + batch := tStore.NewTTLBatch() // Generate a batch of random data for j := 0; j < 10; j++ { key := tu.RandomBytes(10) - keys = append(keys, key) stringifiedKey := string(key) value := tu.RandomBytes(10) - values = append(values, value) + + batch.PutWithExpiration(kb.Key(key), value, expiryTime) data[stringifiedKey] = value expiryTimes[stringifiedKey] = expiryTime } - err := store.PutBatchWithExpiration(keys, values, expiryTime) + err := batch.Apply() assert.NoError(t, err) } @@ -183,7 +187,7 @@ func TestBatchRandomDataExpired(t *testing.T) { elapsedSeconds := rand.Intn(simulatedSeconds / 10) currentTime = currentTime.Add(time.Duration(elapsedSeconds) * time.Second) - err = store.expireKeys(currentTime) + err = (tStore.(*tableStore)).expireKeys(currentTime, 1024) assert.NoError(t, err) for key := range data { @@ -191,32 +195,135 @@ func TestBatchRandomDataExpired(t *testing.T) { expired := !currentTime.Before(keyExpirationTime) if expired { - value, err := store.Get([]byte(key)) + value, err := tStore.Get(kb.Key([]byte(key))) assert.Error(t, err) assert.Nil(t, value) } else { - value, err := store.Get([]byte(key)) + value, err := tStore.Get(kb.Key([]byte(key))) assert.NoError(t, err) expectedValue := data[key] assert.Equal(t, expectedValue, value) } } } + + err = tStore.Shutdown() + assert.NoError(t, err) } -func TestBigBatchOfDeletions(t *testing.T) { +func TestMultiTableBatchRandomDataExpired(t *testing.T) { tu.InitializeRandom() logger, err := common.NewLogger(common.DefaultLoggerConfig()) assert.NoError(t, err) - baseStore := mapstore.NewStore() - store := ttlStore{ - store: baseStore, - ctx: context.Background(), - logger: logger, + config := DefaultMapStoreConfig() + config.Schema = []string{"test1", "test2", "test3"} + config.GarbageCollectionEnabled = false + tStore, err := Start(logger, config) + assert.NoError(t, err) + + keyBuilders := tStore.GetKeyBuilders() + + type tableData map[string][]byte + data := make(map[string] /* table name */ tableData) + for _, kb := range keyBuilders { + data[kb.TableName()] = make(tableData) + } + expiryTimes := make(map[string] /* fully qualified table key */ time.Time) + + startingTime := tu.RandomTime() + simulatedSeconds := 1000 + endingTime := startingTime.Add(time.Duration(simulatedSeconds) * time.Second) + + // Generate some random data + for i := 0; i < 100; i++ { + + expiryTime := startingTime.Add(time.Duration(rand.Intn(simulatedSeconds)) * time.Second) + + batch := tStore.NewTTLBatch() + + // Generate a batch of random data + for j := 0; j < 10; j++ { + + tableIndex := rand.Intn(len(keyBuilders)) + kb := keyBuilders[tableIndex] + + key := tu.RandomBytes(10) + stringifiedKey := string(key) + + fullyQualifiedKey := kb.Key(key) + stringifiedFullyQualifiedKey := string(fullyQualifiedKey.Raw()) + + value := tu.RandomBytes(10) + + batch.PutWithExpiration(fullyQualifiedKey, value, expiryTime) + + data[kb.TableName()][stringifiedKey] = value + expiryTimes[stringifiedFullyQualifiedKey] = expiryTime + } + + err := batch.Apply() + assert.NoError(t, err) } + currentTime := startingTime + + // Simulate time passing + for currentTime.Before(endingTime) { + + elapsedSeconds := rand.Intn(simulatedSeconds / 10) + currentTime = currentTime.Add(time.Duration(elapsedSeconds) * time.Second) + + err = (tStore.(*tableStore)).expireKeys(currentTime, 1024) + assert.NoError(t, err) + + for tableName := range data { + for stringifiedKey := range data[tableName] { + + key := []byte(stringifiedKey) + expectedValue := data[tableName][stringifiedKey] + + kb, err := tStore.GetKeyBuilder(tableName) + assert.NoError(t, err) + + fullyQualifiedKey := kb.Key(key) + + keyExpirationTime := expiryTimes[string(fullyQualifiedKey.Raw())] + expired := !currentTime.Before(keyExpirationTime) + + if expired { + value, err := tStore.Get(kb.Key(key)) + assert.Error(t, err) + assert.Nil(t, value) + } else { + value, err := tStore.Get(kb.Key(key)) + assert.NoError(t, err) + assert.Equal(t, expectedValue, value) + } + } + } + } + + err = tStore.Shutdown() + assert.NoError(t, err) +} + +func TestBigBatchOfDeletions(t *testing.T) { + tu.InitializeRandom() + + logger, err := common.NewLogger(common.DefaultLoggerConfig()) + assert.NoError(t, err) + + config := DefaultMapStoreConfig() + config.Schema = []string{"test"} + config.GarbageCollectionEnabled = false + tStore, err := Start(logger, config) + assert.NoError(t, err) + + kb, err := tStore.GetKeyBuilder("test") + assert.NoError(t, err) + data := make(map[string][]byte) expiryTimes := make(map[string]time.Time) @@ -233,7 +340,7 @@ func TestBigBatchOfDeletions(t *testing.T) { data[stringifiedKey] = value expiryTimes[stringifiedKey] = expiryTime - err = store.PutWithExpiration(key, value, expiryTime) + err = tStore.PutWithExpiration(kb.Key(key), value, expiryTime) assert.NoError(t, err) } @@ -241,13 +348,16 @@ func TestBigBatchOfDeletions(t *testing.T) { elapsedSeconds := simulatedSeconds * 2 currentTime := startingTime.Add(time.Duration(elapsedSeconds) * time.Second) - err = store.expireKeys(currentTime) + err = (tStore.(*tableStore)).expireKeys(currentTime, 1024) assert.NoError(t, err) // All keys should be expired for key := range data { - value, err := store.Get([]byte(key)) + value, err := tStore.Get(kb.Key([]byte(key))) assert.Error(t, err) assert.Nil(t, value) } + + err = tStore.Shutdown() + assert.NoError(t, err) } diff --git a/common/kvstore/tablestore/util.go b/common/kvstore/tablestore/util.go new file mode 100644 index 0000000000..815c6931f1 --- /dev/null +++ b/common/kvstore/tablestore/util.go @@ -0,0 +1,30 @@ +package tablestore + +import ( + "encoding/binary" + "time" +) + +// prependTimestamp prepends the given timestamp to the given base byte slice. The timestamp is +// stored as an 8-byte big-endian integer. +func prependTimestamp( + timestamp time.Time, + baseValue []byte) []byte { + + result := make([]byte, len(baseValue)+8) + unixNano := timestamp.UnixNano() + binary.BigEndian.PutUint64(result, uint64(unixNano)) + + copy(result[8:], baseValue) + + return result +} + +// parsePrependedTimestamp extracts the timestamp and base key from the given byte slice. This method +// is the inverse of prependTimestamp. +func parsePrependedTimestamp(data []byte) (timestamp time.Time, baseValue []byte) { + expiryUnixNano := int64(binary.BigEndian.Uint64(data)) + timestamp = time.Unix(0, expiryUnixNano) + baseValue = data[8:] + return timestamp, baseValue +} diff --git a/common/kvstore/test/store_performance_test.go b/common/kvstore/test/store_performance_test.go index 8505cbde20..a1602fe91d 100644 --- a/common/kvstore/test/store_performance_test.go +++ b/common/kvstore/test/store_performance_test.go @@ -13,7 +13,7 @@ import ( "time" ) -func writeThenReadBenchmark(b *testing.B, store kvstore.Store) { +func writeThenReadBenchmark(b *testing.B, store kvstore.Store[[]byte]) { keySize := 8 valueSize := 1024 diff --git a/common/kvstore/test/store_test.go b/common/kvstore/test/store_test.go index 033c362c70..6e0855ecac 100644 --- a/common/kvstore/test/store_test.go +++ b/common/kvstore/test/store_test.go @@ -1,13 +1,11 @@ package test import ( - "context" "github.com/Layr-Labs/eigenda/common" "github.com/Layr-Labs/eigenda/common/kvstore" "github.com/Layr-Labs/eigenda/common/kvstore/leveldb" "github.com/Layr-Labs/eigenda/common/kvstore/mapstore" "github.com/Layr-Labs/eigenda/common/kvstore/tablestore" - "github.com/Layr-Labs/eigenda/common/kvstore/ttl" tu "github.com/Layr-Labs/eigenda/common/testutils" "github.com/Layr-Labs/eigensdk-go/logging" "github.com/stretchr/testify/assert" @@ -17,45 +15,30 @@ import ( ) // A list of builders for various stores to be tested. -var storeBuilders = []func(logger logging.Logger, path string) (kvstore.Store, error){ - func(logger logging.Logger, path string) (kvstore.Store, error) { +var storeBuilders = []func(logger logging.Logger, path string) (kvstore.Store[[]byte], error){ + func(logger logging.Logger, path string) (kvstore.Store[[]byte], error) { return mapstore.NewStore(), nil }, - - func(logger logging.Logger, path string) (kvstore.Store, error) { - return ttl.TTLWrapper(context.Background(), logger, mapstore.NewStore(), 0), nil - }, - func(logger logging.Logger, path string) (kvstore.Store, error) { + func(logger logging.Logger, path string) (kvstore.Store[[]byte], error) { return leveldb.NewStore(logger, path) }, - func(logger logging.Logger, path string) (kvstore.Store, error) { - store, err := leveldb.NewStore(logger, path) - if err != nil { - return nil, err - } - return ttl.TTLWrapper(context.Background(), logger, store, 0), nil - }, - func(logger logging.Logger, path string) (kvstore.Store, error) { - tableStore, err := tablestore.MapStore.Start(logger, path, "test") + func(logger logging.Logger, path string) (kvstore.Store[[]byte], error) { + config := tablestore.DefaultMapStoreConfig() + config.Schema = []string{"test"} + tableStore, err := tablestore.Start(logger, config) if err != nil { return nil, err } - store, err := tableStore.GetTable("test") - if err != nil { - return nil, err - } - return store, nil + return NewTableAsAStore(tableStore) }, - func(logger logging.Logger, path string) (kvstore.Store, error) { - tableStore, err := tablestore.LevelDB.Start(logger, path, "test") - if err != nil { - return nil, err - } - store, err := tableStore.GetTable("test") + func(logger logging.Logger, path string) (kvstore.Store[[]byte], error) { + config := tablestore.DefaultLevelDBConfig(path) + config.Schema = []string{"test"} + tableStore, err := tablestore.Start(logger, config) if err != nil { return nil, err } - return store, nil + return NewTableAsAStore(tableStore) }, } @@ -71,7 +54,7 @@ func verifyDBIsDeleted(t *testing.T) { assert.True(t, os.IsNotExist(err)) } -func randomOperationsTest(t *testing.T, store kvstore.Store) { +func randomOperationsTest(t *testing.T, store kvstore.Store[[]byte]) { tu.InitializeRandom() deleteDBDirectory(t) @@ -155,7 +138,7 @@ func TestRandomOperations(t *testing.T) { } } -func writeBatchTest(t *testing.T, store kvstore.Store) { +func writeBatchTest(t *testing.T, store kvstore.Store[[]byte]) { tu.InitializeRandom() deleteDBDirectory(t) @@ -222,7 +205,7 @@ func TestWriteBatch(t *testing.T) { } } -func deleteBatchTest(t *testing.T, store kvstore.Store) { +func deleteBatchTest(t *testing.T, store kvstore.Store[[]byte]) { tu.InitializeRandom() deleteDBDirectory(t) @@ -288,7 +271,7 @@ func TestDeleteBatch(t *testing.T) { } } -func iterationTest(t *testing.T, store kvstore.Store) { +func iterationTest(t *testing.T, store kvstore.Store[[]byte]) { tu.InitializeRandom() deleteDBDirectory(t) @@ -340,7 +323,7 @@ func TestIteration(t *testing.T) { } } -func iterationWithPrefixTest(t *testing.T, store kvstore.Store) { +func iterationWithPrefixTest(t *testing.T, store kvstore.Store[[]byte]) { tu.InitializeRandom() deleteDBDirectory(t) @@ -426,7 +409,7 @@ func TestIterationWithPrefix(t *testing.T) { } } -func putNilTest(t *testing.T, store kvstore.Store) { +func putNilTest(t *testing.T, store kvstore.Store[[]byte]) { tu.InitializeRandom() deleteDBDirectory(t) diff --git a/common/kvstore/test/table_as_a_store.go b/common/kvstore/test/table_as_a_store.go new file mode 100644 index 0000000000..86daeba2a1 --- /dev/null +++ b/common/kvstore/test/table_as_a_store.go @@ -0,0 +1,81 @@ +package test + +import ( + "github.com/Layr-Labs/eigenda/common/kvstore" + "github.com/syndtr/goleveldb/leveldb/iterator" +) + +var _ kvstore.Store[[]byte] = &tableAsAStore{} + +// tableAsAStore allows a TableStore to masquerade as a Store. Useful for reusing unit tests that operate on a Store +// against a TableStore. Base TableStore is required to have a table called "test". +type tableAsAStore struct { + tableStore kvstore.TableStore + kb kvstore.KeyBuilder +} + +func NewTableAsAStore(tableStore kvstore.TableStore) (kvstore.Store[[]byte], error) { + kb, err := tableStore.GetKeyBuilder("test") + if err != nil { + return nil, err + } + + return &tableAsAStore{ + tableStore: tableStore, + kb: kb, + }, nil +} + +func (t *tableAsAStore) Put(k []byte, value []byte) error { + return t.tableStore.Put(t.kb.Key(k), value) +} + +func (t *tableAsAStore) Get(k []byte) ([]byte, error) { + return t.tableStore.Get(t.kb.Key(k)) +} + +func (t *tableAsAStore) Delete(k []byte) error { + return t.tableStore.Delete(t.kb.Key(k)) +} + +func (t *tableAsAStore) NewBatch() kvstore.Batch[[]byte] { + return &wrappedTableStoreBatch{ + tableStoreBatch: t.tableStore.NewBatch(), + kb: t.kb, + } +} + +func (t *tableAsAStore) NewIterator(prefix []byte) (iterator.Iterator, error) { + return t.tableStore.NewIterator(t.kb.Key(prefix)) +} + +func (t *tableAsAStore) Shutdown() error { + return t.tableStore.Shutdown() +} + +func (t *tableAsAStore) Destroy() error { + return t.tableStore.Destroy() +} + +var _ kvstore.Batch[[]byte] = &wrappedTableStoreBatch{} + +type wrappedTableStoreBatch struct { + tableStoreBatch kvstore.Batch[kvstore.Key] + kb kvstore.KeyBuilder +} + +func (w *wrappedTableStoreBatch) Put(k []byte, value []byte) { + w.tableStoreBatch.Put(w.kb.Key(k), value) +} + +func (w *wrappedTableStoreBatch) Delete(k []byte) { + w.tableStoreBatch.Delete(w.kb.Key(k)) +} + +func (w *wrappedTableStoreBatch) Apply() error { + return w.tableStoreBatch.Apply() +} + +func (w *wrappedTableStoreBatch) Size() uint32 { + return w.tableStoreBatch.Size() +} diff --git a/common/kvstore/ttl/ttl_wrapper.go b/common/kvstore/ttl/ttl_wrapper.go deleted file mode 100644 index 0883b962e1..0000000000 --- a/common/kvstore/ttl/ttl_wrapper.go +++ /dev/null @@ -1,313 +0,0 @@ -package ttl - -import ( - "context" - "encoding/binary" - "fmt" - "github.com/Layr-Labs/eigenda/common/kvstore" - "github.com/Layr-Labs/eigensdk-go/logging" - "github.com/syndtr/goleveldb/leveldb/iterator" - "github.com/syndtr/goleveldb/leveldb/util" - "time" -) - -var _ kvstore.Store = &ttlStore{} - -// ttlStore adds a time-to-live (TTL) capability to the store. -// -// This store utilizes the properties of store iteration. Namely, that the keys are returned in lexicographical order, -// as well as the ability to filter keys by prefix. "Regular" keys are stored in the store with a prefix "k", while -// special expiry keys are stored with a prefix "e". The expiry key also contains the expiry time in hexadecimal format, -// such that when iterating over expiry keys in lexicographical order, the keys are ordered by expiry time. The value -// each expiry key points to is the regular key that is to be deleted when the expiry time is reached. In order to -// efficiently delete expired keys, the expiry keys must be iterated over periodically to find and delete expired keys. -type ttlStore struct { - store kvstore.Store - ctx context.Context - cancel context.CancelFunc - - logger logging.Logger -} - -// TTLWrapper extends the given store with TTL capabilities. Periodically checks for expired keys and deletes them -// with a period of gcPeriod. If gcPeriod is 0, no background goroutine is spawned to check for expired keys. -// -// Note: it is unsafe to access the wrapped store directly while the TTLStore is in use. The TTLStore uses special -// key formatting, and direct access to the wrapped store may violate the TTLStore's invariants, resulting in -// undefined behavior. -func TTLWrapper( - ctx context.Context, - logger logging.Logger, - store kvstore.Store, - gcPeriod time.Duration) kvstore.TTLStore { - - ctx, cancel := context.WithCancel(ctx) - - ttlStore := &ttlStore{ - store: store, - ctx: ctx, - cancel: cancel, - logger: logger, - } - if gcPeriod > 0 { - ttlStore.expireKeysInBackground(gcPeriod) - } - return ttlStore -} - -var keyPrefix = []byte("k") -var expiryPrefix = []byte("e") -var maxDeletionBatchSize uint32 = 1024 - -// PutWithTTL adds a key-value pair to the store that expires after a specified time-to-live (TTL). -// Key is eventually deleted after the TTL elapses. -func (store *ttlStore) PutWithTTL(key []byte, value []byte, ttl time.Duration) error { - expiryTime := time.Now().Add(ttl) - return store.PutWithExpiration(key, value, expiryTime) -} - -// PutBatchWithTTL adds multiple key-value pairs to the store that expire after a specified time-to-live (TTL). -func (store *ttlStore) PutBatchWithTTL(keys [][]byte, values [][]byte, ttl time.Duration) error { - expiryTime := time.Now().Add(ttl) - return store.PutBatchWithExpiration(keys, values, expiryTime) -} - -// buildExpiryKey creates an expiry key from the given expiry time. -// The expiry key is composed of the following 3 components appended one after the other: -// - a one byte "e" prefix -// - the expiry time in hexadecimal format (8 bytes) -// - and the base key. -func buildExpiryKey( - baseKey []byte, - expiryTime time.Time) []byte { - - expiryKeyLength := 1 /* prefix */ + 8 /* expiry timestamp */ + len(baseKey) - expiryKey := make([]byte, expiryKeyLength) - - expiryKey[0] = 'e' - expiryUnixNano := expiryTime.UnixNano() - binary.BigEndian.PutUint64(expiryKey[1:], uint64(expiryUnixNano)) - - copy(expiryKey[9:], baseKey) - - return expiryKey -} - -// parseExpiryKey extracts the expiry time and base key from the given expiry key. -func parseExpiryKey(expiryKey []byte) (baseKey []byte, expiryTime time.Time) { - expiryUnixNano := int64(binary.BigEndian.Uint64(expiryKey[1:])) - expiryTime = time.Unix(0, expiryUnixNano) - - baseKey = expiryKey[9:] - return -} - -// PutWithExpiration adds a key-value pair to the store that expires at a specified time. -// Key is eventually deleted after the expiry time. -func (store *ttlStore) PutWithExpiration(key []byte, value []byte, expiryTime time.Time) error { - batch := store.store.NewBatch() - - prefixedKey := append(keyPrefix, key...) - batch.Put(prefixedKey, value) - batch.Put(buildExpiryKey(key, expiryTime), nil) - - return batch.Apply() -} - -// PutBatchWithExpiration adds multiple key-value pairs to the store that expire at a specified time. -func (store *ttlStore) PutBatchWithExpiration(keys [][]byte, values [][]byte, expiryTime time.Time) error { - if len(keys) != len(values) { - return fmt.Errorf("keys and values must have the same length (keys: %d, values: %d)", len(keys), len(values)) - } - - batch := store.store.NewBatch() - - for i, key := range keys { - prefixedKey := append(keyPrefix, key...) - - batch.Put(prefixedKey, values[i]) - batch.Put(buildExpiryKey(key, expiryTime), nil) - } - - return batch.Apply() -} - -// Spawns a background goroutine that periodically checks for expired keys and deletes them. -func (store *ttlStore) expireKeysInBackground(gcPeriod time.Duration) { - ticker := time.NewTicker(gcPeriod) - go func() { - for { - select { - case now := <-ticker.C: - err := store.expireKeys(now) - if err != nil { - store.logger.Error("Error expiring keys", err) - continue - } - case <-store.ctx.Done(): - ticker.Stop() - return - } - } - }() -} - -// Delete all keys with a TTL that has expired. -func (store *ttlStore) expireKeys(now time.Time) error { - it, err := store.store.NewIterator(expiryPrefix) - if err != nil { - return err - } - defer it.Release() - - batch := store.store.NewBatch() - - for it.Next() { - expiryKey := it.Key() - baseKey, expiryTimestamp := parseExpiryKey(expiryKey) - - if expiryTimestamp.After(now) { - // No more values to expire - break - } - - prefixedBaseKey := append(keyPrefix, baseKey...) - batch.Delete(prefixedBaseKey) - batch.Delete(expiryKey) - - if batch.Size() >= maxDeletionBatchSize { - err = batch.Apply() - if err != nil { - return err - } - batch = store.store.NewBatch() - } - } - - if batch.Size() > 0 { - return batch.Apply() - } - return nil -} - -func (store *ttlStore) Put(key []byte, value []byte) error { - if value == nil { - value = []byte{} - } - - prefixedKey := append(keyPrefix, key...) - return store.store.Put(prefixedKey, value) -} - -func (store *ttlStore) Get(key []byte) ([]byte, error) { - prefixedKey := append(keyPrefix, key...) - return store.store.Get(prefixedKey) -} - -func (store *ttlStore) Delete(key []byte) error { - prefixedKey := append(keyPrefix, key...) - return store.store.Delete(prefixedKey) -} - -var _ kvstore.StoreBatch = &batch{} - -type batch struct { - base kvstore.StoreBatch -} - -func (b *batch) Put(key []byte, value []byte) { - if value == nil { - value = []byte{} - } - prefixedKey := append(keyPrefix, key...) - b.base.Put(prefixedKey, value) -} - -func (b *batch) Delete(key []byte) { - prefixedKey := append(keyPrefix, key...) - b.base.Delete(prefixedKey) -} - -func (b *batch) Apply() error { - return b.base.Apply() -} - -func (b *batch) Size() uint32 { - return b.base.Size() -} - -// NewBatch creates a new batch for the store. -func (store *ttlStore) NewBatch() kvstore.StoreBatch { - return &batch{ - base: store.store.NewBatch(), - } -} - -type ttlIterator struct { - baseIterator iterator.Iterator -} - -func (it *ttlIterator) First() bool { - return it.baseIterator.First() -} - -func (it *ttlIterator) Last() bool { - return it.baseIterator.Last() -} - -func (it *ttlIterator) Seek(key []byte) bool { - return it.baseIterator.Seek(key) -} - -func (it *ttlIterator) Next() bool { - return it.baseIterator.Next() -} - -func (it *ttlIterator) Prev() bool { - return it.baseIterator.Prev() -} - -func (it *ttlIterator) Release() { - it.baseIterator.Release() -} - -func (it *ttlIterator) SetReleaser(releaser util.Releaser) { - it.baseIterator.SetReleaser(releaser) -} - -func (it *ttlIterator) Valid() bool { - return it.baseIterator.Valid() -} - -func (it *ttlIterator) Error() error { - return it.baseIterator.Error() -} - -func (it *ttlIterator) Key() []byte { - baseKey := it.baseIterator.Key() - return baseKey[len(keyPrefix):] -} - -func (it *ttlIterator) Value() []byte { - return it.baseIterator.Value() -} - -func (store *ttlStore) NewIterator(prefix []byte) (iterator.Iterator, error) { - prefixedPrefix := append(keyPrefix, prefix...) - baseIterator, err := store.store.NewIterator(prefixedPrefix) - if err != nil { - return nil, err - } - - return &ttlIterator{ - baseIterator: baseIterator, - }, nil -} - -func (store *ttlStore) Shutdown() error { - return store.store.Shutdown() -} - -func (store *ttlStore) Destroy() error { - return store.store.Destroy() -} diff --git a/common/kvstore/ttl_store.go b/common/kvstore/ttl_store.go deleted file mode 100644 index 71410eb878..0000000000 --- a/common/kvstore/ttl_store.go +++ /dev/null @@ -1,36 +0,0 @@ -package kvstore - -import "time" - -// TTLStore is a store that supports key-value pairs with time-to-live (TTL) or expiration time. -type TTLStore interface { - Store - - // PutWithTTL adds a key-value pair to the store that expires after a specified duration. - // Key is eventually deleted after the TTL elapses. - // - // Warning: updating the value of a key with a ttl/expiration has undefined behavior. Support for this pattern - // may be implemented in the future if a use case is identified. - PutWithTTL(key []byte, value []byte, ttl time.Duration) error - - // PutWithExpiration adds a key-value pair to the store that expires at a specified time. - // Key is eventually deleted after the expiry time. - // - // Warning: updating the value of a key with a ttl/expiration has undefined behavior. Support for this pattern - // may be implemented in the future if a use case is identified. - PutWithExpiration(key []byte, value []byte, expiryTime time.Time) error - - // PutBatchWithTTL atomically adds multiple key-value pairs to the store that expire after a specified duration. - // Keys are eventually deleted after the TTL elapses. - // - // Warning: updating the value of a key with a ttl/expiration has undefined behavior. Support for this pattern - // may be implemented in the future if a use case is identified. - PutBatchWithTTL(keys [][]byte, values [][]byte, ttl time.Duration) error - - // PutBatchWithExpiration atomically adds multiple key-value pairs to the store that expire at a specified time. - // Keys are eventually deleted after the expiry time. - // - // Warning: updating the value of a key with a ttl/expiration has undefined behavior. Support for this pattern - // may be implemented in the future if a use case is identified. - PutBatchWithExpiration(keys [][]byte, values [][]byte, expiryTime time.Time) error -} diff --git a/node/store.go b/node/store.go index 67368c185a..752557463b 100644 --- a/node/store.go +++ b/node/store.go @@ -29,7 +29,7 @@ var ErrBatchAlreadyExist = errors.New("batch already exists") // Store is a key-value database to store blob data (blob header, blob chunks etc). type Store struct { - db kvstore.Store + db kvstore.Store[[]byte] logger logging.Logger blockStaleMeasure uint32