Skip to content

Commit

Permalink
WIP - Add cached views
Browse files Browse the repository at this point in the history
  • Loading branch information
AndrewSisley committed Sep 9, 2024
1 parent 921b708 commit 8f1ed60
Show file tree
Hide file tree
Showing 48 changed files with 1,263 additions and 79 deletions.
11 changes: 11 additions & 0 deletions client/collection_description.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,15 @@ type CollectionDescription struct {
// parsing is done, to avoid storing an invalid policyID or policy resource
// that may not even exist on acp.
Policy immutable.Option[PolicyDescription]

// IsMaterialized defines whether the items in this collection are cached or not.
//
// If it is true, they will be, if false, the data returned on query will be calculated
// at query-time from source.
//
// At the moment this can only be set to `false` if this collection sources it's data from
// another collection/query (is a View).
IsMaterialized bool
}

// QuerySource represents a collection data source from a query.
Expand Down Expand Up @@ -179,6 +188,7 @@ type collectionDescription struct {
ID uint32
RootID uint32
SchemaVersionID string
IsMaterialized bool
Policy immutable.Option[PolicyDescription]
Indexes []IndexDescription
Fields []CollectionFieldDescription
Expand All @@ -198,6 +208,7 @@ func (c *CollectionDescription) UnmarshalJSON(bytes []byte) error {
c.ID = descMap.ID
c.RootID = descMap.RootID
c.SchemaVersionID = descMap.SchemaVersionID
c.IsMaterialized = descMap.IsMaterialized
c.Indexes = descMap.Indexes
c.Fields = descMap.Fields
c.Sources = make([]any, len(descMap.Sources))
Expand Down
10 changes: 10 additions & 0 deletions client/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,8 @@ type Store interface {
transform immutable.Option[model.Lens],
) ([]CollectionDefinition, error)

RefreshViews(context.Context, CollectionFetchOptions) error

// SetMigration sets the migration for all collections using the given source-destination schema version IDs.
//
// There may only be one migration per collection version. If another migration was registered it will be
Expand Down Expand Up @@ -304,3 +306,11 @@ type SchemaFetchOptions struct {
// If provided, only the schema with this id will be returned.
ID immutable.Option[string]
}

/*
// ViewFetchOptions represents a set of options used for fetching views.
type ViewFetchOptions struct {
// If provided, only views with this name will be returned.
Name immutable.Option[string]
}
*/
4 changes: 4 additions & 0 deletions http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,10 @@ func (c *Client) AddView(
return descriptions, nil
}

func (c *Client) RefreshViews(ctx context.Context, opts client.CollectionFetchOptions) error {
panic("todo")
}

func (c *Client) SetMigration(ctx context.Context, config client.LensConfig) error {
methodURL := c.http.baseURL.JoinPath("lens")

Expand Down
75 changes: 69 additions & 6 deletions internal/core/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/errors"
"github.com/sourcenetwork/defradb/internal/encoding"
)

var (
Expand All @@ -43,12 +44,14 @@ const (
)

const (
COLLECTION = "collection"
COLLECTION_ID = "/collection/id"
COLLECTION_NAME = "/collection/name"
COLLECTION_SCHEMA_VERSION = "/collection/version"
COLLECTION_ROOT = "/collection/root"
COLLECTION_INDEX = "/collection/index"
COLLECTION = "collection"
COLLECTION_ID = "/collection/id"
COLLECTION_NAME = "/collection/name"
COLLECTION_SCHEMA_VERSION = "/collection/version"
COLLECTION_ROOT = "/collection/root"
COLLECTION_INDEX = "/collection/index"
// todo- doc temp-ness
COLLECTION_VIEW_ITEMS = "/collection/vi"
SCHEMA_VERSION = "/schema/version/v"
SCHEMA_VERSION_ROOT = "/schema/version/r"
COLLECTION_SEQ = "/seq/collection"
Expand Down Expand Up @@ -77,6 +80,15 @@ type DataStoreKey struct {

var _ Key = (*DataStoreKey)(nil)

// todo: note this may be removed/merged into DataStoreKey
type ViewCacheKey struct {
CollectionRootID uint32
// todo - note this is temp type:
ItemID uint
}

var _ Key = (*ViewCacheKey)(nil)

// IndexedField contains information necessary for storing a single
// value of a field in an index.
type IndexedField struct {
Expand Down Expand Up @@ -527,6 +539,57 @@ func (k DataStoreKey) ToPrimaryDataStoreKey() PrimaryDataStoreKey {
}
}

func NewViewCacheColPrefix(rootID uint32) ViewCacheKey {
return ViewCacheKey{
CollectionRootID: rootID,
}
}

func NewViewCacheKey(rootID uint32, itemID uint) ViewCacheKey {
return ViewCacheKey{
CollectionRootID: rootID,
ItemID: itemID,
}
}

func (k ViewCacheKey) ToString() string {
return string(k.Bytes())
}

func (k ViewCacheKey) Bytes() []byte {
result := []byte(COLLECTION_VIEW_ITEMS)

if k.CollectionRootID != 0 {
result = append(result, '/')
result = encoding.EncodeUvarintAscending(result, uint64(k.CollectionRootID))
//result = []byte(string(result) + "/" + strconv.Itoa(int(k.CollectionRootID)))
}

if k.ItemID != 0 {
result = append(result, '/')
result = encoding.EncodeUvarintAscending(result, uint64(k.ItemID))
}

return result
}

func (k ViewCacheKey) ToDS() ds.Key {
return ds.NewKey(k.ToString())
}

func (k ViewCacheKey) PrettyPrint() string {
result := COLLECTION_VIEW_ITEMS

if k.CollectionRootID != 0 {
result = result + "/" + strconv.Itoa(int(k.CollectionRootID))
}
if k.ItemID != 0 {
result = result + "/" + strconv.Itoa(int(k.ItemID))
}

return result
}

// NewIndexDataStoreKey creates a new IndexDataStoreKey from a collection ID, index ID and fields.
// It also validates values of the fields.
func NewIndexDataStoreKey(collectionID, indexID uint32, fields []IndexedField) IndexDataStoreKey {
Expand Down
8 changes: 8 additions & 0 deletions internal/db/collection_define.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ func (db *db) patchCollection(

existingCol, ok := existingColsByID[col.ID]
if ok {
if existingCol.IsMaterialized && !col.IsMaterialized {
// todo - doc
err := db.clearViewCache(ctx, col)
if err != nil {
return err
}
}

// Clear any existing migrations in the registry, using this semi-hacky way
// to avoid adding more functions to a public interface that we wish to remove.

Expand Down
34 changes: 34 additions & 0 deletions internal/db/definition_validation.go
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ var globalValidators = []definitionValidator{
validateTypeAndKindCompatible,
validateFieldNotDuplicated,
validateSelfReferences,
validateCollectionMaterialized,
validateMaterializedHasNoACP,
}

var createValidators = append(
Expand Down Expand Up @@ -978,3 +980,35 @@ func validateSchemaNameNotEmpty(

return nil
}

func validateCollectionMaterialized(
ctx context.Context,
db *db,
newState *definitionState,
oldState *definitionState,
) error {
for _, col := range newState.collections {
if len(col.QuerySources()) == 0 && !col.IsMaterialized {
return NewErrColNotMaterialized(col.Name.Value())
}

}

return nil
}

func validateMaterializedHasNoACP(
ctx context.Context,
db *db,
newState *definitionState,
oldState *definitionState,
) error {
for _, col := range newState.collections {
if col.IsMaterialized && len(col.QuerySources()) != 0 && col.Policy.HasValue() {
return NewErrMaterializedViewAndACPNotSupported(col.Name.Value())
}

}

return nil
}
18 changes: 18 additions & 0 deletions internal/db/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ const (
errReplicatorNotFound string = "replicator not found"
errCanNotEncryptBuiltinField string = "can not encrypt build-in field"
errSelfReferenceWithoutSelf string = "must specify 'Self' kind for self referencing relations"
errColNotMaterialized string = "non-materialized collections (only views) are not supported"
errMaterializedViewAndACPNotSupported string = "materialized views do not support ACP"
)

var (
Expand Down Expand Up @@ -143,6 +145,8 @@ var (
ErrReplicatorNotFound = errors.New(errReplicatorNotFound)
ErrCanNotEncryptBuiltinField = errors.New(errCanNotEncryptBuiltinField)
ErrSelfReferenceWithoutSelf = errors.New(errSelfReferenceWithoutSelf)
ErrColNotMaterialized = errors.New(errColNotMaterialized)
ErrMaterializedViewAndACPNotSupported = errors.New(errMaterializedViewAndACPNotSupported)
)

// NewErrFailedToGetHeads returns a new error indicating that the heads of a document
Expand Down Expand Up @@ -659,3 +663,17 @@ func NewErrSelfReferenceWithoutSelf(fieldName string) error {
errors.NewKV("Field", fieldName),
)
}

func NewErrColNotMaterialized(collection string) error {
return errors.New(
errColNotMaterialized,
errors.NewKV("Collection", collection),
)
}

func NewErrMaterializedViewAndACPNotSupported(collection string) error {
return errors.New(
errMaterializedViewAndACPNotSupported,
errors.NewKV("Collection", collection),
)
}
2 changes: 2 additions & 0 deletions internal/db/lens.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func (db *db) setMigration(ctx context.Context, cfg client.LensConfig) error {
ID: uint32(colID),
RootID: client.OrphanRootID,
SchemaVersionID: cfg.SourceSchemaVersionID,
IsMaterialized: true,
}

col, err := description.SaveCollection(ctx, txn, desc)
Expand Down Expand Up @@ -96,6 +97,7 @@ func (db *db) setMigration(ctx context.Context, cfg client.LensConfig) error {
ID: uint32(colID),
RootID: sourceCol.RootID,
SchemaVersionID: cfg.DestinationSchemaVersionID,
IsMaterialized: true,
Sources: []any{
&client.CollectionSource{
SourceCollectionID: sourceCol.ID,
Expand Down
17 changes: 17 additions & 0 deletions internal/db/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,23 @@ func (db *db) AddView(
return defs, nil
}

func (db *db) RefreshViews(ctx context.Context, opts client.CollectionFetchOptions) error {
ctx, txn, err := ensureContextTxn(ctx, db, false)
if err != nil {
return err
}
defer txn.Discard(ctx)

db.refreshViews(ctx, opts)

Check failure on line 247 in internal/db/store.go

View workflow job for this annotation

GitHub Actions / Lint GoLang job

Error return value of `db.refreshViews` is not checked (errcheck)

err = txn.Commit(ctx)
if err != nil {
return err
}

return nil
}

// BasicImport imports a json dataset.
// filepath must be accessible to the node.
func (db *db) BasicImport(ctx context.Context, filepath string) error {
Expand Down
Loading

0 comments on commit 8f1ed60

Please sign in to comment.