Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature(spanner): Implement DML Support for Spanner #1197

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 2 additions & 8 deletions database/spanner/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,11 @@ as described in [README.md#database-urls](../../README.md#database-urls)
> 1496601752/u add_index_on_user_emails (2m12.155787369s)
> 1496602638/u create_books_table (2m30.77299181s)

## DDL with comments
## DDL & DML with comments

At the moment the GCP Spanner backed does not seem to allow for comments (See https://issuetracker.google.com/issues/159730604)
so in order to be able to use migration with DDL containing comments `x-clean-statements` is required

## Multiple statements

In order to be able to use more than 1 DDL statement in the same migration file, the file has to be parsed and therefore the `x-clean-statements` flag is required

## Testing

To unit test the `spanner` driver, `SPANNER_DATABASE` needs to be set. You'll
need to sign-up to Google Cloud Platform (GCP) and have a running Spanner
instance since it is not possible to run Google Spanner outside GCP.
In order to be able to use more than 1 DDL or DML statement in the same migration file, the file has to be parsed and therefore the `x-clean-statements` flag is required
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
DROP TABLE Users;
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
-- Create a table
CREATE TABLE Users (
UserId INT64,
Name STRING(40),
Email STRING(83)
) PRIMARY KEY(UserId /* even inline comments */);

CREATE UNIQUE INDEX UsersEmailIndex ON Users (Email);

-- Comments are okay

INSERT INTO Users(UserId, Name, Email)
VALUES
(100, "Username", "[email protected]"),
(200, "Username2", "[email protected]");
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE Users DROP COLUMN city;
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE Users ADD COLUMN city STRING(100);
162 changes: 134 additions & 28 deletions database/spanner/spanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@ import (

"cloud.google.com/go/spanner"
sdb "cloud.google.com/go/spanner/admin/database/apiv1"
"cloud.google.com/go/spanner/spansql"

"github.com/cloudspannerecosystem/memefish"
"github.com/cloudspannerecosystem/memefish/token"
"github.com/golang-migrate/migrate/v4"
"github.com/golang-migrate/migrate/v4/database"

Expand Down Expand Up @@ -60,22 +61,22 @@ type Config struct {

// Spanner implements database.Driver for Google Cloud Spanner
type Spanner struct {
db *DB

db *DB
config *Config

lock *uatomic.Uint32
lock *uatomic.Uint32
}

type DB struct {
admin *sdb.DatabaseAdminClient
data *spanner.Client
admin *sdb.DatabaseAdminClient
data *spanner.Client
shared bool
}

func NewDB(admin sdb.DatabaseAdminClient, data spanner.Client) *DB {
return &DB{
admin: &admin,
data: &data,
admin: &admin,
data: &data,
shared: true,
}
}

Expand Down Expand Up @@ -146,6 +147,9 @@ func (s *Spanner) Open(url string) (database.Driver, error) {

// Close implements database.Driver
func (s *Spanner) Close() error {
if s.db.shared {
return nil
}
s.db.data.Close()
return s.db.admin.Close()
}
Expand Down Expand Up @@ -174,26 +178,65 @@ func (s *Spanner) Run(migration io.Reader) error {
return err
}

stmts := []string{string(migr)}
if s.config.CleanStatements {
stmts, err = cleanStatements(migr)
if err != nil {
return err
ctx := context.Background()

if !s.config.CleanStatements {
return s.runDdl(ctx, []string{string(migr)})
}

stmtGroups, err := statementGroups(migr)
if err != nil {
return err
}

for _, group := range stmtGroups {
switch group.typ {
case statementTypeDDL:
if err := s.runDdl(ctx, group.stmts); err != nil {
return err
}
case statementTypeDML:
if err := s.runDml(ctx, group.stmts); err != nil {
return err
}
default:
return fmt.Errorf("unknown statement type: %s", group.typ)
}
}

ctx := context.Background()
return nil
}

func (s *Spanner) runDdl(ctx context.Context, stmts []string) error {
op, err := s.db.admin.UpdateDatabaseDdl(ctx, &adminpb.UpdateDatabaseDdlRequest{
Database: s.config.DatabaseName,
Statements: stmts,
})

if err != nil {
return &database.Error{OrigErr: err, Err: "migration failed", Query: migr}
return &database.Error{OrigErr: err, Err: "migration failed", Query: []byte(strings.Join(stmts, ";\n"))}
}

if err := op.Wait(ctx); err != nil {
return &database.Error{OrigErr: err, Err: "migration failed", Query: migr}
return &database.Error{OrigErr: err, Err: "migration failed", Query: []byte(strings.Join(stmts, ";\n"))}
}

return nil
}

func (s *Spanner) runDml(ctx context.Context, stmts []string) error {
_, err := s.db.data.ReadWriteTransaction(ctx,
func(ctx context.Context, txn *spanner.ReadWriteTransaction) error {
for _, s := range stmts {
_, err := txn.Update(ctx, spanner.Statement{SQL: s})
if err != nil {
return err
}
}
return nil
})
if err != nil {
return &database.Error{OrigErr: err, Err: "migration failed", Query: []byte(strings.Join(stmts, ";\n"))}
}

return nil
Expand Down Expand Up @@ -340,17 +383,80 @@ func (s *Spanner) ensureVersionTable() (err error) {
return nil
}

func cleanStatements(migration []byte) ([]string, error) {
// The Spanner GCP backend does not yet support comments for the UpdateDatabaseDdl RPC
// (see https://issuetracker.google.com/issues/159730604) we use
// spansql to parse the DDL and output valid stamements without comments
ddl, err := spansql.ParseDDL("", string(migration))
if err != nil {
return nil, err
type statementType string

const (
statementTypeUnknown statementType = ""
statementTypeDDL statementType = "DDL"
statementTypeDML statementType = "DML"
)

type statementGroup struct {
typ statementType
stmts []string
}

func statementGroups(migr []byte) (groups []*statementGroup, err error) {
lex := &memefish.Lexer{
File: &token.File{Buffer: string(migr)},
}
stmts := make([]string, 0, len(ddl.List))
for _, stmt := range ddl.List {
stmts = append(stmts, stmt.SQL())

group := &statementGroup{}
var stmtTyp statementType
var stmt strings.Builder
for {
if err := lex.NextToken(); err != nil {
return nil, err
}

if stmtTyp == statementTypeUnknown {
switch {
case lex.Token.IsKeywordLike("INSERT") || lex.Token.IsKeywordLike("DELETE") || lex.Token.IsKeywordLike("UPDATE"):
stmtTyp = statementTypeDML
default:
stmtTyp = statementTypeDDL
}
if group.typ != stmtTyp {
if len(group.stmts) > 0 {
groups = append(groups, group)
}
group = &statementGroup{typ: stmtTyp}
}
}

if lex.Token.Kind == token.TokenEOF || lex.Token.Kind == ";" {
if stmt.Len() > 0 {
group.stmts = append(group.stmts, stmt.String())
}
stmtTyp = statementTypeUnknown
stmt.Reset()

if lex.Token.Kind == token.TokenEOF {
if len(group.stmts) > 0 {
groups = append(groups, group)
}

break
}

continue
}

if len(lex.Token.Comments) > 0 && strings.HasPrefix(lex.Token.Comments[0].Raw, "--") {
// standard comment Token consumes a \n, so we need to add it back
if _, err := stmt.WriteString("\n"); err != nil {
return nil, err
}
}
if stmt.Len() > 0 {
if _, err := stmt.WriteString(lex.Token.Space); err != nil {
return nil, err
}
}
if _, err := stmt.WriteString(lex.Token.Raw); err != nil {
return nil, err
}
}
return stmts, nil

return groups, nil
}
Loading
Loading