Skip to content

Commit

Permalink
Implement COPY FROM support for DML initial load.
Browse files Browse the repository at this point in the history
  • Loading branch information
pedrofranceschi committed May 1, 2016
1 parent 565655c commit a643933
Show file tree
Hide file tree
Showing 18 changed files with 236 additions and 150 deletions.
2 changes: 1 addition & 1 deletion action/alterattribute.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (

type AlterAttribute struct {
SchemaName string
TypeName string
TypeName string
Column Column
NewColumn Column
}
Expand Down
21 changes: 21 additions & 0 deletions action/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,24 @@ func (c *Context) GetPreparedStatement(statement string) (*sqlx.Stmt, error) {

return nil, nil
}

func (c *Context) FlushStatements() error {
for _, stmt := range c.stmtCache {
// Exec and close each statement
_, err := stmt.Exec()
if err != nil {
return err
}

err = stmt.Close()
if err != nil {
return err
}
}

return nil
}

func (c *Context) Commit() error {
return c.Tx.Commit()
}
4 changes: 2 additions & 2 deletions action/createextension.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import (
)

type CreateExtension struct {
SchemaName string
ExtensionName string
SchemaName string
ExtensionName string
}

// Register type for gob
Expand Down
2 changes: 1 addition & 1 deletion action/dropextension.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
)

type DropExtension struct {
ExtensionName string
ExtensionName string
}

// Register type for gob
Expand Down
162 changes: 92 additions & 70 deletions action/insertrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/gob"
"encoding/json"
"fmt"
"github.com/lib/pq"
"strings"
"time"
)
Expand All @@ -12,7 +13,8 @@ type InsertRow struct {
SchemaName string
TableName string
PrimaryKeyName string
Rows []Row
Rows Rows
BulkInsert bool
}

// Register type for gob
Expand All @@ -22,69 +24,62 @@ func init() {
}

