Skip to content

Commit

Permalink
TTL + table store (#811)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <[email protected]>
  • Loading branch information
cody-littley authored Oct 30, 2024
1 parent 2ed86a0 commit 410419a
Show file tree
Hide file tree
Showing 24 changed files with 1,211 additions and 1,015 deletions.
14 changes: 0 additions & 14 deletions common/kvstore/Batch.go

This file was deleted.

32 changes: 32 additions & 0 deletions common/kvstore/batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package kvstore

import "time"

// Batch is a collection of key / value pairs that will be written atomically to a database.
// Although it is thread safe to modify different batches in parallel or to modify a batch while
// the store is being modified, it is not thread safe to concurrently modify the same batch.
type Batch[K any] interface {
// Put stores the given key / value pair in the batch, overwriting any existing value for that key.
// If nil is passed as the value, a byte slice of length 0 will be stored.
Put(key K, value []byte)
// Delete removes the key from the batch.
Delete(key K)
// Apply atomically writes all the key / value pairs in the batch to the database.
Apply() error
// Size returns the number of operations in the batch.
Size() uint32
}

// TTLBatch is a collection of key / value pairs that will be written atomically to a database with
// time-to-live (TTL) or expiration times. Although it is thread safe to modify different batches in
// parallel or to modify a batch while the store is being modified, it is not thread safe to concurrently
// modify the same batch.
type TTLBatch[K any] interface {
Batch[K]
// PutWithTTL stores the given key / value pair in the batch with a time-to-live (TTL) or expiration time.
// If nil is passed as the value, a byte slice of length 0 will be stored.
PutWithTTL(key K, value []byte, ttl time.Duration)
// PutWithExpiration stores the given key / value pair in the batch with an expiration time.
// If nil is passed as the value, a byte slice of length 0 will be stored.
PutWithExpiration(key K, value []byte, expiryTime time.Time)
}
27 changes: 27 additions & 0 deletions common/kvstore/key.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package kvstore

import "errors"

// ErrInvalidKey is returned when a key cannot be interpreted as the requested type.
var ErrInvalidKey = errors.New("invalid key")

// Key represents a key in a TableStore. Each key is scoped to a specific table.
type Key interface {
// Bytes returns the key as a byte slice. Does not include internal metadata (i.e. the table).
Bytes() []byte
// Raw returns the raw byte slice that represents the key. This value
// may not be equal to the byte slice that was used to create the key, and
// should be treated as an opaque value.
Raw() []byte
// Builder returns the KeyBuilder that created this key.
Builder() KeyBuilder
}

// KeyBuilder is used to create keys for a TableStore. Each KeyBuilder is scoped to a particular table,
// and can be used to create keys that are within that table.
type KeyBuilder interface {
// TableName returns the name of the table that this KeyBuilder is scoped to.
TableName() string
// Key creates a key from a byte slice.
Key(key []byte) Key
}
6 changes: 3 additions & 3 deletions common/kvstore/leveldb/leveldb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"github.com/syndtr/goleveldb/leveldb/util"
)

var _ kvstore.Store = &levelDBStore{}
var _ kvstore.Store[[]byte] = &levelDBStore{}

