Skip to content

Commit

Permalink
Implement incremental apply.
Browse files Browse the repository at this point in the history
  • Loading branch information
pedrofranceschi committed Apr 25, 2016
1 parent a9720dd commit ee355f2
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 8 deletions.
44 changes: 40 additions & 4 deletions applier/applier.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,23 +118,59 @@ func (a *Applier) applyBatch(batch *database.Batch) (bool, error) {
}
} else if batch.StorageType == "fs" {
reader, file, err := batch.GetFileReader()
defer file.Close()

if err != nil {
return updateBatchStatus(err)
}

defer file.Close()

var act action.Action
currentStatement := 0
currentBatchSize := 0
previousStatement := batch.LastExecutedStatement

act, err = batch.ReadAction(reader);

for err == nil {
err = a.applyAction(act, tx)
// Increment current statement
currentStatement += 1

if err != nil {
return updateBatchStatus(err)
// Start applying from previous stop point
if currentStatement > previousStatement {
err = a.applyAction(act, tx)

if err != nil {
return updateBatchStatus(err)
}

batch.LastExecutedStatement += 1
currentBatchSize += 1

if currentBatchSize >= a.batchSize {
// Current batch reached the maximum size.
// Commit batch to database.
err = batch.UpdateQuery(tx)

if err != nil {
return false, err
}

err = tx.Commit()

if err != nil {
return false, err
}

// Restart transaction
tx = a.db.NewTransaction()

// Reset batch size
currentBatchSize = 0
}
}

// Read next action
act, err = batch.ReadAction(reader);
}

Expand Down
3 changes: 2 additions & 1 deletion data/sql/setup.sql
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ CREATE TABLE IF NOT EXISTS teleport.batch (
data text,
source text,
target text,
waiting_reexecution boolean not null default false
waiting_reexecution boolean not null default false,
last_executed_statement int default 0
);

-- Create table to store events of a given batch
Expand Down
8 changes: 5 additions & 3 deletions database/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type Batch struct {
Data *string `db:"data" json:"data"`
StorageType string `db:"storage_type" json:"storage_type"`
WaitingReexecution bool `db:"waiting_reexecution" json:"waiting_reexecution"`
LastExecutedStatement int `db:"last_executed_statement" json:"last_executed_statement"`
}

func NewBatch(storageType string) *Batch {
Expand Down Expand Up @@ -189,20 +190,21 @@ func (b *Batch) UpdateQuery(tx *sqlx.Tx) error {

// If there's no id, insert without id
if b.DataStatus == "" {
query = "UPDATE teleport.batch SET status = $1, data = $2, waiting_reexecution = $3 WHERE id = $4"
query = "UPDATE teleport.batch SET status = $1, data = $2, waiting_reexecution = $3, last_executed_statement = $4 WHERE id = $5"
args = append(args, b.Status)
} else if b.Status == "" {
query = "UPDATE teleport.batch SET data_status = $1, data = $2, waiting_reexecution = $3 WHERE id = $4"
query = "UPDATE teleport.batch SET data_status = $1, data = $2, waiting_reexecution = $3, last_executed_statement = $4 WHERE id = $5"
args = append(args, b.DataStatus)
} else {
query = "UPDATE teleport.batch SET data_status = $1, status = $2, data = $3, waiting_reexecution = $4 WHERE id = $5"
query = "UPDATE teleport.batch SET data_status = $1, status = $2, data = $3, waiting_reexecution = $4, last_executed_statement = $5 WHERE id = $6"
args = append(args, b.DataStatus)
args = append(args, b.Status)
}

args = append(args,
b.Data,
b.WaitingReexecution,
b.LastExecutedStatement,
b.Id,
)

Expand Down

0 comments on commit ee355f2

Please sign in to comment.