func (a *InsertRow) Execute(c *Context) error {
escapedCols := make([]string, 0)
escapedRows := make([]string, 0)
values := make([]interface{}, 0)
if a.BulkInsert {
escapedCols := make([]string, 0)
values := make([]interface{}, 0)

var primaryKeyRow *Row
for _, row := range a.Rows {
escapedCols = append(escapedCols, row.Column.Name)
values = append(values, row.GetValue())
}

for i, row := range a.Rows {
escapedCols = append(escapedCols, fmt.Sprintf("\"%s\"", row.Column.Name))
escapedRows = append(escapedRows, fmt.Sprintf("$%d::%s\"%s\"", i+1, row.Column.GetTypeSchemaStr(a.SchemaName), row.Column.Type))
stmt, err := c.GetPreparedStatement(
pq.CopyInSchema(a.SchemaName, a.TableName, escapedCols...),
)

// Marshall JSON objects as pg driver does not support it
if obj, ok := row.Value.(*map[string]interface{}); ok {
jsonStr, err := json.Marshal(obj)
if err != nil {
return err
}

if err != nil {
return err
}
_, err = stmt.Exec(values...)

values = append(values, string(jsonStr))
} else {
values = append(values, row.Value)
if err != nil {
return err
}
} else {
// Perform a single insert (upsert)
escapedCols := make([]string, 0)
escapedRows := make([]string, 0)
values := make([]interface{}, 0)

if row.Column.Name == a.PrimaryKeyName {
primaryKeyRow = &row
}
}
var primaryKeyRow *Row

// Save transaction prior to inserting to rollback
// if INSERT fails, so a UPDATE can be tried
_, err := c.Tx.Exec(fmt.Sprintf(
`SAVEPOINT "%s%s";`,
a.SchemaName,
a.TableName,
))
for i, row := range a.Rows {
escapedCols = append(escapedCols, fmt.Sprintf("\"%s\"", row.Column.Name))
escapedRows = append(escapedRows, fmt.Sprintf("$%d::%s\"%s\"", i+1, row.Column.GetTypeSchemaStr(a.SchemaName), row.Column.Type))

if err != nil {
return err
}
// Marshall JSON objects as pg driver does not support it
if obj, ok := row.Value.(*map[string]interface{}); ok {
jsonStr, err := json.Marshal(obj)

stmt, err := c.GetPreparedStatement(
fmt.Sprintf(
`
INSERT INTO "%s"."%s" (%s) VALUES (%s);
`,
a.SchemaName,
a.TableName,
strings.Join(escapedCols, ","),
strings.Join(escapedRows, ","),
),
)
if err != nil {
return err
}

if err != nil {
return err
}
values = append(values, string(jsonStr))
} else {
values = append(values, row.Value)
}

_, err = stmt.Exec(values...)
if row.Column.Name == a.PrimaryKeyName {
primaryKeyRow = &row
}
}

// Try to UPDATE (upsert) if INSERT fails...
if err != nil {
// Rollback to SAVEPOINT
_, err = c.Tx.Exec(fmt.Sprintf(
`ROLLBACK TO SAVEPOINT "%s%s";`,
// Save transaction prior to inserting to rollback
// if INSERT fails, so a UPDATE can be tried
_, err := c.Tx.Exec(fmt.Sprintf(
`SAVEPOINT "%s%s";`,
a.SchemaName,
a.TableName,
))
Expand All @@ -93,28 +88,55 @@ func (a *InsertRow) Execute(c *Context) error {
return err
}

updateAction := &UpdateRow{
a.SchemaName,
a.TableName,
*primaryKeyRow,
a.Rows,
}
_, err = c.Tx.Exec(
fmt.Sprintf(
`
INSERT INTO "%s"."%s" (%s) VALUES (%s);
`,
a.SchemaName,
a.TableName,
strings.Join(escapedCols, ","),
strings.Join(escapedRows, ","),
),
values...,
)

// Try to UPDATE (upsert) if INSERT fails...
if err != nil {
// Rollback to SAVEPOINT
_, err = c.Tx.Exec(fmt.Sprintf(
`ROLLBACK TO SAVEPOINT "%s%s";`,
a.SchemaName,
a.TableName,
))

err = updateAction.Execute(c)
if err != nil {
return err
}

if err != nil {
return err
}
} else {
// Release SAVEPOINT to avoid "out of shared memory"
_, err := c.Tx.Exec(fmt.Sprintf(
`RELEASE SAVEPOINT "%s%s";`,
a.SchemaName,
a.TableName,
))
updateAction := &UpdateRow{
a.SchemaName,
a.TableName,
*primaryKeyRow,
a.Rows,
}

if err != nil {
return err
err = updateAction.Execute(c)

if err != nil {
return err
}
} else {
// Release SAVEPOINT to avoid "out of shared memory"
_, err := c.Tx.Exec(fmt.Sprintf(
`RELEASE SAVEPOINT "%s%s";`,
a.SchemaName,
a.TableName,
))

if err != nil {
return err
}
}
}

Expand Down
40 changes: 40 additions & 0 deletions action/row.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,46 @@
package action

import (
"encoding/json"
)

type Row struct {
Value interface{}
Column Column
}

type Rows []Row

func (r *Row) GetValue() interface{} {
// pq cannot handle nested JSON objects
if obj, ok := r.Value.(*map[string]interface{}); ok {
jsonStr, err := json.Marshal(obj)

if err != nil {
return err
}

return string(jsonStr)
}

// text/varchar are converted to slice of bytes.
// Convert back to string...
if bytea, ok := r.Value.([]byte); ok {
return string(bytea)
}

return r.Value
}

// Implement Interface
func (slice Rows) Len() int {
return len(slice)
}

func (slice Rows) Less(i, j int) bool {
return slice[i].Column.Name < slice[j].Column.Name
}

func (slice Rows) Swap(i, j int) {
slice[i], slice[j] = slice[j], slice[i]
}
Loading

0 comments on commit a643933

Please sign in to comment.