Skip to content

Commit

Permalink
Implement prepared statements caching for insert row.
Browse files Browse the repository at this point in the history
  • Loading branch information
pedrofranceschi committed Apr 25, 2016
1 parent ee355f2 commit fb7539c
Show file tree
Hide file tree
Showing 22 changed files with 70 additions and 33 deletions.
2 changes: 1 addition & 1 deletion action/action.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package action

type Action interface {
// Execute the given action
Execute(c Context) error
Execute(c *Context) error
// Returns whether current action should be executed
// for a targetExpression
Filter(targetExpression string) bool
Expand Down
2 changes: 1 addition & 1 deletion action/altercolumn.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func init() {
gob.Register(&AlterColumn{})
}

func (a *AlterColumn) Execute(c Context) error {
func (a *AlterColumn) Execute(c *Context) error {
if a.Column.Name != a.NewColumn.Name {
_, err := c.Tx.Exec(
fmt.Sprintf(
Expand Down
2 changes: 1 addition & 1 deletion action/alterschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func init() {
gob.Register(&AlterSchema{})
}

func (a *AlterSchema) Execute(c Context) error {
func (a *AlterSchema) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf("ALTER SCHEMA \"%s\" RENAME TO \"%s\";", a.SourceName, a.TargetName),
)
Expand Down
2 changes: 1 addition & 1 deletion action/altertable.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func init() {
gob.Register(&AlterTable{})
}

func (a *AlterTable) Execute(c Context) error {
func (a *AlterTable) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf("ALTER TABLE \"%s\".\"%s\" RENAME TO \"%s\";", a.SchemaName, a.SourceName, a.TargetName),
)
Expand Down
2 changes: 1 addition & 1 deletion action/altertype.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func init() {
gob.Register(&AlterType{})
}

func (a *AlterType) Execute(c Context) error {
func (a *AlterType) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf("ALTER TYPE \"%s\".\"%s\" RENAME TO \"%s\";", a.SchemaName, a.SourceName, a.TargetName),
)
Expand Down
34 changes: 34 additions & 0 deletions action/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,45 @@ package action

import (
"github.com/jmoiron/sqlx"
"crypto/md5"
)

// Defines the execution context of actions
type Context struct {
Tx *sqlx.Tx
Db *sqlx.DB
stmtCache map[string]*sqlx.Stmt
}

func NewContext(tx *sqlx.Tx, db *sqlx.DB) *Context {
return &Context{
tx,
db,
make(map[string]*sqlx.Stmt),
}
}

func (c *Context) getHash(str string) string {
h := md5.New()
return string(h.Sum([]byte(str)))
}

func (c *Context) GetPreparedStatement(statement string) (*sqlx.Stmt, error) {
hash := c.getHash(statement)

if stmt, ok := c.stmtCache[hash]; ok {
return stmt, nil
} else {
stmt, err := c.Tx.Preparex(statement)

if err != nil {
return nil, err
}

c.stmtCache[hash] = stmt

return stmt, nil
}

return nil, nil
}
2 changes: 1 addition & 1 deletion action/createcolumn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func init() {
gob.Register(&CreateColumn{})
}

func (a *CreateColumn) Execute(c Context) error {
func (a *CreateColumn) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf(
"ALTER TABLE \"%s\".\"%s\" ADD COLUMN \"%s\" %s\"%s\";",
Expand Down
2 changes: 1 addition & 1 deletion action/createenum.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func init() {
gob.Register(&CreateEnum{})
}

func (a *CreateEnum) Execute(c Context) error {
func (a *CreateEnum) Execute(c *Context) error {
// ALTER TYPE... ADD VALUE cannot run inside a transaction
_, err := c.Db.Exec(
fmt.Sprintf(
Expand Down
2 changes: 1 addition & 1 deletion action/createschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func init() {
gob.Register(&CreateSchema{})
}

func (a *CreateSchema) Execute(c Context) error {
func (a *CreateSchema) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf("CREATE SCHEMA IF NOT EXISTS \"%s\";", a.SchemaName),
)
Expand Down
2 changes: 1 addition & 1 deletion action/createtable.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ func init() {
gob.Register(&CreateTable{})
}

func (a *CreateTable) Execute(c Context) error {
func (a *CreateTable) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf(
"CREATE TABLE \"%s\".\"%s\" (\"%s\" %s\"%s\" PRIMARY KEY);",
Expand Down
2 changes: 1 addition & 1 deletion action/createtrigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func init() {
gob.Register(&CreateTrigger{})
}

func (a *CreateTrigger) Execute(c Context) error {
func (a *CreateTrigger) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf(
`
Expand Down
2 changes: 1 addition & 1 deletion action/createtype.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func init() {
gob.Register(&CreateType{})
}

func (a *CreateType) Execute(c Context) error {
func (a *CreateType) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf(
"CREATE TYPE \"%s\".\"%s\" AS ENUM ();",
Expand Down
2 changes: 1 addition & 1 deletion action/deleterow.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func init() {
gob.Register(&DeleteRow{})
}

func (a *DeleteRow) Execute(c Context) error {
func (a *DeleteRow) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf(
`DELETE FROM "%s"."%s" WHERE "%s" = $1;`,
Expand Down
2 changes: 1 addition & 1 deletion action/dropcolumn.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func init() {
gob.Register(&DropColumn{})
}

func (a *DropColumn) Execute(c Context) error {
func (a *DropColumn) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf("ALTER TABLE \"%s\".\"%s\" DROP COLUMN \"%s\";", a.SchemaName, a.TableName, a.Column.Name),
)
Expand Down
2 changes: 1 addition & 1 deletion action/dropschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ func init() {
gob.Register(&DropSchema{})
}

func (a *DropSchema) Execute(c Context) error {
func (a *DropSchema) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf("DROP SCHEMA \"%s\";", a.SchemaName),
)
Expand Down
2 changes: 1 addition & 1 deletion action/droptable.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func init() {
gob.Register(&DropTable{})
}

func (a *DropTable) Execute(c Context) error {
func (a *DropTable) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf("DROP TABLE \"%s\".\"%s\";", a.SchemaName, a.TableName),
)
Expand Down
2 changes: 1 addition & 1 deletion action/droptrigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ func init() {
gob.Register(&DropTrigger{})
}

func (a *DropTrigger) Execute(c Context) error {
func (a *DropTrigger) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf(
`DROP TRIGGER IF EXISTS "%s" ON "%s"."%s";`,
Expand Down
2 changes: 1 addition & 1 deletion action/droptype.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func init() {
gob.Register(&DropType{})
}

func (a *DropType) Execute(c Context) error {
func (a *DropType) Execute(c *Context) error {
_, err := c.Tx.Exec(
fmt.Sprintf("DROP TYPE \"%s\".\"%s\";", a.SchemaName, a.TypeName),
)
Expand Down
11 changes: 8 additions & 3 deletions action/insertrow.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func init() {
gob.Register(&time.Time{})
}

func (a *InsertRow) Execute(c Context) error {
func (a *InsertRow) Execute(c *Context) error {
escapedCols := make([]string, 0)
escapedRows := make([]string, 0)
values := make([]interface{}, 0)
Expand Down Expand Up @@ -62,7 +62,7 @@ func (a *InsertRow) Execute(c Context) error {
return err
}

_, err = c.Tx.Exec(
stmt, err := c.GetPreparedStatement(
fmt.Sprintf(
`
INSERT INTO "%s"."%s" (%s) VALUES (%s);
Expand All @@ -72,9 +72,14 @@ func (a *InsertRow) Execute(c Context) error {
strings.Join(escapedCols, ","),
strings.Join(escapedRows, ","),
),
values...,
)

if err != nil {
return err
}

_, err = stmt.Exec(values...)

// Try to UPDATE (upsert) if INSERT fails...
if err != nil {
// Rollback to SAVEPOINT
Expand Down
2 changes: 1 addition & 1 deletion action/updaterow.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func init() {
gob.Register(&UpdateRow{})
}

func (a *UpdateRow) Execute(c Context) error {
func (a *UpdateRow) Execute(c *Context) error {
escapedRows := make([]string, 0)
replacementsCount := 0
values := make([]interface{}, 0)
Expand Down
17 changes: 9 additions & 8 deletions applier/applier.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package applier

import (
"github.com/jmoiron/sqlx"
"github.com/pagarme/teleport/action"
"github.com/pagarme/teleport/database"
"log"
Expand Down Expand Up @@ -46,12 +45,9 @@ func (a *Applier) Watch(sleepTime time.Duration) {
}
}

func (a *Applier) applyAction(act action.Action, tx *sqlx.Tx) error {
func (a *Applier) applyAction(act action.Action, context *action.Context) error {
// Execute action of the given event
err := act.Execute(action.Context{
Tx: tx,
Db: a.db.Db,
})
err := act.Execute(context)

if err != nil {
log.Printf("Error applying action %#v: %v", act, err)
Expand Down Expand Up @@ -109,8 +105,10 @@ func (a *Applier) applyBatch(batch *database.Batch) (bool, error) {
return false, err
}

context := action.NewContext(tx, a.db.Db)

for _, act := range actions {
err := a.applyAction(act, tx)
err := a.applyAction(act, context)

if err != nil {
return updateBatchStatus(err)
Expand All @@ -130,6 +128,7 @@ func (a *Applier) applyBatch(batch *database.Batch) (bool, error) {
currentBatchSize := 0
previousStatement := batch.LastExecutedStatement

currentContext := action.NewContext(tx, a.db.Db)
act, err = batch.ReadAction(reader);

for err == nil {
Expand All @@ -138,7 +137,7 @@ func (a *Applier) applyBatch(batch *database.Batch) (bool, error) {

// Start applying from previous stop point
if currentStatement > previousStatement {
err = a.applyAction(act, tx)
err = a.applyAction(act, currentContext)

if err != nil {
return updateBatchStatus(err)
Expand All @@ -165,6 +164,8 @@ func (a *Applier) applyBatch(batch *database.Batch) (bool, error) {
// Restart transaction
tx = a.db.NewTransaction()

currentContext = action.NewContext(tx, a.db.Db)

// Reset batch size
currentBatchSize = 0
}
Expand Down
5 changes: 1 addition & 4 deletions database/class.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,7 @@ func (c *Class) InstallTriggers() error {
tx := c.Schema.Db.NewTransaction()

for _, currentAction := range actions {
err := currentAction.Execute(action.Context{
Tx: tx,
Db: c.Schema.Db.Db,
})
err := currentAction.Execute(action.NewContext(tx, c.Schema.Db.Db,))

if err != nil {
log.Printf("Error creating triggers on %s: %v", c.RelationName, err)
Expand Down

0 comments on commit fb7539c

Please sign in to comment.