From b8c61736b3926d1de1eee028e9b75e75730edd3d Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Thu, 11 Jan 2024 01:17:31 -0800 Subject: [PATCH 01/16] [CF-557] Changelog updates should wait for transaction commit --- pkg/cmd/ctlstore/main.go | 2 +- pkg/executive/db_executive.go | 2 +- pkg/ldb/ldbs.go | 4 + pkg/ldbwriter/ldb_callback_writer.go | 35 +++++- pkg/ldbwriter/ldb_callback_writer_test.go | 147 ++++++++++++++++++++++ pkg/sqlite/sqlite_watch.go | 7 +- 6 files changed, 191 insertions(+), 6 deletions(-) create mode 100644 pkg/ldbwriter/ldb_callback_writer_test.go diff --git a/pkg/cmd/ctlstore/main.go b/pkg/cmd/ctlstore/main.go index 31bbb99a..dee8be11 100644 --- a/pkg/cmd/ctlstore/main.go +++ b/pkg/cmd/ctlstore/main.go @@ -114,7 +114,7 @@ type supervisorCliConfig struct { type ledgerHealthConfig struct { Disable bool `conf:"disable" help:"disable ledger latency health attributing (DEPRECATED: use disable-ecs-behavior instead)"` DisableECSBehavior bool `conf:"disable-ecs-behavior" help:"disable ledger latency health attributing"` - MaxHealthyLatency time.Duration `conf:"max-healty-latency" help:"Max latency considered healthy"` + MaxHealthyLatency time.Duration `conf:"max-healthy-latency" help:"Max latency considered healthy"` AttributeName string `conf:"attribute-name" help:"The name of the attribute"` HealthyAttributeValue string `conf:"healthy-attribute-value" help:"The value of the attribute if healthy"` UnhealthyAttributeValue string `conf:"unhealth-attribute-value" help:"The value of the attribute if unhealthy"` diff --git a/pkg/executive/db_executive.go b/pkg/executive/db_executive.go index fc2bbf4f..07066d17 100644 --- a/pkg/executive/db_executive.go +++ b/pkg/executive/db_executive.go @@ -319,7 +319,7 @@ func (e *dbExecutive) AddFields(familyName string, tableName string, fieldNames } // We first write the column modification to the DML ledger within the transaction. - // It's important that this is done befored the DDL is applied to the ctldb, as + // It's important that this is done before the DDL is applied to the ctldb, as // the DDL is not able to be rolled back. In this way, if the DDL fails, the DML // can be rolled back. dlw := dmlLedgerWriter{Tx: tx, TableName: dmlLedgerTableName} diff --git a/pkg/ldb/ldbs.go b/pkg/ldb/ldbs.go index 9fca941d..6e7b38f0 100644 --- a/pkg/ldb/ldbs.go +++ b/pkg/ldb/ldbs.go @@ -121,3 +121,7 @@ func FetchSeqFromLdb(ctx context.Context, db *sql.DB) (schema.DMLSequence, error } return schema.DMLSequence(seq), err } + +func IsInternalTable(name string) bool { + return name == LDBSeqTableName || name == LDBLastUpdateTableName +} diff --git a/pkg/ldbwriter/ldb_callback_writer.go b/pkg/ldbwriter/ldb_callback_writer.go index ad1a4148..ceba645f 100644 --- a/pkg/ldbwriter/ldb_callback_writer.go +++ b/pkg/ldbwriter/ldb_callback_writer.go @@ -7,15 +7,19 @@ import ( "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/ctlstore/pkg/sqlite" "github.com/segmentio/events/v2" + "github.com/segmentio/stats/v4" ) // CallbackWriter is an LDBWriter that delegates to another // writer and then, upon a successful write, executes N callbacks. type CallbackWriter struct { - DB *sql.DB - Delegate LDBWriter - Callbacks []LDBWriteCallback + DB *sql.DB + Delegate LDBWriter + Callbacks []LDBWriteCallback + // Buffer between SQLite Callback and our code ChangeBuffer *sqlite.SQLChangeBuffer + // Accumulated changes across multiple ApplyDMLStatement calls + transactionChanges []sqlite.SQLiteWatchChange } func (w *CallbackWriter) ApplyDMLStatement(ctx context.Context, statement schema.DMLStatement) error { @@ -23,7 +27,32 @@ func (w *CallbackWriter) ApplyDMLStatement(ctx context.Context, statement schema if err != nil { return err } + + // If beginning a transaction then start accumulating changes + if statement.Statement == schema.DMLTxBeginKey { + w.transactionChanges = make([]sqlite.SQLiteWatchChange, 0) + stats.Set("ldb_changes_accumulated", len(w.transactionChanges)) + return nil + } + changes := w.ChangeBuffer.Pop() + + // Are we in a transaction? + if w.transactionChanges != nil { + if statement.Statement == schema.DMLTxEndKey { + // Transaction done! Send out the accumulated changes + changes = append(w.transactionChanges, changes...) + stats.Set("ldb_changes_accumulated", len(changes)) + w.transactionChanges = nil + } else { + // Transaction isn't over yet, save the latest changes + w.transactionChanges = append(w.transactionChanges, changes...) + stats.Set("ldb_changes_accumulated", len(w.transactionChanges)) + return nil + } + } + + stats.Observe("ldb_changes_written", len(changes)) for _, callback := range w.Callbacks { events.Debug("Writing DML callback for %{cb}T", callback) callback.LDBWritten(ctx, LDBWriteMetadata{ diff --git a/pkg/ldbwriter/ldb_callback_writer_test.go b/pkg/ldbwriter/ldb_callback_writer_test.go new file mode 100644 index 00000000..ada2e720 --- /dev/null +++ b/pkg/ldbwriter/ldb_callback_writer_test.go @@ -0,0 +1,147 @@ +package ldbwriter + +import ( + "context" + "database/sql" + "github.com/segmentio/ctlstore/pkg/ldb" + "github.com/segmentio/ctlstore/pkg/schema" + "github.com/segmentio/ctlstore/pkg/sqlite" + "github.com/stretchr/testify/assert" + "testing" +) + +/* + * Simple LDBWriteCallback handler that just stores the changes it gets. + */ +type TestUpdateCallbackHandler struct { + Changes []sqlite.SQLiteWatchChange +} + +func (u *TestUpdateCallbackHandler) LDBWritten(ctx context.Context, data LDBWriteMetadata) { + // The [:0] slice operation will reuse the underlying array of u.Changes if it's large enough + // to hold all elements of data.Changes, otherwise it will allocate a new one. + u.Changes = append(u.Changes[:0], data.Changes...) +} + +func (u *TestUpdateCallbackHandler) UpdateCount() int { + return len(u.Changes) +} + +func (u *TestUpdateCallbackHandler) Reset() { + u.Changes = u.Changes[:0] + return +} + +/* + * Test strategy: + * Check how many times we get callbacks while applying DML statements, + * and how many updates we get per callback. + */ +func TestCallbackWriter_ApplyDMLStatement(t *testing.T) { + // Begin boilerplate + var err error + ctx := context.Background() + var changeBuffer sqlite.SQLChangeBuffer + dbName := "test_ldb_callback_writer" + _ = sqlite.RegisterSQLiteWatch(dbName, &changeBuffer) + + db, err := sql.Open(dbName, ":memory:") + if err != nil { + t.Fatalf("Unexpected error: %+v", err) + } + defer db.Close() + + err = ldb.EnsureLdbInitialized(ctx, db) + if err != nil { + t.Fatalf("Couldn't initialize SQLite db, error %v", err) + } + // End boilerplate + + // Set up the callback writer with our test callback handler + ldbWriteCallback := &TestUpdateCallbackHandler{} + + writer := CallbackWriter{ + DB: db, + Delegate: &SqlLdbWriter{Db: db}, + Callbacks: []LDBWriteCallback{ldbWriteCallback}, + ChangeBuffer: &changeBuffer, + } + + err = writer.ApplyDMLStatement(ctx, schema.NewTestDMLStatement("CREATE TABLE foo (bar VARCHAR);")) + if err != nil { + t.Fatalf("Could not issue CREATE TABLE statements, error %v", err) + } + + type args struct { + ctx context.Context + statements []schema.DMLStatement + } + tests := []struct { + name string + args args + expectedCallbacks int + expectedUpdatesPerCallback int + wantErr bool + }{ + { + name: "Test 1", + args: args{ + ctx: ctx, + statements: []schema.DMLStatement{schema.NewTestDMLStatement("INSERT INTO foo VALUES('dummy');")}, + }, + expectedCallbacks: 1, + expectedUpdatesPerCallback: 1, + wantErr: false, + }, + { + name: "Test 2", + args: args{ + ctx: ctx, + statements: []schema.DMLStatement{ + schema.NewTestDMLStatement("INSERT INTO foo VALUES('boston');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('detroit');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('chicago');"), + }, + }, + // bare statements outside of a transaction should get a callback each time + expectedCallbacks: 3, + expectedUpdatesPerCallback: 1, + wantErr: false, + }, + { + name: "Test 3", + args: args{ + ctx: ctx, + statements: []schema.DMLStatement{ + schema.NewTestDMLStatement(schema.DMLTxBeginKey), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('asdf');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('foo');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('bar');"), + schema.NewTestDMLStatement(schema.DMLTxEndKey), + }, + }, + // since it's a transaction, we expect only one callback, and it should have all 3 updates + expectedCallbacks: 1, + expectedUpdatesPerCallback: 3, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + callbackCount := 0 + for _, statement := range tt.args.statements { + if err := writer.ApplyDMLStatement(tt.args.ctx, statement); (err != nil) != tt.wantErr { + t.Errorf("ApplyDMLStatement() error = %v, wantErr %v", err, tt.wantErr) + } + // did we get a callback from that statement being applied? + if ldbWriteCallback.UpdateCount() > 0 { + callbackCount++ + assert.Equal(t, tt.expectedUpdatesPerCallback, ldbWriteCallback.UpdateCount()) + // delete previous callback's update entries since we "handled" the callback + ldbWriteCallback.Reset() + } + } + assert.Equal(t, tt.expectedCallbacks, callbackCount) + }) + } +} diff --git a/pkg/sqlite/sqlite_watch.go b/pkg/sqlite/sqlite_watch.go index 9d351ce7..40f02332 100644 --- a/pkg/sqlite/sqlite_watch.go +++ b/pkg/sqlite/sqlite_watch.go @@ -3,8 +3,8 @@ package sqlite import ( "context" "database/sql" - "github.com/pkg/errors" + "github.com/segmentio/ctlstore/pkg/ldb" "github.com/segmentio/ctlstore/pkg/scanfunc" "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/go-sqlite3" @@ -40,6 +40,11 @@ func RegisterSQLiteWatch(dbName string, buffer *SQLChangeBuffer) error { var newRow []interface{} var oldRow []interface{} + // Don't bother propagating updates of our internal bookkeeping tables + if ldb.IsInternalTable(pud.TableName) { + return + } + if pud.Op == sqlite3.SQLITE_UPDATE || pud.Op == sqlite3.SQLITE_DELETE { oldRow = make([]interface{}, cnt) err := pud.Old(oldRow...) From 8574a1397d3a548dca9207a5eb6b646a432d7ca0 Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Thu, 11 Jan 2024 01:47:30 -0800 Subject: [PATCH 02/16] add back newline that got autoremoved somehow --- pkg/ldbwriter/ldb_callback_writer.go | 19 ++++++++++++++++--- pkg/sqlite/sqlite_watch.go | 1 + 2 files changed, 17 insertions(+), 3 deletions(-) diff --git a/pkg/ldbwriter/ldb_callback_writer.go b/pkg/ldbwriter/ldb_callback_writer.go index ceba645f..5e7f9f9b 100644 --- a/pkg/ldbwriter/ldb_callback_writer.go +++ b/pkg/ldbwriter/ldb_callback_writer.go @@ -22,6 +22,8 @@ type CallbackWriter struct { transactionChanges []sqlite.SQLiteWatchChange } +// TODO: write a small struct with a couple receiver methods to make the below code more clean & simple + func (w *CallbackWriter) ApplyDMLStatement(ctx context.Context, statement schema.DMLStatement) error { err := w.Delegate.ApplyDMLStatement(ctx, statement) if err != nil { @@ -30,8 +32,18 @@ func (w *CallbackWriter) ApplyDMLStatement(ctx context.Context, statement schema // If beginning a transaction then start accumulating changes if statement.Statement == schema.DMLTxBeginKey { - w.transactionChanges = make([]sqlite.SQLiteWatchChange, 0) - stats.Set("ldb_changes_accumulated", len(w.transactionChanges)) + if w.transactionChanges == nil { + w.transactionChanges = make([]sqlite.SQLiteWatchChange, 0) + } else { + if len(w.transactionChanges) > 0 { + // This should never happen, but just in case... + stats.Add("ldb_changes_abandoned", len(w.transactionChanges)) + events.Log("error: abandoned %{count}d changes from incomplete transaction", len(w.transactionChanges)) + } + // Reset to size 0, but keep the underlying array + w.transactionChanges = w.transactionChanges[:0] + } + stats.Set("ldb_changes_accumulated", 0) return nil } @@ -43,7 +55,8 @@ func (w *CallbackWriter) ApplyDMLStatement(ctx context.Context, statement schema // Transaction done! Send out the accumulated changes changes = append(w.transactionChanges, changes...) stats.Set("ldb_changes_accumulated", len(changes)) - w.transactionChanges = nil + // Reset to size 0, but keep the underlying array + w.transactionChanges = w.transactionChanges[:0] } else { // Transaction isn't over yet, save the latest changes w.transactionChanges = append(w.transactionChanges, changes...) diff --git a/pkg/sqlite/sqlite_watch.go b/pkg/sqlite/sqlite_watch.go index 40f02332..cf6a8e96 100644 --- a/pkg/sqlite/sqlite_watch.go +++ b/pkg/sqlite/sqlite_watch.go @@ -3,6 +3,7 @@ package sqlite import ( "context" "database/sql" + "github.com/pkg/errors" "github.com/segmentio/ctlstore/pkg/ldb" "github.com/segmentio/ctlstore/pkg/scanfunc" From 5c5b5d8ca98f4d9f708bdc802d74191f505d151f Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Thu, 11 Jan 2024 01:54:01 -0800 Subject: [PATCH 03/16] pin alpine version to avoid musl problems See: https://github.com/mattn/go-sqlite3/issues/1164 --- Dockerfile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Dockerfile b/Dockerfile index a70bf5d3..f3891f68 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,4 +1,4 @@ -FROM golang:1.20-alpine +FROM golang:1.20-alpine3.18 ENV SRC github.com/segmentio/ctlstore ARG VERSION From 4221eb96b96f9bed05354fd09e52e98f1dcd446e Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Thu, 11 Jan 2024 16:21:07 -0800 Subject: [PATCH 04/16] refactor to make code easier to read, and add another test case --- pkg/ldbwriter/ldb_callback_writer.go | 75 +++++++++++++++-------- pkg/ldbwriter/ldb_callback_writer_test.go | 22 ++++++- 2 files changed, 69 insertions(+), 28 deletions(-) diff --git a/pkg/ldbwriter/ldb_callback_writer.go b/pkg/ldbwriter/ldb_callback_writer.go index 5e7f9f9b..2926e4ab 100644 --- a/pkg/ldbwriter/ldb_callback_writer.go +++ b/pkg/ldbwriter/ldb_callback_writer.go @@ -16,51 +16,76 @@ type CallbackWriter struct { DB *sql.DB Delegate LDBWriter Callbacks []LDBWriteCallback - // Buffer between SQLite Callback and our code + // Buffer between SQLite preupdate Hook and this code ChangeBuffer *sqlite.SQLChangeBuffer // Accumulated changes across multiple ApplyDMLStatement calls transactionChanges []sqlite.SQLiteWatchChange } -// TODO: write a small struct with a couple receiver methods to make the below code more clean & simple +func (w *CallbackWriter) InTransaction() bool { + return w.transactionChanges != nil +} + +func (w *CallbackWriter) BeginTransaction() { + if w.transactionChanges == nil { + w.transactionChanges = make([]sqlite.SQLiteWatchChange, 0) + } else { + if len(w.transactionChanges) > 0 { + // This should never happen, but just in case... + stats.Add("ldb_changes_abandoned", len(w.transactionChanges)) + events.Log("error: abandoned %{count}d changes from incomplete transaction", len(w.transactionChanges)) + } + // Reset to size 0, but keep the underlying array + w.transactionChanges = w.transactionChanges[:0] + } + stats.Set("ldb_changes_accumulated", 0) +} +// Transaction done! Return the accumulated changes including the latest ones +func (w *CallbackWriter) EndTransaction(changes *[]sqlite.SQLiteWatchChange) { + *changes = append(w.transactionChanges, *changes...) + stats.Set("ldb_changes_accumulated", len(*changes)) + // Reset to size 0, but keep the underlying array + w.transactionChanges = w.transactionChanges[:0] +} + +// Transaction isn't over yet, save the latest changes +func (w *CallbackWriter) AccumulateChanges(changes []sqlite.SQLiteWatchChange) { + w.transactionChanges = append(w.transactionChanges, changes...) + stats.Set("ldb_changes_accumulated", len(w.transactionChanges)) +} + +// ApplyDMLStatement +// +// It is not obvious, but this code executes synchronously: +// 1. Delegate.AppyDMLStatement executes the DML statement against the SQLite LDB. +// (WARNING: That's what the code is wired up to do today, January 2024, though the Delegate +// could be doing other things since the code is so flexible.) +// 2. When SQLite processes the statement it invokes our preupdate hook (see sqlite_watch.go). +// 3. Our preupdate hook writes the changes to the change buffer. +// 4. The code returns here, and we decide whether to process the change buffer immediately or +// wait until the end of the ledger transaction. func (w *CallbackWriter) ApplyDMLStatement(ctx context.Context, statement schema.DMLStatement) error { err := w.Delegate.ApplyDMLStatement(ctx, statement) if err != nil { return err } - // If beginning a transaction then start accumulating changes + // If beginning a transaction then start accumulating changes, don't send them out yet if statement.Statement == schema.DMLTxBeginKey { - if w.transactionChanges == nil { - w.transactionChanges = make([]sqlite.SQLiteWatchChange, 0) - } else { - if len(w.transactionChanges) > 0 { - // This should never happen, but just in case... - stats.Add("ldb_changes_abandoned", len(w.transactionChanges)) - events.Log("error: abandoned %{count}d changes from incomplete transaction", len(w.transactionChanges)) - } - // Reset to size 0, but keep the underlying array - w.transactionChanges = w.transactionChanges[:0] - } - stats.Set("ldb_changes_accumulated", 0) + w.BeginTransaction() return nil } changes := w.ChangeBuffer.Pop() - // Are we in a transaction? - if w.transactionChanges != nil { + if w.InTransaction() { if statement.Statement == schema.DMLTxEndKey { - // Transaction done! Send out the accumulated changes - changes = append(w.transactionChanges, changes...) - stats.Set("ldb_changes_accumulated", len(changes)) - // Reset to size 0, but keep the underlying array - w.transactionChanges = w.transactionChanges[:0] + // Transaction done, let's send what we have accumulated + w.EndTransaction(&changes) } else { - // Transaction isn't over yet, save the latest changes - w.transactionChanges = append(w.transactionChanges, changes...) - stats.Set("ldb_changes_accumulated", len(w.transactionChanges)) + // Transaction not over, continue accumulating + w.AccumulateChanges(changes) return nil } } diff --git a/pkg/ldbwriter/ldb_callback_writer_test.go b/pkg/ldbwriter/ldb_callback_writer_test.go index ada2e720..520b769a 100644 --- a/pkg/ldbwriter/ldb_callback_writer_test.go +++ b/pkg/ldbwriter/ldb_callback_writer_test.go @@ -84,7 +84,7 @@ func TestCallbackWriter_ApplyDMLStatement(t *testing.T) { wantErr bool }{ { - name: "Test 1", + name: "Test one bare statement", args: args{ ctx: ctx, statements: []schema.DMLStatement{schema.NewTestDMLStatement("INSERT INTO foo VALUES('dummy');")}, @@ -94,7 +94,7 @@ func TestCallbackWriter_ApplyDMLStatement(t *testing.T) { wantErr: false, }, { - name: "Test 2", + name: "Test three bare statements", args: args{ ctx: ctx, statements: []schema.DMLStatement{ @@ -109,7 +109,7 @@ func TestCallbackWriter_ApplyDMLStatement(t *testing.T) { wantErr: false, }, { - name: "Test 3", + name: "Test three statements in a ledger transaction", args: args{ ctx: ctx, statements: []schema.DMLStatement{ @@ -125,6 +125,22 @@ func TestCallbackWriter_ApplyDMLStatement(t *testing.T) { expectedUpdatesPerCallback: 3, wantErr: false, }, + { + name: "Test two statements in a ledger transaction", + args: args{ + ctx: ctx, + statements: []schema.DMLStatement{ + schema.NewTestDMLStatement(schema.DMLTxBeginKey), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('blue');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('green');"), + schema.NewTestDMLStatement(schema.DMLTxEndKey), + }, + }, + // since it's a transaction, we expect only one callback, and it should have all 3 updates + expectedCallbacks: 1, + expectedUpdatesPerCallback: 2, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From e8a2d852cd3fa2855755043f84c61962800d57cd Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Thu, 11 Jan 2024 16:55:55 -0800 Subject: [PATCH 05/16] =?UTF-8?q?add=20=E2=9A=A0=EF=B8=8F=20to=20comment?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/ldbwriter/ldb_callback_writer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ldbwriter/ldb_callback_writer.go b/pkg/ldbwriter/ldb_callback_writer.go index 2926e4ab..7ef7c686 100644 --- a/pkg/ldbwriter/ldb_callback_writer.go +++ b/pkg/ldbwriter/ldb_callback_writer.go @@ -59,7 +59,7 @@ func (w *CallbackWriter) AccumulateChanges(changes []sqlite.SQLiteWatchChange) { // // It is not obvious, but this code executes synchronously: // 1. Delegate.AppyDMLStatement executes the DML statement against the SQLite LDB. -// (WARNING: That's what the code is wired up to do today, January 2024, though the Delegate +// (⚠️ WARNING: That's what the code is wired up to do today, January 2024, though the Delegate // could be doing other things since the code is so flexible.) // 2. When SQLite processes the statement it invokes our preupdate hook (see sqlite_watch.go). // 3. Our preupdate hook writes the changes to the change buffer. From 209e26c3b0deec34088fea993f1a62dfd8f74884 Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Thu, 11 Jan 2024 22:47:08 -0800 Subject: [PATCH 06/16] changelog: propagate change type --- pkg/changelog/changelog_writer.go | 4 ++++ pkg/event/entry.go | 4 +++- pkg/sqlite/sqlite_watch.go | 40 ++++++++++++++++++++++++++++++- 3 files changed, 46 insertions(+), 2 deletions(-) diff --git a/pkg/changelog/changelog_writer.go b/pkg/changelog/changelog_writer.go index e3c2fba4..9539f6e1 100644 --- a/pkg/changelog/changelog_writer.go +++ b/pkg/changelog/changelog_writer.go @@ -2,6 +2,7 @@ package changelog import ( "encoding/json" + "github.com/segmentio/ctlstore/pkg/sqlite" "github.com/pkg/errors" "github.com/segmentio/events/v2" @@ -17,6 +18,7 @@ type ( } ChangelogEntry struct { Seq int64 + Type sqlite.ChangeType Family string Table string Key []interface{} @@ -30,11 +32,13 @@ func NewChangelogEntry(seq int64, family string, table string, key []interface{} func (w *ChangelogWriter) WriteChange(e ChangelogEntry) error { structure := struct { Seq int64 `json:"seq"` + Type string `json:"type"` Family string `json:"family"` Table string `json:"table"` Key []interface{} `json:"key"` }{ e.Seq, + e.Type.String(), e.Family, e.Table, e.Key, diff --git a/pkg/event/entry.go b/pkg/event/entry.go index e175153d..cc62cdc1 100644 --- a/pkg/event/entry.go +++ b/pkg/event/entry.go @@ -2,9 +2,11 @@ package event // entry represents a single row in the changelog // e.g. -// {"seq":1,"family":"fam","table":"foo","key":[{"name":"id","type":"int","value":1}]} +// +// {"seq":1,"family":"fam","table":"foo","key":[{"name":"id","type":"int","value":1}]} type entry struct { Seq int64 `json:"seq"` + Type string `json:"type"` Family string `json:"family"` Table string `json:"table"` Key []Key `json:"key"` diff --git a/pkg/sqlite/sqlite_watch.go b/pkg/sqlite/sqlite_watch.go index cf6a8e96..c220f4ee 100644 --- a/pkg/sqlite/sqlite_watch.go +++ b/pkg/sqlite/sqlite_watch.go @@ -3,7 +3,6 @@ package sqlite import ( "context" "database/sql" - "github.com/pkg/errors" "github.com/segmentio/ctlstore/pkg/ldb" "github.com/segmentio/ctlstore/pkg/scanfunc" @@ -11,6 +10,16 @@ import ( "github.com/segmentio/go-sqlite3" ) +type ChangeType int + +const ( + CHANGE_INSERT ChangeType = iota + CHANGE_UPDATE + CHANGE_DELETE + CHANGE_UNKNOWN + // Add other statement types here +) + type ( SQLiteWatchChange struct { Op int @@ -18,6 +27,7 @@ type ( TableName string OldRowID int64 NewRowID int64 + Type ChangeType OldRow []interface{} NewRow []interface{} } @@ -29,6 +39,33 @@ type ( } ) +// Map SQLite's update operation types to our own internal type +func determineChangeType(op int) ChangeType { + switch op { + case sqlite3.SQLITE_INSERT: + return CHANGE_INSERT + case sqlite3.SQLITE_UPDATE: + return CHANGE_UPDATE + case sqlite3.SQLITE_DELETE: + return CHANGE_DELETE + default: + return CHANGE_UNKNOWN + } +} + +func (c ChangeType) String() string { + switch c { + case CHANGE_INSERT: + return "insert" + case CHANGE_UPDATE: + return "update" + case CHANGE_DELETE: + return "delete" + default: + return "N/A" + } +} + // Registers a hook against dbName that will populate the passed buffer with // sqliteWatchChange messages each time a change is executed against the // database. These messages are pre-update, so the buffer will be populated @@ -70,6 +107,7 @@ func RegisterSQLiteWatch(dbName string, buffer *SQLChangeBuffer) error { NewRowID: pud.NewRowID, OldRow: oldRow, NewRow: newRow, + Type: determineChangeType(pud.Op), }) }) return nil From 73d259aa82b296d4f772c38388e103a7b7627957 Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Thu, 11 Jan 2024 22:56:35 -0800 Subject: [PATCH 07/16] add another stop for the propagation trainline --- pkg/event/entry.go | 1 + pkg/event/event.go | 1 + 2 files changed, 2 insertions(+) diff --git a/pkg/event/entry.go b/pkg/event/entry.go index cc62cdc1..c5c2b029 100644 --- a/pkg/event/entry.go +++ b/pkg/event/entry.go @@ -16,6 +16,7 @@ type entry struct { func (e entry) event() Event { return Event{ Sequence: e.Seq, + Type: e.Type, RowUpdate: RowUpdate{ FamilyName: e.Family, TableName: e.Table, diff --git a/pkg/event/event.go b/pkg/event/event.go index 4cfc2d7d..99522018 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -3,6 +3,7 @@ package event // Event is the type that the Iterator produces type Event struct { Sequence int64 + Type string RowUpdate RowUpdate } From 6b11bd8ffc661b329491f752521c36cc4c42ca5c Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Thu, 11 Jan 2024 23:02:08 -0800 Subject: [PATCH 08/16] update comment --- pkg/event/entry.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/event/entry.go b/pkg/event/entry.go index c5c2b029..95bf9360 100644 --- a/pkg/event/entry.go +++ b/pkg/event/entry.go @@ -3,7 +3,7 @@ package event // entry represents a single row in the changelog // e.g. // -// {"seq":1,"family":"fam","table":"foo","key":[{"name":"id","type":"int","value":1}]} +// {"seq":1,"family":"fam","type":"insert","table":"foo","key":[{"name":"id","type":"int","value":1}]} type entry struct { Seq int64 `json:"seq"` Type string `json:"type"` From 690d90652bdf8bfe9e42ae9ec24170e35f8ddb55 Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Fri, 12 Jan 2024 02:47:13 -0800 Subject: [PATCH 09/16] rename ChangeType to ChangeOp, defer mapping of SQLite Op until we are writing out the changes, and include the ledger sequence in the changelog --- pkg/changelog/changelog_writer.go | 67 ++++++++++++++++++++++------ pkg/event/entry.go | 18 ++++---- pkg/event/event.go | 7 +-- pkg/ldbwriter/changelog_callback.go | 10 +++-- pkg/ldbwriter/ldb_callback_writer.go | 2 +- pkg/ldbwriter/ldb_writer.go | 1 + pkg/sqlite/sqlite_watch.go | 39 ---------------- 7 files changed, 76 insertions(+), 68 deletions(-) diff --git a/pkg/changelog/changelog_writer.go b/pkg/changelog/changelog_writer.go index 9539f6e1..7b8d3cbd 100644 --- a/pkg/changelog/changelog_writer.go +++ b/pkg/changelog/changelog_writer.go @@ -2,12 +2,50 @@ package changelog import ( "encoding/json" - "github.com/segmentio/ctlstore/pkg/sqlite" - "github.com/pkg/errors" + "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/events/v2" + "github.com/segmentio/go-sqlite3" ) +// ChangeOp is the type of operation that was done to the DB +type ChangeOp int + +const ( + INSERT_OP ChangeOp = iota + UPDATE_OP + DELETE_OP + UNKNOWN_OP + // Add other statement types here +) + +// Map SQLite's update operation types to our own internal type +func MapSQLiteOpToChangeOp(op int) ChangeOp { + switch op { + case sqlite3.SQLITE_INSERT: + return INSERT_OP + case sqlite3.SQLITE_UPDATE: + return UPDATE_OP + case sqlite3.SQLITE_DELETE: + return DELETE_OP + default: + return UNKNOWN_OP + } +} + +func (c ChangeOp) String() string { + switch c { + case INSERT_OP: + return "insert" + case UPDATE_OP: + return "update" + case DELETE_OP: + return "delete" + default: + return "unknown" + } +} + type ( // WriteLine writes a line to something WriteLine interface { @@ -17,11 +55,12 @@ type ( WriteLine WriteLine } ChangelogEntry struct { - Seq int64 - Type sqlite.ChangeType - Family string - Table string - Key []interface{} + Seq int64 + Op ChangeOp + Family string + Table string + Key []interface{} + LedgerSeq schema.DMLSequence } ) @@ -31,14 +70,16 @@ func NewChangelogEntry(seq int64, family string, table string, key []interface{} func (w *ChangelogWriter) WriteChange(e ChangelogEntry) error { structure := struct { - Seq int64 `json:"seq"` - Type string `json:"type"` - Family string `json:"family"` - Table string `json:"table"` - Key []interface{} `json:"key"` + Seq int64 `json:"seq"` + LedgerSeq int64 `json:"ledgerSeq"` + Op string `json:"op"` + Family string `json:"family"` + Table string `json:"table"` + Key []interface{} `json:"key"` }{ e.Seq, - e.Type.String(), + e.LedgerSeq.Int(), + e.Op.String(), e.Family, e.Table, e.Key, diff --git a/pkg/event/entry.go b/pkg/event/entry.go index 95bf9360..d70f1e47 100644 --- a/pkg/event/entry.go +++ b/pkg/event/entry.go @@ -3,20 +3,22 @@ package event // entry represents a single row in the changelog // e.g. // -// {"seq":1,"family":"fam","type":"insert","table":"foo","key":[{"name":"id","type":"int","value":1}]} +// {"seq":1,"ledgerSeq":42,"op":"insert","family":"fam","table":"foo","key":[{"name":"id","type":"int","value":1}]} type entry struct { - Seq int64 `json:"seq"` - Type string `json:"type"` - Family string `json:"family"` - Table string `json:"table"` - Key []Key `json:"key"` + Seq int64 `json:"seq"` + LedgerSeq int64 `json:"ledgerSeq"` + Op string `json:"op"` + Family string `json:"family"` + Table string `json:"table"` + Key []Key `json:"key"` } // event converts the entry into an event for the iterator to return func (e entry) event() Event { return Event{ - Sequence: e.Seq, - Type: e.Type, + Sequence: e.Seq, + Op: e.Op, + LedgerSequence: e.LedgerSeq, RowUpdate: RowUpdate{ FamilyName: e.Family, TableName: e.Table, diff --git a/pkg/event/event.go b/pkg/event/event.go index 99522018..c11f77b4 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -2,9 +2,10 @@ package event // Event is the type that the Iterator produces type Event struct { - Sequence int64 - Type string - RowUpdate RowUpdate + Sequence int64 + LedgerSequence int64 + Op string + RowUpdate RowUpdate } // RowUpdate represents a single row update diff --git a/pkg/ldbwriter/changelog_callback.go b/pkg/ldbwriter/changelog_callback.go index be6d58fb..ac8698e9 100644 --- a/pkg/ldbwriter/changelog_callback.go +++ b/pkg/ldbwriter/changelog_callback.go @@ -37,10 +37,12 @@ func (c *ChangelogCallback) LDBWritten(ctx context.Context, data LDBWriteMetadat for _, key := range keys { seq := atomic.AddInt64(&c.Seq, 1) err = c.ChangelogWriter.WriteChange(changelog.ChangelogEntry{ - Seq: seq, - Family: fam.Name, - Table: tbl.Name, - Key: key, + Seq: seq, + Op: changelog.MapSQLiteOpToChangeOp(change.Op), + LedgerSeq: data.Statement.Sequence, + Family: fam.Name, + Table: tbl.Name, + Key: key, }) if err != nil { events.Log("Skipped logging change to %{family}s.%{table}s:%{key}v: %{err}v", diff --git a/pkg/ldbwriter/ldb_callback_writer.go b/pkg/ldbwriter/ldb_callback_writer.go index 7ef7c686..0fe1bebd 100644 --- a/pkg/ldbwriter/ldb_callback_writer.go +++ b/pkg/ldbwriter/ldb_callback_writer.go @@ -23,7 +23,7 @@ type CallbackWriter struct { } func (w *CallbackWriter) InTransaction() bool { - return w.transactionChanges != nil + return len(w.transactionChanges) > 0 } func (w *CallbackWriter) BeginTransaction() { diff --git a/pkg/ldbwriter/ldb_writer.go b/pkg/ldbwriter/ldb_writer.go index a3dfe84f..43cac68d 100644 --- a/pkg/ldbwriter/ldb_writer.go +++ b/pkg/ldbwriter/ldb_writer.go @@ -31,6 +31,7 @@ type LDBWriteMetadata struct { DB *sql.DB Statement schema.DMLStatement Changes []sqlite.SQLiteWatchChange + // TODO: sequence } // ldbWriter applies statements to a SQL database diff --git a/pkg/sqlite/sqlite_watch.go b/pkg/sqlite/sqlite_watch.go index c220f4ee..40f02332 100644 --- a/pkg/sqlite/sqlite_watch.go +++ b/pkg/sqlite/sqlite_watch.go @@ -10,16 +10,6 @@ import ( "github.com/segmentio/go-sqlite3" ) -type ChangeType int - -const ( - CHANGE_INSERT ChangeType = iota - CHANGE_UPDATE - CHANGE_DELETE - CHANGE_UNKNOWN - // Add other statement types here -) - type ( SQLiteWatchChange struct { Op int @@ -27,7 +17,6 @@ type ( TableName string OldRowID int64 NewRowID int64 - Type ChangeType OldRow []interface{} NewRow []interface{} } @@ -39,33 +28,6 @@ type ( } ) -// Map SQLite's update operation types to our own internal type -func determineChangeType(op int) ChangeType { - switch op { - case sqlite3.SQLITE_INSERT: - return CHANGE_INSERT - case sqlite3.SQLITE_UPDATE: - return CHANGE_UPDATE - case sqlite3.SQLITE_DELETE: - return CHANGE_DELETE - default: - return CHANGE_UNKNOWN - } -} - -func (c ChangeType) String() string { - switch c { - case CHANGE_INSERT: - return "insert" - case CHANGE_UPDATE: - return "update" - case CHANGE_DELETE: - return "delete" - default: - return "N/A" - } -} - // Registers a hook against dbName that will populate the passed buffer with // sqliteWatchChange messages each time a change is executed against the // database. These messages are pre-update, so the buffer will be populated @@ -107,7 +69,6 @@ func RegisterSQLiteWatch(dbName string, buffer *SQLChangeBuffer) error { NewRowID: pud.NewRowID, OldRow: oldRow, NewRow: newRow, - Type: determineChangeType(pud.Op), }) }) return nil From 9679b404e806db6d71038d756cef5b75a6759789 Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Mon, 15 Jan 2024 00:14:22 -0800 Subject: [PATCH 10/16] fix transaction tracking by simplyifying code to just allocate new slices, propagate ledger sequence for each statement by storing it into the SQLiteWatchChange, also add a boolean for whether the changelog entry is from a transaction or not, and add another test case --- pkg/changelog/changelog_writer.go | 27 ++++++----- pkg/event/entry.go | 18 ++++---- pkg/event/event.go | 1 + pkg/ldbwriter/changelog_callback.go | 13 +++--- pkg/ldbwriter/ldb_callback_writer.go | 55 +++++++++++++---------- pkg/ldbwriter/ldb_callback_writer_test.go | 20 ++++++++- pkg/ldbwriter/ldb_writer.go | 8 ++-- pkg/sqlite/sqlite_watch.go | 15 ++++--- 8 files changed, 95 insertions(+), 62 deletions(-) diff --git a/pkg/changelog/changelog_writer.go b/pkg/changelog/changelog_writer.go index 7b8d3cbd..34a9e147 100644 --- a/pkg/changelog/changelog_writer.go +++ b/pkg/changelog/changelog_writer.go @@ -55,12 +55,13 @@ type ( WriteLine WriteLine } ChangelogEntry struct { - Seq int64 - Op ChangeOp - Family string - Table string - Key []interface{} - LedgerSeq schema.DMLSequence + Seq int64 + Op ChangeOp + Family string + Table string + Key []interface{} + LedgerSeq schema.DMLSequence + Transaction bool } ) @@ -70,15 +71,17 @@ func NewChangelogEntry(seq int64, family string, table string, key []interface{} func (w *ChangelogWriter) WriteChange(e ChangelogEntry) error { structure := struct { - Seq int64 `json:"seq"` - LedgerSeq int64 `json:"ledgerSeq"` - Op string `json:"op"` - Family string `json:"family"` - Table string `json:"table"` - Key []interface{} `json:"key"` + Seq int64 `json:"seq"` + LedgerSeq int64 `json:"ledgerSeq"` + Transaction bool `json:"tx"` + Op string `json:"op"` + Family string `json:"family"` + Table string `json:"table"` + Key []interface{} `json:"key"` }{ e.Seq, e.LedgerSeq.Int(), + e.Transaction, e.Op.String(), e.Family, e.Table, diff --git a/pkg/event/entry.go b/pkg/event/entry.go index d70f1e47..356d4e58 100644 --- a/pkg/event/entry.go +++ b/pkg/event/entry.go @@ -3,22 +3,24 @@ package event // entry represents a single row in the changelog // e.g. // -// {"seq":1,"ledgerSeq":42,"op":"insert","family":"fam","table":"foo","key":[{"name":"id","type":"int","value":1}]} +// {"seq":1,"ledgerSeq":42,"tx":false,"op":"insert","family":"fam","table":"foo","key":[{"name":"id","type":"int","value":1}]} type entry struct { - Seq int64 `json:"seq"` - LedgerSeq int64 `json:"ledgerSeq"` - Op string `json:"op"` - Family string `json:"family"` - Table string `json:"table"` - Key []Key `json:"key"` + Seq int64 `json:"seq"` + LedgerSeq int64 `json:"ledgerSeq"` + Transaction bool `json:"tx"` + Op string `json:"op"` + Family string `json:"family"` + Table string `json:"table"` + Key []Key `json:"key"` } // event converts the entry into an event for the iterator to return func (e entry) event() Event { return Event{ Sequence: e.Seq, - Op: e.Op, LedgerSequence: e.LedgerSeq, + Transaction: e.Transaction, + Op: e.Op, RowUpdate: RowUpdate{ FamilyName: e.Family, TableName: e.Table, diff --git a/pkg/event/event.go b/pkg/event/event.go index c11f77b4..36746bd5 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -4,6 +4,7 @@ package event type Event struct { Sequence int64 LedgerSequence int64 + Transaction bool Op string RowUpdate RowUpdate } diff --git a/pkg/ldbwriter/changelog_callback.go b/pkg/ldbwriter/changelog_callback.go index ac8698e9..e2cdfb8a 100644 --- a/pkg/ldbwriter/changelog_callback.go +++ b/pkg/ldbwriter/changelog_callback.go @@ -37,12 +37,13 @@ func (c *ChangelogCallback) LDBWritten(ctx context.Context, data LDBWriteMetadat for _, key := range keys { seq := atomic.AddInt64(&c.Seq, 1) err = c.ChangelogWriter.WriteChange(changelog.ChangelogEntry{ - Seq: seq, - Op: changelog.MapSQLiteOpToChangeOp(change.Op), - LedgerSeq: data.Statement.Sequence, - Family: fam.Name, - Table: tbl.Name, - Key: key, + Seq: seq, + Op: changelog.MapSQLiteOpToChangeOp(change.Op), + LedgerSeq: change.LedgerSequence, + Transaction: data.Transaction, + Family: fam.Name, + Table: tbl.Name, + Key: key, }) if err != nil { events.Log("Skipped logging change to %{family}s.%{table}s:%{key}v: %{err}v", diff --git a/pkg/ldbwriter/ldb_callback_writer.go b/pkg/ldbwriter/ldb_callback_writer.go index 0fe1bebd..404dd87e 100644 --- a/pkg/ldbwriter/ldb_callback_writer.go +++ b/pkg/ldbwriter/ldb_callback_writer.go @@ -3,6 +3,7 @@ package ldbwriter import ( "context" "database/sql" + "fmt" "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/ctlstore/pkg/sqlite" @@ -22,35 +23,30 @@ type CallbackWriter struct { transactionChanges []sqlite.SQLiteWatchChange } -func (w *CallbackWriter) InTransaction() bool { - return len(w.transactionChanges) > 0 +func (w *CallbackWriter) inTransaction() bool { + return w.transactionChanges != nil } -func (w *CallbackWriter) BeginTransaction() { - if w.transactionChanges == nil { - w.transactionChanges = make([]sqlite.SQLiteWatchChange, 0) - } else { - if len(w.transactionChanges) > 0 { - // This should never happen, but just in case... - stats.Add("ldb_changes_abandoned", len(w.transactionChanges)) - events.Log("error: abandoned %{count}d changes from incomplete transaction", len(w.transactionChanges)) - } - // Reset to size 0, but keep the underlying array - w.transactionChanges = w.transactionChanges[:0] +func (w *CallbackWriter) beginTransaction(ledgerSequence schema.DMLSequence) { + if len(w.transactionChanges) > 0 { + // This should never happen, but just in case... + stats.Add("ldb_changes_abandoned", len(w.transactionChanges)) + events.Log("error: abandoned %{count}d changes from incomplete transaction, current statement's ledger sequence: %{sequence}d", + len(w.transactionChanges), ledgerSequence) } + w.transactionChanges = make([]sqlite.SQLiteWatchChange, 0) stats.Set("ldb_changes_accumulated", 0) } // Transaction done! Return the accumulated changes including the latest ones -func (w *CallbackWriter) EndTransaction(changes *[]sqlite.SQLiteWatchChange) { +func (w *CallbackWriter) endTransaction(changes *[]sqlite.SQLiteWatchChange) { *changes = append(w.transactionChanges, *changes...) stats.Set("ldb_changes_accumulated", len(*changes)) - // Reset to size 0, but keep the underlying array - w.transactionChanges = w.transactionChanges[:0] + w.transactionChanges = nil } // Transaction isn't over yet, save the latest changes -func (w *CallbackWriter) AccumulateChanges(changes []sqlite.SQLiteWatchChange) { +func (w *CallbackWriter) accumulateChanges(changes []sqlite.SQLiteWatchChange) { w.transactionChanges = append(w.transactionChanges, changes...) stats.Set("ldb_changes_accumulated", len(w.transactionChanges)) } @@ -73,30 +69,41 @@ func (w *CallbackWriter) ApplyDMLStatement(ctx context.Context, statement schema // If beginning a transaction then start accumulating changes, don't send them out yet if statement.Statement == schema.DMLTxBeginKey { - w.BeginTransaction() + w.beginTransaction(statement.Sequence) return nil } changes := w.ChangeBuffer.Pop() - if w.InTransaction() { + // Record the responsible ledger sequence in each change so that the callback can use it + for i := range changes { + changes[i].LedgerSequence = statement.Sequence + } + + var transaction bool + if w.inTransaction() { + transaction = true if statement.Statement == schema.DMLTxEndKey { // Transaction done, let's send what we have accumulated - w.EndTransaction(&changes) + w.endTransaction(&changes) } else { // Transaction not over, continue accumulating - w.AccumulateChanges(changes) + w.accumulateChanges(changes) return nil } } + // DEBUG: + fmt.Printf("CallbackWriter.ApplyDMLStatement: invoking callback: len(changes)=%d statement.ledgerSequence=%d\n", len(changes), statement.Sequence) + //fmt.Printf("CallbackWriter.ApplyDMLStatement: len(changes)=%d changes=%+v\n", len(changes), changes) stats.Observe("ldb_changes_written", len(changes)) for _, callback := range w.Callbacks { events.Debug("Writing DML callback for %{cb}T", callback) callback.LDBWritten(ctx, LDBWriteMetadata{ - DB: w.DB, - Statement: statement, - Changes: changes, + DB: w.DB, + Statement: statement, + Changes: changes, + Transaction: transaction, }) } return nil diff --git a/pkg/ldbwriter/ldb_callback_writer_test.go b/pkg/ldbwriter/ldb_callback_writer_test.go index 520b769a..a813b186 100644 --- a/pkg/ldbwriter/ldb_callback_writer_test.go +++ b/pkg/ldbwriter/ldb_callback_writer_test.go @@ -136,11 +136,29 @@ func TestCallbackWriter_ApplyDMLStatement(t *testing.T) { schema.NewTestDMLStatement(schema.DMLTxEndKey), }, }, - // since it's a transaction, we expect only one callback, and it should have all 3 updates + // since it's a transaction, we expect only one callback, and it should have all 2 updates expectedCallbacks: 1, expectedUpdatesPerCallback: 2, wantErr: false, }, + { + name: "Test 4 statements in a ledger transaction, using REPLACE INTO", + args: args{ + ctx: ctx, + statements: []schema.DMLStatement{ + schema.NewTestDMLStatement(schema.DMLTxBeginKey), + schema.NewTestDMLStatement("REPLACE INTO foo('bar') VALUES('lion');"), + schema.NewTestDMLStatement("REPLACE INTO foo('bar') VALUES('green');"), + schema.NewTestDMLStatement("REPLACE INTO foo('bar') VALUES('boston');"), + schema.NewTestDMLStatement("REPLACE INTO foo('bar') VALUES('detroit')"), + schema.NewTestDMLStatement(schema.DMLTxEndKey), + }, + }, + // since it's a transaction, we expect only one callback, and it should have all 4 updates + expectedCallbacks: 1, + expectedUpdatesPerCallback: 4, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { diff --git a/pkg/ldbwriter/ldb_writer.go b/pkg/ldbwriter/ldb_writer.go index 43cac68d..1c10d81c 100644 --- a/pkg/ldbwriter/ldb_writer.go +++ b/pkg/ldbwriter/ldb_writer.go @@ -28,10 +28,10 @@ type LDBWriteCallback interface { // LDBWriteMetadata contains the metadata about a statement that was written // to the LDB. type LDBWriteMetadata struct { - DB *sql.DB - Statement schema.DMLStatement - Changes []sqlite.SQLiteWatchChange - // TODO: sequence + DB *sql.DB + Statement schema.DMLStatement + Changes []sqlite.SQLiteWatchChange + Transaction bool } // ldbWriter applies statements to a SQL database diff --git a/pkg/sqlite/sqlite_watch.go b/pkg/sqlite/sqlite_watch.go index 40f02332..75f8773c 100644 --- a/pkg/sqlite/sqlite_watch.go +++ b/pkg/sqlite/sqlite_watch.go @@ -12,13 +12,14 @@ import ( type ( SQLiteWatchChange struct { - Op int - DatabaseName string - TableName string - OldRowID int64 - NewRowID int64 - OldRow []interface{} - NewRow []interface{} + Op int + DatabaseName string + TableName string + OldRowID int64 + NewRowID int64 + OldRow []interface{} + NewRow []interface{} + LedgerSequence schema.DMLSequence } // pkAndMeta is a primary key value with name and type metadata to boot pkAndMeta struct { From 5e21030534def8eb81562b63d7b19d9587a2f034 Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Mon, 15 Jan 2024 23:20:26 -0800 Subject: [PATCH 11/16] undo Op propagation --- pkg/changelog/changelog_writer.go | 46 ---------------------------- pkg/event/entry.go | 4 +-- pkg/event/event.go | 1 - pkg/ldbwriter/changelog_callback.go | 1 - pkg/ldbwriter/ldb_callback_writer.go | 3 ++ 5 files changed, 4 insertions(+), 51 deletions(-) diff --git a/pkg/changelog/changelog_writer.go b/pkg/changelog/changelog_writer.go index 34a9e147..cb58e322 100644 --- a/pkg/changelog/changelog_writer.go +++ b/pkg/changelog/changelog_writer.go @@ -5,47 +5,8 @@ import ( "github.com/pkg/errors" "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/events/v2" - "github.com/segmentio/go-sqlite3" ) -// ChangeOp is the type of operation that was done to the DB -type ChangeOp int - -const ( - INSERT_OP ChangeOp = iota - UPDATE_OP - DELETE_OP - UNKNOWN_OP - // Add other statement types here -) - -// Map SQLite's update operation types to our own internal type -func MapSQLiteOpToChangeOp(op int) ChangeOp { - switch op { - case sqlite3.SQLITE_INSERT: - return INSERT_OP - case sqlite3.SQLITE_UPDATE: - return UPDATE_OP - case sqlite3.SQLITE_DELETE: - return DELETE_OP - default: - return UNKNOWN_OP - } -} - -func (c ChangeOp) String() string { - switch c { - case INSERT_OP: - return "insert" - case UPDATE_OP: - return "update" - case DELETE_OP: - return "delete" - default: - return "unknown" - } -} - type ( // WriteLine writes a line to something WriteLine interface { @@ -56,7 +17,6 @@ type ( } ChangelogEntry struct { Seq int64 - Op ChangeOp Family string Table string Key []interface{} @@ -65,16 +25,11 @@ type ( } ) -func NewChangelogEntry(seq int64, family string, table string, key []interface{}) *ChangelogEntry { - return &ChangelogEntry{Seq: seq, Family: family, Table: table, Key: key} -} - func (w *ChangelogWriter) WriteChange(e ChangelogEntry) error { structure := struct { Seq int64 `json:"seq"` LedgerSeq int64 `json:"ledgerSeq"` Transaction bool `json:"tx"` - Op string `json:"op"` Family string `json:"family"` Table string `json:"table"` Key []interface{} `json:"key"` @@ -82,7 +37,6 @@ func (w *ChangelogWriter) WriteChange(e ChangelogEntry) error { e.Seq, e.LedgerSeq.Int(), e.Transaction, - e.Op.String(), e.Family, e.Table, e.Key, diff --git a/pkg/event/entry.go b/pkg/event/entry.go index 356d4e58..b12e7c23 100644 --- a/pkg/event/entry.go +++ b/pkg/event/entry.go @@ -3,12 +3,11 @@ package event // entry represents a single row in the changelog // e.g. // -// {"seq":1,"ledgerSeq":42,"tx":false,"op":"insert","family":"fam","table":"foo","key":[{"name":"id","type":"int","value":1}]} +// {"seq":1,"ledgerSeq":42,"tx":false,"family":"fam","table":"foo","key":[{"name":"id","type":"int","value":1}]} type entry struct { Seq int64 `json:"seq"` LedgerSeq int64 `json:"ledgerSeq"` Transaction bool `json:"tx"` - Op string `json:"op"` Family string `json:"family"` Table string `json:"table"` Key []Key `json:"key"` @@ -20,7 +19,6 @@ func (e entry) event() Event { Sequence: e.Seq, LedgerSequence: e.LedgerSeq, Transaction: e.Transaction, - Op: e.Op, RowUpdate: RowUpdate{ FamilyName: e.Family, TableName: e.Table, diff --git a/pkg/event/event.go b/pkg/event/event.go index 36746bd5..0acef6b4 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -5,7 +5,6 @@ type Event struct { Sequence int64 LedgerSequence int64 Transaction bool - Op string RowUpdate RowUpdate } diff --git a/pkg/ldbwriter/changelog_callback.go b/pkg/ldbwriter/changelog_callback.go index e2cdfb8a..17559f21 100644 --- a/pkg/ldbwriter/changelog_callback.go +++ b/pkg/ldbwriter/changelog_callback.go @@ -38,7 +38,6 @@ func (c *ChangelogCallback) LDBWritten(ctx context.Context, data LDBWriteMetadat seq := atomic.AddInt64(&c.Seq, 1) err = c.ChangelogWriter.WriteChange(changelog.ChangelogEntry{ Seq: seq, - Op: changelog.MapSQLiteOpToChangeOp(change.Op), LedgerSeq: change.LedgerSequence, Transaction: data.Transaction, Family: fam.Name, diff --git a/pkg/ldbwriter/ldb_callback_writer.go b/pkg/ldbwriter/ldb_callback_writer.go index 404dd87e..1c348520 100644 --- a/pkg/ldbwriter/ldb_callback_writer.go +++ b/pkg/ldbwriter/ldb_callback_writer.go @@ -35,12 +35,14 @@ func (w *CallbackWriter) beginTransaction(ledgerSequence schema.DMLSequence) { len(w.transactionChanges), ledgerSequence) } w.transactionChanges = make([]sqlite.SQLiteWatchChange, 0) + // TODO: Figure out if we wanna use a gauge or a counter here stats.Set("ldb_changes_accumulated", 0) } // Transaction done! Return the accumulated changes including the latest ones func (w *CallbackWriter) endTransaction(changes *[]sqlite.SQLiteWatchChange) { *changes = append(w.transactionChanges, *changes...) + // TODO: Figure out if we wanna use a gauge or a counter here stats.Set("ldb_changes_accumulated", len(*changes)) w.transactionChanges = nil } @@ -48,6 +50,7 @@ func (w *CallbackWriter) endTransaction(changes *[]sqlite.SQLiteWatchChange) { // Transaction isn't over yet, save the latest changes func (w *CallbackWriter) accumulateChanges(changes []sqlite.SQLiteWatchChange) { w.transactionChanges = append(w.transactionChanges, changes...) + // TODO: Figure out if we wanna use a gauge or a counter here stats.Set("ldb_changes_accumulated", len(w.transactionChanges)) } From 4dc0a359bf842e60098e217d5bf1fc5c307986d0 Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Mon, 15 Jan 2024 23:31:37 -0800 Subject: [PATCH 12/16] add test case for two ledger transactions --- pkg/ldbwriter/ldb_callback_writer_test.go | 25 ++++++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/pkg/ldbwriter/ldb_callback_writer_test.go b/pkg/ldbwriter/ldb_callback_writer_test.go index a813b186..ba011e8a 100644 --- a/pkg/ldbwriter/ldb_callback_writer_test.go +++ b/pkg/ldbwriter/ldb_callback_writer_test.go @@ -17,6 +17,7 @@ type TestUpdateCallbackHandler struct { Changes []sqlite.SQLiteWatchChange } +// Record the changes propagated from ApplyDMLStatement func (u *TestUpdateCallbackHandler) LDBWritten(ctx context.Context, data LDBWriteMetadata) { // The [:0] slice operation will reuse the underlying array of u.Changes if it's large enough // to hold all elements of data.Changes, otherwise it will allocate a new one. @@ -142,7 +143,7 @@ func TestCallbackWriter_ApplyDMLStatement(t *testing.T) { wantErr: false, }, { - name: "Test 4 statements in a ledger transaction, using REPLACE INTO", + name: "Test four statements in a ledger transaction, using REPLACE INTO", args: args{ ctx: ctx, statements: []schema.DMLStatement{ @@ -159,6 +160,28 @@ func TestCallbackWriter_ApplyDMLStatement(t *testing.T) { expectedUpdatesPerCallback: 4, wantErr: false, }, + { + name: "Test six statements in two ledger transactions", + args: args{ + ctx: ctx, + statements: []schema.DMLStatement{ + schema.NewTestDMLStatement(schema.DMLTxBeginKey), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('cat');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('dog');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('hamster');"), + schema.NewTestDMLStatement(schema.DMLTxEndKey), + schema.NewTestDMLStatement(schema.DMLTxBeginKey), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('fish');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('rabbit');"), + schema.NewTestDMLStatement("INSERT INTO foo VALUES('ferret');"), + schema.NewTestDMLStatement(schema.DMLTxEndKey), + }, + }, + // since it's a transaction, we expect only one callback, and it should have all 3 updates + expectedCallbacks: 2, + expectedUpdatesPerCallback: 3, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { From 4d4de9b90a2063a6ca6bdd2c7c8a0e5309c235c8 Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Tue, 16 Jan 2024 00:35:42 -0800 Subject: [PATCH 13/16] add warning to ldb_writer_with_changelog.go and remove debug printouts --- pkg/ldbwriter/ldb_callback_writer.go | 7 +------ pkg/ldbwriter/ldb_writer_with_changelog.go | 3 ++- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/pkg/ldbwriter/ldb_callback_writer.go b/pkg/ldbwriter/ldb_callback_writer.go index 1c348520..45236a6b 100644 --- a/pkg/ldbwriter/ldb_callback_writer.go +++ b/pkg/ldbwriter/ldb_callback_writer.go @@ -3,8 +3,6 @@ package ldbwriter import ( "context" "database/sql" - "fmt" - "github.com/segmentio/ctlstore/pkg/schema" "github.com/segmentio/ctlstore/pkg/sqlite" "github.com/segmentio/events/v2" @@ -96,12 +94,9 @@ func (w *CallbackWriter) ApplyDMLStatement(ctx context.Context, statement schema } } - // DEBUG: - fmt.Printf("CallbackWriter.ApplyDMLStatement: invoking callback: len(changes)=%d statement.ledgerSequence=%d\n", len(changes), statement.Sequence) - //fmt.Printf("CallbackWriter.ApplyDMLStatement: len(changes)=%d changes=%+v\n", len(changes), changes) stats.Observe("ldb_changes_written", len(changes)) for _, callback := range w.Callbacks { - events.Debug("Writing DML callback for %{cb}T", callback) + events.Debug("Writing DML callback for %{cb}T with %{changeCount}d changes", callback, len(changes)) callback.LDBWritten(ctx, LDBWriteMetadata{ DB: w.DB, Statement: statement, diff --git a/pkg/ldbwriter/ldb_writer_with_changelog.go b/pkg/ldbwriter/ldb_writer_with_changelog.go index 6f3bc9e5..f527d017 100644 --- a/pkg/ldbwriter/ldb_writer_with_changelog.go +++ b/pkg/ldbwriter/ldb_writer_with_changelog.go @@ -19,6 +19,8 @@ type LDBWriterWithChangelog struct { Seq int64 } +// ⚠️ WARNING: This code is not used except in a unit test. +// See ldb_callback_writer.go for the real code and more description. // // NOTE: How does the changelog work? // @@ -33,7 +35,6 @@ type LDBWriterWithChangelog struct { // This is pretty complex, but after enumerating about 8 different options, it // ended up actually being the most simple. Other options involved not-so-great // options like parsing SQL or maintaining triggers on every table. -// func (w *LDBWriterWithChangelog) ApplyDMLStatement(ctx context.Context, statement schema.DMLStatement) error { err := w.LdbWriter.ApplyDMLStatement(ctx, statement) if err != nil { From 4479f75cd88f9dff3553de7484632b30d0a616cf Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Tue, 16 Jan 2024 01:28:40 -0800 Subject: [PATCH 14/16] adjust tests to account for changelog changes --- pkg/changelog/changelog_writer_test.go | 2 +- pkg/event/changelog_test.go | 2 +- pkg/reflector/reflector_test.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/changelog/changelog_writer_test.go b/pkg/changelog/changelog_writer_test.go index bf70a948..7ea7e2f4 100644 --- a/pkg/changelog/changelog_writer_test.go +++ b/pkg/changelog/changelog_writer_test.go @@ -32,5 +32,5 @@ func TestWriteChange(t *testing.T) { }) require.NoError(t, err) require.EqualValues(t, 1, len(mock.Lines)) - require.Equal(t, `{"seq":42,"family":"family1","table":"table1","key":[18014398509481984,"foo"]}`, mock.Lines[0]) + require.Equal(t, `{"seq":42,"ledgerSeq":0,"tx":false,"family":"family1","table":"table1","key":[18014398509481984,"foo"]}`, mock.Lines[0]) } diff --git a/pkg/event/changelog_test.go b/pkg/event/changelog_test.go index 3e813caa..3aa7b13e 100644 --- a/pkg/event/changelog_test.go +++ b/pkg/event/changelog_test.go @@ -135,7 +135,7 @@ func TestChangelog(t *testing.T) { numEvents: 10000, rotateAfterBytes: 1024 * 128, writeDelay: 100 * time.Microsecond, - mustRotateN: 8, + mustRotateN: 9, }, } { t.Run(test.name, func(t *testing.T) { diff --git a/pkg/reflector/reflector_test.go b/pkg/reflector/reflector_test.go index 9d8fda93..c87c3ab6 100644 --- a/pkg/reflector/reflector_test.go +++ b/pkg/reflector/reflector_test.go @@ -208,7 +208,7 @@ func TestReflector(t *testing.T) { clBytes, err := ioutil.ReadFile(changelogPath) require.NoError(t, err) - expectChangelog := "{\"seq\":1,\"family\":\"family1\",\"table\":\"table1234\",\"key\":[{\"name\":\"field1\",\"type\":\"INTEGER\",\"value\":1234}]}\n" + expectChangelog := "{\"seq\":1,\"ledgerSeq\":2,\"tx\":false,\"family\":\"family1\",\"table\":\"table1234\",\"key\":[{\"name\":\"field1\",\"type\":\"INTEGER\",\"value\":1234}]}\n" if diff := cmp.Diff(expectChangelog, string(clBytes)); diff != "" { t.Errorf("Changelog contents differ\n%s", diff) } From a616e623080c8f1ae4005aa64b346665f429a11c Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Tue, 16 Jan 2024 11:53:21 -0800 Subject: [PATCH 15/16] fix incorrect copy-pasta comment --- pkg/ldbwriter/ldb_callback_writer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/ldbwriter/ldb_callback_writer_test.go b/pkg/ldbwriter/ldb_callback_writer_test.go index ba011e8a..e57c23dc 100644 --- a/pkg/ldbwriter/ldb_callback_writer_test.go +++ b/pkg/ldbwriter/ldb_callback_writer_test.go @@ -177,7 +177,7 @@ func TestCallbackWriter_ApplyDMLStatement(t *testing.T) { schema.NewTestDMLStatement(schema.DMLTxEndKey), }, }, - // since it's a transaction, we expect only one callback, and it should have all 3 updates + // since these are transactions, we expect two callbacks, and each should have all 3 updates expectedCallbacks: 2, expectedUpdatesPerCallback: 3, wantErr: false, From 7f3a2027e84913eadbaa87d433566be621b8f1d1 Mon Sep 17 00:00:00 2001 From: Erik Weathers Date: Tue, 16 Jan 2024 13:45:13 -0800 Subject: [PATCH 16/16] make API for ending transaction more clear and less subtle (remove possibly premature optimization of updating passed-in slice; also remove metric for ldb_changes_accumulated in favor of just using the ldb_changes_written one --- pkg/ldbwriter/ldb_callback_writer.go | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/pkg/ldbwriter/ldb_callback_writer.go b/pkg/ldbwriter/ldb_callback_writer.go index 45236a6b..ce4464ab 100644 --- a/pkg/ldbwriter/ldb_callback_writer.go +++ b/pkg/ldbwriter/ldb_callback_writer.go @@ -33,23 +33,19 @@ func (w *CallbackWriter) beginTransaction(ledgerSequence schema.DMLSequence) { len(w.transactionChanges), ledgerSequence) } w.transactionChanges = make([]sqlite.SQLiteWatchChange, 0) - // TODO: Figure out if we wanna use a gauge or a counter here - stats.Set("ldb_changes_accumulated", 0) } // Transaction done! Return the accumulated changes including the latest ones -func (w *CallbackWriter) endTransaction(changes *[]sqlite.SQLiteWatchChange) { - *changes = append(w.transactionChanges, *changes...) - // TODO: Figure out if we wanna use a gauge or a counter here - stats.Set("ldb_changes_accumulated", len(*changes)) +func (w *CallbackWriter) endTransaction(changes []sqlite.SQLiteWatchChange) (transactionChanges []sqlite.SQLiteWatchChange) { + w.accumulateChanges(changes) + transactionChanges = w.transactionChanges w.transactionChanges = nil + return } // Transaction isn't over yet, save the latest changes func (w *CallbackWriter) accumulateChanges(changes []sqlite.SQLiteWatchChange) { w.transactionChanges = append(w.transactionChanges, changes...) - // TODO: Figure out if we wanna use a gauge or a counter here - stats.Set("ldb_changes_accumulated", len(w.transactionChanges)) } // ApplyDMLStatement @@ -86,7 +82,7 @@ func (w *CallbackWriter) ApplyDMLStatement(ctx context.Context, statement schema transaction = true if statement.Statement == schema.DMLTxEndKey { // Transaction done, let's send what we have accumulated - w.endTransaction(&changes) + changes = w.endTransaction(changes) } else { // Transaction not over, continue accumulating w.accumulateChanges(changes)