Skip to content

Commit

Permalink
Move update schema to schema.go
Browse files Browse the repository at this point in the history
Cut-paste, no code has changed
  • Loading branch information
AndrewSisley committed Jun 10, 2024
1 parent d7dfacb commit de54174
Show file tree
Hide file tree
Showing 2 changed files with 201 additions and 200 deletions.
200 changes: 0 additions & 200 deletions internal/db/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,206 +109,6 @@ func (db *db) validateCollectionDefinitionPolicyDesc(
)
}

// updateSchema updates the persisted schema description matching the name of the given
// description, to the values in the given description.
//
// It will validate the given description using [validateUpdateSchema] before updating it.
//
// The schema (including the schema version ID) will only be updated if any changes have actually
// been made, if the given description matches the current persisted description then no changes will be
// applied.
func (db *db) updateSchema(
ctx context.Context,
existingSchemaByName map[string]client.SchemaDescription,
proposedDescriptionsByName map[string]client.SchemaDescription,
schema client.SchemaDescription,
migration immutable.Option[model.Lens],
setAsActiveVersion bool,
) error {
hasChanged, err := db.validateUpdateSchema(
existingSchemaByName,
proposedDescriptionsByName,
schema,
)
if err != nil {
return err
}

if !hasChanged {
return nil
}

for _, field := range schema.Fields {
if field.Kind.IsObject() && !field.Kind.IsArray() {
idFieldName := field.Name + "_id"
if _, ok := schema.GetFieldByName(idFieldName); !ok {
schema.Fields = append(schema.Fields, client.SchemaFieldDescription{
Name: idFieldName,
Kind: client.FieldKind_DocID,
})
}
}
}

for i, field := range schema.Fields {
if field.Typ == client.NONE_CRDT {
// If no CRDT Type has been provided, default to LWW_REGISTER.
field.Typ = client.LWW_REGISTER
schema.Fields[i] = field
}
}

txn := mustGetContextTxn(ctx)
previousVersionID := schema.VersionID
schema, err = description.CreateSchemaVersion(ctx, txn, schema)
if err != nil {
return err
}

// After creating the new schema version, we need to create new collection versions for
// any collection using the previous version. These will be inactive unless [setAsActiveVersion]
// is true.

cols, err := description.GetCollectionsBySchemaVersionID(ctx, txn, previousVersionID)
if err != nil {
return err
}

colSeq, err := db.getSequence(ctx, core.CollectionIDSequenceKey{})
if err != nil {
return err
}

for _, col := range cols {
previousID := col.ID

existingCols, err := description.GetCollectionsBySchemaVersionID(ctx, txn, schema.VersionID)
if err != nil {
return err
}

// The collection version may exist before the schema version was created locally. This is
// because migrations for the globally known schema version may have been registered locally
// (typically to handle documents synced over P2P at higher versions) before the local schema
// was updated. We need to check for them now, and update them instead of creating new ones
// if they exist.
var isExistingCol bool
existingColLoop:
for _, existingCol := range existingCols {
sources := existingCol.CollectionSources()
for _, source := range sources {
// Make sure that this collection is the parent of the current [col], and not part of
// another collection set that happens to be using the same schema.
if source.SourceCollectionID == previousID {
if existingCol.RootID == client.OrphanRootID {
existingCol.RootID = col.RootID
}

fieldSeq, err := db.getSequence(ctx, core.NewFieldIDSequenceKey(existingCol.RootID))
if err != nil {
return err
}

for _, globalField := range schema.Fields {
var fieldID client.FieldID
// We must check the source collection if the field already exists, and take its ID
// from there, otherwise the field must be generated by the sequence.
existingField, ok := col.GetFieldByName(globalField.Name)
if ok {
fieldID = existingField.ID
} else {
nextFieldID, err := fieldSeq.next(ctx)
if err != nil {
return err
}
fieldID = client.FieldID(nextFieldID)
}

existingCol.Fields = append(
existingCol.Fields,
client.CollectionFieldDescription{
Name: globalField.Name,
ID: fieldID,
},
)
}
existingCol, err = description.SaveCollection(ctx, txn, existingCol)
if err != nil {
return err
}
isExistingCol = true
break existingColLoop
}
}
}

if !isExistingCol {
colID, err := colSeq.next(ctx)
if err != nil {
return err
}

fieldSeq, err := db.getSequence(ctx, core.NewFieldIDSequenceKey(col.RootID))
if err != nil {
return err
}

// Create any new collections without a name (inactive), if [setAsActiveVersion] is true
// they will be activated later along with any existing collection versions.
col.Name = immutable.None[string]()
col.ID = uint32(colID)
col.SchemaVersionID = schema.VersionID
col.Sources = []any{
&client.CollectionSource{
SourceCollectionID: previousID,
Transform: migration,
},
}

for _, globalField := range schema.Fields {
_, exists := col.GetFieldByName(globalField.Name)
if !exists {
fieldID, err := fieldSeq.next(ctx)
if err != nil {
return err
}

col.Fields = append(
col.Fields,
client.CollectionFieldDescription{
Name: globalField.Name,
ID: client.FieldID(fieldID),
},
)
}
}

_, err = description.SaveCollection(ctx, txn, col)
if err != nil {
return err
}

if migration.HasValue() {
err = db.LensRegistry().SetMigration(ctx, col.ID, migration.Value())
if err != nil {
return err
}
}
}
}

if setAsActiveVersion {
// activate collection versions using the new schema ID. This call must be made after
// all new collection versions have been saved.
err = db.setActiveSchemaVersion(ctx, schema.VersionID)
if err != nil {
return err
}
}

return nil
}

// validateUpdateSchema validates that the given schema description is a valid update.
//
// Will return true if the given description differs from the current persisted state of the
Expand Down
Loading

0 comments on commit de54174

Please sign in to comment.