Skip to content

Commit

Permalink
alter table support
Browse files Browse the repository at this point in the history
  • Loading branch information
myhau committed Jun 13, 2024
1 parent 2bf3f5e commit 0959e50
Show file tree
Hide file tree
Showing 13 changed files with 527 additions and 44 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -96,3 +96,5 @@ require (
google.golang.org/genproto v0.0.0-20240213162025-012b6fc9bca9 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20240314234333-6e1732d8331c // indirect
)

replace github.com/goccy/go-zetasqlite v0.19.2 => github.com/myhau/go-zetasqlite v0.19.2-fork.1
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,6 @@ github.com/goccy/go-yaml v1.11.0 h1:n7Z+zx8S9f9KgzG6KtQKf+kwqXZlLNR2F6018Dgau54=
github.com/goccy/go-yaml v1.11.0/go.mod h1:H+mJrWtjPTJAHvRbV09MCK9xYwODM+wRTVFFTWckfng=
github.com/goccy/go-zetasql v0.5.5 h1:3JpXt3p2533rnZMu09upCcI7YJ2KfjKgdo2Lu0xo+fU=
github.com/goccy/go-zetasql v0.5.5/go.mod h1:xvvooX2RG404vnbdFZuAM8bTFksYwVUlqeIUrUNuo40=
github.com/goccy/go-zetasqlite v0.19.2 h1:3eVwY7J2Gj0VtBW/9z8MAuEphuZxj0zLj7rnrZ/PERE=
github.com/goccy/go-zetasqlite v0.19.2/go.mod h1:ThavIQcmI6a/sVTpvJtj+idUui32gwbnXe1jeb7pISY=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
Expand Down Expand Up @@ -169,6 +167,8 @@ github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8 h1:AMFGa4R4MiIpsp
github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3 h1:+n/aFZefKZp7spd8DFdX7uMikMLXX4oubIzJF4kv/wI=
github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE=
github.com/myhau/go-zetasqlite v0.19.2-fork.1 h1:u7qOHLdSrQxx06pNYgaOtKVWTfyn2I9OvDf+90IK0s4=
github.com/myhau/go-zetasqlite v0.19.2-fork.1/go.mod h1:ThavIQcmI6a/sVTpvJtj+idUui32gwbnXe1jeb7pISY=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
github.com/pierrec/lz4/v4 v4.1.18 h1:xaKrnTkyoqfh1YItXl56+6KJNVYWlEEPuAQW9xsplYQ=
Expand Down
37 changes: 37 additions & 0 deletions internal/contentdata/errors.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package contentdata

type ContentDataError struct {
Cause error
Message string
ErrorReason ContentDataErrorReason
}

type ContentDataErrorReason string

func Error(errorReason ContentDataErrorReason) *ContentDataError {
return &ContentDataError{
Cause: nil,
Message: "",
ErrorReason: errorReason,
}
}

func ErrorWithMessage(errorReason ContentDataErrorReason, message string) *ContentDataError {
return &ContentDataError{
Message: message,
ErrorReason: errorReason,
}
}

func ErrorWithCause(errorReason ContentDataErrorReason, cause error) *ContentDataError {
return &ContentDataError{
Cause: cause,
ErrorReason: errorReason,
}
}

const (
AlterTableExistingFieldRemoved ContentDataErrorReason = "alterTableExistingFieldRemoved"
AlterTableExistingFieldChanged ContentDataErrorReason = "alterTableExistingFieldChanged"
Unknown ContentDataErrorReason = "unknown"
)
78 changes: 78 additions & 0 deletions internal/contentdata/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,84 @@ func (r *Repository) routinePath(projectID, datasetID, routineID string) string
return strings.Join(routinePath, ".")
}

func stringReference(ref *bigqueryv2.TableReference) string {
return fmt.Sprintf("%s:%s.%s", ref.ProjectId, ref.DatasetId, ref.TableId)
}