// levelDBStore implements kvstore.Store interfaces with levelDB as the backend engine.
type levelDBStore struct {
Expand All @@ -25,7 +25,7 @@ type levelDBStore struct {
}

// NewStore returns a new levelDBStore built using LevelDB.
func NewStore(logger logging.Logger, path string) (kvstore.Store, error) {
func NewStore(logger logging.Logger, path string) (kvstore.Store[[]byte], error) {
levelDB, err := leveldb.OpenFile(path, nil)

if err != nil {
Expand Down Expand Up @@ -88,7 +88,7 @@ func (store *levelDBStore) WriteBatch(keys [][]byte, values [][]byte) error {
}

// NewBatch creates a new batch for the store.
func (store *levelDBStore) NewBatch() kvstore.StoreBatch {
func (store *levelDBStore) NewBatch() kvstore.Batch[[]byte] {
return &levelDBBatch{
store: store,
batch: new(leveldb.Batch),
Expand Down
6 changes: 3 additions & 3 deletions common/kvstore/mapstore/map_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"sync"
)

var _ kvstore.Store = &mapStore{}
var _ kvstore.Store[[]byte] = &mapStore{}

// mapStore is a simple in-memory implementation of KVStore. Designed more as a correctness test than a
// production implementation -- there are things that may not be performant with this implementation.
Expand All @@ -19,7 +19,7 @@ type mapStore struct {
}

// NewStore creates a new mapStore.
func NewStore() kvstore.Store {
func NewStore() kvstore.Store[[]byte] {
return &mapStore{
data: make(map[string][]byte),
}
Expand Down Expand Up @@ -79,7 +79,7 @@ func (store *mapStore) WriteBatch(keys, values [][]byte) error {
}

// NewBatch creates a new batch for the store.
func (store *mapStore) NewBatch() kvstore.StoreBatch {
func (store *mapStore) NewBatch() kvstore.Batch[[]byte] {
return &batch{
store: store,
keys: make([][]byte, 0),
Expand Down
16 changes: 7 additions & 9 deletions common/kvstore/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,33 +8,31 @@ import (
// ErrNotFound is returned when a key is not found in the database.
var ErrNotFound = errors.New("not found")

// StoreBatch is a collection of operations that can be applied atomically to a Store.
type StoreBatch Batch[[]byte]

// Store implements a key-value store. May be backed by a database like LevelDB.
// The generic type K is the type of the keys in the store.
//
// Implementations of this interface are expected to be thread-safe.
type Store interface {
type Store[K any] interface {

// Put stores the given key / value pair in the database, overwriting any existing value for that key.
// If nil is passed as the value, a byte slice of length 0 will be stored.
Put(key []byte, value []byte) error
Put(k K, value []byte) error

// Get retrieves the value for the given key. Returns a ErrNotFound error if the key does not exist.
// The value returned is safe to modify.
Get(key []byte) ([]byte, error)
Get(k K) ([]byte, error)

// Delete removes the key from the database. Does not return an error if the key does not exist.
Delete(key []byte) error
Delete(k K) error

// NewBatch creates a new batch that can be used to perform multiple operations atomically.
NewBatch() StoreBatch
NewBatch() Batch[K]

// NewIterator returns an iterator that can be used to iterate over a subset of the keys in the database.
// Only keys with the given prefix will be iterated. The iterator must be closed by calling Release() when done.
// The iterator will return keys in lexicographically sorted order. The iterator walks over a consistent snapshot
// of the database, so it will not see any writes that occur after the iterator is created.
NewIterator(prefix []byte) (iterator.Iterator, error)
NewIterator(prefix K) (iterator.Iterator, error)

// Shutdown shuts down the store, flushing any remaining data to disk.
//
Expand Down
80 changes: 41 additions & 39 deletions common/kvstore/table.go
Original file line number Diff line number Diff line change
@@ -1,32 +1,14 @@
package kvstore

import "errors"

// ErrTableLimitExceeded is returned when the maximum number of tables has been reached.
var ErrTableLimitExceeded = errors.New("table limit exceeded")
import (
"errors"
"github.com/syndtr/goleveldb/leveldb/iterator"
"time"
)

// ErrTableNotFound is returned when a table is not found.
var ErrTableNotFound = errors.New("table not found")

// Table can be used to operate on data in a specific table in a TableStore.
type Table interface {
// Store permits access to the table as if it were a store.
Store

// Name returns the name of the table.
Name() string

// TableKey creates a new key scoped to this table that can be used for batch operations that modify this table.
TableKey(key []byte) TableKey
}

// TableKey is a key scoped to a particular table. It can be used to perform batch operations that modify multiple
// table keys atomically.
type TableKey []byte

// TableBatch is a collection of operations that can be applied atomically to a TableStore.
type TableBatch Batch[TableKey]

// TableStore implements a key-value store, with the addition of the abstraction of tables.
// A "table" in this context is a disjoint keyspace. Keys in one table to not collide with keys in another table,
// and keys within a particular table can be iterated over efficiently.
Expand All @@ -36,20 +18,40 @@ type TableBatch Batch[TableKey]
//
// Implementations of this interface are expected to be thread-safe, except where noted.
type TableStore interface {

// GetTable gets the table with the given name. If the table does not exist, it is first created.
// Returns ErrTableNotFound if the table does not exist and cannot be created.
GetTable(name string) (Table, error)

// GetTables returns a list of all tables in the store in no particular order.
GetTables() []Table

// NewBatch creates a new batch that can be used to perform multiple operations across tables atomically.
NewBatch() TableBatch

// Shutdown shuts down the store, flushing any remaining data to disk.
Shutdown() error

// Destroy shuts down and permanently deletes all data in the store.
Destroy() error
Store[Key]

// GetKeyBuilder gets the key builder for a particular table. Returns ErrTableNotFound if the table does not exist.
// The returned KeyBuilder can be used to interact with the table.
//
// Warning: Do not use key builders created by one TableStore instance with another TableStore instance.
// This may result in odd and undefined behavior.
GetKeyBuilder(name string) (KeyBuilder, error)

// GetKeyBuilders returns all key builders in the store.
GetKeyBuilders() []KeyBuilder

// GetTables returns a list of the table names currently in the store.
GetTables() []string

// PutWithTTL adds a key-value pair to the store that expires after a specified duration.
// Key is eventually deleted after the TTL elapses.
//
// Warning: updating the value of a key with a ttl/expiration has undefined behavior. Support for this pattern
// may be implemented in the future if a use case is identified.
PutWithTTL(key Key, value []byte, ttl time.Duration) error

// PutWithExpiration adds a key-value pair to the store that expires at a specified time.
// Key is eventually deleted after the expiry time.
//
// Warning: updating the value of a key with a ttl/expiration has undefined behavior. Support for this pattern
// may be implemented in the future if a use case is identified.
PutWithExpiration(key Key, value []byte, expiryTime time.Time) error

// NewTTLBatch creates a new TTLBatch that can be used to perform multiple operations atomically.
// Use this instead of NewBatch to create a batch that supports TTL/expiration.
NewTTLBatch() TTLBatch[Key]

// NewTableIterator returns an iterator that can be used to iterate over all keys in a table.
// Equivalent to NewIterator(keyBuilder.Key([]byte{})).
NewTableIterator(keyBuilder KeyBuilder) (iterator.Iterator, error)
}
60 changes: 60 additions & 0 deletions common/kvstore/tablestore/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package tablestore

import "time"

// StoreType describes the underlying store implementation.
type StoreType int

const (
// LevelDB is a LevelDB-backed store.
LevelDB StoreType = iota
// MapStore is an in-memory store. This store does not preserve data across restarts.
MapStore
)

// Config is the configuration for a TableStore.
type Config struct {
// The type of the base store. Default is LevelDB.
Type StoreType
// The path to the file system directory where the store will write its data. Default is nil.
// Some store implementations may ignore this field (e.g. MapStore). Other store implementations may require
// this field to be set (e.g. LevelDB).
Path *string
// If true, the store will perform garbage collection on a background goroutine. Default is true.
GarbageCollectionEnabled bool
// If garbage collection is enabled, this is the interval at which it will run. Default is 5 minutes.
GarbageCollectionInterval time.Duration
// If garbage collection is enabled, this is the maximum number of entries to delete in a single batch during
// garbage collection. Default is 1024.
GarbageCollectionBatchSize uint32
// The list of tables to create on startup. Any pre-existing table not in this list will be deleted. If
// this list is nil, the previous schema will be carried forward with no modifications. Default is nil.
Schema []string
}

// DefaultConfig returns a Config with default values.
func DefaultConfig() *Config {
return &Config{
Type: LevelDB,
Path: nil,
GarbageCollectionEnabled: true,
GarbageCollectionInterval: 5 * time.Minute,
GarbageCollectionBatchSize: 1024,
Schema: nil,
}
}

// DefaultLevelDBConfig returns a Config with default values for a LevelDB store.
func DefaultLevelDBConfig(path string) *Config {
config := DefaultConfig()
config.Type = LevelDB
config.Path = &path
return config
}

// DefaultMapStoreConfig returns a Config with default values for a MapStore.
func DefaultMapStoreConfig() *Config {
config := DefaultConfig()
config.Type = MapStore
return config
}
27 changes: 27 additions & 0 deletions common/kvstore/tablestore/key_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package tablestore

import (
tu "github.com/Layr-Labs/eigenda/common/testutils"
"github.com/stretchr/testify/assert"
"testing"
)

func TestGetName(t *testing.T) {
tu.InitializeRandom()

tableName := tu.RandomString(10)

kb := newKeyBuilder(tableName, 0)
assert.Equal(t, tableName, kb.TableName())
}

func TestBytesRoundTrip(t *testing.T) {
tu.InitializeRandom()

tableName := tu.RandomString(10)
b := tu.RandomBytes(10)

kb := newKeyBuilder(tableName, 0)
k := kb.Key(b)
assert.Equal(t, b, k.Bytes())
}
Loading

0 comments on commit 410419a

Please sign in to comment.