func (r *Repository) AlterTable(ctx context.Context, tx *connection.Tx, table *bigqueryv2.Table, newSchema *bigqueryv2.TableSchema) *ContentDataError {
if newSchema == nil {
return nil
}

if err := tx.ContentRepoMode(); err != nil {
return ErrorWithCause(Unknown, err)
}

defer func() {
_ = tx.MetadataRepoMode()
}()

ref := table.TableReference
if ref == nil {
return ErrorWithMessage(Unknown, "TableReference is nil")
}
tablePath := r.tablePath(ref.ProjectId, ref.DatasetId, ref.TableId)

alterStatements := make([]string, 0)
oldFieldMap := make(map[string]*bigqueryv2.TableFieldSchema)
for _, field := range table.Schema.Fields {
oldFieldMap[field.Name] = field
}

for _, newField := range newSchema.Fields {
oldField, exists := oldFieldMap[newField.Name]
if !exists {
alterStatements = append(alterStatements, fmt.Sprintf("ADD COLUMN `%s` %s", newField.Name, r.encodeSchemaField(newField)))
} else if !reflect.DeepEqual(oldField, newField) {
oldFieldAfterUpdates := *oldField
if oldField.Description != newField.Description {
// it's ok, ignore
oldFieldAfterUpdates.Description = newField.Description
//alterStatements = append(alterStatements, fmt.Sprintf("ALTER COLUMN `%s` SET DEFAULT %s", newField.Name, newField.DefaultValueExpression))
//oldFieldAfterUpdates.Description = newField.Description
//if !reflect.DeepEqual(&oldFieldAfterUpdates, newField) {
// message := fmt.Sprintf(
// "(Todo) one change allowed at a time",
// )
// return ErrorWithMessage(Unknown, message)
//}
} else if oldField.DefaultValueExpression != newField.DefaultValueExpression {
alterStatements = append(alterStatements, fmt.Sprintf("ALTER COLUMN `%s` SET DEFAULT %s", newField.Name, newField.DefaultValueExpression))
oldFieldAfterUpdates.DefaultValueExpression = newField.DefaultValueExpression
if !reflect.DeepEqual(&oldFieldAfterUpdates, newField) {
message := fmt.Sprintf(
"(Todo) one change allowed at a time",
)
return ErrorWithMessage(Unknown, message)
}
} else {
return ErrorWithMessage(Unknown, "unknown")
}
// FIXME: what about field description?

}
delete(oldFieldMap, newField.Name)
}

if len(oldFieldMap) > 0 {
return Error(AlterTableExistingFieldRemoved)
}

if len(alterStatements) > 0 {
query := fmt.Sprintf("ALTER TABLE `%s` %s", tablePath, strings.Join(alterStatements, ", "))
if _, err := tx.Tx().ExecContext(ctx, query); err != nil {
return ErrorWithCause(Unknown, fmt.Errorf("failed to alter table %s: %w", query, err))
}
}

return nil
}

func (r *Repository) CreateTable(ctx context.Context, tx *connection.Tx, table *bigqueryv2.Table) error {
if err := tx.ContentRepoMode(); err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions internal/metadata/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,12 @@ import (

var ErrDuplicatedTable = errors.New("table is already created")

var ErrDuplicatedDataset = errors.New("dataset is already created")

func ErrDuplicatedDatasetWithId(id string) error {
return fmt.Errorf("dataset with id %s, %w", id, ErrDuplicatedDataset)
}

type Dataset struct {
ID string
ProjectID string
Expand Down
2 changes: 1 addition & 1 deletion internal/metadata/project.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (p *Project) AddDataset(ctx context.Context, tx *sql.Tx, dataset *Dataset)
p.mu.Lock()
defer p.mu.Unlock()
if _, exists := p.datasetMap[dataset.ID]; exists {
return fmt.Errorf("dataset %s is already created", dataset.ID)
return ErrDuplicatedDatasetWithId(dataset.ID)
}
if err := dataset.Insert(ctx, tx); err != nil {
return err
Expand Down
7 changes: 4 additions & 3 deletions internal/metadata/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -630,8 +630,9 @@ func (r *Repository) AddTable(ctx context.Context, tx *sql.Tx, table *Table) err
return nil
}

func (r *Repository) UpdateTable(ctx context.Context, tx *sql.Tx, table *Table) error {
metadata, err := json.Marshal(table.metadata)
func (r *Repository) UpdateTable(ctx context.Context, tx *sql.Tx, table *Table, metadata map[string]interface{}) error {
marshalledMetadata, err := json.Marshal(metadata)

if err != nil {
return err
}
Expand All @@ -640,7 +641,7 @@ func (r *Repository) UpdateTable(ctx context.Context, tx *sql.Tx, table *Table)
sql.Named("id", table.ID),
sql.Named("projectID", table.ProjectID),
sql.Named("datasetID", table.DatasetID),
sql.Named("metadata", string(metadata)),
sql.Named("metadata", string(marshalledMetadata)),
); err != nil {
return err
}
Expand Down
13 changes: 12 additions & 1 deletion internal/metadata/table.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"database/sql"
"fmt"
"maps"

"github.com/goccy/go-json"
bigqueryv2 "google.golang.org/api/bigquery/v2"
Expand All @@ -18,7 +19,17 @@ type Table struct {
}

func (t *Table) Update(ctx context.Context, tx *sql.Tx, metadata map[string]interface{}) error {
return t.repo.UpdateTable(ctx, tx, t)
mergedMetadata := map[string]interface{}{}
maps.Copy(mergedMetadata, t.metadata)
for key, value := range metadata {
mergedMetadata[key] = value
}

err := t.repo.UpdateTable(ctx, tx, t, mergedMetadata)
if err == nil {
t.metadata = mergedMetadata
}
return err
}

func (t *Table) Insert(ctx context.Context, tx *sql.Tx) error {
Expand Down
13 changes: 13 additions & 0 deletions server/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,19 @@ const (
Timeout ErrorReason = "timeout"
)

func withDebugInfo(existingError *ServerError, cause error) *ServerError {
if cause == nil {
return existingError
}
return &ServerError{
Status: existingError.Status,
Reason: existingError.Reason,
Location: existingError.Location,
Message: existingError.Message,
DebugInfo: cause.Error(),
}
}

func errAccessDenied(msg string) *ServerError {
return &ServerError{
Status: http.StatusForbidden,
Expand Down
Loading

0 comments on commit 0959e50

Please sign in to comment.